import logging from collections import KeysView from typing import Type, Sized, Collection from analysis.analyzers.settings import LogSettings log: logging.Logger = logging.getLogger(__name__) class Result: def __init__(self, analysis: Type, result: Sized, name: str = None): self.result = result self.__analysis__ = analysis log.debug("set" + str(len(self.result))) self.name = name def analysis(self): return self.__analysis__ def get(self): log.debug("get" + str(len(self.result))) return self.result def __repr__(self): return "" class ResultStore: """Store Results""" def __init__(self, store_entry: Type[Collection] = list, store_action: callable = list.append, key_index=None) -> None: self.store = {} self.category = None self.entry: Type[Collection] = store_entry self.action: callable = store_action self.key_index = key_index def new_category(self, key) -> None: if not self.key_index is None: key = key[self.key_index] self.category = key if not key in self.store: self.store[key] = self.entry() def add(self, entry: Result) -> None: self.action(self.store[self.category], entry) def get_store(self) -> dict: return dict(self.store) def get_all(self) -> Collection[Result]: """ Throw all categories together :return: """ result = [] for key in sorted(self.store): result += self.store[key] return result def get_categories(self) -> KeysView: return self.store.keys() def get_category(self, key): if key not in self.store: return self.entry() log.error("get_category %s %s", key, len(self.store[key])) return self.store[key] def serializable(self): values = {} for key in self.store: values[key] = [{ "analysis": str(result.analysis()), "result": result.get(), "name": result.name } for result in self.store[key]] return values class Analyzer: """Operate on log entries, one at a time""" def __init__(self, settings: LogSettings) -> None: self.settings: LogSettings = settings def process(self, entry: dict) -> bool: """ Process an entry :param entry: Entry to process :return: True if consumed, False for further analysis """ raise NotImplementedError() def result(self, store: ResultStore, name=None) -> None: raise NotImplementedError() def name(self) -> str: return self.__name__