diff --git a/thingsboard_gateway/connectors/ble/ble_connector.py b/thingsboard_gateway/connectors/ble/ble_connector.py index 317162e9..defc383c 100644 --- a/thingsboard_gateway/connectors/ble/ble_connector.py +++ b/thingsboard_gateway/connectors/ble/ble_connector.py @@ -29,7 +29,7 @@ class BLEConnector(Connector, Thread): def __init__(self, gateway, config, connector_type): super().__init__() self.__connector_type = connector_type - self.__default_services = [x for x in range(0x1800, 0x183A)] + self.__default_services = list(range(0x1800, 0x183A)) self.statistics = {'MessagesReceived': 0, 'MessagesSent': 0} self.__gateway = gateway @@ -39,8 +39,8 @@ class BLEConnector(Connector, Thread): self._connected = False self.__stopped = False - self.__previous_scan_time = time.time()-10000 - self.__previous_read_time = time.time()-10000 + self.__previous_scan_time = time.time() - 10000 + self.__previous_read_time = time.time() - 10000 self.__check_interval_seconds = self.__config['checkIntervalSeconds'] if self.__config.get( 'checkIntervalSeconds') is not None else 10 self.__rescan_time = self.__config['rescanIntervalSeconds'] if self.__config.get( @@ -82,30 +82,30 @@ class BLEConnector(Connector, Thread): def on_attributes_update(self, content): log.debug(content) - try: - for device in self.__devices_around: - if self.__devices_around[device]['device_config'].get('name') == content['device']: - for requests in self.__devices_around[device]['device_config']["attributeUpdates"]: - for service in self.__devices_around[device]['services']: - if requests['characteristicUUID'] in self.__devices_around[device]['services'][service]: - characteristic = self.__devices_around[device]['services'][service][requests['characteristicUUID']]['characteristic'] - if 'WRITE' in characteristic.propertiesToString(): - if content['data'].get(requests['attributeOnThingsBoard']) is not None: - try: - self.__check_and_reconnect(device) - resp = characteristic.write(content['data'][requests['attributeOnThingsBoard']].encode('UTF-8'), True) - except BTLEDisconnectError: - self.__check_and_reconnect(device) - resp = characteristic.write(content['data'][requests['attributeOnThingsBoard']].encode('UTF-8'), True) - except Exception as e: - log.exception(e) - else: - log.error('Cannot process attribute update request for device: %s with data: %s and config: %s', - device, - content, - self.__devices_around[device]['device_config']["attributeUpdates"]) - except Exception as e: - log.exception(e) + for device in self.__devices_around: + if self.__devices_around[device]['device_config'].get('name') == content['device']: + for requests in self.__devices_around[device]['device_config']["attributeUpdates"]: + for service in self.__devices_around[device]['services']: + if requests['characteristicUUID'] in self.__devices_around[device]['services'][service]: + characteristic = self.__devices_around[device]['services'][service][requests['characteristicUUID']]['characteristic'] + if 'WRITE' in characteristic.propertiesToString(): + if content['data'].get(requests['attributeOnThingsBoard']) is not None: + try: + self.__check_and_reconnect(device) + content_to_write = content['data'][requests['attributeOnThingsBoard']].encode('UTF-8') + characteristic.write(content_to_write, True) + except BTLEDisconnectError: + self.__check_and_reconnect(device) + content_to_write = content['data'][requests['attributeOnThingsBoard']].encode('UTF-8') + characteristic.write(content_to_write, True) + except Exception as e: + log.exception(e) + else: + log.error( + 'Cannot process attribute update request for device: %s with data: %s and config: %s', + device, + content, + self.__devices_around[device]['device_config']["attributeUpdates"]) def server_side_rpc_handler(self, content): log.debug(content) @@ -122,12 +122,12 @@ class BLEConnector(Connector, Thread): if requests['methodProcessing'].upper() == 'WRITE': try: self.__check_and_reconnect(device) - response = characteristic.write(content['data'].get('params', '').encode('UTF-8'), requests.get('withResponse', False)) + response = characteristic.write(content['data'].get('params', '').encode('UTF-8'), + requests.get('withResponse', False)) except BTLEDisconnectError: self.__check_and_reconnect(device) - response = characteristic.write(content['data'].get('params', '').encode('UTF-8'), requests.get('withResponse', False)) - except Exception as e: - log.exception(e) + response = characteristic.write(content['data'].get('params', '').encode('UTF-8'), + requests.get('withResponse', False)) elif requests['methodProcessing'].upper() == 'READ': try: self.__check_and_reconnect(device) @@ -135,32 +135,29 @@ class BLEConnector(Connector, Thread): except BTLEDisconnectError: self.__check_and_reconnect(device) response = characteristic.read() - except Exception as e: - log.exception(e) elif requests['methodProcessing'].upper() == 'NOTIFY': try: self.__check_and_reconnect(device) - delegate = self.__notify_handler(self.__devices_around[device], characteristic.handle) + delegate = self.__notify_handler(self.__devices_around[device], + characteristic.handle) response = delegate.data except BTLEDisconnectError: self.__check_and_reconnect(device) - delegate = self.__notify_handler(self.__devices_around[device], characteristic.handle) + delegate = self.__notify_handler(self.__devices_around[device], + characteristic.handle) response = delegate.data - except Exception as e: - log.exception(e) if response is not None: log.debug('Response from device: %s', response) if requests['withResponse']: response = 'success' - self.__gateway.send_rpc_reply(content['device'], content['data']['id'], str(response)) - - - + self.__gateway.send_rpc_reply(content['device'], content['data']['id'], + str(response)) else: - log.error('Method for rpc request - not supported by characteristic or not found in the config.\nDevice: %s with data: %s and config: %s', - device, - content, - self.__devices_around[device]['device_config']["serverSideRpc"]) + log.error( + 'Method for rpc request - not supported by characteristic or not found in the config.\nDevice: %s with data: %s and config: %s', + device, + content, + self.__devices_around[device]['device_config']["serverSideRpc"]) except Exception as e: log.exception(e) @@ -173,7 +170,8 @@ class BLEConnector(Connector, Thread): def device_add(self, device): for interested_device in self.__devices_around: - if device.addr.upper() == interested_device and self.__devices_around[interested_device].get('scanned_device') is None: + if device.addr.upper() == interested_device and self.__devices_around[interested_device].get( + 'scanned_device') is None: self.__devices_around[interested_device]['scanned_device'] = device self.__devices_around[interested_device]['is_new_device'] = True log.debug('Device with address: %s - found.', device.addr.upper()) @@ -181,8 +179,10 @@ class BLEConnector(Connector, Thread): def __get_services_and_chars(self): for device in self.__devices_around: try: - if self.__devices_around.get(device) is not None and self.__devices_around[device].get('scanned_device') is not None: - log.debug('Connecting to device with address: %s', self.__devices_around[device]['scanned_device'].addr.upper()) + if self.__devices_around.get(device) is not None and self.__devices_around[device].get( + 'scanned_device') is not None: + log.debug('Connecting to device with address: %s', + self.__devices_around[device]['scanned_device'].addr.upper()) if self.__devices_around[device].get('peripheral') is None: peripheral = Peripheral(self.__devices_around[device]['scanned_device']) self.__devices_around[device]['peripheral'] = peripheral @@ -222,16 +222,17 @@ class BLEConnector(Connector, Thread): except Exception as e: log.exception(e) characteristic_uuid = str(characteristic.uuid).upper() - if self.__devices_around[device]['services'][service_uuid].get(characteristic_uuid) is None: - self.__devices_around[device]['services'][service_uuid][characteristic_uuid] = { - 'characteristic': characteristic, - 'handle': characteristic.handle, - 'descriptors': {}} + if self.__devices_around[device]['services'][service_uuid].get( + characteristic_uuid) is None: + self.__devices_around[device]['services'][service_uuid][characteristic_uuid] = {'characteristic': characteristic, + 'handle': characteristic.handle, + 'descriptors': {}} for descriptor in descriptors: log.debug(descriptor.handle) log.debug(str(descriptor.uuid)) log.debug(str(descriptor)) - self.__devices_around[device]['services'][service_uuid][characteristic_uuid]['descriptors'][descriptor.handle] = descriptor + self.__devices_around[device]['services'][service_uuid][ + characteristic_uuid]['descriptors'][descriptor.handle] = descriptor except BTLEDisconnectError: self.__check_and_reconnect(device) else: @@ -262,12 +263,15 @@ class BLEConnector(Connector, Thread): log.exception(e) def __new_device_processing(self, device): - default_services_on_device = [service for service in self.__devices_around[device]['services'].keys() if int(service.split('-')[0], 16) in self.__default_services] + default_services_on_device = [service for service in self.__devices_around[device]['services'].keys() if + int(service.split('-')[0], 16) in self.__default_services] log.debug('Default services found on device %s :%s', device, default_services_on_device) converter = BytesBLEUplinkConverter(self.__devices_around[device]['device_config']) converted_data = None for service in default_services_on_device: - characteristics = [char for char in self.__devices_around[device]['services'][service].keys() if self.__devices_around[device]['services'][service][char]['characteristic'].supportsRead()] + characteristics = [char for char in self.__devices_around[device]['services'][service].keys() if + self.__devices_around[device]['services'][service][char][ + 'characteristic'].supportsRead()] for char in characteristics: read_config = {'characteristicUUID': char, 'method': 'READ', @@ -325,10 +329,11 @@ class BLEConnector(Connector, Thread): characteristic_uuid_from_config = characteristic_processing_conf.get('characteristicUUID') if characteristic_uuid_from_config is None: log.error('Characteristic not found in config: %s', pformat(characteristic_processing_conf)) - return + return None if self.__devices_around[device]['services'][service].get(characteristic_uuid_from_config) is None: continue - characteristic = self.__devices_around[device]['services'][service][characteristic_uuid_from_config]['characteristic'] + characteristic = self.__devices_around[device]['services'][service][characteristic_uuid_from_config][ + 'characteristic'] self.__check_and_reconnect(device) data = None if characteristic_processing_conf.get('method', '_').upper().split()[0] == "READ": @@ -346,20 +351,24 @@ class BLEConnector(Connector, Thread): self.__notify_delegators[device] = {} if self.__notify_delegators[device].get(handle) is None: self.__notify_delegators[device][handle] = {'function': self.__notify_handler, - 'args': ( - self.__devices_around[device], - handle, - self.__notify_delegators[device].get(handle)), - 'delegate': None, + 'args': (self.__devices_around[device], + handle, + self.__notify_delegators[device].get(handle)), + 'delegate': None } - self.__notify_delegators[device][handle]['delegate'] = self.__notify_delegators[device][handle]['function'](*self.__notify_delegators[device][handle]['args']) + self.__notify_delegators[device][handle]['delegate'] = self.__notify_delegators[device][handle][ + 'function'](*self.__notify_delegators[device][handle]['args']) data = self.__notify_delegators[device][handle]['delegate'].data else: - self.__notify_delegators[device][handle]['args'] = (self.__devices_around[device], handle, self.__notify_delegators[device][handle]['delegate']) - self.__notify_delegators[device][handle]['delegate'] = self.__notify_delegators[device][handle]['function'](*self.__notify_delegators[device][handle]['args']) + self.__notify_delegators[device][handle]['args'] = (self.__devices_around[device], + handle, + self.__notify_delegators[device][handle]['delegate']) + self.__notify_delegators[device][handle]['delegate'] = self.__notify_delegators[device][handle][ + 'function'](*self.__notify_delegators[device][handle]['args']) data = self.__notify_delegators[device][handle]['delegate'].data if data is None: - log.error('Cannot process characteristic: %s with config:\n%s', str(characteristic.uuid).upper(), pformat(characteristic_processing_conf)) + log.error('Cannot process characteristic: %s with config:\n%s', str(characteristic.uuid).upper(), + pformat(characteristic_processing_conf)) else: log.debug('data: %s', data) return data @@ -367,7 +376,8 @@ class BLEConnector(Connector, Thread): def __scan_ble(self): log.debug("Scanning for devices...") try: - self.__scanner.scan(self.__config.get('scanTimeSeconds', 5), passive=self.__config.get('passiveScanMode', False)) + self.__scanner.scan(self.__config.get('scanTimeSeconds', 5), + passive=self.__config.get('passiveScanMode', False)) except BTLEManagementError as e: log.error('BLE working only with root user.') log.error('Or you can try this command:\nsudo setcap ' @@ -385,7 +395,7 @@ class BLEConnector(Connector, Thread): if self.__config.get('devices') is None: log.error('Devices not found in configuration file. BLE Connector stopped.') self._connected = False - return + return None for interest_device in self.__config.get('devices'): keys_in_config = ['attributes', 'telemetry'] if interest_device.get('MACAddress') is not None: @@ -397,27 +407,34 @@ class BLEConnector(Connector, Thread): converter = None if type_section.get('converter') is not None: try: - module = TBUtility.check_and_import(self.__connector_type, type_section['converter']) + module = TBUtility.check_and_import(self.__connector_type, + type_section['converter']) if module is not None: - log.debug('Custom converter for device %s - found!', interest_device['MACAddress']) + log.debug('Custom converter for device %s - found!', + interest_device['MACAddress']) converter = module(interest_device) else: - log.error("\n\nCannot find extension module for device %s .\nPlease check your configuration.\n", interest_device['MACAddress']) + log.error( + "\n\nCannot find extension module for device %s .\nPlease check your configuration.\n", + interest_device['MACAddress']) except Exception as e: log.exception(e) else: converter = default_converter if converter is not None: if interest_uuid.get(type_section["characteristicUUID"].upper()) is None: - interest_uuid[type_section["characteristicUUID"].upper()] = [{'section_config': type_section, - 'type': key_type, - 'converter': converter}] + interest_uuid[type_section["characteristicUUID"].upper()] = [ + {'section_config': type_section, + 'type': key_type, + 'converter': converter}] else: - interest_uuid[type_section["characteristicUUID"].upper()].append({'section_config': type_section, - 'type': key_type, - 'converter': converter}) + interest_uuid[type_section["characteristicUUID"].upper()].append( + {'section_config': type_section, + 'type': key_type, + 'converter': converter}) else: - log.error("No characteristicUUID found in configuration section for %s:\n%s\n", key_type, pformat(type_section)) + log.error("No characteristicUUID found in configuration section for %s:\n%s\n", key_type, + pformat(type_section)) if self.__devices_around.get(interest_device['MACAddress'].upper()) is None: self.__devices_around[interest_device['MACAddress'].upper()] = {} self.__devices_around[interest_device['MACAddress'].upper()]['device_config'] = interest_device @@ -431,6 +448,6 @@ class ScanDelegate(DefaultDelegate): DefaultDelegate.__init__(self) self.__connector = ble_connector - def handleDiscovery(self, dev, is_new_device, is_new_data): + def handleDiscovery(self, dev, is_new_device, _): if is_new_device: self.__connector.device_add(dev) diff --git a/thingsboard_gateway/connectors/ble/ble_uplink_converter.py b/thingsboard_gateway/connectors/ble/ble_uplink_converter.py index a9c22041..04a15870 100644 --- a/thingsboard_gateway/connectors/ble/ble_uplink_converter.py +++ b/thingsboard_gateway/connectors/ble/ble_uplink_converter.py @@ -12,10 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from thingsboard_gateway.connectors.converter import Converter, ABC, log, abstractmethod +from thingsboard_gateway.connectors.converter import Converter, log, abstractmethod -class BLEUplinkConverter(ABC): +class BLEUplinkConverter(Converter): @abstractmethod def convert(self, config, data): diff --git a/thingsboard_gateway/connectors/ble/bytes_ble_uplink_converter.py b/thingsboard_gateway/connectors/ble/bytes_ble_uplink_converter.py index a24a1d36..fbc6d4af 100644 --- a/thingsboard_gateway/connectors/ble/bytes_ble_uplink_converter.py +++ b/thingsboard_gateway/connectors/ble/bytes_ble_uplink_converter.py @@ -25,10 +25,8 @@ # limitations under the License. from pprint import pformat -import codecs + from thingsboard_gateway.connectors.ble.ble_uplink_converter import BLEUplinkConverter, log -import struct -from thingsboard_gateway.tb_utility.tb_utility import TBUtility class BytesBLEUplinkConverter(BLEUplinkConverter): diff --git a/thingsboard_gateway/connectors/modbus/bytes_modbus_downlink_converter.py b/thingsboard_gateway/connectors/modbus/bytes_modbus_downlink_converter.py index 1f3b3e21..b1ade67a 100644 --- a/thingsboard_gateway/connectors/modbus/bytes_modbus_downlink_converter.py +++ b/thingsboard_gateway/connectors/modbus/bytes_modbus_downlink_converter.py @@ -31,7 +31,7 @@ class BytesModbusDownlinkConverter(ModbusConverter): builder = BinaryPayloadBuilder(byteorder=Endian.Big) else: log.warning("byte order is not BIG or LITTLE") - return + return None reg_count = config.get("registerCount", 1) value = config["value"] if config.get("tag") is not None: @@ -46,14 +46,14 @@ class BytesModbusDownlinkConverter(ModbusConverter): else: log.warning("unsupported amount of registers with double type for device %s in Downlink converter", self.__config["deviceName"]) - return + return None elif "Float" in tags: if reg_count == 2: builder.add_32bit_float(value) else: log.warning("unsupported amount of registers with float type for device %s in Downlink converter", self.__config["deviceName"]) - return + return None elif "Integer" in tags or "DWord" in tags or "DWord/Integer" in tags or "Word" in tags: if reg_count == 1: builder.add_16bit_int(value) @@ -64,7 +64,7 @@ class BytesModbusDownlinkConverter(ModbusConverter): else: log.warning("unsupported amount of registers with integer/word/dword type for device %s in Downlink converter", self.__config["deviceName"]) - return + return None else: log.warning("unsupported hardware data type for device %s in Downlink converter", self.__config["deviceName"]) @@ -83,4 +83,4 @@ class BytesModbusDownlinkConverter(ModbusConverter): else: log.warning("Unsupported function code, for device %s in Downlink converter", self.__config["deviceName"]) - return + return None diff --git a/thingsboard_gateway/connectors/modbus/bytes_modbus_uplink_converter.py b/thingsboard_gateway/connectors/modbus/bytes_modbus_uplink_converter.py index aee5924a..941270f3 100644 --- a/thingsboard_gateway/connectors/modbus/bytes_modbus_uplink_converter.py +++ b/thingsboard_gateway/connectors/modbus/bytes_modbus_uplink_converter.py @@ -52,6 +52,7 @@ class BytesModbusUplinkConverter(ModbusConverter): try: decoder = BinaryPayloadDecoder.fromRegisters(registers, byteorder=endian_order) except TypeError: + # pylint: disable=E1123 decoder = BinaryPayloadDecoder.fromRegisters(registers, endian=endian_order) assert decoder is not None decoded_data = self.__decode_from_registers(decoder, configuration) diff --git a/thingsboard_gateway/connectors/modbus/modbus_connector.py b/thingsboard_gateway/connectors/modbus/modbus_connector.py index 3df02e25..e4f7cf58 100644 --- a/thingsboard_gateway/connectors/modbus/modbus_connector.py +++ b/thingsboard_gateway/connectors/modbus/modbus_connector.py @@ -16,12 +16,15 @@ import time import threading from random import choice from string import ascii_lowercase -from thingsboard_gateway.tb_utility.tb_utility import TBUtility + from pymodbus.client.sync import ModbusTcpClient, ModbusUdpClient, ModbusSerialClient, ModbusRtuFramer, ModbusSocketFramer from pymodbus.bit_write_message import WriteSingleCoilResponse, WriteMultipleCoilsResponse -from pymodbus.register_write_message import WriteMultipleRegistersResponse, WriteSingleRegisterResponse +from pymodbus.register_write_message import WriteMultipleRegistersResponse, \ + WriteSingleRegisterResponse from pymodbus.register_read_message import ReadRegistersResponseBase from pymodbus.exceptions import ConnectionException + +from thingsboard_gateway.tb_utility.tb_utility import TBUtility from thingsboard_gateway.connectors.connector import Connector, log from thingsboard_gateway.connectors.modbus.bytes_modbus_uplink_converter import BytesModbusUplinkConverter from thingsboard_gateway.connectors.modbus.bytes_modbus_downlink_converter import BytesModbusDownlinkConverter @@ -157,12 +160,13 @@ class ModbusConnector(Connector, threading.Thread): log.debug("Data has not been changed.") elif self.__devices[device]["config"].get("sendDataOnlyOnChange") is None or not self.__devices[device]["config"].get("sendDataOnlyOnChange"): self.statistics['MessagesReceived'] += 1 - to_send = {"deviceName": converted_data["deviceName"], "deviceType": converted_data["deviceType"]} + to_send = {"deviceName": converted_data["deviceName"], + "deviceType": converted_data["deviceType"]} # if converted_data["telemetry"] != self.__devices[device]["telemetry"]: self.__devices[device]["last_telemetry"] = converted_data["telemetry"] to_send["telemetry"] = converted_data["telemetry"] # if converted_data["attributes"] != self.__devices[device]["attributes"]: - self.__devices[device]["last_telemetry"] = converted_data["attributes"] + self.__devices[device]["last_attributes"] = converted_data["attributes"] to_send["attributes"] = converted_data["attributes"] self.__gateway.send_to_storage(self.get_name(), to_send) self.statistics['MessagesSent'] += 1 @@ -234,10 +238,10 @@ class ModbusConnector(Connector, threading.Thread): log.exception(e) if response is not None: log.debug(response) - if type(response) in (WriteMultipleRegistersResponse, - WriteMultipleCoilsResponse, - WriteSingleCoilResponse, - WriteSingleRegisterResponse): + if isinstance(response, (WriteMultipleRegistersResponse, + WriteMultipleCoilsResponse, + WriteSingleCoilResponse, + WriteSingleRegisterResponse)): response = True else: response = False diff --git a/thingsboard_gateway/connectors/modbus/modbus_converter.py b/thingsboard_gateway/connectors/modbus/modbus_converter.py index 5fac5a1e..7d23ab04 100644 --- a/thingsboard_gateway/connectors/modbus/modbus_converter.py +++ b/thingsboard_gateway/connectors/modbus/modbus_converter.py @@ -12,10 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from thingsboard_gateway.connectors.converter import Converter, ABC, abstractmethod, log +from thingsboard_gateway.connectors.converter import Converter, abstractmethod, log -class ModbusConverter(ABC): +class ModbusConverter(Converter): @abstractmethod def convert(self, config, data): pass diff --git a/thingsboard_gateway/connectors/mqtt/json_mqtt_uplink_converter.py b/thingsboard_gateway/connectors/mqtt/json_mqtt_uplink_converter.py index ad388f41..b4cf0a94 100644 --- a/thingsboard_gateway/connectors/mqtt/json_mqtt_uplink_converter.py +++ b/thingsboard_gateway/connectors/mqtt/json_mqtt_uplink_converter.py @@ -69,5 +69,4 @@ class JsonMqttUplinkConverter(MqttUplinkConverter): except Exception as e: log.error('Error in converter, for config: \n%s\n and message: \n%s\n', dumps(self.__config), str(data)) log.exception(e) - finally: - return dict_result \ No newline at end of file + return dict_result diff --git a/thingsboard_gateway/connectors/mqtt/mqtt_connector.py b/thingsboard_gateway/connectors/mqtt/mqtt_connector.py index 201b8bb1..b1f1b69f 100644 --- a/thingsboard_gateway/connectors/mqtt/mqtt_connector.py +++ b/thingsboard_gateway/connectors/mqtt/mqtt_connector.py @@ -15,14 +15,13 @@ import time import string import random +from threading import Thread from re import match, fullmatch, search import ssl from paho.mqtt.client import Client from thingsboard_gateway.connectors.connector import Connector, log from thingsboard_gateway.connectors.mqtt.json_mqtt_uplink_converter import JsonMqttUplinkConverter -from threading import Thread from thingsboard_gateway.tb_utility.tb_utility import TBUtility -from simplejson import loads class MqttConnector(Connector, Thread): @@ -37,7 +36,7 @@ class MqttConnector(Connector, Thread): self.__broker = config.get('broker') self.__mapping = config.get('mapping') self.__server_side_rpc = config.get('serverSideRpc') - self.__service_config = {"connectRequests": None, "disconnectRequests": None} + self.__service_config = {"connectRequests": [], "disconnectRequests": []} self.__attribute_updates = [] self.__get_service_config(config) self.__sub_topics = {} @@ -63,9 +62,9 @@ class MqttConnector(Connector, Thread): tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None) except Exception as e: - self.__log.error("Cannot setup connection to broker %s using SSL. Please check your configuration.\nError: %s", - self.get_name(), - e) + self.__log.error("Cannot setup connection to broker %s using SSL. Please check your configuration.\nError: ", + self.get_name()) + self.__log.exception(e) self._client.tls_insecure_set(False) self._client.on_connect = self._on_connect self._client.on_message = self._on_message @@ -138,10 +137,7 @@ class MqttConnector(Connector, Thread): } if rc == 0: self._connected = True - self.__log.info('%s connected to %s:%s - successfully.', - self.get_name(), - self.__broker["host"], - self.__broker.get("port", "1883")) + self.__log.info('%s connected to %s:%s - successfully.', self.get_name(), self.__broker["host"], self.__broker.get("port", "1883")) for mapping in self.__mapping: try: converter = None @@ -152,7 +148,7 @@ class MqttConnector(Connector, Thread): self.__log.debug('Custom converter for topic %s - found!', mapping["topicFilter"]) converter = module(mapping) else: - self.__log.error("\n\nCannot find extension module for %s topic.\n\Please check your configuration.\n", mapping["topicFilter"]) + self.__log.error("\n\nCannot find extension module for %s topic.\nPlease check your configuration.\n", mapping["topicFilter"]) except Exception as e: self.__log.exception(e) else: @@ -191,9 +187,8 @@ class MqttConnector(Connector, Thread): def _on_log(self, *args): self.__log.debug(args) - # pass - def _on_subscribe(self, client, userdata, mid, granted_qos): + def _on_subscribe(self, _, __, mid, granted_qos): try: if granted_qos[0] == 128: self.__log.error('"%s" subscription failed to topic %s subscription message id = %i', self.get_name(), self.__subscribes_sent.get(mid), mid) @@ -234,95 +229,85 @@ class MqttConnector(Connector, Thread): continue else: self.__log.error('Cannot find converter for topic:"%s"!', message.topic) - return + return None except Exception as e: log.exception(e) - return + return None elif self.__service_config.get("connectRequests"): - connect_requests = [connect_request for connect_request in self.__service_config.get("connectRequests")] - if connect_requests: - for request in connect_requests: - if request.get("topicFilter"): - if message.topic in request.get("topicFilter") or\ - (request.get("deviceNameTopicExpression") is not None and search(request.get("deviceNameTopicExpression"), message.topic)): - founded_device_name = None - founded_device_type = 'default' - if request.get("deviceNameJsonExpression"): - founded_device_name = TBUtility.get_value(request["deviceNameJsonExpression"], content) - if request.get("deviceNameTopicExpression"): - device_name_expression = request["deviceNameTopicExpression"] - founded_device_name = search(device_name_expression, message.topic) - if request.get("deviceTypeJsonExpression"): - founded_device_type = TBUtility.get_value(request["deviceTypeJsonExpression"], content) - if request.get("deviceTypeTopicExpression"): - device_type_expression = request["deviceTypeTopicExpression"] - founded_device_type = search(device_type_expression, message.topic) - if founded_device_name is not None and founded_device_name not in self.__gateway.get_devices(): - self.__gateway.add_device(founded_device_name, {"connector": self}, device_type=founded_device_type) - else: - self.__log.error("Cannot find connect request for device from message from topic: %s and with data: %s", - message.topic, - content) + for request in self.__service_config["connectRequests"]: + if request.get("topicFilter"): + if message.topic in request.get("topicFilter") or\ + (request.get("deviceNameTopicExpression") is not None and search(request.get("deviceNameTopicExpression"), message.topic)): + founded_device_name = None + founded_device_type = 'default' + if request.get("deviceNameJsonExpression"): + founded_device_name = TBUtility.get_value(request["deviceNameJsonExpression"], content) + if request.get("deviceNameTopicExpression"): + device_name_expression = request["deviceNameTopicExpression"] + founded_device_name = search(device_name_expression, message.topic) + if request.get("deviceTypeJsonExpression"): + founded_device_type = TBUtility.get_value(request["deviceTypeJsonExpression"], content) + if request.get("deviceTypeTopicExpression"): + device_type_expression = request["deviceTypeTopicExpression"] + founded_device_type = search(device_type_expression, message.topic) + if founded_device_name is not None and founded_device_name not in self.__gateway.get_devices(): + self.__gateway.add_device(founded_device_name, {"connector": self}, device_type=founded_device_type) else: - self.__log.error("\"topicFilter\" in connect requests config not found.") - else: - self.__log.error("Connection requests in config not found.") - - elif self.__service_config.get("disconnectRequests") is not None: - disconnect_requests = [disconnect_request for disconnect_request in self.__service_config.get("disconnectRequests")] - if disconnect_requests: - for request in disconnect_requests: - if request.get("topicFilter") is not None: - if message.topic in request.get("topicFilter") or\ - (request.get("deviceNameTopicExpression") is not None and search(request.get("deviceNameTopicExpression"), message.topic)): - founded_device_name = None - if request.get("deviceNameJsonExpression"): - founded_device_name = TBUtility.get_value(request["deviceNameJsonExpression"], content) - if request.get("deviceNameTopicExpression"): - device_name_expression = request["deviceNameTopicExpression"] - founded_device_name = search(device_name_expression, message.topic) - if founded_device_name is not None and founded_device_name in self.__gateway.get_devices(): - self.__gateway.del_device(founded_device_name) - else: - self.__log.error("Cannot find connect request for device from message from topic: %s and with data: %s", - message.topic, - content) + self.__log.error("Cannot find connect request for device from message from topic: %s and with data: %s", + message.topic, + content) + else: + self.__log.error("\"topicFilter\" in connect requests config not found.") + elif self.__service_config.get("disconnectRequests"): + for request in self.__service_config["disconnectRequests"]: + if request.get("topicFilter") is not None: + if message.topic in request.get("topicFilter") or\ + (request.get("deviceNameTopicExpression") is not None and search(request.get("deviceNameTopicExpression"), message.topic)): + founded_device_name = None + if request.get("deviceNameJsonExpression"): + founded_device_name = TBUtility.get_value(request["deviceNameJsonExpression"], content) + if request.get("deviceNameTopicExpression"): + device_name_expression = request["deviceNameTopicExpression"] + founded_device_name = search(device_name_expression, message.topic) + if founded_device_name is not None and founded_device_name in self.__gateway.get_devices(): + self.__gateway.del_device(founded_device_name) else: - self.__log.error("\"topicFilter\" in connect requests config not found.") - else: - self.__log.error("Disconnection requests in config not found.") + self.__log.error("Cannot find connect request for device from message from topic: %s and with data: %s", + message.topic, + content) + else: + self.__log.error("\"topicFilter\" in connect requests config not found.") elif message.topic in self.__gateway.rpc_requests_in_progress: self.__gateway.rpc_with_reply_processing(message.topic, content) else: self.__log.debug("Received message to topic \"%s\" with unknown interpreter data: \n\n\"%s\"", - message.topic, - content) + message.topic, + content) def on_attributes_update(self, content): - attribute_updates_config = [update for update in self.__attribute_updates] - if attribute_updates_config: - for attribute_update in attribute_updates_config: + if self.__attribute_updates: + for attribute_update in self.__attribute_updates: if match(attribute_update["deviceNameFilter"], content["device"]) and \ content["data"].get(attribute_update["attributeFilter"]): - topic = attribute_update["topicExpression"]\ - .replace("${deviceName}", content["device"])\ - .replace("${attributeKey}", attribute_update["attributeFilter"])\ - .replace("${attributeValue}", content["data"][attribute_update["attributeFilter"]]) - data = '' + try: + topic = attribute_update["topicExpression"]\ + .replace("${deviceName}", content["device"])\ + .replace("${attributeKey}", attribute_update["attributeFilter"])\ + .replace("${attributeValue}", content["data"][attribute_update["attributeFilter"]]) + except KeyError as e: + log.exception("Cannot form topic, key %s - not found", e) + raise e try: data = attribute_update["valueExpression"]\ .replace("${attributeKey}", attribute_update["attributeFilter"])\ .replace("${attributeValue}", content["data"][attribute_update["attributeFilter"]]) - except Exception as e: - self.__log.error(e) + except KeyError as e: + log.exception("Cannot form topic, key %s - not found", e) + raise e self._client.publish(topic, data).wait_for_publish() - self.__log.debug("Attribute Update data: %s for device %s to topic: %s", - data, - content["device"], - topic) + self.__log.debug("Attribute Update data: %s for device %s to topic: %s", data, content["device"], topic) else: - self.__log.error("Not found deviceName by filter in message or attributeFilter in message with data: %s", - content) + self.__log.error("Cannot found deviceName by filter in message or attributeFilter in message with data: %s", content) else: self.__log.error("Attribute updates config not found.") diff --git a/thingsboard_gateway/connectors/mqtt/mqtt_uplink_converter.py b/thingsboard_gateway/connectors/mqtt/mqtt_uplink_converter.py index 9b80a5c6..4b4981bd 100644 --- a/thingsboard_gateway/connectors/mqtt/mqtt_uplink_converter.py +++ b/thingsboard_gateway/connectors/mqtt/mqtt_uplink_converter.py @@ -12,10 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from thingsboard_gateway.connectors.converter import Converter, ABC, abstractmethod, log +from thingsboard_gateway.connectors.converter import Converter, abstractmethod, log -class MqttUplinkConverter(ABC): +class MqttUplinkConverter(Converter): @abstractmethod def convert(self, config, data): diff --git a/thingsboard_gateway/connectors/opcua/opcua_connector.py b/thingsboard_gateway/connectors/opcua/opcua_connector.py index 14f10100..ed947ad5 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_connector.py +++ b/thingsboard_gateway/connectors/opcua/opcua_connector.py @@ -13,15 +13,15 @@ # limitations under the License. import re -import regex -from simplejson import dumps import time -from threading import Thread -from random import choice -from string import ascii_lowercase -from opcua import Client, ua +from concurrent.futures import TimeoutError as FuturesTimeoutError from copy import deepcopy -from concurrent.futures import TimeoutError +from random import choice +from threading import Thread +from string import ascii_lowercase +import regex +from opcua import Client, ua +from simplejson import dumps from thingsboard_gateway.tb_utility.tb_utility import TBUtility from thingsboard_gateway.connectors.connector import Connector, log from thingsboard_gateway.connectors.opcua.opcua_uplink_converter import OpcUaUplinkConverter @@ -59,7 +59,7 @@ class OpcUaConnector(Thread, Connector): policy = self.__server_conf["security"] if cert is None or private_key is None: log.exception("Error in ssl configuration - cert or privateKey parameter not found") - raise + raise RuntimeError("Error in ssl configuration - cert or privateKey parameter not found") security_string = policy+','+security_mode+','+cert+','+private_key if ca_cert is not None: security_string = security_string + ',' + ca_cert @@ -75,6 +75,7 @@ class OpcUaConnector(Thread, Connector): self.setName(self.__server_conf.get("name", 'OPC-UA ' + ''.join(choice(ascii_lowercase) for _ in range(5))) + " Connector") self.__opcua_nodes = {} self._subscribed = {} + self.__sub = None self.data_to_send = [] self.__sub_handler = SubHandler(self) self.__stopped = False @@ -92,7 +93,7 @@ class OpcUaConnector(Thread, Connector): def run(self): while not self.__connected: try: - self.__connected = self.client.connect() + self.client.connect() try: self.client.load_type_definitions() except Exception as e: @@ -162,7 +163,7 @@ class OpcUaConnector(Thread, Connector): self.__connected = False self._subscribed = {} self.__sub = None - except TimeoutError: + except FuturesTimeoutError: self.__connected = False self._subscribed = {} self.__sub = None @@ -206,7 +207,7 @@ class OpcUaConnector(Thread, Connector): arguments_from_config = method["arguments"] arguments = content["data"].get("params") if content["data"].get("params") is not None else arguments_from_config try: - if type(arguments) is list: + if isinstance(arguments, list): result = method["node"].call_method(method[rpc_method], *arguments) elif arguments is not None: result = method["node"].call_method(method[rpc_method], arguments) @@ -360,7 +361,7 @@ class OpcUaConnector(Thread, Connector): name_expression, str(device_name_from_node)) else: log.error("Device name node not found with expression: %s", name_expression) - return + return None else: full_device_name = name_expression result_device_dict["deviceName"] = full_device_name @@ -392,14 +393,16 @@ class OpcUaConnector(Thread, Connector): log.error("Device node not found with expression: %s", TBUtility.get_value(device["deviceNodePattern"], get_tag=True)) return result - def __search_node(self, current_node, fullpath, search_method=False, result=[]): + def __search_node(self, current_node, fullpath, search_method=False, result=None): + if result is None: + result = [] try: - if regex.match("ns=\d*;[isgb]=.*", fullpath, regex.IGNORECASE): + if regex.match(r"ns=\d*;[isgb]=.*", fullpath, regex.IGNORECASE): if self.__show_map: log.debug("Looking for node with config") node = self.client.get_node(fullpath) if node is None: - log.warn("NODE NOT FOUND - using configuration %s", fullpath) + log.warning("NODE NOT FOUND - using configuration %s", fullpath) else: log.debug("Found in %s", node) result.append(node) @@ -411,13 +414,11 @@ class OpcUaConnector(Thread, Connector): if self.__show_map: log.debug("SHOW MAP: Current node path: %s", new_node_path) new_node_class = new_node.get_node_class() - # regex_fullmatch = re.fullmatch(fullpath, new_node_path.replace('\\\\.', '.')) or new_node_path.replace('\\\\', '\\') == fullpath regex_fullmatch = regex.fullmatch(fullpath_pattern, new_node_path.replace('\\\\.', '.')) or \ new_node_path.replace('\\\\', '\\') == fullpath.replace('\\\\', '\\') or \ new_node_path.replace('\\\\', '\\') == fullpath regex_search = fullpath_pattern.fullmatch(new_node_path.replace('\\\\.', '.'), partial=True) or \ new_node_path.replace('\\\\', '\\') in fullpath.replace('\\\\', '\\') - # regex_search = re.search(new_node_path, fullpath.replace('\\\\', '\\')) if regex_fullmatch: if self.__show_map: log.debug("SHOW MAP: Current node path: %s - NODE FOUND", new_node_path.replace('\\\\', '\\')) @@ -439,9 +440,9 @@ class OpcUaConnector(Thread, Connector): log.exception(e) def _check_path(self, config_path, node): - if regex.match("ns=\d*;[isgb]=.*", config_path, regex.IGNORECASE): + if regex.match(r"ns=\d*;[isgb]=.*", config_path, regex.IGNORECASE): return config_path - if re.search("^root", config_path.lower()) is None: + if re.search(r"^root", config_path.lower()) is None: node_path = '\\\\.'.join( char.split(":")[1] for char in node.get_path(200000, True)) if config_path[-3:] != '\\.': @@ -450,7 +451,8 @@ class OpcUaConnector(Thread, Connector): information_path = node_path + config_path.replace('\\', '\\\\') else: information_path = config_path - return information_path[:] + result = information_path[:] + return result @property def subscribed(self): @@ -463,7 +465,7 @@ class SubHandler(object): def datachange_notification(self, node, val, data): try: - log.debug("Python: New data change event on node %s, with val: %s", node, val) + log.debug("Python: New data change event on node %s, with val: %s and data %s", node, val, str(data)) subscription = self.connector.subscribed[node] converted_data = subscription["converter"].convert((subscription["config_path"], subscription["path"]), val) self.connector.statistics['MessagesReceived'] += 1 diff --git a/thingsboard_gateway/connectors/opcua/opcua_converter.py b/thingsboard_gateway/connectors/opcua/opcua_converter.py index 9c857813..e409d0ba 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_converter.py +++ b/thingsboard_gateway/connectors/opcua/opcua_converter.py @@ -12,12 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging -from abc import ABC, abstractmethod -from thingsboard_gateway.connectors.converter import Converter, ABC, abstractmethod, log +from thingsboard_gateway.connectors.converter import Converter, abstractmethod, log -class OpcUaConverter(ABC): +class OpcUaConverter(Converter): @abstractmethod def convert(self, config, data): pass diff --git a/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py b/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py index 88685815..dcbd45c1 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py +++ b/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py @@ -12,9 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +from re import fullmatch + from thingsboard_gateway.connectors.opcua.opcua_converter import OpcUaConverter, log from thingsboard_gateway.tb_utility.tb_utility import TBUtility -from re import fullmatch class OpcUaUplinkConverter(OpcUaConverter): diff --git a/thingsboard_gateway/connectors/request/request_connector.py b/thingsboard_gateway/connectors/request/request_connector.py index 3491f8b9..b6eab5c6 100644 --- a/thingsboard_gateway/connectors/request/request_connector.py +++ b/thingsboard_gateway/connectors/request/request_connector.py @@ -27,6 +27,7 @@ import requests from requests import Timeout from requests.auth import HTTPBasicAuth from requests.exceptions import RequestException +# pylint: disable=E1101 requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS += ':ADH-AES128-SHA256' @@ -127,7 +128,7 @@ class RequestConnector(Connector, Thread): log.debug('Custom converter for url %s - found!', endpoint["url"]) converter = module(endpoint) else: - log.error("\n\nCannot find extension module for %s url.\n\Please check your configuration.\n", endpoint["url"]) + log.error("\n\nCannot find extension module for %s url.\nPlease check your configuration.\n", endpoint["url"]) else: converter = JsonRequestUplinkConverter(endpoint) self.__requests_in_progress.append({"config": endpoint, @@ -155,15 +156,15 @@ class RequestConnector(Connector, Thread): rpc_request_dict = {**rpc_request, "converter": converter} self.__rpc_requests.append(rpc_request_dict) - def __send_request(self, request, converter_queue, log): + def __send_request(self, request, converter_queue, logger): url = "" try: request["next_time"] = time() + request["config"].get("scanPeriod", 10) request_url_from_config = request["config"]["url"] request_url_from_config = str('/' + request_url_from_config) if request_url_from_config[0] != '/' else request_url_from_config - log.debug(request_url_from_config) + logger.debug(request_url_from_config) url = self.__host + request_url_from_config - log.debug(url) + logger.debug(url) request_timeout = request["config"].get("timeout", 1) params = { "method": request["config"].get("httpMethod", "GET"), @@ -173,10 +174,10 @@ class RequestConnector(Connector, Thread): "verify": self.__ssl_verify, "auth": self.__security } - log.debug(url) + logger.debug(url) if request["config"].get("httpHeaders") is not None: params["headers"] = request["config"]["httpHeaders"] - log.debug("Request to %s will be sent", url) + logger.debug("Request to %s will be sent", url) response = request["request"](**params) if response and response.ok: if not converter_queue.full(): @@ -188,16 +189,16 @@ class RequestConnector(Connector, Thread): if len(data_to_storage) == 3: converter_queue.put(data_to_storage) else: - log.error("Request to URL: %s finished with code: %i", url, response.status_code) + logger.error("Request to URL: %s finished with code: %i", url, response.status_code) except Timeout: - log.error("Timeout error on request %s.", url) + logger.error("Timeout error on request %s.", url) except RequestException as e: - log.error("Cannot connect to %s. Connection error.", url) - log.debug(e) + logger.error("Cannot connect to %s. Connection error.", url) + logger.debug(e) except ConnectionError: - log.error("Cannot connect to %s. Connection error.", url) + logger.error("Cannot connect to %s. Connection error.", url) except Exception as e: - log.exception(e) + logger.exception(e) def __process_data(self): try: diff --git a/thingsboard_gateway/connectors/request/request_converter.py b/thingsboard_gateway/connectors/request/request_converter.py index 6718b9ee..b494b6d0 100644 --- a/thingsboard_gateway/connectors/request/request_converter.py +++ b/thingsboard_gateway/connectors/request/request_converter.py @@ -12,11 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from abc import ABC, abstractmethod -from thingsboard_gateway.connectors.converter import Converter, ABC, abstractmethod, log +from thingsboard_gateway.connectors.converter import Converter, abstractmethod, log -class RequestConverter(ABC): +class RequestConverter(Converter): @abstractmethod def convert(self, config, data): pass diff --git a/thingsboard_gateway/connectors/request/request_uplink_converter.py b/thingsboard_gateway/connectors/request/request_uplink_converter.py index aec32c12..8931c27f 100644 --- a/thingsboard_gateway/connectors/request/request_uplink_converter.py +++ b/thingsboard_gateway/connectors/request/request_uplink_converter.py @@ -12,10 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from thingsboard_gateway.connectors.converter import Converter, ABC, abstractmethod, log +from thingsboard_gateway.connectors.converter import Converter, abstractmethod, log -class RequestUplinkConverter(ABC): +class RequestUplinkConverter(Converter): @abstractmethod def convert(self, config, data): diff --git a/thingsboard_gateway/extensions/mqtt/custom_mqtt_uplink_converter.py b/thingsboard_gateway/extensions/mqtt/custom_mqtt_uplink_converter.py index c50a3eda..ddcf3aad 100644 --- a/thingsboard_gateway/extensions/mqtt/custom_mqtt_uplink_converter.py +++ b/thingsboard_gateway/extensions/mqtt/custom_mqtt_uplink_converter.py @@ -23,9 +23,7 @@ class CustomMqttUplinkConverter(MqttUplinkConverter): def convert(self, topic, body): try: - ''' getting device name from topic, next line will get all data after last '/' symbol - in this case: if topic = 'devices/temperature/sensor1' device name will be 'sensor1'.''' - self.dict_result["deviceName"] = topic.split("/")[-1] + self.dict_result["deviceName"] = topic.split("/")[-1] # getting all data after last '/' symbol in this case: if topic = 'devices/temperature/sensor1' device name will be 'sensor1'. self.dict_result["deviceType"] = "Thermostat" # just hardcode this self.dict_result["telemetry"] = [] # template for telemetry array bytes_to_read = body.replace("0x", "") # Replacing the 0x (if '0x' in body), needs for converting to bytearray @@ -33,7 +31,7 @@ class CustomMqttUplinkConverter(MqttUplinkConverter): if self.__config.get("extension-config") is not None: for telemetry_key in self.__config["extension-config"]: # Processing every telemetry key in config for extension value = 0 - for current_byte_position in range(self.__config["extension-config"][telemetry_key]): # reading every value with value length from config + for _ in range(self.__config["extension-config"][telemetry_key]): # reading every value with value length from config value = value*256 + converted_bytes.pop(0) # process and remove byte from processing telemetry_to_send = {telemetry_key.replace("Bytes", ""): value} # creating telemetry data for sending into Thingsboard self.dict_result["telemetry"].append(telemetry_to_send) # adding data to telemetry array @@ -43,4 +41,4 @@ class CustomMqttUplinkConverter(MqttUplinkConverter): except Exception as e: log.exception('Error in converter, for config: \n%s\n and message: \n%s\n', dumps(self.__config), body) - log.error(e) + log.exception(e) diff --git a/thingsboard_gateway/extensions/request/custom_request_uplink_converter.py b/thingsboard_gateway/extensions/request/custom_request_uplink_converter.py index 287815fa..dd041915 100644 --- a/thingsboard_gateway/extensions/request/custom_request_uplink_converter.py +++ b/thingsboard_gateway/extensions/request/custom_request_uplink_converter.py @@ -23,7 +23,7 @@ class CustomRequestUplinkConverter(RequestConverter): self.__config = config.get('converter') self.dict_result = {} - def convert(self, topic, body): + def convert(self, _, body): try: data = body["data"]["value"] self.dict_result["deviceName"] = TBUtility.get_value(self.__config.get("deviceNameJsonExpression"), body, expression_instead_none=True) diff --git a/thingsboard_gateway/extensions/serial/custom_serial_connector.py b/thingsboard_gateway/extensions/serial/custom_serial_connector.py index 5c051c69..5c9c16a3 100644 --- a/thingsboard_gateway/extensions/serial/custom_serial_connector.py +++ b/thingsboard_gateway/extensions/serial/custom_serial_connector.py @@ -52,7 +52,7 @@ class CustomSerialConnector(Thread, Connector): # Define a connector class, i or not self.devices[device]["serial"].isOpen(): # Connect only if serial not available earlier or it is closed. self.devices[device]["serial"] = None while self.devices[device]["serial"] is None or not self.devices[device]["serial"].isOpen(): # Try connect - '''connection to serial port with parameters from configuration file or default''' + # connection to serial port with parameters from configuration file or default self.devices[device]["serial"] = serial.Serial( port=self.__config.get('port', '/dev/ttyUSB0'), baudrate=self.__config.get('baudrate', 9600), @@ -112,21 +112,21 @@ class CustomSerialConnector(Thread, Connector): # Define a connector class, i try: while True: for device in self.devices: - serial = self.devices[device]["serial"] + device_serial_port = self.devices[device]["serial"] ch = b'' data_from_device = b'' - while ch != b'\n': + while ch != b'\n': # We will read until receive LF symbol try: try: - ch = serial.read(1) # Reading data from serial + ch = device_serial_port.read(1) # Read one symbol per time except AttributeError as e: - if serial is None: + if device_serial_port is None: self.__connect_to_devices() # if port not found - try to connect to it raise e data_from_device = data_from_device + ch except Exception as e: log.exception(e) - continue + break try: converted_data = self.devices[device]['converter'].convert(self.devices[device]['device_config'], data_from_device) self.__gateway.send_to_storage(self.get_name(), converted_data) diff --git a/thingsboard_gateway/extensions/serial/custom_serial_converter.py b/thingsboard_gateway/extensions/serial/custom_serial_converter.py index 0f3c4969..a0126e35 100644 --- a/thingsboard_gateway/extensions/serial/custom_serial_converter.py +++ b/thingsboard_gateway/extensions/serial/custom_serial_converter.py @@ -46,5 +46,6 @@ class CustomSerialUplinkConverter(Converter): data_to_convert = data_to_convert[from_byte:] converted_data = {config_object['key']: data_to_convert.decode('UTF-8')} self.result_dict[key].append(converted_data) + log.debug("Converted data: %s", self.result_dict) return self.result_dict diff --git a/thingsboard_gateway/gateway/tb_gateway_remote_configurator.py b/thingsboard_gateway/gateway/tb_gateway_remote_configurator.py index 7c1d9601..6a632683 100644 --- a/thingsboard_gateway/gateway/tb_gateway_remote_configurator.py +++ b/thingsboard_gateway/gateway/tb_gateway_remote_configurator.py @@ -25,7 +25,7 @@ from thingsboard_gateway.gateway.tb_client import TBClient from thingsboard_gateway.gateway.tb_logger import TBLoggerHandler from thingsboard_gateway.tb_utility.tb_utility import TBUtility -log = getLogger("service") +LOG = getLogger("service") class RemoteConfigurator: @@ -50,7 +50,7 @@ class RemoteConfigurator: if not self.in_process: self.in_process = True # while not self.__gateway._published_events.empty(): - # log.debug("Waiting for end of the data processing...") + # LOG.debug("Waiting for end of the data processing...") # sleep(1) decoded_configuration = b64decode(configuration) self.__new_configuration = loads(decoded_configuration) @@ -58,7 +58,7 @@ class RemoteConfigurator: self.__new_general_configuration_file = self.__new_configuration.get("thingsboard") self.__new_logs_configuration = b64decode(self.__new_general_configuration_file.pop("logs")).decode('UTF-8').replace('}}', '\n') if self.__old_configuration != decoded_configuration: - log.info("Remote configuration received: \n %s", decoded_configuration) + LOG.info("Remote configuration received: \n %s", decoded_configuration) result = self.__process_connectors_configuration() self.in_process = False if result: @@ -67,13 +67,13 @@ class RemoteConfigurator: else: return False else: - log.info("Remote configuration is the same.") + LOG.info("Remote configuration is the same.") else: - log.error("Remote configuration is already in processing") + LOG.error("Remote configuration is already in processing") return False except Exception as e: self.in_process = False - log.exception(e) + LOG.exception(e) def send_current_configuration(self): try: @@ -91,18 +91,18 @@ class RemoteConfigurator: self.__old_configuration = encoded_current_configuration self.__gateway.tb_client.client.send_attributes( {"current_configuration": encoded_current_configuration.decode("UTF-8")}) - log.debug('Current configuration has been sent to ThingsBoard: %s', json_current_configuration) + LOG.debug('Current configuration has been sent to ThingsBoard: %s', json_current_configuration) except Exception as e: - log.exception(e) + LOG.exception(e) def __process_connectors_configuration(self): - log.info("Processing remote connectors configuration...") + LOG.info("Processing remote connectors configuration...") if self.__apply_new_connectors_configuration(): self.__write_new_configuration_files() self.__apply_storage_configuration() if self.__safe_apply_connection_configuration(): - log.info("Remote configuration has been applied.") - with open(self.__gateway._config_dir + "tb_gateway.yaml", "w") as general_configuration_file: + LOG.info("Remote configuration has been applied.") + with open(self.__gateway.get_config_path() + "tb_gateway.yaml", "w") as general_configuration_file: safe_dump(self.__new_general_configuration_file, general_configuration_file) self.__old_connectors_configs = {} self.__new_connectors_configs = {} @@ -115,9 +115,9 @@ class RemoteConfigurator: else: self.__update_logs_configuration() self.__old_general_configuration_file.pop("logs") - with open(self.__gateway._config_dir + "tb_gateway.yaml", "w") as general_configuration_file: + with open(self.__gateway.get_config_path() + "tb_gateway.yaml", "w") as general_configuration_file: safe_dump(self.__old_general_configuration_file, general_configuration_file) - log.error("A remote general configuration applying has been failed.") + LOG.error("A remote general configuration applying has been failed.") self.__old_connectors_configs = {} self.__new_connectors_configs = {} self.__new_logs_configuration = None @@ -137,7 +137,7 @@ class RemoteConfigurator: connector_class = TBUtility.check_and_import(connector["type"], self.__gateway._default_connectors.get(connector["type"], connector.get("class"))) self.__gateway._implemented_connectors[connector["type"]] = connector_class except Exception as e: - log.exception(e) + LOG.exception(e) def __apply_new_connectors_configuration(self): try: @@ -146,9 +146,9 @@ class RemoteConfigurator: try: self.__gateway.available_connectors[connector_name].close() except Exception as e: - log.exception(e) + LOG.exception(e) self.__gateway._connect_with_connectors() - log.debug("New connectors configuration has been applied") + LOG.debug("New connectors configuration has been applied") self.__old_connectors_configs = {} return True except Exception as e: @@ -157,7 +157,7 @@ class RemoteConfigurator: self.__gateway.available_connectors[connector_name].close() self.__gateway._load_connectors(self.__old_general_configuration_file) self.__gateway._connect_with_connectors() - log.exception(e) + LOG.exception(e) return False def __write_new_configuration_files(self): @@ -168,10 +168,10 @@ class RemoteConfigurator: for connector_config_section in self.__new_connectors_configs[connector_type]: for connector_file in connector_config_section["config"]: connector_config = connector_config_section["config"][connector_file] - with open(self.__gateway._config_dir + connector_file, "w") as config_file: + with open(self.__gateway.get_config_path() + connector_file, "w") as config_file: dump(connector_config, config_file, sort_keys=True, indent=2) new_connectors_files.append(connector_file) - log.debug("Saving new configuration for \"%s\" connector to file \"%s\"", connector_type, + LOG.debug("Saving new configuration for \"%s\" connector to file \"%s\"", connector_type, connector_file) break self.__old_general_configuration_file["connectors"] = self.__new_general_configuration_file["connectors"] @@ -179,11 +179,11 @@ class RemoteConfigurator: for old_connector_config_section in self.__old_connectors_configs[old_connector_type]: for old_connector_file in old_connector_config_section["config"]: if old_connector_file not in new_connectors_files: - remove(self.__gateway._config_dir + old_connector_file) - log.debug("Remove old configuration file \"%s\" for \"%s\" connector ", old_connector_file, + remove(self.__gateway.get_config_path() + old_connector_file) + LOG.debug("Remove old configuration file \"%s\" for \"%s\" connector ", old_connector_file, old_connector_type) except Exception as e: - log.exception(e) + LOG.exception(e) def __safe_apply_connection_configuration(self): apply_start = time() * 1000 @@ -200,14 +200,14 @@ class RemoteConfigurator: sleep(.1) if not connection_state: self.__revert_configuration() - log.info("The gateway cannot connect to the ThingsBoard server with a new configuration.") + LOG.info("The gateway cannot connect to the ThingsBoard server with a new configuration.") return False else: self.__old_tb_client.stop() self.__gateway.subscribe_to_required_topics() return True except Exception as e: - log.exception(e) + LOG.exception(e) self.__revert_configuration() return False @@ -215,56 +215,50 @@ class RemoteConfigurator: if self.__old_general_configuration_file["storage"] != self.__new_general_configuration_file["storage"]: self.__old_event_storage = self.__gateway._event_storage try: - self.__gateway._event_storage = self.__gateway._event_storage_types[ - self.__new_general_configuration_file["storage"]["type"]]( - self.__new_general_configuration_file["storage"]) + storage_class = self.__gateway._event_storage_types[self.__new_general_configuration_file["storage"]["type"]] + self.__gateway._event_storage = storage_class(self.__new_general_configuration_file["storage"]) self.__old_event_storage = None except Exception as e: - log.exception(e) + LOG.exception(e) self.__gateway._event_storage = self.__old_event_storage def __revert_configuration(self): try: - log.info("Remote general configuration will be restored.") + LOG.info("Remote general configuration will be restored.") self.__new_general_configuration_file = self.__old_general_configuration_file self.__gateway.tb_client.disconnect() self.__gateway.tb_client.stop() self.__gateway.tb_client = TBClient(self.__old_general_configuration_file["thingsboard"]) self.__gateway.tb_client.connect() self.__gateway.subscribe_to_required_topics() - log.debug("%s connection has been restored", str(self.__gateway.tb_client.client._client)) + LOG.debug("%s connection has been restored", str(self.__gateway.tb_client.client._client)) except Exception as e: - log.exception("Exception on reverting configuration occurred:") - log.exception(e) + LOG.exception("Exception on reverting configuration occurred:") + LOG.exception(e) def __get_current_logs_configuration(self): try: - with open(self.__gateway._config_dir + 'logs.conf', 'r') as logs: + with open(self.__gateway.get_config_path() + 'logs.conf', 'r') as logs: current_logs_configuration = logs.read() return current_logs_configuration except Exception as e: - log.exception(e) + LOG.exception(e) def __update_logs_configuration(self): + global LOG try: - global log - log = getLogger('service') - remote_handler_current_state = self.__gateway.remote_handler.activated - remote_handler_current_level = self.__gateway.remote_handler.current_log_level - logs_conf_file_path = self.__gateway._config_dir + 'logs.conf' + LOG = getLogger('service') + logs_conf_file_path = self.__gateway.get_config_path() + 'logs.conf' new_logging_level = findall(r'level=(.*)', self.__new_logs_configuration.replace("NONE", "NOTSET"))[-1] with open(logs_conf_file_path, 'w') as logs: logs.write(self.__new_logs_configuration.replace("NONE", "NOTSET")+"\r\n") - # fileConfig(logs_conf_file_path) - # self.__gateway.main_handler = MemoryHandler(-1) self.__gateway.main_handler.setLevel(new_logging_level) - # self.__gateway.remote_handler = TBLoggerHandler(self.__gateway) self.__gateway.main_handler.setTarget(self.__gateway.remote_handler) if new_logging_level == "NOTSET": self.__gateway.remote_handler.deactivate() else: self.__gateway.remote_handler.activate(new_logging_level) - log.debug("Logs configuration has been updated.") + LOG.debug("Logs configuration has been updated.") except Exception as e: - log.exception(e) + LOG.exception(e) diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index e63f0e4a..ba6aff52 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -251,7 +251,7 @@ class TBGatewayService: if not connector_name == self.name: if not TBUtility.validate_converted_data(data): log.error("Data from %s connector is invalid.", connector_name) - return + return None if data["deviceName"] not in self.get_devices(): self.add_device(data["deviceName"], {"connector": self.available_connectors[connector_name]}, wait_for_publish=True, device_type=data["deviceType"]) diff --git a/thingsboard_gateway/storage/event_storage_reader.py b/thingsboard_gateway/storage/event_storage_reader.py index 0c425d9b..4528d5a6 100644 --- a/thingsboard_gateway/storage/event_storage_reader.py +++ b/thingsboard_gateway/storage/event_storage_reader.py @@ -86,7 +86,7 @@ class EventStorageReader: # No more records to read for now continue except IOError as e: - log.warning("[{}] Failed to read file!".format(self.new_pos.get_file(), e)) + log.warning("[%s] Failed to read file! Error: %s", self.new_pos.get_file(), e) break except Exception as e: log.exception(e) @@ -130,7 +130,7 @@ class EventStorageReader: return self.buffered_reader except IOError as e: - log.error("Failed to initialize buffered reader!", e) + log.error("Failed to initialize buffered reader! Error: %s", e) raise RuntimeError("Failed to initialize buffered reader!", e) def read_state_file(self): @@ -138,13 +138,13 @@ class EventStorageReader: state_data_node = {} try: with BufferedReader(FileIO(self.settings.get_data_folder_path() + - self.files.get_state_file(), 'r')) as br: - state_data_node = load(br) + self.files.get_state_file(), 'r')) as buffered_reader: + state_data_node = load(buffered_reader) except JSONDecodeError: log.error("Failed to decode JSON from state file") state_data_node = 0 except IOError as e: - log.warning("Failed to fetch info from state file!", e) + log.warning("Failed to fetch info from state file! Error: %s", e) reader_file = None reader_pos = 0 if state_data_node: @@ -169,7 +169,7 @@ class EventStorageReader: with open(self.settings.get_data_folder_path() + self.files.get_state_file(), 'w') as outfile: outfile.write(dumps(state_file_node)) except IOError as e: - log.warning("Failed to update state file!", e) + log.warning("Failed to update state file! Error: %s", e) except Exception as e: log.exception(e) diff --git a/thingsboard_gateway/storage/event_storage_writer.py b/thingsboard_gateway/storage/event_storage_writer.py index 06688110..1c04ff5c 100644 --- a/thingsboard_gateway/storage/event_storage_writer.py +++ b/thingsboard_gateway/storage/event_storage_writer.py @@ -71,7 +71,7 @@ class EventStorageWriter: else: return self.buffered_writer except IOError as e: - log.error("Failed to initialize buffered writer!", e) + log.error("Failed to initialize buffered writer! Error: %s", e) raise RuntimeError("Failed to initialize buffered writer!", e) def create_datafile(self): @@ -87,7 +87,7 @@ class EventStorageWriter: os_open(file_path, O_CREAT | O_EXCL) return full_file_name except IOError as e: - log.error("Failed to create a new file!", e) + log.error("Failed to create a new file! Error: %s", e) def get_number_of_records_in_file(self, file): if self.current_file_records_count[0] <= 0: diff --git a/thingsboard_gateway/storage/file_event_storage.py b/thingsboard_gateway/storage/file_event_storage.py index 85e3baea..c4f1274e 100644 --- a/thingsboard_gateway/storage/file_event_storage.py +++ b/thingsboard_gateway/storage/file_event_storage.py @@ -12,16 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os +import time +import json from thingsboard_gateway.storage.event_storage import EventStorage, log from thingsboard_gateway.storage.event_storage_files import EventStorageFiles from thingsboard_gateway.storage.event_storage_writer import EventStorageWriter from thingsboard_gateway.storage.event_storage_reader import EventStorageReader from thingsboard_gateway.storage.file_event_storage_settings import FileEventStorageSettings -from random import choice -from string import ascii_lowercase -import os -import time -import json class FileEventStorage(EventStorage): @@ -54,7 +52,7 @@ class FileEventStorage(EventStorage): try: os.makedirs(path) except OSError as e: - log.error('Failed to create data folder!', e) + log.error('Failed to create data folder! Error: %s', e) def init_data_files(self): data_files = [] @@ -86,5 +84,4 @@ class FileEventStorage(EventStorage): file.close() return prefix + filename + '.txt' except IOError as e: - log.error("Failed to create a new file!", e) - pass + log.error("Failed to create a new file! Error: %s", e) diff --git a/thingsboard_gateway/storage/memory_event_storage.py b/thingsboard_gateway/storage/memory_event_storage.py index 865dd177..897b6b7b 100644 --- a/thingsboard_gateway/storage/memory_event_storage.py +++ b/thingsboard_gateway/storage/memory_event_storage.py @@ -12,9 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from thingsboard_gateway.storage.event_storage import EventStorage, log import queue -from logging import getLogger +from thingsboard_gateway.storage.event_storage import EventStorage, log class MemoryEventStorage(EventStorage): @@ -23,6 +22,7 @@ class MemoryEventStorage(EventStorage): self.__events_per_time = config.get("read_records_count", 1000) self.__events_queue = queue.Queue(self.__queue_len) self.__event_pack = [] + log.debug("Memory storage created with following configuration: \nMax size: %i\n Read records per time: %i", self.__queue_len, self.__events_per_time) def put(self, event): if not self.__events_queue.full(): diff --git a/thingsboard_gateway/tb_client/tb_device_mqtt.py b/thingsboard_gateway/tb_client/tb_device_mqtt.py index 8caab8a3..be5391f4 100644 --- a/thingsboard_gateway/tb_client/tb_device_mqtt.py +++ b/thingsboard_gateway/tb_client/tb_device_mqtt.py @@ -16,15 +16,18 @@ import logging import queue import ssl import time -from simplejson import loads, dumps from threading import RLock from threading import Thread -from thingsboard_gateway.tb_utility.tb_utility import TBUtility import paho.mqtt.client as paho + +from simplejson import dumps from jsonschema import Draft7Validator from jsonschema import ValidationError +from thingsboard_gateway.tb_utility.tb_utility import TBUtility + + KV_SCHEMA = { "type": "object", "patternProperties": @@ -190,8 +193,7 @@ class TBDeviceMqttClient: self._client.subscribe(RPC_RESPONSE_TOPIC + '+', qos=1) else: if rc in result_codes: - log.error("connection FAIL with error {rc} {explanation}".format(rc=rc, - explanation=result_codes[rc])) + log.error("connection FAIL with error %s %s", rc, result_codes[rc]) else: log.error("connection FAIL with unknown error") @@ -250,8 +252,8 @@ class TBDeviceMqttClient: with self._lock: # callbacks for everything if self.__device_sub_dict.get("*"): - for x in self.__device_sub_dict["*"]: - dict_results.append(self.__device_sub_dict["*"][x]) + for subscription_id in self.__device_sub_dict["*"]: + dict_results.append(self.__device_sub_dict["*"][subscription_id]) # specific callback keys = content.keys() keys_list = [] @@ -290,9 +292,9 @@ class TBDeviceMqttClient: self._client.reconnect_delay_set(min_delay, max_delay) def send_rpc_reply(self, req_id, resp, quality_of_service=1, wait_for_publish=False): - if quality_of_service != 0 and quality_of_service != 1: + if quality_of_service not in (0, 1): log.error("Quality of service (qos) value must be 0 or 1") - return + return None info = self._client.publish(RPC_RESPONSE_TOPIC + req_id, resp, qos=quality_of_service) if wait_for_publish: info.wait_for_publish() @@ -313,14 +315,13 @@ class TBDeviceMqttClient: def publish_data(self, data, topic, qos): data = dumps(data) - if qos != 0 and qos != 1: + if qos not in (0, 1): log.exception("Quality of service (qos) value must be 0 or 1") raise TBQoSException("Quality of service (qos) value must be 0 or 1") - else: - return TBPublishInfo(self._client.publish(topic, data, qos)) + return TBPublishInfo(self._client.publish(topic, data, qos)) def send_telemetry(self, telemetry, quality_of_service=1): - if type(telemetry) is not list: + if not isinstance(telemetry, list): telemetry = [telemetry] self.validate(DEVICE_TS_OR_KV_VALIDATOR, telemetry) return self.publish_data(telemetry, TELEMETRY_TOPIC, quality_of_service) @@ -331,14 +332,13 @@ class TBDeviceMqttClient: def unsubscribe_from_attribute(self, subscription_id): with self._lock: - for x in self.__device_sub_dict: - if self.__device_sub_dict[x].get(subscription_id): - del self.__device_sub_dict[x][subscription_id] - log.debug("Unsubscribed from {attribute}, subscription id {sub_id}".format(attribute=x, - sub_id=subscription_id)) + for attribute in self.__device_sub_dict: + if self.__device_sub_dict[attribute].get(subscription_id): + del self.__device_sub_dict[attribute][subscription_id] + log.debug("Unsubscribed from %s, subscription id %i", attribute, subscription_id) if subscription_id == '*': self.__device_sub_dict = {} - self.__device_sub_dict = dict((k, v) for k, v in self.__device_sub_dict.items() if v is not {}) + self.__device_sub_dict = dict((k, v) for k, v in self.__device_sub_dict.items() if v) def subscribe_to_all_attributes(self, callback): return self.subscribe_to_attribute("*", callback) @@ -350,7 +350,7 @@ class TBDeviceMqttClient: self.__device_sub_dict.update({key: {self.__device_max_sub_id: callback}}) else: self.__device_sub_dict[key].update({self.__device_max_sub_id: callback}) - log.debug("Subscribed to {key} with id {id}".format(key=key, id=self.__device_max_sub_id)) + log.debug("Subscribed to %s with id %i", key, self.__device_max_sub_id) return self.__device_max_sub_id def request_attributes(self, client_keys=None, shared_keys=None, callback=None): @@ -398,8 +398,7 @@ class TBDeviceMqttClient: current_ts_in_millis = int(round(time.time() * 1000)) if current_ts_in_millis > item["ts"]: break - else: - time.sleep(0.001) + time.sleep(0.001) with self._lock: callback = None if item.get("attribute_request_id"): diff --git a/thingsboard_gateway/tb_client/tb_gateway_mqtt.py b/thingsboard_gateway/tb_client/tb_gateway_mqtt.py index ff846bc9..77028d13 100644 --- a/thingsboard_gateway/tb_client/tb_gateway_mqtt.py +++ b/thingsboard_gateway/tb_client/tb_gateway_mqtt.py @@ -136,19 +136,17 @@ class TBGatewayMqttClient(TBDeviceMqttClient): return self.publish_data({device: attributes}, GATEWAY_MAIN_TOPIC + "attributes", quality_of_service) def gw_send_telemetry(self, device, telemetry, quality_of_service=1): - if type(telemetry) is not list: + if isinstance(telemetry, list): telemetry = [telemetry] # self.validate(DEVICE_TS_KV_VALIDATOR, telemetry) return self.publish_data({device: telemetry}, GATEWAY_MAIN_TOPIC + "telemetry", quality_of_service, ) - #TODO ADD "type" to connection request - def gw_connect_device(self, device_name, device_type): info = self._client.publish(topic=GATEWAY_MAIN_TOPIC + "connect", payload=dumps({"device": device_name, "type": device_type}), qos=1) self.__connected_devices.add(device_name) # if self.gateway: # self.gateway.on_device_connected(device_name, self.__devices_server_side_rpc_request_handler) - log.debug("Connected device {name}".format(name=device_name)) + log.debug("Connected device %s", device_name) return info def gw_disconnect_device(self, device_name): @@ -157,7 +155,7 @@ class TBGatewayMqttClient(TBDeviceMqttClient): self.__connected_devices.remove(device_name) # if self.gateway: # self.gateway.on_device_disconnected(self, device_name) - log.debug("Disconnected device {name}".format(name=device_name)) + log.debug("Disconnected device %s", device_name) return info def gw_subscribe_to_all_attributes(self, callback): @@ -168,7 +166,7 @@ class TBGatewayMqttClient(TBDeviceMqttClient): def gw_subscribe_to_attribute(self, device, attribute, callback): if device not in self.__connected_devices: - log.error("Device {name} not connected".format(name=device)) + log.error("Device %s not connected", device) return False with self._lock: self.__max_sub_id += 1 @@ -177,16 +175,15 @@ class TBGatewayMqttClient(TBDeviceMqttClient): self.__sub_dict.update({key: {self.__max_sub_id: callback}}) else: self.__sub_dict[key].update({self.__max_sub_id: callback}) - log.debug("Subscribed to {key} with id {id}".format(key=key, id=self.__max_sub_id)) + log.debug("Subscribed to %s with id %i", key, self.__max_sub_id) return self.__max_sub_id def gw_unsubscribe(self, subscription_id): with self._lock: - for x in self.__sub_dict: - if self.__sub_dict[x].get(subscription_id): - del self.__sub_dict[x][subscription_id] - log.debug("Unsubscribed from {attribute}, subscription id {sub_id}".format(attribute=x, - sub_id=subscription_id)) + for attribute in self.__sub_dict: + if self.__sub_dict[attribute].get(subscription_id): + del self.__sub_dict[attribute][subscription_id] + log.debug("Unsubscribed from %s, subscription id %i", attribute, subscription_id) if subscription_id == '*': self.__sub_dict = {} @@ -194,9 +191,9 @@ class TBGatewayMqttClient(TBDeviceMqttClient): self.devices_server_side_rpc_request_handler = handler def gw_send_rpc_reply(self, device, req_id, resp, quality_of_service=1): - if quality_of_service != 0 and quality_of_service != 1: + if quality_of_service not in (0, 1): log.error("Quality of service (qos) value must be 0 or 1") - return + return None info = self._client.publish(GATEWAY_RPC_TOPIC, dumps({"device": device, "id": req_id, "data": resp}), qos=quality_of_service)