project/analysis/loaders/neocart.py

70 lines
1.9 KiB
Python

import logging
from datetime import datetime
from lxml import etree
from .loader import Loader
log = logging.getLogger(__name__)
NS = {'gpx':"http://www.topografix.com/GPX/1/1"}
class NeoCartLoader(Loader):
def load(self, file: str):
src = open(file, "r")
parser = etree.XMLParser(recover=True)
tree = etree.parse(src, parser=parser)
self.entries = []
for point in tree.xpath("//gpx:trkpt", namespaces=NS):
try:
self.entries.append(self.parse_point(point))
except ValueError as e:
print(e, etree.tostring(point, pretty_print=True).decode())
log.exception(e)
def parse_point(self, point):
raw_lat = point.xpath("@lat")[0]
if raw_lat.count(".") > 1:
log.warning(f"recreate lat/lon from: {raw_lat}")
log.warn(etree.tostring(point, pretty_print=True).decode())
start_offset = 4
x = raw_lat[start_offset:].index(".")
offset = start_offset + x
raw_lon = raw_lat[offset:]
raw_lat = raw_lat[:offset]
else:
raw_lon = point.xpath("@lon")[0]
lat = float(raw_lat)
lon = float(raw_lon)
times = point.xpath("gpx:time",namespaces=NS)
assert len(times) == 1
time = times[0].text
dt = datetime.strptime(time, "%Y-%m-%dT%H:%M:%SZ")
timestamp = int(dt.timestamp() * 1000) # python3.6 has no timestamp_ns (yet)
events = point.xpath(".//gpx:event",namespaces=NS)
assert 0 <= len(events) <= 1
event = {}
if events:
event = dict(events[0].attrib)
if events[0].tail and events[0].tail.strip():
try:
# base case: trailing 'geoid="0"/>'
key, v = events[0].tail.strip().split("=")
value = v.split('"')[1]
event[key] = value
except:
event['__tail__'] = events[0].tail.strip()
return {
"location": {
"type": "Point",
"coordinates": [lon, lat]
},
"timestamp": timestamp,
"event": event,
"type": "event" if event else "location"
}
def get_entry(self) -> object:
for i in self.entries:
yield i