diff --git a/thingsboard_gateway/gateway/tb_gateway_remote_configurator.py b/thingsboard_gateway/gateway/tb_gateway_remote_configurator.py index b3f63778..56af989b 100644 --- a/thingsboard_gateway/gateway/tb_gateway_remote_configurator.py +++ b/thingsboard_gateway/gateway/tb_gateway_remote_configurator.py @@ -59,12 +59,16 @@ class RemoteConfigurator: self.__update_logs_configuration() if self.__old_configuration != decoded_configuration: log.info("Remote configuration received: \n %s", decoded_configuration) - self.__process_connectors_configuration() - self.__old_configuration = self.__new_configuration - # self.send_current_configuration() - self.in_process = False + result = self.__process_connectors_configuration() + self.in_process = False + if result: + self.__old_configuration = self.__new_configuration + return True + else: + return False else: log.error("Remote configuration is already in processing") + return False except Exception as e: self.in_process = False log.exception(e) @@ -89,7 +93,7 @@ class RemoteConfigurator: log.exception(e) def __process_connectors_configuration(self): - log.debug("Processing remote connectors configuration...") + log.info("Processing remote connectors configuration...") if self.__apply_new_connectors_configuration(): self.__write_new_configuration_files() if self.__safe_apply_connection_configuration(): diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index 49172010..15d3be3f 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -38,10 +38,10 @@ main_handler = logging.handlers.MemoryHandler(-1) class TBGatewayService: def __init__(self, config_file=None): if config_file is None: - config_file = path.dirname(path.dirname(path.abspath(__file__))) + '/config/tb_gateway.yaml' + config_file = path.dirname(path.dirname(path.abspath(__file__))) + '/config/tb_gateway.yaml'.replace('/', path.sep) with open(config_file) as general_config: config = safe_load(general_config) - self._config_dir = path.dirname(path.abspath(config_file)) + '/' + self._config_dir = path.dirname(path.abspath(config_file)) + path.sep logging.config.fileConfig(self._config_dir + "logs.conf") global log log = logging.getLogger('service') @@ -161,10 +161,11 @@ class TBGatewayService: new_configuration = shared_attributes.get("configuration") if shared_attributes is not None and shared_attributes.get("configuration") is not None else content.get("configuration") if new_configuration is not None and self.__remote_configurator is not None: try: - self.__remote_configurator.process_configuration(new_configuration) - self.__send_thread = Thread(target=self.__read_data_from_storage, daemon=True, - name="Send data to Thingsboard Thread") - self.__send_thread.start() + confirmed = self.__remote_configurator.process_configuration(new_configuration) + if confirmed: + self.__send_thread = Thread(target=self.__read_data_from_storage, daemon=True, + name="Send data to Thingsboard Thread") + self.__send_thread.start() self.__remote_configurator.send_current_configuration() except Exception as e: log.exception(e) @@ -195,32 +196,15 @@ class TBGatewayService: raise Exception("Configuration for connectors not found, check your config file.") for connector in main_config['connectors']: try: - # if connector.get('class') is not None: - # try: - # connector_class = TBUtility.check_and_import(connector['type'], connector['class']) - # self._implemented_connectors[connector['type']] = connector_class - # except Exception as e: - # log.error("Exception when loading the custom connector:") - # log.exception(e) - # elif connector.get("type") is not None and connector["type"] in self._default_connectors: - try: - connector_class = TBUtility.check_and_import(connector["type"], self._default_connectors.get(connector["type"], connector.get("class"))) - self._implemented_connectors[connector["type"]] = connector_class - except Exception as e: - log.error("Error on loading connector:") - log.exception(e) - # else: - # log.error("Connector with config %s - not found", safe_dump(connector)) + connector_class = TBUtility.check_and_import(connector["type"], self._default_connectors.get(connector["type"], connector.get("class"))) + self._implemented_connectors[connector["type"]] = connector_class with open(self._config_dir + connector['configuration'], 'r') as conf_file: - try: - connector_conf = load(conf_file) - if not self.connectors_configs.get(connector['type']): - self.connectors_configs[connector['type']] = [] - self.connectors_configs[connector['type']].append({"name": connector["name"], "config": {connector['configuration']: connector_conf}}) - except Exception as e: - log.exception(e) - + connector_conf = load(conf_file) + if not self.connectors_configs.get(connector['type']): + self.connectors_configs[connector['type']] = [] + self.connectors_configs[connector['type']].append({"name": connector["name"], "config": {connector['configuration']: connector_conf}}) except Exception as e: + log.error("Error on loading connector:") log.exception(e) def _connect_with_connectors(self): @@ -229,21 +213,15 @@ class TBGatewayService: for config in connector_config["config"]: try: connector = None - try: - connector = self._implemented_connectors[connector_type](self, connector_config["config"][config], - connector_type) - connector.setName(connector_config["name"]) - self.available_connectors[connector.get_name()] = connector - connector.open() - except Exception as e: - log.exception(e) - if connector is not None: - connector.close() + connector = self._implemented_connectors[connector_type](self, connector_config["config"][config], + connector_type) + connector.setName(connector_config["name"]) + self.available_connectors[connector.get_name()] = connector + connector.open() except Exception as e: log.exception(e) - - def __send_statistic(self): - self.tb_client.client.gw_send_telemetry() + if connector is not None: + connector.close() def send_to_storage(self, connector_name, data): if not connector_name == self.name: @@ -273,12 +251,19 @@ class TBGatewayService: json_data = dumps(data) save_result = self._event_storage.put(json_data) if not save_result: - log.error('Data from device "%s" cannot be saved, connector name is %s.', + log.error('Data from the device "%s" cannot be saved, connector name is %s.', data["deviceName"], connector_name) + def check_size(self, size, devices_data_in_event_pack): + if size >= 48000: + self.__send_data(devices_data_in_event_pack) + size = 0 + return size + def __read_data_from_storage(self): devices_data_in_event_pack = {} + log.debug("Send data Thread has been started successfully.") while True: try: if self.tb_client.is_connected(): @@ -298,41 +283,29 @@ class TBGatewayService: if type(current_event["telemetry"]) == list: for item in current_event["telemetry"]: size += getsizeof(item) - if size >= 48000: - if not self.tb_client.is_connected(): break - self.__send_data(devices_data_in_event_pack) - size = 0 + size = self.check_size(size, devices_data_in_event_pack) devices_data_in_event_pack[current_event["deviceName"]]["telemetry"].append(item) else: if not self.tb_client.is_connected(): break size += getsizeof(current_event["telemetry"]) - if size >= 48000: - self.__send_data(devices_data_in_event_pack) - size = 0 + size = self.check_size(size, devices_data_in_event_pack) devices_data_in_event_pack[current_event["deviceName"]]["telemetry"].append(current_event["telemetry"]) if current_event.get("attributes"): if type(current_event["attributes"]) == list: for item in current_event["attributes"]: if not self.tb_client.is_connected(): break size += getsizeof(item) - if size >= 48000: - self.__send_data(devices_data_in_event_pack) - size = 0 - devices_data_in_event_pack[current_event["deviceName"]][ - "attributes"].update( - item.items()) + size = self.check_size(size, devices_data_in_event_pack) + devices_data_in_event_pack[current_event["deviceName"]]["attributes"].update(item.items()) else: if not self.tb_client.is_connected(): break size += getsizeof(current_event["attributes"].items()) - if size >= 48000: - self.__send_data(devices_data_in_event_pack) - size = 0 + size = self.check_size(size, devices_data_in_event_pack) devices_data_in_event_pack[current_event["deviceName"]]["attributes"].update( current_event["attributes"].items()) if devices_data_in_event_pack: if not self.tb_client.is_connected(): break self.__send_data(devices_data_in_event_pack) - if self.tb_client.is_connected(): success = True while not self.__published_events.empty(): @@ -365,14 +338,14 @@ class TBGatewayService: else: self.__published_events.put(self.tb_client.client.gw_send_attributes(device, devices_data_in_event_pack[ - device][ - "attributes"])) + device][ + "attributes"])) if devices_data_in_event_pack[device].get("telemetry"): if device == self.name: self.__published_events.put(self.tb_client.client.send_telemetry(devices_data_in_event_pack[device]["telemetry"])) else: self.__published_events.put(self.tb_client.client.gw_send_telemetry(device, - devices_data_in_event_pack[ + devices_data_in_event_pack[ device][ "telemetry"])) devices_data_in_event_pack[device] = {"telemetry": [], "attributes": {}} @@ -488,4 +461,4 @@ class TBGatewayService: if __name__ == '__main__': - TBGatewayService(path.dirname(path.dirname(path.abspath(__file__))) + path.sep + 'config' + path.sep +'tb_gateway.yaml') + TBGatewayService(path.dirname(path.dirname(path.abspath(__file__))) + '/config/tb_gateway.yaml'.replace('/', path.sep))