213 lines
5.8 KiB
Python
213 lines
5.8 KiB
Python
import logging
|
|
from collections import defaultdict, OrderedDict
|
|
|
|
from analysis.util import json_path
|
|
from . import Result, LogSettings, Analyzer, ResultStore
|
|
|
|
|
|
class LocationAnalyzer(Analyzer):
|
|
"""
|
|
store spatial log entries
|
|
"""
|
|
__name__ = "Location"
|
|
log = logging.getLogger(__name__)
|
|
|
|
def __init__(self, settings: LogSettings):
|
|
super().__init__(settings)
|
|
self.entries = []
|
|
|
|
def result(self, store: ResultStore, **kwargs) -> None:
|
|
#self.log.debug(len(self.entries))
|
|
store.add(Result(type(self), list(self.entries), name=kwargs['name']))
|
|
|
|
def process(self, entry: dict) -> bool:
|
|
if entry[self.settings.type_field] in self.settings.spatials:
|
|
self.entries.append(entry)
|
|
# self.log.debug(len(self.entries))
|
|
return False
|
|
|
|
|
|
class LogEntryCountAnalyzer(Analyzer):
|
|
#TODO: flexibler: z.b. min/max lat/long
|
|
"""
|
|
count occurrences of log entry types
|
|
"""
|
|
__name__ = "LogEntryCount"
|
|
|
|
def result(self, store: ResultStore) -> None:
|
|
store.add(Result(type(self), dict(self.store)))
|
|
|
|
def process(self, entry: dict) -> bool:
|
|
self.store[entry[self.settings.type_field]] += 1
|
|
return False
|
|
|
|
def __init__(self, settings: LogSettings):
|
|
super().__init__(settings)
|
|
self.store = defaultdict(lambda: 0)
|
|
|
|
|
|
class LogEntrySequenceAnalyzer(Analyzer):
|
|
"""
|
|
store sequence of all log entry types
|
|
"""
|
|
__name__ = "LogEntrySequence"
|
|
|
|
def result(self, store: ResultStore) -> None:
|
|
store.add(Result(type(self), list(self.store)))
|
|
|
|
def process(self, entry: dict) -> bool:
|
|
entry_type = entry[self.settings.type_field]
|
|
self.store.append(entry_type)
|
|
return False
|
|
|
|
def __init__(self, settings: LogSettings):
|
|
super().__init__(settings)
|
|
self.store = []
|
|
|
|
|
|
class ActionSequenceAnalyzer(LogEntrySequenceAnalyzer):
|
|
"""
|
|
find sequence of non-spatial log entry types
|
|
"""
|
|
__name__ = "ActionSequenceAnalyzer"
|
|
|
|
def process(self, entry: dict) -> bool:
|
|
entry_type = entry[self.settings.type_field]
|
|
if entry_type in self.settings.spatials:
|
|
return False
|
|
self.store.append(entry_type)
|
|
return False
|
|
|
|
|
|
class CategorizerStub(Analyzer):
|
|
"""
|
|
generate a new Category in a ResultStore
|
|
"""
|
|
|
|
def process(self, entry: dict) -> bool:
|
|
raise NotImplementedError()
|
|
|
|
__name__ = "Categorizer"
|
|
|
|
def result(self, store: ResultStore, name=None) -> None:
|
|
print(name if name else self.key)
|
|
store.new_category((name, self.key) if name else self.key)
|
|
|
|
def __init__(self, settings: LogSettings):
|
|
super().__init__(settings)
|
|
self.key = "default"
|
|
|
|
class SimpleCategorizer(CategorizerStub):
|
|
def process(self, entry):
|
|
return False
|
|
|
|
|
|
class Store(Analyzer):
|
|
"""
|
|
Store the entire log
|
|
"""
|
|
__name__ = "Store"
|
|
|
|
def result(self, store: ResultStore) -> None:
|
|
store.add(Result(type(self), list(self.store)))
|
|
|
|
def process(self, entry: dict) -> bool:
|
|
self.store.append(entry)
|
|
return False
|
|
|
|
def __init__(self, settings: LogSettings):
|
|
super().__init__(settings)
|
|
self.store: list = []
|
|
|
|
|
|
class ProgressAnalyzer(Analyzer):
|
|
"""track spatial and ingame progress"""
|
|
__name__ = "ProgressAnalyzer"
|
|
|
|
def __init__(self, settings: LogSettings) -> None:
|
|
super().__init__(settings)
|
|
self.spatial = OrderedDict()
|
|
self.board = OrderedDict()
|
|
|
|
def result(self, store: ResultStore) -> None:
|
|
store.add(Result(type(self), {"spatials": self.spatial, "boards": self.board}))
|
|
|
|
def process(self, entry: dict) -> bool:
|
|
if entry[self.settings.type_field] in self.settings.spatials:
|
|
self.spatial[entry["timestamp"]] = {
|
|
'timestamp': entry['timestamp'],
|
|
'coordinates': json_path(entry, "location.coordinates"),
|
|
'accuracy': entry['accuracy']
|
|
}
|
|
if entry[self.settings.type_field] in self.settings.boards:
|
|
self.board[entry["timestamp"]] = entry
|
|
return False
|
|
|
|
|
|
class MetaDataAnalyzer(Analyzer):
|
|
"""collect metadata"""
|
|
__name__ = "MetaDataAnalyzer"
|
|
|
|
def result(self, store: ResultStore, name=None) -> None:
|
|
store.add(Result(type(self), dict(self.store)))
|
|
|
|
def process(self, entry: dict) -> bool:
|
|
if not "metadata" in self.settings.custom:
|
|
return False
|
|
for mdata in self.settings.custom["metadata"]:
|
|
key = self.settings.custom["metadata"]
|
|
if key in entry:
|
|
self.store[mdata] = json_path(entry, key)
|
|
|
|
def __init__(self, settings: LogSettings) -> None:
|
|
super().__init__(settings)
|
|
self.store = {}
|
|
|
|
|
|
def write_logentry_count_csv(LogEntryCountCSV, store, render, analyzers):
|
|
global cat, data, lines, csvfile
|
|
LogEntryCountCSV.summary = None
|
|
for cat in store.get_categories():
|
|
data = store.get_category(cat)
|
|
render(analyzers.LogEntryCountAnalyzer, data, name=cat)
|
|
if LogEntryCountCSV.summary:
|
|
headers = []
|
|
lines = []
|
|
for name in LogEntryCountCSV.summary:
|
|
data = LogEntryCountCSV.summary[name]
|
|
for head in data:
|
|
if not head in headers:
|
|
headers.append(head)
|
|
line = [name]
|
|
for head in headers:
|
|
line.append(data[head]) if head in data else line.append(0)
|
|
lines.append(line)
|
|
import csv
|
|
|
|
with open('logentrycount.csv', 'w', newline='') as csvfile:
|
|
writer = csv.writer(csvfile, quoting=csv.QUOTE_NONE)
|
|
writer.writerow(["name"] + [h.split(".")[-1] for h in headers])
|
|
for line in lines:
|
|
writer.writerow(line)
|
|
|
|
|
|
def write_simulation_flag_csv(store):
|
|
global csvfile, result, i
|
|
from datetime import datetime
|
|
import json
|
|
json.dump(store.serializable(), open("simus.json", "w"), indent=2)
|
|
with open("simus.csv", "w") as csvfile:
|
|
csvfile.write("instanceconfig,log,simu,answered,universe_state,selected_actions,timestamp,time\n")
|
|
for key in store.get_store():
|
|
csvfile.write("{}\n".format(key))
|
|
for result in store.store[key]:
|
|
csvfile.write(",{}\n".format(result.name))
|
|
for i in result.get():
|
|
csvfile.write(",,{},{},{},{},{},{}\n".format(
|
|
i['answers']['@id'],
|
|
i['answers']['answered'],
|
|
len(i['answers']['universe_state']) if i['answers']['universe_state'] else 0,
|
|
len(i['selected_actions']) if i['selected_actions'] else 0,
|
|
i['timestamp'],
|
|
str(datetime.fromtimestamp(i['timestamp'] / 1000))
|
|
)) |