diff --git a/thingsboard_gateway/connectors/modbus/modbus_connector.py b/thingsboard_gateway/connectors/modbus/modbus_connector.py index 21101c04..bbe2fc9f 100644 --- a/thingsboard_gateway/connectors/modbus/modbus_connector.py +++ b/thingsboard_gateway/connectors/modbus/modbus_connector.py @@ -33,7 +33,7 @@ class ModbusConnector(Connector, threading.Thread): 'MessagesSent': 0} super().__init__() self.__gateway = gateway - self.__connector_type = connector_type + self._connector_type = connector_type self.__master = None self.__config = config.get("server") self.__configure_master() @@ -70,11 +70,11 @@ class ModbusConnector(Connector, threading.Thread): try: for device in self.__config["devices"]: if self.__config.get("converter") is not None: - converter = TBUtility.check_and_import(self.__connector_type, self.__config["converter"])(device) + converter = TBUtility.check_and_import(self._connector_type, self.__config["converter"])(device) else: converter = BytesModbusUplinkConverter(device) if self.__config.get("downlink_converter") is not None: - downlink_converter = TBUtility.check_and_import(self.__connector_type, self.__config["downlink_converter"])(device) + downlink_converter = TBUtility.check_and_import(self._connector_type, self.__config["downlink_converter"])(device) else: downlink_converter = BytesModbusDownlinkConverter(device) if device.get('deviceName') not in self.__gateway.get_devices(): diff --git a/thingsboard_gateway/connectors/mqtt/mqtt_connector.py b/thingsboard_gateway/connectors/mqtt/mqtt_connector.py index a35bb676..201b8bb1 100644 --- a/thingsboard_gateway/connectors/mqtt/mqtt_connector.py +++ b/thingsboard_gateway/connectors/mqtt/mqtt_connector.py @@ -30,7 +30,7 @@ class MqttConnector(Connector, Thread): super().__init__() self.__log = log self.config = config - self.__connector_type = connector_type + self._connector_type = connector_type self.statistics = {'MessagesReceived': 0, 'MessagesSent': 0} self.__gateway = gateway @@ -147,7 +147,7 @@ class MqttConnector(Connector, Thread): converter = None if mapping["converter"]["type"] == "custom": try: - module = TBUtility.check_and_import(self.__connector_type, mapping["converter"]["extension"]) + module = TBUtility.check_and_import(self._connector_type, mapping["converter"]["extension"]) if module is not None: self.__log.debug('Custom converter for topic %s - found!', mapping["topicFilter"]) converter = module(mapping) @@ -246,7 +246,7 @@ class MqttConnector(Connector, Thread): if message.topic in request.get("topicFilter") or\ (request.get("deviceNameTopicExpression") is not None and search(request.get("deviceNameTopicExpression"), message.topic)): founded_device_name = None - founded_device_type + founded_device_type = 'default' if request.get("deviceNameJsonExpression"): founded_device_name = TBUtility.get_value(request["deviceNameJsonExpression"], content) if request.get("deviceNameTopicExpression"): diff --git a/thingsboard_gateway/gateway/tb_gateway_remote_configurator.py b/thingsboard_gateway/gateway/tb_gateway_remote_configurator.py index fb7928df..84196ce9 100644 --- a/thingsboard_gateway/gateway/tb_gateway_remote_configurator.py +++ b/thingsboard_gateway/gateway/tb_gateway_remote_configurator.py @@ -22,6 +22,7 @@ from logging.handlers import MemoryHandler from os import remove from thingsboard_gateway.gateway.tb_client import TBClient from thingsboard_gateway.gateway.tb_logger import TBLoggerHandler +from thingsboard_gateway.tb_utility.tb_utility import TBUtility log = getLogger("service") @@ -129,6 +130,8 @@ class RemoteConfigurator: self.__gateway.connectors_configs[connector['type']] = [] self.__gateway.connectors_configs[connector['type']].append( {"name": connector["name"], "config": {connector['configuration']: input_connector["config"]}}) + connector_class = TBUtility.check_and_import(connector["type"], self.__gateway._default_connectors.get(connector["type"], connector.get("class"))) + self.__gateway._implemented_connectors[connector["type"]] = connector_class except Exception as e: log.exception(e) @@ -148,6 +151,7 @@ class RemoteConfigurator: self.__gateway.connectors_configs = self.__old_connectors_configs for connector_name in self.__gateway.available_connectors: self.__gateway.available_connectors[connector_name].close() + self.__gateway._load_connectors(self.__old_general_configuration_file) self.__gateway._connect_with_connectors() log.exception(e) return False diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index f69bc275..b0f433c6 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -20,7 +20,7 @@ from yaml import safe_dump, safe_load from simplejson import load, loads, dumps from os import listdir, path from sys import getsizeof -from threading import Thread +from threading import Thread, RLock from random import choice from string import ascii_lowercase from queue import Queue @@ -37,6 +37,7 @@ main_handler = logging.handlers.MemoryHandler(-1) class TBGatewayService: def __init__(self, config_file=None): + self.__lock = RLock() if config_file is None: config_file = path.dirname(path.dirname(path.abspath(__file__))) + '/config/tb_gateway.yaml'.replace('/', path.sep) with open(config_file) as general_config: @@ -87,9 +88,9 @@ class TBGatewayService: self.__remote_configurator.send_current_configuration() self.__load_persistent_devices() self.__published_events = Queue(0) - self.__send_thread = Thread(target=self.__read_data_from_storage, daemon=True, - name="Send data to Thingsboard Thread") - self.__send_thread.start() + self._send_thread = Thread(target=self.__read_data_from_storage, daemon=True, + name="Send data to Thingsboard Thread") + self._send_thread.start() try: gateway_statistic_send = 0 @@ -118,7 +119,8 @@ class TBGatewayService: if cur_time - gateway_statistic_send > 5000.0 and self.tb_client.is_connected(): summary_messages = self.__form_statistics() - self.tb_client.client.send_telemetry(summary_messages) + with self.__lock: + self.tb_client.client.send_telemetry(summary_messages) gateway_statistic_send = time.time()*1000 # self.__check_shared_attributes() except KeyboardInterrupt: @@ -154,10 +156,10 @@ class TBGatewayService: if new_configuration is not None and self.__remote_configurator is not None: try: confirmed = self.__remote_configurator.process_configuration(new_configuration) - if confirmed: - self.__send_thread = Thread(target=self.__read_data_from_storage, daemon=True, - name="Send data to Thingsboard Thread") - self.__send_thread.start() + # if confirmed: + # self._send_thread = Thread(target=self.__read_data_from_storage, daemon=True, + # name="Send data to Thingsboard Thread") + # self._send_thread.start() self.__remote_configurator.send_current_configuration() except Exception as e: log.exception(e) @@ -267,7 +269,8 @@ class TBGatewayService: try: if self.tb_client.is_connected(): size = getsizeof(devices_data_in_event_pack) - events = self._event_storage.get_event_pack() + with self.__lock: + events = self._event_storage.get_event_pack() if events: for event in events: try: @@ -330,24 +333,25 @@ class TBGatewayService: def __send_data(self, devices_data_in_event_pack): try: - for device in devices_data_in_event_pack: - if devices_data_in_event_pack[device].get("attributes"): - if device == self.name: - self.__published_events.put(self.tb_client.client.send_attributes(devices_data_in_event_pack[device]["attributes"])) - else: - self.__published_events.put(self.tb_client.client.gw_send_attributes(device, - devices_data_in_event_pack[ - device][ - "attributes"])) - if devices_data_in_event_pack[device].get("telemetry"): - if device == self.name: - self.__published_events.put(self.tb_client.client.send_telemetry(devices_data_in_event_pack[device]["telemetry"])) - else: - self.__published_events.put(self.tb_client.client.gw_send_telemetry(device, - devices_data_in_event_pack[ - device][ - "telemetry"])) - devices_data_in_event_pack[device] = {"telemetry": [], "attributes": {}} + with self.__lock: + for device in devices_data_in_event_pack: + if devices_data_in_event_pack[device].get("attributes"): + if device == self.name: + self.__published_events.put(self.tb_client.client.send_attributes(devices_data_in_event_pack[device]["attributes"])) + else: + self.__published_events.put(self.tb_client.client.gw_send_attributes(device, + devices_data_in_event_pack[ + device][ + "attributes"])) + if devices_data_in_event_pack[device].get("telemetry"): + if device == self.name: + self.__published_events.put(self.tb_client.client.send_telemetry(devices_data_in_event_pack[device]["telemetry"])) + else: + self.__published_events.put(self.tb_client.client.gw_send_telemetry(device, + devices_data_in_event_pack[ + device][ + "telemetry"])) + devices_data_in_event_pack[device] = {"telemetry": [], "attributes": {}} except Exception as e: log.exception(e) @@ -373,8 +377,9 @@ class TBGatewayService: if self.connectors_configs.get(module): log.debug("Connector \"%s\" for RPC request \"%s\" found", module, content["method"]) for connector_name in self.available_connectors: - log.debug("Sending command RPC %s to connector %s", content["method"], connector_name) - result = self.available_connectors[connector_name].server_side_rpc_handler(content) + if self.available_connectors[connector_name]._connector_type == module: + log.debug("Sending command RPC %s to connector %s", content["method"], connector_name) + result = self.available_connectors[connector_name].server_side_rpc_handler(content) else: log.error("Connector \"%s\" not found", module) result = {"error": "%s - connector not found in available connectors." % module, "code": 404} @@ -402,11 +407,11 @@ class TBGatewayService: if success_sent: rpc_response["success"] = True if device is not None and success_sent is not None: - self.tb_client.client.gw_send_rpc_reply(device, req_id, rpc_response) + self.tb_client.client.gw_send_rpc_reply(device, req_id, dumps(rpc_response)) elif device is not None and req_id is not None and content is not None: self.tb_client.client.gw_send_rpc_reply(device, req_id, content) elif device is None and success_sent is not None: - self.tb_client.client.send_rpc_reply(req_id, rpc_response, quality_of_service=1) + self.tb_client.client.send_rpc_reply(req_id, dumps(rpc_response), quality_of_service=1) elif device is None and content is not None: self.tb_client.client.send_rpc_reply(req_id, content, quality_of_service=1) except Exception as e: @@ -441,7 +446,6 @@ class TBGatewayService: self.available_connectors[connector].statistics['MessagesReceived'] telemetry[(connector_camel_case + ' EventsSent').replace(' ', '')] = \ self.available_connectors[connector].statistics['MessagesSent'] - self.tb_client.client.send_telemetry(telemetry) summary_messages['eventsProduced'] += telemetry[ str(connector_camel_case + ' EventsProduced').replace(' ', '')] summary_messages['eventsSent'] += telemetry[ diff --git a/thingsboard_gateway/tb_client/tb_gateway_mqtt.py b/thingsboard_gateway/tb_client/tb_gateway_mqtt.py index be78992d..3518884d 100644 --- a/thingsboard_gateway/tb_client/tb_gateway_mqtt.py +++ b/thingsboard_gateway/tb_client/tb_gateway_mqtt.py @@ -24,6 +24,7 @@ GATEWAY_ATTRIBUTES_REQUEST_TOPIC = "v1/gateway/attributes/request" GATEWAY_ATTRIBUTES_RESPONSE_TOPIC = "v1/gateway/attributes/response" GATEWAY_MAIN_TOPIC = "v1/gateway/" GATEWAY_RPC_TOPIC = "v1/gateway/rpc" +GATEWAY_RPC_RESPONSE_TOPIC = "v1/gateway/rpc/response" log = logging.getLogger("tb_connection") log.setLevel(logging.DEBUG) @@ -43,6 +44,7 @@ class TBGatewayMqttClient(TBDeviceMqttClient): self._client.on_connect = self._on_connect self._client.on_message = self._on_message self._client.on_subscribe = self._on_subscribe + self._client._on_unsubscribe = self._on_unsubscribe self._gw_subscriptions = {} self.gateway = gateway @@ -52,6 +54,7 @@ class TBGatewayMqttClient(TBDeviceMqttClient): self._gw_subscriptions[int(self._client.subscribe(GATEWAY_ATTRIBUTES_TOPIC, qos=1)[1])] = GATEWAY_ATTRIBUTES_TOPIC self._gw_subscriptions[int(self._client.subscribe(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC + "/+")[1])] = GATEWAY_ATTRIBUTES_RESPONSE_TOPIC self._gw_subscriptions[int(self._client.subscribe(GATEWAY_RPC_TOPIC + "/+")[1])] = GATEWAY_RPC_TOPIC + self._gw_subscriptions[int(self._client.subscribe(GATEWAY_RPC_RESPONSE_TOPIC + "/+")[1])] = GATEWAY_RPC_RESPONSE_TOPIC def _on_subscribe(self, client, userdata, mid, granted_qos): subscription = self._gw_subscriptions.get(mid) @@ -63,6 +66,9 @@ class TBGatewayMqttClient(TBDeviceMqttClient): log.debug("Service subscription to topic %s - successfully completed.", subscription) del(self._gw_subscriptions[mid]) + def _on_unsubscribe(self, *args): + log.debug(args) + def get_subscriptions_in_progress(self): return True if self._gw_subscriptions else False