project/tasks/tasks.py

96 lines
2.8 KiB
Python

import json
import logging
import shutil
import uuid
import os.path
import os
import redis as redis_lib
import time
from celery import Celery
from analysis import log_analyzer as la
from analysis.analyzers import KMLRender, ActivityMapperRender
from analysis.analyzers.render.biogames import OEBRender
from clients.webclients import CLIENTS
FLASK_DB = 1
REDIS_HOST = "redis"
DATA_PATH = "/data/results/"
RENDERERS = { # TODO
"KMLRender": KMLRender,
"ActivityMapper": ActivityMapperRender,
"OEBRender": OEBRender
}
app = Celery('tasks', backend='redis://redis', broker='redis://redis')
redis = redis_lib.StrictRedis(host=REDIS_HOST, db=FLASK_DB)
log: logging.Logger = logging.getLogger(__name__)
def update_status(username, name, state, **kwargs):
status = json.loads(redis.get(username))
status[name][state[0]] = time.strftime("%c")
status[name]['status'] = state[1]
for i in kwargs:
status[name][i] = kwargs[i]
redis.set(username, json.dumps(status))
@app.task
def analyze(config, log_ids, **kwargs):
update_status(kwargs['username'], kwargs['name'], ('load', 'LOADING'))
try:
log.info("start analysis")
client = CLIENTS[kwargs['clientName']](host=kwargs['host'], **kwargs['cookies'])
logs = client.list()
id_urls = {str(x['@id']): x['file_url'] for x in logs}
urls = [id_urls[i] for i in log_ids]
tmpdir = client.download_files(urls)
log.info(tmpdir.name, list(os.scandir(tmpdir.name)))
uid = str(uuid.uuid4())
update_status(kwargs['username'], kwargs['name'], ('start', 'RUNNING'), uid=uid)
results = []
settings = la.parse_settings(config)
store = la.run_analysis([p.path for p in os.scandir(tmpdir.name)], settings, la.LOADERS)
os.mkdir(os.path.join(DATA_PATH, uid))
render = RENDERERS[settings.render[0]]() # TODO
files = []
if settings.render[0] == "OEBRender":
files.append(render.render_store(store))
else:
for category in store.get_categories():
data = store.get_category(category)
print(category, type(category))
files = render.render(data, name=category[1])
log.error(files)
for file in files:
try:
head, tail = os.path.split(file)
target = os.path.join(DATA_PATH, uid, tail)
log.error(target)
log.error(shutil.move(file, target))
results.append(target)
except FileNotFoundError as e:
log.exception(e)
tmpdir.cleanup()
update_status(kwargs['username'], kwargs['name'], ('done', 'FINISHED'), results=results)
except Exception as e:
log.exception(e)
update_status(kwargs['username'], kwargs['name'], ('abort', 'ERROR'), exception=str(e))
def status_update(key, status_key, status):
record = redis.get(key)
if not record:
redis.set(key, json.dumps({status_key: status}))
else:
data = json.loads(record)
data[status_key] = status
redis.set(key, json.dumps(data))
redis.save()