project/analyzers/analyzer/__init__.py

91 lines
2.1 KiB
Python

import logging
from collections import KeysView
from typing import Type, Sized, Collection
from analyzers.settings import LogSettings
log: logging.Logger = logging.getLogger(__name__)
class Result:
def __init__(self, analysis: Type, result: Sized):
self.result = result
self.__analysis__ = analysis
log.debug("set" + str(len(self.result)))
def analysis(self):
return self.__analysis__
def get(self):
log.debug("get" + str(len(self.result)))
return self.result
def __repr__(self):
return "<Result " + str(self.__analysis__) + ": " + str(type(self.result)) + " " + str(len(self.result)) + ">"
class ResultStore:
"""Store Results"""
def __init__(self, store_entry: Type[Collection] = list, store_action: callable = list.append) -> None:
self.store = {}
self.category = None
self.entry: Type[Collection] = store_entry
self.action: callable = store_action
def new_category(self, key) -> None:
self.category = key
if not key in self.store:
self.store[key] = self.entry()
def add(self, entry: Result) -> None:
self.action(self.store[self.category], entry)
def get_store(self) -> dict:
return dict(self.store)
def get_all(self) -> Collection[Result]:
"""
Throw all categories together
:return:
"""
result = []
for key in self.store:
result += self.store[key]
return result
def get_categories(self) -> KeysView:
return self.store.keys()
def get_category(self, key):
if key not in self.store:
return self.entry
return self.store[key]
def serializable(self):
values = {}
for key in self.store:
values[key] = [{"analysis": str(result.analysis()), "result": result.get()} for result in self.store[key]]
return values
class Analyzer:
"""Operate on log entries, one at a time"""
def __init__(self, settings: LogSettings) -> None:
self.settings: LogSettings = settings
def process(self, entry: dict) -> bool:
"""
Process an entry
:param entry: Entry to process
:return: True if consumed, False for further analysis
"""
raise NotImplementedError()
def result(self, store: ResultStore) -> None:
raise NotImplementedError()
def name(self) -> str:
return self.__name__