import logging from typing import List from analysis.analyzers.analyzer import ResultStore, Analyzer from analysis.analyzers.settings import LogSettings log: logging.Logger = logging.getLogger(__name__) def process_log(logfile: str, settings: LogSettings, loaders) -> List[Analyzer]: loader = loaders[settings.log_format]() try: loader.load(logfile) except BaseException as e: raise RuntimeError(e) analyzers: List[Analyzer] = [] log.debug("build analyzers") for analyzer in settings.analyzers: analyzers.append(analyzer(settings)) log.debug("process entries") for entry in loader.get_entry(): for analyzer in analyzers: try: if analyzer.process(entry): break except KeyError as e: log.exception(e) return analyzers def run_analysis(log_ids: list, settings, loaders): store: ResultStore = ResultStore() for log_id in log_ids: log.info("LOG_ID: "+ str(log_id)) for analysis in process_log(log_id, settings, loaders): log.info("* Result for " + analysis.name()) analysis.result(store, name=log_id) return store def load_ids(name: str): log_ids = [] with open(name) as src: for line in src: line = line.strip() log_ids.append(line) return log_ids def grep(log_ids, source, settings): logs = [] with open(source) as src: lines = src.readlines() for id in log_ids: for line in lines: if id in line: logs.append(line.strip()) return logs def src_file(filename): log_ids = [] with open(filename) as src: for line in src: line = line.strip() log_ids.append(line) return log_ids