import logging from collections import defaultdict, namedtuple, OrderedDict from types import SimpleNamespace from typing import List, NamedTuple from analysis.util import json_path, combinate from analysis.util.download import download_board, get_board_data from . import Result, LogSettings, Analyzer, ResultStore from .default import CategorizerStub, Store logger = logging.getLogger(__name__) class BoardDurationAnalyzer(Analyzer): """ calculate display duration of boards """ __name__ = "BoardDuration" def render(self) -> str: return "\n".join(["{}\t{}".format(entry["active"], entry["id"]) for entry in self.result().get()]) def result(self, store: ResultStore) -> None: result = [] last_timestamp = None last_board = None for board in self.store: board_id, timestamp = board["id"], board["timestamp"] if not last_timestamp is None: result.append(self.save_entry(last_board, last_timestamp, timestamp - last_timestamp)) last_timestamp = timestamp last_board = board_id # TODO: last board? store.add(Result(type(self), result)) def process(self, entry: dict) -> bool: entry_type = entry[self.settings.type_field] if entry_type in self.settings.boards: self.store.append(self.save_entry(entry["board_id"], entry["timestamp"])) # TODO: make configurable return False def save_entry(self, board_id: str, timestamp: int, active: int = None): entry = {"id": board_id, "timestamp": timestamp} if not active is None: entry["active"] = active return entry def __init__(self, settings: LogSettings): super().__init__(settings) self.store = [] self.last = {} class TypedBoardDuration(Analyzer): __name__ = "BoardDuration" def result(self, store: ResultStore) -> None: pass def process(self, entry: dict) -> bool: entry_type = entry[self.settings.type_field] if entry_type in self.settings.boards: pass def add_board(self, entry): board_data = get_board_data(self.settings.source, ) def add_location(self, entry): self.track['coordinates'].append(json_path(entry, self.settings.custom['coordinates'])) def add_track(self, **props): self.track['properties'] = props self.tracks.append(self.track) self.track = dict(self.template) def __init__(self, settings: LogSettings): super().__init__(settings) self.last_board = {} self.tracks = [] self.template = {"type": "LineString", "coordinates": [], "properties": {}} self.track = dict(self.template) class SimulationRoundsAnalyzer(Analyzer): __name__ = "SimuRounds" def __init__(self, settings: LogSettings): super().__init__(settings) self.store = defaultdict(lambda: -1) # TODO verify def result(self, store: ResultStore) -> None: store.add(Result(type(self), dict(self.store))) def process(self, entry: dict) -> bool: entry_type = entry[self.settings.type_field] if entry_type in self.settings.custom['simulation_rounds']: if entry["answers"][self.settings.type_field] in self.settings.custom["simu_data"]: simu_id = entry['answers']["@id"] self.store[simu_id] += 1 return False class ActivationSequenceAnalyzer(Analyzer): __name__ = "ActivationSequence" def __init__(self, settings: LogSettings): super().__init__(settings) self.store = [] def result(self, store: ResultStore) -> None: store.add(Result(type(self), self.store)) def process(self, entry: dict) -> bool: if entry[self.settings.type_field] in self.settings.sequences['start']: if entry['cache']: self.store.append(entry['cache']['@id']) else: logger.error("null cache") return False class BiogamesCategorizer(CategorizerStub): # TODO: refactor __name__ = "BiogamesCategorizer" def __init__(self, settings: LogSettings): super().__init__(settings) def process(self, entry: dict) -> bool: if self.key is "default": if entry[self.settings.type_field] in self.settings.custom['instance_start']: self.key = entry[self.settings.custom['instance_id']] return False class GameField_InstanceCategorizer(CategorizerStub): # TODO: refactor __name__ = "BiogamesCategorizer" def __init__(self, settings: LogSettings): super().__init__(settings) def process(self, entry: dict) -> bool: if self.key is "default": if entry[self.settings.type_field] in self.settings.custom['instance_start']: try: self.key = json_path(entry, self.settings.custom['instance_config_id']) + "_" + entry[self.settings.custom['instance_id']] + "_" + str(entry["timestamp"]) except KeyError as e: print(entry) raise e return False class GameFieldInstanceGroup(Analyzer): __name__ = "BiogamesGamefieldInstanceGroupAnalizer" def __init__(self, settings: LogSettings): super().__init__(settings) self.metadata = None def process(self, entry: dict) -> bool: if not self.metadata: if entry[self.settings.type_field] in self.settings.custom['instance_start']: try: self.metadata = {"instance_config_id": json_path(entry, self.settings.custom['instance_config_id']), "instance_id": entry[self.settings.custom['instance_id']], "timestamp": str(entry["timestamp"]), "player_group_name": entry['player_group_name'] } except KeyError as e: print(entry) raise e return False def result(self, store: ResultStore, **kwargs) -> None: store.add(Result(type(self), self.metadata)) class ActivityMapper(Analyzer): __name__ = "ActivityMapper" classes = { "sequence.simulation.": "simu", "sequence.question.": "question", "error": "error" } colors = { "simu": "blue", "question": "orange", "image": "green", "audio": "red", "video": "purple", "other": "brown", "map": "violet", "error": "grey" } def __init__(self, settings: LogSettings) -> None: super().__init__(settings) self.store: List[self.State] = [] self.timeline = [] self.last_board = {} self.last_board_type = "other" self.last_coordinate = None self.last_timestamp = None self.tracks = [] self.track = None self.instance_config_id: str = None self.filters = SimpleNamespace() self.filters.start = lambda entry: combinate(self.settings.custom["sequences2"]["start"], entry) self.filters.end = lambda entry: combinate(self.settings.custom["sequences2"]["end"], entry) self.State: NamedTuple = namedtuple("State", ["sequence", "events", "track", "timestamp"]) def result(self, store: ResultStore, **kwargs) -> None: for board in self.timeline: if board[self.settings.type_field] in self.settings.boards: if board["extra_data"]["activity_type"] == "simu": board["image"] = "simu.png" continue try: local_file = download_board(board["board_id"], self.instance_config_id, board["sequence_id"], self.settings.source) if local_file: board['image'] = local_file else: raise ValueError except Exception as e: board['image'] = "ERROR_FETCHING_FILE" logger.error("error downloading board! %s %s %s", self.instance_config_id, board["sequence_id"], board["board_id"]) logger.exception(e) else: board["image"] = "map.png" store.add(Result(type(self), { "type": "FeatureCollection", "features": self.tracks, "properties": { "instance": self.instance_config_id, "boards": self.timeline, "colors": self.colors, }, })) def process(self, entry: dict) -> bool: if self.track is None: self.track = self.new_track(entry['timestamp']) if self.instance_config_id is None: if entry[self.settings.type_field] in self.settings.custom['instance_start']: self.instance_config_id = json_path(entry, self.settings.custom['instance_config_id']) self.update_board_type(entry) if entry[self.settings.type_field] in self.settings.spatials: self.add_location(entry) elif entry[self.settings.type_field] in self.settings.boards: board_data = get_board_data(self.settings.source, self.instance_config_id, entry["sequence_id"], entry["board_id"]) entry["extra_data"] = board_data entry["extra_data"]["activity_type"] = self.last_board_type entry['coordinate'] = self.new_coordinate() self.timeline.append(entry) return False def update_board_type(self, entry): type = self.classify_entry(entry) if not type == self.last_board_type: self.add_track(activity_type=self.last_board_type, end_timestamp=entry['timestamp']) self.last_board_type = type def classify_entry(self, entry): entry_type = entry[self.settings.type_field] if self.filters.end(entry): data = {"extra_data": {"activity_type": "map"}, "coordinate": self.new_coordinate()} data.update(entry) self.timeline.append(data) return "map" if not entry_type in self.settings.boards: return self.last_board_type board_data = get_board_data(self.settings.source, self.instance_config_id, entry["sequence_id"], entry["board_id"]) for pattern in self.classes: if pattern in board_data['class']: return self.classes[pattern] if board_data['has_video']: return "video" elif board_data['has_audio']: return "audio" elif board_data['has_image']: return "image" return "other" def new_coordinate(self): return {"type": "Point", "coordinates": self.last_coordinate} def add_location(self, entry): coordinates = json_path(entry, self.settings.custom['coordinates']) self.track["geometry"]['coordinates'].append(coordinates) self.track['properties']['coordTimes'].append(entry['timestamp']) #FIXME self.last_coordinate = coordinates self.last_timestamp = entry['timestamp'] def add_track(self, **props): self.track['properties'].update(props) if "activity_type" in self.track['properties'] and self.track['properties']['activity_type'] in self.colors: if not "stroke" in self.track['properties']: self.track['properties']['stroke'] = self.colors[self.track['properties']['activity_type']] self.tracks.append(self.track) self.track = self.new_track(props['end_timestamp']) if self.last_coordinate: self.track["geometry"]['coordinates'].append(self.last_coordinate) self.track['properties']['coordTimes'].append(self.last_timestamp) def new_track(self, timestamp): return {"type": "Feature", "geometry": {"type": "LineString", "coordinates": []}, "properties": {'start_timestamp': timestamp, 'coordTimes': []}} class BiogamesDuration(Analyzer): __name__ = "BiogamesDuration" def __init__(self, settings: LogSettings) -> None: super().__init__(settings) self.first = None self.last = None self.sequences = defaultdict(list) self.filters = SimpleNamespace() self.filters.start = lambda entry: combinate(self.settings.custom["sequences2"]["start"], entry) self.filters.end = lambda entry: combinate(self.settings.custom["sequences2"]["end"], entry) self.sequence = None self.sequence_start = None self.cache = "None" def process(self, entry: dict) -> bool: if not self.first: self.first = entry['timestamp'] self.last = entry['timestamp'] if not self.sequence and self.filters.start(entry): self.sequence = entry['sequence_id'] self.sequence_start = entry['timestamp'] elif self.sequence and self.filters.end(entry): self.sequences[f"{self.cache}+{self.sequence}"].append((self.sequence_start, entry['timestamp'])) self.sequences[f"only+{self.sequence}"].append((self.sequence_start, entry['timestamp'])) self.sequence = None self.sequence_start = 0 self.cache = "None" if entry['@class'] in self.settings.sequences['start']: if entry['cache']: self.cache = entry['cache']['@id'] else: self.cache = "None" return False def result(self, store: ResultStore, name=None) -> None: results = {"start": self.first, "end": self.last, "duration": self.last - self.first} for sid in self.sequences: seq = self.sequences[sid] #print([end-start for start,end in seq]) results[f"sequence_{sid}_duration"] = sum([end-start for start,end in seq]) store.add(Result(type(self), results)) class BiogamesTasks(Analyzer): __name__ = "BiogamesTasks" DATA_CLASSES = ("de.findevielfalt.games.game2.instance.log.entry.LogEntryQuestion", ) BOARD_CLASSES = ("de.findevielfalt.games.game2.instance.log.entry.ShowBoardLogEntry",) def __init__(self, settings: LogSettings) -> None: super().__init__(settings) self.settings: LogSettings = settings self.tasks = {} self.last_board = None self.instance_config_id: str = None def process(self, entry: dict) -> bool: if self.instance_config_id is None: if entry[self.settings.type_field] in self.settings.custom['instance_start']: self.instance_config_id = json_path(entry, self.settings.custom['instance_config_id']) if self.is_task(entry) and self.last_board: entry['__duration'] = entry['timestamp'] - self.last_board['timestamp'] self.tasks[self.ids()] = entry if self.is_board(entry): self.last_board = entry return False def result(self, store: ResultStore, name=None) -> None: results = {} for ids in self.tasks: task = self.tasks[ids] for action in task['selected_actions']: if self.is_dollar_action(action): results[ids] = {"duration": task['__duration'], "result": action['increment']} store.add(Result(type(self), results)) def ids(self): return f"{self.instance_config_id}_{self.last_board['sequence_id']}_{self.last_board['board_id']}" def is_task(self, entry) -> bool: return entry['@class'] in self.DATA_CLASSES def is_board(self, entry) -> bool: return entry['@class'] in self.BOARD_CLASSES def is_dollar_action(self, action): return action['@class'] in ("de.findevielfalt.games.game2.instance.action.IncrementDiversityDollarAction") class BiogamesStore(Store): __name__ = "BiogamesStore" def result(self, store: ResultStore) -> None: result = OrderedDict() for event in self.store: if event[self.settings.type_field] in self.settings.boards: sequence_id = json_path(event, json_path(self.settings.custom, "sequences2.id_field")) board_id = event["board_id"] local_file = download_board( board_id=board_id, instance_config_id=json_path(self.store[0], self.settings.custom["instance_config_id"]), sequence_id=sequence_id, source=self.settings.source) if local_file is not None: event["image"] = local_file[16:] result[event["timestamp"]] = event store.add(Result(type(self), result)) def process(self, entry: dict) -> bool: self.store.append(entry) return False class InstanceConfig(Analyzer): __name__ = "InstanceConfig" def __init__(self, settings: LogSettings): super().__init__(settings) self.store = {} def process(self, entry: dict): if entry[self.settings.type_field] in self.settings.custom["instance_start"]: print(entry) self.store["instance_id"] = json_path(entry, self.settings.custom["instance_config_id"]) def result(self, store: ResultStore, name=None): store.add(Result(type(self), dict(self.store), name=name)) class SimulationOrderAnalyzer(Analyzer): __name__ = "SimuOrder" def __init__(self, settings: LogSettings): super().__init__(settings) self.store = defaultdict(lambda: -1) # TODO verify self.order = [] def result(self, store: ResultStore, name=None) -> None: store.add(Result(type(self), [self.store[sim] for sim in self.order], name=name)) def process(self, entry: dict) -> bool: entry_type = entry[self.settings.type_field] if entry_type in self.settings.custom['simulation_rounds']: if entry["answers"][self.settings.type_field] in self.settings.custom["simu_data"]: simu_id = entry['answers']["@id"] self.store[simu_id] += 1 if not simu_id in self.order: self.order.append(simu_id) return False class SimulationCategorizer(CategorizerStub): # TODO: refactor categorizer __name__ = "SimulationCategorizer" # TODO: rename -.- (InstanceConfigIDCategorizer) def process(self, entry: dict) -> bool: if self.key is "default": if entry[self.settings.type_field] in self.settings.custom['instance_start']: try: self.key = json_path(entry, self.settings.custom['instance_config_id']) except KeyError as e: print(entry) raise e return False class SimulationFlagsAnalyzer(Analyzer): __name__ = "SimuFlags" def __init__(self, settings: LogSettings) -> None: super().__init__(settings) self.store = [] def process(self, entry: dict) -> bool: entry_type = entry[self.settings.type_field] if entry_type in self.settings.custom['simulation_rounds']: if entry["answers"][self.settings.type_field] in self.settings.custom["simu_data"]: self.store.append(entry) return False def result(self, store: ResultStore, name=None) -> None: store.add(Result(type(self), self.store, name=name))