project/analyzers/analyzer/default.py

114 lines
2.5 KiB
Python

import logging
from collections import defaultdict
from . import Result, LogSettings, Analyzer, ResultStore
class LocationAnalyzer(Analyzer):
"""
store spatial log entries
"""
__name__ = "Location"
log = logging.getLogger(__name__)
def __init__(self, settings: LogSettings):
super().__init__(settings)
self.entries = []
def result(self, store: ResultStore) -> None:
self.log.debug(len(self.entries))
store.add(Result(type(self), list(self.entries)))
def process(self, entry: dict) -> bool:
if entry[self.settings.type_field] in self.settings.spatials:
self.entries.append(entry)
# self.log.debug(len(self.entries))
return False
class LogEntryCountAnalyzer(Analyzer):
"""
count occurrences of log entry types
"""
__name__ = "LogEntryCount"
def result(self, store: ResultStore) -> None:
store.add(Result(type(self), dict(self.store)))
def process(self, entry: dict) -> bool:
self.store[entry[self.settings.type_field]] += 1
return False
def __init__(self, settings: LogSettings):
super().__init__(settings)
self.store = defaultdict(lambda: 0)
class LogEntrySequenceAnalyzer(Analyzer):
"""
store sequence of all log entry types
"""
__name__ = "LogEntrySequence"
def result(self, store: ResultStore) -> None:
store.add(Result(type(self), list(self.store)))
def process(self, entry: dict) -> bool:
entry_type = entry[self.settings.type_field]
self.store.append(entry_type)
return False
def __init__(self, settings: LogSettings):
super().__init__(settings)
self.store = []
class ActionSequenceAnalyzer(LogEntrySequenceAnalyzer):
"""
find sequence of non-spatial log entry types
"""
__name__ = "ActionSequenceAnalyzer"
def process(self, entry: dict) -> bool:
entry_type = entry[self.settings.type_field]
if entry_type in self.settings.spatials:
return False
self.store.append(entry_type)
return False
class CategorizerStub(Analyzer):
"""
generate a new Category in a ResultStore
"""
def process(self, entry: dict) -> bool:
raise NotImplementedError()
__name__ = "Categorizer"
def result(self, store: ResultStore) -> None:
store.new_category(self.key)
def __init__(self, settings: LogSettings):
super().__init__(settings)
self.key = "default"
class Store(Analyzer):
"""
Store the entire log
"""
__name__ = "Store"
def result(self, store: ResultStore) -> None:
store.add(Result(type(self), list(self.store)))
def process(self, entry: dict) -> bool:
self.store.append(entry)
return False
def __init__(self, settings: LogSettings):
super().__init__(settings)
self.store: list = []