97 lines
2.1 KiB
Python
97 lines
2.1 KiB
Python
import json
|
|
from collections import defaultdict, Iterable
|
|
|
|
from log_analyzer import LogSettings
|
|
|
|
|
|
class Analyzer:
|
|
def __init__(self, settings: LogSettings):
|
|
self.settings = settings
|
|
|
|
def process(self, entry: dict) -> bool:
|
|
raise NotImplementedError()
|
|
|
|
def result(self) -> Iterable:
|
|
raise NotImplementedError()
|
|
|
|
def name(self):
|
|
return self.__name__
|
|
|
|
|
|
class LocationAnalyzer(Analyzer):
|
|
"""
|
|
store spatial log entries
|
|
"""
|
|
entries = []
|
|
__name__ = "Location"
|
|
|
|
class Formats:
|
|
geojson = 0
|
|
|
|
def __init__(self, settings: LogSettings):
|
|
super().__init__(settings)
|
|
|
|
def result(self) -> list:
|
|
return self.entries
|
|
|
|
def render(self, format: int = Formats.geojson):
|
|
if format is self.Formats.geojson:
|
|
return json.dumps([entry['location']['coordinates'] for entry in self.entries])
|
|
raise NotImplementedError()
|
|
|
|
def process(self, entry: dict) -> bool:
|
|
if entry[self.settings.type_field] in self.settings.spatials:
|
|
self.entries.append(entry)
|
|
return False
|
|
|
|
|
|
class LogEntryCountAnalyzer(Analyzer):
|
|
"""
|
|
count occurrences of log entry types
|
|
"""
|
|
__name__ = "LogEntryCount"
|
|
|
|
def result(self) -> dict:
|
|
return dict(self.store)
|
|
|
|
def process(self, entry: dict) -> bool:
|
|
self.store[entry[self.settings.type_field]] += 1
|
|
return False
|
|
|
|
def __init__(self, settings: LogSettings):
|
|
super().__init__(settings)
|
|
self.store = defaultdict(lambda: 0)
|
|
|
|
|
|
class LogEntrySequenceAnalyzer(Analyzer):
|
|
"""
|
|
store sequence of all log entry types
|
|
"""
|
|
__name__ = "LogEntrySequence"
|
|
|
|
def result(self) -> list:
|
|
return self.store
|
|
|
|
def process(self, entry: dict) -> bool:
|
|
entry_type = entry[self.settings.type_field]
|
|
self.store.append(entry_type)
|
|
return False
|
|
|
|
def __init__(self, settings: LogSettings):
|
|
super().__init__(settings)
|
|
self.store = []
|
|
|
|
|
|
class ActionSequenceAnalyzer(LogEntrySequenceAnalyzer):
|
|
"""
|
|
find sequence of non-spatial log entry types
|
|
"""
|
|
__name__ = "ActionSequenceAnalyzer"
|
|
|
|
def process(self, entry: dict) -> bool:
|
|
entry_type = entry[self.settings.type_field]
|
|
if entry_type in self.settings.spatials:
|
|
return False
|
|
self.store.append(entry_type)
|
|
return False
|