diff --git a/.gitignore b/.gitignore index 93bb8caf..46b6f4df 100644 --- a/.gitignore +++ b/.gitignore @@ -20,3 +20,4 @@ thingsboard_gateway/storage/data/ /data/ /logs/ __pycache__ +/thingsboard_gateway/config/ diff --git a/thingsboard_gateway/connectors/connector.py b/thingsboard_gateway/connectors/connector.py index 4a10ed6e..64fc471b 100644 --- a/thingsboard_gateway/connectors/connector.py +++ b/thingsboard_gateway/connectors/connector.py @@ -15,7 +15,7 @@ import logging from abc import ABC, abstractmethod -log = logging.getLogger("connector") +log = logging.getLogger("tb_gateway.connector") class Connector(ABC): diff --git a/thingsboard_gateway/connectors/converter.py b/thingsboard_gateway/connectors/converter.py index d7a2f597..4aad7a31 100644 --- a/thingsboard_gateway/connectors/converter.py +++ b/thingsboard_gateway/connectors/converter.py @@ -15,7 +15,7 @@ import logging from abc import ABC, abstractmethod -log = logging.getLogger("converter") +log = logging.getLogger("tb_gateway.converter") class Converter(ABC): diff --git a/thingsboard_gateway/connectors/mqtt/mqtt_connector.py b/thingsboard_gateway/connectors/mqtt/mqtt_connector.py index 2bc4ec0d..af6347ed 100644 --- a/thingsboard_gateway/connectors/mqtt/mqtt_connector.py +++ b/thingsboard_gateway/connectors/mqtt/mqtt_connector.py @@ -109,12 +109,12 @@ class MqttConnector(Connector, Thread): time.sleep(1) def close(self): + self.__stopped = True try: self._client.disconnect() - except: - pass + except Exception as e: + log.exception(e) self._client.loop_stop() - self.__stopped = True self.__log.info('%s has been stopped.', self.get_name()) def get_name(self): diff --git a/thingsboard_gateway/gateway/tb_client.py b/thingsboard_gateway/gateway/tb_client.py index 45c36200..13df3394 100644 --- a/thingsboard_gateway/gateway/tb_client.py +++ b/thingsboard_gateway/gateway/tb_client.py @@ -17,7 +17,7 @@ import time from thingsboard_gateway.tb_client.tb_gateway_mqtt import TBGatewayMqttClient import threading -log = logging.getLogger("tb_connection") +log = logging.getLogger("tb_gateway.tb_connection") class TBClient(threading.Thread): @@ -52,7 +52,10 @@ class TBClient(threading.Thread): self.start() def _on_log(self, *args): - log.debug(args) + if "exception" in args[-1]: + log.exception(args[-1]) + else: + log.debug(args) def pause(self): self.__paused = True @@ -67,6 +70,10 @@ class TBClient(threading.Thread): log.debug('TB client %s connected to ThingsBoard', str(client)) self.client._on_connect(client, userdata, flags, rc, *extra_params) + def _on_subscribe(self, client, userdata, flags, rc, *extra_params): + log.debug('TB client %s connected to ThingsBoard', str(client)) + self.client(client, userdata, flags, rc, *extra_params) + def _on_disconnect(self, client, userdata, rc): log.info("TB client %s has been disconnected.", str(client)) self.client._on_disconnect(client, userdata, rc) @@ -104,7 +111,7 @@ class TBClient(threading.Thread): log.exception(e) time.sleep(10) - while True: + while not self.__stopped: try: if not self.__stopped: time.sleep(1) diff --git a/thingsboard_gateway/gateway/tb_gateway_remote_configurator.py b/thingsboard_gateway/gateway/tb_gateway_remote_configurator.py index 59d546f4..c24b9370 100644 --- a/thingsboard_gateway/gateway/tb_gateway_remote_configurator.py +++ b/thingsboard_gateway/gateway/tb_gateway_remote_configurator.py @@ -19,8 +19,9 @@ from time import time, sleep from logging import getLogger from os import remove from thingsboard_gateway.gateway.tb_client import TBClient +from thingsboard_gateway.gateway.tb_logger import TBLoggerHandler -log = getLogger("service") +log = getLogger("tb_gateway.service") class RemoteConfigurator: @@ -56,7 +57,7 @@ class RemoteConfigurator: current_configuration[connector].append(config_file[config]) current_configuration["thingsboard"] = self.__old_general_configuration_file encoded_current_configuration = b64encode(dumps(current_configuration).encode()) - self.__gateway.tb_client.client.send_attributes({"current_configuration": encoded_current_configuration.decode("UTF-8")}).get() + self.__gateway.tb_client.client.send_attributes({"current_configuration": encoded_current_configuration.decode("UTF-8")}) def __process_connectors_configuration(self): log.debug("Processing remote connectors configuration...") @@ -101,13 +102,6 @@ class RemoteConfigurator: def __write_new_configuration_files(self): try: general_edited = True - # general_edited = False - # if self.__new_general_configuration_file and self.__new_general_configuration_file != self.__old_general_configuration_file: - # general_edited = False - # if self.__new_general_configuration_file["thingsboard"] != self.__old_general_configuration_file["thingsboard"]: - # general_edited = True - # if self.__new_general_configuration_file["storage"] != self.__old_general_configuration_file["storage"]: - # general_edited = True self.__new_general_configuration_file = self.__new_general_configuration_file if general_edited else self.__old_general_configuration_file self.__new_connectors_configs = self.__new_connectors_configs if self.__new_connectors_configs else self.__new_connectors_configs self.__new_general_configuration_file["connectors"] = [] @@ -162,7 +156,7 @@ class RemoteConfigurator: log.exception(e) self.__revert_configuration() return False - connection_state = False + # connection_state = False try: tb_client = TBClient(self.__new_general_configuration_file["thingsboard"]) tb_client.connect() diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index 8e3b366f..e3c17719 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -29,7 +29,7 @@ from thingsboard_gateway.storage.memory_event_storage import MemoryEventStorage from thingsboard_gateway.storage.file_event_storage import FileEventStorage from thingsboard_gateway.gateway.tb_gateway_remote_configurator import RemoteConfigurator -log = logging.getLogger('service') +log = logging.getLogger('tb_gateway.service') class TBGatewayService: @@ -41,7 +41,7 @@ class TBGatewayService: self._config_dir = path.dirname(path.abspath(config_file)) + '/' logging.config.fileConfig(self._config_dir + "logs.conf") global log - log = logging.getLogger('service') + log = logging.getLogger('tb_gateway.service') self.available_connectors = {} self.__connector_incoming_messages = {} self.__connected_devices = {} @@ -70,16 +70,26 @@ class TBGatewayService: "file": FileEventStorage, } self._event_storage = self._event_storage_types[config["storage"]["type"]](config["storage"]) - self.__load_connectors(config) + self._connectors_configs = {} + self._load_connectors(config) self._connect_with_connectors() self.__remote_configurator = None if config["thingsboard"].get("remoteConfiguration"): try: self.__remote_configurator = RemoteConfigurator(self, config) - self.__check_shared_attributes() + + def check_attribute_after_connection(gateway:TBGatewayService): + while not gateway.tb_client.is_connected(): + time.sleep(1) + log.debug("Request for shared attribute has been sent.") + info = gateway.tb_client.client.request_attributes(callback=gateway._attributes_parse) + + self.__checking_thread = Thread(target=check_attribute_after_connection, + args=(self,), + name="Check shared attributes on connect", + daemon=True).start() + except Exception as e: - self.__load_connectors(config) - self._connect_with_connectors() log.exception(e) if self.__remote_configurator is not None: self.__remote_configurator.send_current_configuration() @@ -100,7 +110,11 @@ class TBGatewayService: self.cancel_rpc_request(rpc_in_progress) time.sleep(0.1) else: - time.sleep(1) + try: + time.sleep(1) + except Exception as e: + log.exception(e) + break if cur_time - gateway_statistic_send > 60.0 and self.tb_client.is_connected(): summary_messages = {"eventsProduced": 0, "eventsSent": 0} @@ -119,35 +133,51 @@ class TBGatewayService: str(connector_camel_case + ' EventsSent').replace(' ', '')] self.tb_client.client.send_telemetry(summary_messages) gateway_statistic_send = time.time() + except KeyboardInterrupt as e: + log.info("Stopping...") + self.__close_connectors() + log.info("The gateway has been stopped.") + self.tb_client.stop() except Exception as e: log.exception(e) - for device in self.__connected_devices: - log.debug("Close connection for device %s", device) - try: - current_connector = self.__connected_devices[device].get("connector") - if current_connector is not None: - current_connector.close() - log.debug("Connector %s closed connection.", current_connector.get_name()) - except Exception as e: - log.error(e) + self.__close_connectors() + log.info("The gateway has been stopped.") + self.tb_client.stop() - def __attributes_parse(self, content, *args): + def __close_connectors(self): + for current_connector in self.available_connectors: + try: + self.available_connectors[current_connector].close() + log.debug("Connector %s closed connection.", current_connector) + log.debug(current_connector) + except Exception as e: + log.error(e) + + def _attributes_parse(self, content, *args): try: - shared_attributes = content.get("shared") - client_attributes = content.get("client") - if shared_attributes is not None: - if self.__remote_configurator is not None and shared_attributes.get("configuration"): + log.debug("Received data: %s", content) + if content is not None: + shared_attributes = content.get("shared") + client_attributes = content.get("client") + if shared_attributes is not None: + if self.__remote_configurator is not None and shared_attributes.get("configuration"): + try: + self.__remote_configurator.process_configuration(shared_attributes.get("configuration")) + except Exception as e: + log.exception(e) + if client_attributes is not None: + log.debug("Client attributes received (%s).", ", ".join([attr for attr in client_attributes.keys()])) + if self.__remote_configurator is not None and content.get("configuration"): try: - self.__remote_configurator.process_configuration(shared_attributes.get("configuration")) + self.__remote_configurator.process_configuration(content.get("configuration")) except Exception as e: log.exception(e) - elif client_attributes is not None: - log.debug("Client attributes received") - if self.__remote_configurator is not None and content.get("configuration"): - try: - self.__remote_configurator.process_configuration(content.get("configuration")) - except Exception as e: - log.exception(e) + if (shared_attributes is not None and shared_attributes.get('RemoteLoggingLevel') == 'NONE') or content.get("RemoteLoggingLevel") == 'NONE': + self.remote_handler.deactivate() + log.info('Remote logging has being deactivated.') + elif (shared_attributes is not None and shared_attributes.get('RemoteLoggingLevel') is not None) or content.get("RemoteLoggingLevel") is not None: + self.remote_handler.activate(content.get('RemoteLoggingLevel')) + log.info('Remote logging has being activated.') except Exception as e: log.exception(e) @@ -155,9 +185,9 @@ class TBGatewayService: return self._config_dir def __check_shared_attributes(self): - self.tb_client.client.request_attributes(callback=self.__attributes_parse) + self.tb_client.client.request_attributes(callback=self._attributes_parse).wait_for_publish() - def __load_connectors(self, config): + def _load_connectors(self, config): self._connectors_configs = {} if not config.get("connectors"): raise Exception("Configuration for connectors not found, check your config file.") @@ -235,7 +265,7 @@ class TBGatewayService: def __read_data_from_storage(self): devices_data_in_event_pack = {} - while True: + while not True: try: if self.tb_client.is_connected(): size = getsizeof(devices_data_in_event_pack) @@ -303,7 +333,7 @@ class TBGatewayService: del devices_data_in_event_pack devices_data_in_event_pack = {} else: - break + continue else: time.sleep(.01) else: @@ -364,14 +394,7 @@ class TBGatewayService: except Exception as e: log.exception(e) else: - if content.get('RemoteLoggingLevel') == 'NONE': - self.remote_handler.deactivate() - log.info('Remote logging has being deactivated.') - elif content.get('RemoteLoggingLevel') is not None: - self.remote_handler.activate(content.get('RemoteLoggingLevel')) - log.info('Remote logging has being activated.') - else: - self.__attributes_parse(content) + self._attributes_parse(content) def add_device(self, device_name, content, wait_for_publish=False): if device_name not in self.__saved_devices: diff --git a/thingsboard_gateway/gateway/tb_logger.py b/thingsboard_gateway/gateway/tb_logger.py index 31322a35..b963f909 100644 --- a/thingsboard_gateway/gateway/tb_logger.py +++ b/thingsboard_gateway/gateway/tb_logger.py @@ -20,7 +20,7 @@ from time import time class TBLoggerHandler(logging.Handler): def __init__(self, gateway): - super().__init__(logging.ERROR) + super().__init__(logging.DEBUG) self.__gateway = gateway self.activated = False self.log_levels = { @@ -43,7 +43,7 @@ class TBLoggerHandler(logging.Handler): for logger in self.loggers: log = logging.getLogger(logger) log.addHandler(self.__gateway.main_handler) - self.__current_log_level = 'ERROR' + self.__current_log_level = 'DEBUG' def emit(self, record): pass @@ -53,11 +53,12 @@ class TBLoggerHandler(logging.Handler): for logger in self.loggers: if log_level is not None and self.log_levels.get(log_level) is not None: log = logging.getLogger(logger) + # log.addHandler(self) self.__current_log_level = log_level log.setLevel(self.log_levels[log_level]) except Exception as e: - log = logging.getLogger('service') - log.error(e) + log = logging.getLogger('tb_gateway.service') + log.exception(e) self.activated = True def handle(self, record): diff --git a/thingsboard_gateway/storage/event_storage.py b/thingsboard_gateway/storage/event_storage.py index 37727d6d..a98bc0cb 100644 --- a/thingsboard_gateway/storage/event_storage.py +++ b/thingsboard_gateway/storage/event_storage.py @@ -12,8 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +from logging import getLogger from abc import ABC, abstractmethod +log = getLogger("tb_gateway.storage") class EventStorage(ABC): diff --git a/thingsboard_gateway/storage/event_storage_reader.py b/thingsboard_gateway/storage/event_storage_reader.py index 2ad20a14..0c425d9b 100644 --- a/thingsboard_gateway/storage/event_storage_reader.py +++ b/thingsboard_gateway/storage/event_storage_reader.py @@ -17,8 +17,9 @@ from os import remove from os.path import exists from base64 import b64decode from simplejson import load, dumps, JSONDecodeError +from thingsboard_gateway.storage.file_event_storage import log from thingsboard_gateway.storage.event_storage_files import EventStorageFiles -from thingsboard_gateway.storage.file_event_storage_settings import FileEventStorageSettings, log +from thingsboard_gateway.storage.file_event_storage_settings import FileEventStorageSettings from thingsboard_gateway.storage.event_storage_reader_pointer import EventStorageReaderPointer diff --git a/thingsboard_gateway/storage/event_storage_writer.py b/thingsboard_gateway/storage/event_storage_writer.py index a827f680..33c304d3 100644 --- a/thingsboard_gateway/storage/event_storage_writer.py +++ b/thingsboard_gateway/storage/event_storage_writer.py @@ -12,8 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +from thingsboard_gateway.storage.file_event_storage import log from thingsboard_gateway.storage.event_storage_files import EventStorageFiles -from thingsboard_gateway.storage.file_event_storage_settings import FileEventStorageSettings, log +from thingsboard_gateway.storage.file_event_storage_settings import FileEventStorageSettings from time import time from io import BufferedWriter, FileIO from os import linesep, open as os_open, O_CREAT, O_EXCL diff --git a/thingsboard_gateway/storage/file_event_storage.py b/thingsboard_gateway/storage/file_event_storage.py index 6f7a8099..85e3baea 100644 --- a/thingsboard_gateway/storage/file_event_storage.py +++ b/thingsboard_gateway/storage/file_event_storage.py @@ -12,11 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from thingsboard_gateway.storage.event_storage import EventStorage +from thingsboard_gateway.storage.event_storage import EventStorage, log from thingsboard_gateway.storage.event_storage_files import EventStorageFiles from thingsboard_gateway.storage.event_storage_writer import EventStorageWriter from thingsboard_gateway.storage.event_storage_reader import EventStorageReader -from thingsboard_gateway.storage.file_event_storage_settings import FileEventStorageSettings, log +from thingsboard_gateway.storage.file_event_storage_settings import FileEventStorageSettings from random import choice from string import ascii_lowercase import os diff --git a/thingsboard_gateway/storage/file_event_storage_settings.py b/thingsboard_gateway/storage/file_event_storage_settings.py index 87136b64..eef7b375 100644 --- a/thingsboard_gateway/storage/file_event_storage_settings.py +++ b/thingsboard_gateway/storage/file_event_storage_settings.py @@ -12,10 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from logging import getLogger - -log = getLogger('storage') - class FileEventStorageSettings: def __init__(self, config): diff --git a/thingsboard_gateway/storage/memory_event_storage.py b/thingsboard_gateway/storage/memory_event_storage.py index 96231b3a..865dd177 100644 --- a/thingsboard_gateway/storage/memory_event_storage.py +++ b/thingsboard_gateway/storage/memory_event_storage.py @@ -12,12 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from thingsboard_gateway.storage.event_storage import EventStorage +from thingsboard_gateway.storage.event_storage import EventStorage, log import queue from logging import getLogger -log = getLogger("storage") - class MemoryEventStorage(EventStorage): def __init__(self, config): diff --git a/thingsboard_gateway/tb_client/tb_device_mqtt.py b/thingsboard_gateway/tb_client/tb_device_mqtt.py index 018ff0bc..a38a30d8 100644 --- a/thingsboard_gateway/tb_client/tb_device_mqtt.py +++ b/thingsboard_gateway/tb_client/tb_device_mqtt.py @@ -83,7 +83,8 @@ ATTRIBUTES_TOPIC = 'v1/devices/me/attributes' ATTRIBUTES_TOPIC_REQUEST = 'v1/devices/me/attributes/request/' ATTRIBUTES_TOPIC_RESPONSE = 'v1/devices/me/attributes/response/' TELEMETRY_TOPIC = 'v1/devices/me/telemetry' -log = logging.getLogger("tb_connection") +log = logging.getLogger("tb_gateway.tb_connection") +log.setLevel(logging.DEBUG) class TBTimeoutException(Exception): @@ -375,6 +376,7 @@ class TBDeviceMqttClient: self.__attr_request_number += 1 self._attr_request_dict.update({self.__attr_request_number: callback}) attr_request_number = self.__attr_request_number + log.debug(attr_request_number) return attr_request_number def __timeout_check(self): @@ -382,7 +384,7 @@ class TBDeviceMqttClient: try: item = self.__timeout_queue.get() if item is not None: - while True: + while not True: current_ts_in_millis = int(round(time.time() * 1000)) if current_ts_in_millis > item["ts"]: break diff --git a/thingsboard_gateway/tb_client/tb_gateway_mqtt.py b/thingsboard_gateway/tb_client/tb_gateway_mqtt.py index 85320d1b..d682f1e8 100644 --- a/thingsboard_gateway/tb_client/tb_gateway_mqtt.py +++ b/thingsboard_gateway/tb_client/tb_gateway_mqtt.py @@ -25,7 +25,8 @@ GATEWAY_ATTRIBUTES_RESPONSE_TOPIC = "v1/gateway/attributes/response" GATEWAY_MAIN_TOPIC = "v1/gateway/" GATEWAY_RPC_TOPIC = "v1/gateway/rpc" -log = logging.getLogger("tb_connection") +log = logging.getLogger("tb_gateway.tb_connection") +log.setLevel(logging.DEBUG) class TBGatewayAPI: