weatherstation/Setup.py

140 lines
4.5 KiB
Python

#!/usr/bin/python3
# -*- coding: utf-8 -*-
from functools import partial
import traceback
try:
from tinkerforge.ip_connection import IPConnection
from tinkerforge.bricklet_temperature import Temperature
from tinkerforge.bricklet_humidity import Humidity
from tinkerforge.bricklet_ambient_light import AmbientLight
from tinkerforge.bricklet_barometer import Barometer
from tinkerforge.bricklet_temperature_ir import BrickletTemperatureIR
except ImportError:
print("package 'tinkerforge' not installed, canceling")
raise Exception("package 'tinkerforge' not installed, canceling")
from settings import SensorType
from settings import settings
class ConnectionSetup(object):
def __init__(self, log):
self.log = log
def setupConnection(self, host):
ipcon = IPConnection()
ipcon.connect(host['name'], host['port'])
return (ipcon)
def setupConnectionAndSensors(self, host, sensors, cb_generic):
hostname = host['name']
port = host['port']
ipcon = IPConnection()
ipcon.connect(hostname, port)
sensorSetup = SensorSetup(ipcon, sensors, cb_generic, self.log)
connectedSensors = sensorSetup.setupSensors()
return (ipcon, connectedSensors)
def disconnectAny(self, connections):
for connection in connections:
if not connection.get_connection_state() is IPConnection.CONNECTION_STATE_DISCONNECTED:
connection.disconnect()
class SensorSetup(object):
def __init__(self, connection, sensors, cb_generic, log):
self.connection = connection
self.sensors = sensors
self.cb_generic = cb_generic
self.log = log
self._previous_sensors={}
self._configs={
#SensorType.none: {
# 'obj': ,
# 'setcb': ,
# 'get': ,
# 'cb':
#},
SensorType.temp: {
'obj': Temperature,
'setcb': Temperature.set_temperature_callback_period,
'get': Temperature.get_temperature,
'cb': Temperature.CALLBACK_TEMPERATURE
},
SensorType.humi: {
'obj': Humidity,
'setcb': Humidity.set_humidity_callback_period,
'get': Humidity.get_humidity,
'cb': Humidity.CALLBACK_HUMIDITY
},
SensorType.ambi: {
'obj': AmbientLight,
'setcb': AmbientLight.set_illuminance_callback_period,
'get': AmbientLight.get_illuminance,
'cb': AmbientLight.CALLBACK_ILLUMINANCE
},
SensorType.baro: {
'obj': Barometer,
'setcb': Barometer.get_air_pressure_callback_period,
'get': Barometer.get_air_pressure,
'cb': Barometer.CALLBACK_AIR_PRESSURE
},
SensorType.iram: {
'obj': BrickletTemperatureIR,
'setcb': BrickletTemperatureIR.set_ambient_temperature_callback_period,
'get': BrickletTemperatureIR.get_ambient_temperature,
'cb': BrickletTemperatureIR.CALLBACK_AMBIENT_TEMPERATURE
},
SensorType.irob: {
'obj': BrickletTemperatureIR,
'setcb': BrickletTemperatureIR.set_object_temperature_callback_period,
'get': BrickletTemperatureIR.get_object_temperature,
'cb': BrickletTemperatureIR.CALLBACK_OBJECT_TEMPERATURE
}
}
def parametrizedCallback(self, name, type):
return partial(self.cb_generic, sensor=name, type=type)
def __setupSensor__(self, callback, id, cbtime, var):
obj = None
if id in self._previous_sensors:
self.log.debug("reusing instance for %s", id)
obj = self._previous_sensors[id] # restore instance for another callback
else:
self.log.debug("new instance for %s", id)
obj = var['obj'](id, self.connection) # construct instance
self._previous_sensors[id] = obj # save instance for multiple callbacks
var['setcb'](obj, cbtime) # set callback period
callback(var['get'](obj ), supress=True) # execute callback with raw getter as value
obj.register_callback(var['cb'], callback) # register callback
return obj
def genericSensorSetup(self, name, sensor):
status = "setup device "+ sensor[0] +" ("+ name +"):"
callback = self.parametrizedCallback(name, type=sensor[1])
cbtime = settings.sensor_properties[sensor[1]][0]
obj = None
if sensor[1] is SensorType.rain:
self.log.error("rain is not yet implemented (%s, %s)", sensor[0], name)
return None
elif not sensor[1] in self._configs:
self.log.error("%s FAIL (unknown type)", status)
return None
try:
obj = self.__setupSensor__(callback, sensor[0], cbtime, self._configs[sensor[1]])
self.log.info("%s OK", status)
except Exception as e:
self.log.error("%s FAIL:: %s (%s)",status, e,traceback.format_exc())
return obj
def setupSensors(self):
connected=[]
for name in self.sensors:
sensor = self.sensors[name]
obj = self.genericSensorSetup(name, sensor)
connected.append(obj)
return connected