# -*- coding: utf-8 -*- # Copyright (C) 2012-2013 Matthias Bolte # Copyright (C) 2011-2012 Olaf Lüke # # Redistribution and use in source and binary forms of this file, # with or without modification, are permitted. from threading import Thread, Lock, Semaphore # current_thread for python 2.6, currentThread for python 2.5 try: from threading import current_thread except ImportError: from threading import currentThread as current_thread # Queue for python 2, queue for python 3 try: from Queue import Queue, Empty except ImportError: from queue import Queue, Empty import struct import socket import types import sys import time # use normal tuples instead of namedtuples in python version below 2.6 if sys.hexversion < 0x02060000: def namedtuple(typename, field_names, verbose=False, rename=False): def ntuple(*args): return args return ntuple else: from collections import namedtuple def get_uid_from_data(data): return struct.unpack('> 4) & 0x0F def get_error_code_from_data(data): return (struct.unpack('> 6) & 0x03 BASE58 = '123456789abcdefghijkmnopqrstuvwxyzABCDEFGHJKLMNPQRSTUVWXYZ' def base58encode(value): encoded = '' while value >= 58: div, mod = divmod(value, 58) encoded = BASE58[mod] + encoded value = div encoded = BASE58[value] + encoded return encoded def base58decode(encoded): value = 0 column_multiplier = 1 for c in encoded[::-1]: column = BASE58.index(c) value += column * column_multiplier column_multiplier *= 58 return value def uid64_to_uid32(uid64): value1 = uid64 & 0xFFFFFFFF value2 = (uid64 >> 32) & 0xFFFFFFFF uid32 = (value1 & 0x00000FFF) uid32 |= (value1 & 0x0F000000) >> 12 uid32 |= (value2 & 0x0000003F) << 16 uid32 |= (value2 & 0x000F0000) << 6 uid32 |= (value2 & 0x3F000000) << 2 return uid32 class Error(Exception): TIMEOUT = -1 NOT_ADDED = -6 # obsolete since v2.0 ALREADY_CONNECTED = -7 NOT_CONNECTED = -8 INVALID_PARAMETER = -9 NOT_SUPPORTED = -10 UNKNOWN_ERROR_CODE = -11 def __init__(self, value, description): self.value = value self.description = description def __str__(self): return str(self.value) + ': ' + str(self.description) class Device: RESPONSE_EXPECTED_INVALID_FUNCTION_ID = 0 RESPONSE_EXPECTED_ALWAYS_TRUE = 1 # getter RESPONSE_EXPECTED_ALWAYS_FALSE = 2 # callback RESPONSE_EXPECTED_TRUE = 3 # setter RESPONSE_EXPECTED_FALSE = 4 # setter, default def __init__(self, uid, ipcon): """ Creates the device object with the unique device ID *uid* and adds it to the IPConnection *ipcon*. """ uid_ = base58decode(uid) if uid_ > 0xFFFFFFFF: uid_ = uid64_to_uid32(uid_) self.uid = uid_ self.ipcon = ipcon self.api_version = (0, 0, 0) self.registered_callbacks = {} self.callback_formats = {} self.expected_response_function_id = None # protected by request_lock self.expected_response_sequence_number = None # protected by request_lock self.response_queue = Queue() self.request_lock = Lock() self.auth_key = None self.response_expected = [Device.RESPONSE_EXPECTED_INVALID_FUNCTION_ID] * 256 self.response_expected[IPConnection.FUNCTION_ENUMERATE] = Device.RESPONSE_EXPECTED_ALWAYS_FALSE self.response_expected[IPConnection.FUNCTION_ADC_CALIBRATE] = Device.RESPONSE_EXPECTED_ALWAYS_TRUE self.response_expected[IPConnection.FUNCTION_GET_ADC_CALIBRATION] = Device.RESPONSE_EXPECTED_ALWAYS_TRUE self.response_expected[IPConnection.FUNCTION_READ_BRICKLET_UID] = Device.RESPONSE_EXPECTED_ALWAYS_TRUE self.response_expected[IPConnection.FUNCTION_WRITE_BRICKLET_UID] = Device.RESPONSE_EXPECTED_ALWAYS_TRUE self.response_expected[IPConnection.FUNCTION_READ_BRICKLET_PLUGIN] = Device.RESPONSE_EXPECTED_ALWAYS_TRUE self.response_expected[IPConnection.FUNCTION_WRITE_BRICKLET_PLUGIN] = Device.RESPONSE_EXPECTED_ALWAYS_TRUE self.response_expected[IPConnection.CALLBACK_ENUMERATE] = Device.RESPONSE_EXPECTED_ALWAYS_FALSE ipcon.devices[self.uid] = self # FIXME: maybe use a weakref here def get_api_version(self): """ Returns the API version (major, minor, revision) of the bindings for this device. """ return self.api_version def get_response_expected(self, function_id): """ Returns the response expected flag for the function specified by the *function_id* parameter. It is *true* if the function is expected to send a response, *false* otherwise. For getter functions this is enabled by default and cannot be disabled, because those functions will always send a response. For callback configuration functions it is enabled by default too, but can be disabled via the set_response_expected function. For setter functions it is disabled by default and can be enabled. Enabling the response expected flag for a setter function allows to detect timeouts and other error conditions calls of this setter as well. The device will then send a response for this purpose. If this flag is disabled for a setter function then no response is send and errors are silently ignored, because they cannot be detected. """ if function_id < 0 or function_id >= len(self.response_expected): raise ValueError('Function ID {0} out of range'.format(function_id)) flag = self.response_expected[function_id] if flag == Device.RESPONSE_EXPECTED_INVALID_FUNCTION_ID: raise ValueError('Invalid function ID {0}'.format(function_id)) return flag in [Device.RESPONSE_EXPECTED_ALWAYS_TRUE, Device.RESPONSE_EXPECTED_TRUE] def set_response_expected(self, function_id, response_expected): """ Changes the response expected flag of the function specified by the *function_id* parameter. This flag can only be changed for setter (default value: *false*) and callback configuration functions (default value: *true*). For getter functions it is always enabled and callbacks it is always disabled. Enabling the response expected flag for a setter function allows to detect timeouts and other error conditions calls of this setter as well. The device will then send a response for this purpose. If this flag is disabled for a setter function then no response is send and errors are silently ignored, because they cannot be detected. """ if function_id < 0 or function_id >= len(self.response_expected): raise ValueError('Function ID {0} out of range'.format(function_id)) flag = self.response_expected[function_id] if flag == Device.RESPONSE_EXPECTED_INVALID_FUNCTION_ID: raise ValueError('Invalid function ID {0}'.format(function_id)) if flag in [Device.RESPONSE_EXPECTED_ALWAYS_TRUE, Device.RESPONSE_EXPECTED_ALWAYS_FALSE]: raise ValueError('Response Expected flag cannot be changed for function ID {0}'.format(function_id)) if bool(response_expected): self.response_expected[function_id] = Device.RESPONSE_EXPECTED_TRUE else: self.response_expected[function_id] = Device.RESPONSE_EXPECTED_FALSE def set_response_expected_all(self, response_expected): """ Changes the response expected flag for all setter and callback configuration functions of this device at once. """ if bool(response_expected): flag = Device.RESPONSE_EXPECTED_TRUE else: flag = Device.RESPONSE_EXPECTED_FALSE for i in range(len(self.response_expected)): if self.response_expected[i] in [Device.RESPONSE_EXPECTED_TRUE, Device.RESPONSE_EXPECTED_FALSE]: self.response_expected[i] = flag class IPConnection: FUNCTION_ENUMERATE = 254 FUNCTION_ADC_CALIBRATE = 251 FUNCTION_GET_ADC_CALIBRATION = 250 FUNCTION_READ_BRICKLET_UID = 249 FUNCTION_WRITE_BRICKLET_UID = 248 FUNCTION_READ_BRICKLET_PLUGIN = 247 FUNCTION_WRITE_BRICKLET_PLUGIN = 246 CALLBACK_ENUMERATE = 253 CALLBACK_CONNECTED = 0 CALLBACK_DISCONNECTED = 1 BROADCAST_UID = 0 PLUGIN_CHUNK_SIZE = 32 # enumeration_type parameter to the enumerate callback ENUMERATION_TYPE_AVAILABLE = 0 ENUMERATION_TYPE_CONNECTED = 1 ENUMERATION_TYPE_DISCONNECTED = 2 # connect_reason parameter to the connected callback CONNECT_REASON_REQUEST = 0 CONNECT_REASON_AUTO_RECONNECT = 1 # disconnect_reason parameter to the disconnected callback DISCONNECT_REASON_REQUEST = 0 DISCONNECT_REASON_ERROR = 1 DISCONNECT_REASON_SHUTDOWN = 2 # returned by get_connection_state CONNECTION_STATE_DISCONNECTED = 0 CONNECTION_STATE_CONNECTED = 1 CONNECTION_STATE_PENDING = 2 # auto-reconnect in process QUEUE_EXIT = 0 QUEUE_META = 1 QUEUE_PACKET = 2 def __init__(self): """ Creates an IP Connection object that can be used to enumerate the available devices. It is also required for the constructor of Bricks and Bricklets. """ self.host = None self.port = None self.timeout = 2.5 self.auto_reconnect = True self.auto_reconnect_allowed = False self.auto_reconnect_pending = False self.sequence_number_lock = Lock() self.next_sequence_number = 0 self.auth_key = None self.devices = {} self.registered_callbacks = {} self.socket = None self.socket_lock = Lock() self.receive_flag = False self.receive_thread = None self.callback_queue = None self.callback_thread = None self.waiter = Semaphore() def connect(self, host, port): """ Creates a TCP/IP connection to the given *host* and *port*. The host and port can point to a Brick Daemon or to a WIFI/Ethernet Extension. Devices can only be controlled when the connection was established successfully. Blocks until the connection is established and throws an exception if there is no Brick Daemon or WIFI/Ethernet Extension listening at the given host and port. """ with self.socket_lock: if self.socket is not None: raise Error(Error.ALREADY_CONNECTED, 'Already connected to {0}:{1}'.format(self.host, self.port)) self.host = host self.port = port self.connect_unlocked(False) def disconnect(self): """ Disconnects the TCP/IP connection from the Brick Daemon or the WIFI/Ethernet Extension. """ with self.socket_lock: self.auto_reconnect_allowed = False if self.auto_reconnect_pending: # abort potentially pending auto reconnect self.auto_reconnect_pending = False else: if self.socket is None: raise Error(Error.NOT_CONNECTED, 'Not connected') # end receive thread self.receive_flag = False try: self.socket.shutdown(socket.SHUT_RDWR) except socket.error: pass if self.receive_thread is not None: self.receive_thread.join() # FIXME: use a timeout? self.receive_thread = None # close socket self.socket.close() self.socket = None # end callback thread callback_queue = self.callback_queue callback_thread = self.callback_thread self.callback_queue = None self.callback_thread = None # do this outside of socket_lock to allow calling (dis-)connect from # the callbacks while blocking on the join call here callback_queue.put((IPConnection.QUEUE_META, (IPConnection.CALLBACK_DISCONNECTED, IPConnection.DISCONNECT_REASON_REQUEST))) callback_queue.put((IPConnection.QUEUE_EXIT, None)) if current_thread() is not callback_thread: callback_thread.join() def get_connection_state(self): """ Can return the following states: - CONNECTION_STATE_DISCONNECTED: No connection is established. - CONNECTION_STATE_CONNECTED: A connection to the Brick Daemon or the WIFI/Ethernet Extension is established. - CONNECTION_STATE_PENDING: IP Connection is currently trying to connect. """ if self.socket is not None: return IPConnection.CONNECTION_STATE_CONNECTED elif self.auto_reconnect_pending: return IPConnection.CONNECTION_STATE_PENDING else: return IPConnection.CONNECTION_STATE_DISCONNECTED def set_auto_reconnect(self, auto_reconnect): """ Enables or disables auto-reconnect. If auto-reconnect is enabled, the IP Connection will try to reconnect to the previously given host and port, if the connection is lost. Default value is *True*. """ self.auto_reconnect = bool(auto_reconnect) if not self.auto_reconnect: # abort potentially pending auto reconnect self.auto_reconnect_allowed = False def get_auto_reconnect(self): """ Returns *true* if auto-reconnect is enabled, *false* otherwise. """ return self.auto_reconnect def set_timeout(self, timeout): """ Sets the timeout in seconds for getters and for setters for which the response expected flag is activated. Default timeout is 2.5. """ timeout = float(timeout) if timeout < 0: raise ValueError('Timeout cannot be negative') self.timeout = timeout def get_timeout(self): """ Returns the timeout as set by set_timeout. """ return self.timeout def enumerate(self): """ Broadcasts an enumerate request. All devices will respond with an enumerate callback. """ request, _, _ = self.create_packet_header(None, 8, IPConnection.FUNCTION_ENUMERATE) self.send(request) def wait(self): """ Stops the current thread until unwait is called. This is useful if you rely solely on callbacks for events, if you want to wait for a specific callback or if the IP Connection was created in a thread. Wait and unwait act in the same way as "acquire" and "release" of a semaphore. """ self.waiter.acquire() def unwait(self): """ Unwaits the thread previously stopped by wait. Wait and unwait act in the same way as "acquire" and "release" of a semaphore. """ self.waiter.release() def register_callback(self, id, callback): """ Registers a callback with ID *id* to the function *callback*. """ self.registered_callbacks[id] = callback def connect_unlocked(self, is_auto_reconnect): # NOTE: assumes that socket_lock is locked if self.callback_thread is None: try: self.callback_queue = Queue() self.callback_thread = Thread(name='Callback-Processor', target=self.callback_loop, args=(self.callback_queue, )) self.callback_thread.daemon = True self.callback_thread.start() except: self.callback_queue = None self.callback_thread = None raise try: self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) self.socket.connect((self.host, self.port)) except: self.socket = None raise try: self.receive_flag = True self.receive_thread = Thread(name='Brickd-Receiver', target=self.receive_loop) self.receive_thread.daemon = True self.receive_thread.start() except: def cleanup(): # end receive thread self.receive_flag = False try: self.socket.shutdown(socket.SHUT_RDWR) except socket.error: pass if self.receive_thread is not None: self.receive_thread.join() # FIXME: use a timeout? self.receive_thread = None # close socket self.socket.close() self.socket = None # end callback thread if not is_auto_reconnect: self.callback_queue.put((IPConnection.QUEUE_EXIT, None)) if current_thread() is not self.callback_thread: self.callback_thread.join() self.callback_queue = None self.callback_thread = None cleanup() raise self.auto_reconnect_allowed = False self.auto_reconnect_pending = False if is_auto_reconnect: connect_reason = IPConnection.CONNECT_REASON_AUTO_RECONNECT else: connect_reason = IPConnection.CONNECT_REASON_REQUEST self.callback_queue.put((IPConnection.QUEUE_META, (IPConnection.CALLBACK_CONNECTED, connect_reason))) def receive_loop(self): if sys.hexversion < 0x03000000: pending_data = '' else: pending_data = bytes() while self.receive_flag: try: data = self.socket.recv(8192) except socket.error: self.auto_reconnect_allowed = True self.receive_flag = False self.callback_queue.put((IPConnection.QUEUE_META, (IPConnection.CALLBACK_DISCONNECTED, IPConnection.DISCONNECT_REASON_ERROR))) return if len(data) == 0: if self.receive_flag: self.auto_reconnect_allowed = True self.receive_flag = False self.callback_queue.put((IPConnection.QUEUE_META, (IPConnection.CALLBACK_DISCONNECTED, IPConnection.DISCONNECT_REASON_SHUTDOWN))) return pending_data += data while True: if len(pending_data) < 8: # Wait for complete header break length = get_length_from_data(pending_data) if len(pending_data) < length: # Wait for complete packet break packet = pending_data[0:length] pending_data = pending_data[length:] self.handle_response(packet) def dispatch_meta(self, function_id, parameter): if function_id == IPConnection.CALLBACK_CONNECTED: if IPConnection.CALLBACK_CONNECTED in self.registered_callbacks and \ self.registered_callbacks[IPConnection.CALLBACK_CONNECTED] is not None: self.registered_callbacks[IPConnection.CALLBACK_CONNECTED](parameter) elif function_id == IPConnection.CALLBACK_DISCONNECTED: # need to do this here, the receive_loop is not allowed to # hold the socket_lock because this could cause a deadlock # with a concurrent call to the (dis-)connect function with self.socket_lock: if self.socket is not None: self.socket.close() self.socket = None # FIXME: wait a moment here, otherwise the next connect # attempt will succeed, even if there is no open server # socket. the first receive will then fail directly time.sleep(0.1) if IPConnection.CALLBACK_DISCONNECTED in self.registered_callbacks and \ self.registered_callbacks[IPConnection.CALLBACK_DISCONNECTED] is not None: self.registered_callbacks[IPConnection.CALLBACK_DISCONNECTED](parameter) if parameter != IPConnection.DISCONNECT_REASON_REQUEST and \ self.auto_reconnect and self.auto_reconnect_allowed: self.auto_reconnect_pending = True retry = True # block here until reconnect. this is okay, there is no # callback to deliver when there is no connection while retry: retry = False with self.socket_lock: if self.auto_reconnect_allowed and self.socket is None: try: self.connect_unlocked(True) except: retry = True else: self.auto_reconnect_pending = False if retry: time.sleep(0.1) def dispatch_packet(self, packet): uid = get_uid_from_data(packet) length = get_length_from_data(packet) function_id = get_function_id_from_data(packet) payload = packet[8:] if function_id == IPConnection.CALLBACK_ENUMERATE and \ IPConnection.CALLBACK_ENUMERATE in self.registered_callbacks: uid, connected_uid, position, hardware_version, \ firmware_version, device_identifier, enumeration_type = \ self.deserialize_data(payload, '8s 8s c 3B 3B H B') cb = self.registered_callbacks[IPConnection.CALLBACK_ENUMERATE] cb(uid, connected_uid, position, hardware_version, firmware_version, device_identifier, enumeration_type) return if uid not in self.devices: return device = self.devices[uid] if function_id in device.registered_callbacks and \ device.registered_callbacks[function_id] is not None: cb = device.registered_callbacks[function_id] form = device.callback_formats[function_id] if len(form) == 0: cb() elif len(form) == 1: cb(self.deserialize_data(payload, form)) else: cb(*self.deserialize_data(payload, form)) def callback_loop(self, callback_queue): while True: kind, data = callback_queue.get() if kind == IPConnection.QUEUE_EXIT: return elif kind == IPConnection.QUEUE_META: self.dispatch_meta(*data) elif kind == IPConnection.QUEUE_PACKET: if not self.receive_flag: # don't dispatch callbacks when the receive thread isn't running continue self.dispatch_packet(data) def deserialize_data(self, data, form): ret = [] for f in form.split(' '): f = '<' + f length = struct.calcsize(f) x = struct.unpack(f, data[:length]) if len(x) > 1: ret.append(x) elif 's' in f: ret.append(self.trim_deserialized_string(x[0])) else: ret.append(x[0]) data = data[length:] if len(ret) == 1: return ret[0] return ret def trim_deserialized_string(self, s): if sys.hexversion >= 0x03000000: s = s.decode('ascii') i = s.find(chr(0)) if i >= 0: s = s[:i] return s def send(self, packet): with self.socket_lock: if self.socket is None: raise Error(Error.NOT_CONNECTED, 'Not connected') try: self.socket.send(packet) except socket.error: pass def send_request(self, device, function_id, data, form, form_ret): length = 8 + struct.calcsize('<' + form) request, response_expected, sequence_number = \ self.create_packet_header(device, length, function_id) def pack_string(f, d): if sys.hexversion < 0x03000000: if type(d) == types.UnicodeType: return struct.pack('<' + f, d.encode('ascii')) else: return struct.pack('<' + f, d) else: if isinstance(d, str): return struct.pack('<' + f, bytes(d, 'ascii')) else: return struct.pack('<' + f, d) for f, d in zip(form.split(' '), data): if len(f) > 1 and not 's' in f and not 'c' in f: request += struct.pack('<' + f, *d) elif 's' in f: request += pack_string(f, d) elif 'c' in f: if len(f) > 1: if int(f.replace('c', '')) != len(d): raise ValueError('Incorrect char list length'); for k in d: request += pack_string('c', k) else: request += pack_string(f, d) else: request += struct.pack('<' + f, d) if response_expected: with device.request_lock: device.expected_response_function_id = function_id device.expected_response_sequence_number = sequence_number try: self.send(request) response = device.response_queue.get(True, self.timeout) except Empty: msg = 'Did not receive response for function {0} in time'.format(function_id) raise Error(Error.TIMEOUT, msg) finally: device.expected_response_function_id = None device.expected_response_sequence_number = None error_code = get_error_code_from_data(response) if error_code == 0: # no error pass elif error_code == 1: msg = 'Got invalid parameter for function {0}'.format(function_id) raise Error(Error.INVALID_PARAMETER, msg) elif error_code == 2: msg = 'Function {0} is not supported'.format(function_id) raise Error(Error.NOT_SUPPORTED, msg) else: msg = 'Function {0} returned an unknown error'.format(function_id) raise Error(Error.UNKNOWN_ERROR_CODE, msg) if len(form_ret) > 0: return self.deserialize_data(response[8:], form_ret) else: self.send(request) def get_next_sequence_number(self): with self.sequence_number_lock: sequence_number = self.next_sequence_number + 1 self.next_sequence_number = sequence_number % 15 return sequence_number def handle_response(self, packet): function_id = get_function_id_from_data(packet) sequence_number = get_sequence_number_from_data(packet) if sequence_number == 0 and function_id == IPConnection.CALLBACK_ENUMERATE: if IPConnection.CALLBACK_ENUMERATE in self.registered_callbacks: self.callback_queue.put((IPConnection.QUEUE_PACKET, packet)) return uid = get_uid_from_data(packet) if not uid in self.devices: # Response from an unknown device, ignoring it return device = self.devices[uid] if sequence_number == 0: if function_id in device.registered_callbacks: self.callback_queue.put((IPConnection.QUEUE_PACKET, packet)) return if device.expected_response_function_id == function_id and \ device.expected_response_sequence_number == sequence_number: device.response_queue.put(packet) return # Response seems to be OK, but can't be handled, most likely # a callback without registered function def create_packet_header(self, device, length, function_id): uid = IPConnection.BROADCAST_UID sequence_number = self.get_next_sequence_number() r_bit = 0 a_bit = 0 if device is not None: uid = device.uid if device.get_response_expected(function_id): r_bit = 1 if device.auth_key is not None: a_bit = 1 else: if self.auth_key is not None: a_bit = 1 sequence_number_and_options = \ (sequence_number << 4) | (r_bit << 3) | (a_bit << 2) return (struct.pack('