project/analysis/analyzers/analyzer/default.py

208 lines
5.7 KiB
Python

import logging
from collections import defaultdict, OrderedDict
from analysis.util import json_path
from . import Result, LogSettings, Analyzer, ResultStore
class LocationAnalyzer(Analyzer):
"""
store spatial log entries
"""
__name__ = "Location"
log = logging.getLogger(__name__)
def __init__(self, settings: LogSettings):
super().__init__(settings)
self.entries = []
def result(self, store: ResultStore, **kwargs) -> None:
self.log.debug(len(self.entries))
store.add(Result(type(self), list(self.entries), name=kwargs['name']))
def process(self, entry: dict) -> bool:
if entry[self.settings.type_field] in self.settings.spatials:
self.entries.append(entry)
# self.log.debug(len(self.entries))
return False
class LogEntryCountAnalyzer(Analyzer):
#TODO: flexibler: z.b. min/max lat/long
"""
count occurrences of log entry types
"""
__name__ = "LogEntryCount"
def result(self, store: ResultStore) -> None:
store.add(Result(type(self), dict(self.store)))
def process(self, entry: dict) -> bool:
self.store[entry[self.settings.type_field]] += 1
return False
def __init__(self, settings: LogSettings):
super().__init__(settings)
self.store = defaultdict(lambda: 0)
class LogEntrySequenceAnalyzer(Analyzer):
"""
store sequence of all log entry types
"""
__name__ = "LogEntrySequence"
def result(self, store: ResultStore) -> None:
store.add(Result(type(self), list(self.store)))
def process(self, entry: dict) -> bool:
entry_type = entry[self.settings.type_field]
self.store.append(entry_type)
return False
def __init__(self, settings: LogSettings):
super().__init__(settings)
self.store = []
class ActionSequenceAnalyzer(LogEntrySequenceAnalyzer):
"""
find sequence of non-spatial log entry types
"""
__name__ = "ActionSequenceAnalyzer"
def process(self, entry: dict) -> bool:
entry_type = entry[self.settings.type_field]
if entry_type in self.settings.spatials:
return False
self.store.append(entry_type)
return False
class CategorizerStub(Analyzer):
"""
generate a new Category in a ResultStore
"""
def process(self, entry: dict) -> bool:
raise NotImplementedError()
__name__ = "Categorizer"
def result(self, store: ResultStore, name=None) -> None:
store.new_category(self.key)
def __init__(self, settings: LogSettings):
super().__init__(settings)
self.key = "default"
class Store(Analyzer):
"""
Store the entire log
"""
__name__ = "Store"
def result(self, store: ResultStore) -> None:
store.add(Result(type(self), list(self.store)))
def process(self, entry: dict) -> bool:
self.store.append(entry)
return False
def __init__(self, settings: LogSettings):
super().__init__(settings)
self.store: list = []
class ProgressAnalyzer(Analyzer):
"""track spatial and ingame progress"""
__name__ = "ProgressAnalyzer"
def __init__(self, settings: LogSettings) -> None:
super().__init__(settings)
self.spatial = OrderedDict()
self.board = OrderedDict()
def result(self, store: ResultStore) -> None:
store.add(Result(type(self), {"spatials": self.spatial, "boards": self.board}))
def process(self, entry: dict) -> bool:
if entry[self.settings.type_field] in self.settings.spatials:
self.spatial[entry["timestamp"]] = {
'timestamp': entry['timestamp'],
'coordinates': json_path(entry, "location.coordinates"),
'accuracy': entry['accuracy']
}
if entry[self.settings.type_field] in self.settings.boards:
self.board[entry["timestamp"]] = entry
return False
class MetaDataAnalyzer(Analyzer):
"""collect metadata"""
__name__ = "MetaDataAnalyzer"
def result(self, store: ResultStore, name=None) -> None:
store.add(Result(type(self), dict(self.store)))
def process(self, entry: dict) -> bool:
if not "metadata" in self.settings.custom:
return False
for mdata in self.settings.custom["metadata"]:
key = self.settings.custom["metadata"]
if key in entry:
self.store[mdata] = json_path(entry, key)
def __init__(self, settings: LogSettings) -> None:
super().__init__(settings)
self.store = {}
def write_logentry_count_csv(LogEntryCountCSV, store, render, analyzers):
global cat, data, lines, csvfile
LogEntryCountCSV.summary = None
for cat in store.get_categories():
data = store.get_category(cat)
render(analyzers.LogEntryCountAnalyzer, data, name=cat)
if LogEntryCountCSV.summary:
headers = []
lines = []
for name in LogEntryCountCSV.summary:
data = LogEntryCountCSV.summary[name]
for head in data:
if not head in headers:
headers.append(head)
line = [name]
for head in headers:
line.append(data[head]) if head in data else line.append(0)
lines.append(line)
import csv
with open('logentrycount.csv', 'w', newline='') as csvfile:
writer = csv.writer(csvfile, quoting=csv.QUOTE_NONE)
writer.writerow(["name"] + [h.split(".")[-1] for h in headers])
for line in lines:
writer.writerow(line)
def write_simulation_flag_csv(store):
global csvfile, result, i
from datetime import datetime
import json
json.dump(store.serializable(), open("simus.json", "w"), indent=2)
with open("simus.csv", "w") as csvfile:
csvfile.write("instanceconfig,log,simu,answered,universe_state,selected_actions,timestamp,time\n")
for key in store.get_store():
csvfile.write("{}\n".format(key))
for result in store.store[key]:
csvfile.write(",{}\n".format(result.name))
for i in result.get():
csvfile.write(",,{},{},{},{},{},{}\n".format(
i['answers']['@id'],
i['answers']['answered'],
len(i['answers']['universe_state']) if i['answers']['universe_state'] else 0,
len(i['selected_actions']) if i['selected_actions'] else 0,
i['timestamp'],
str(datetime.fromtimestamp(i['timestamp'] / 1000))
))