1
0
mirror of https://github.com/thingsboard/thingsboard-gateway synced 2025-10-26 22:31:42 +08:00
Files
thingsboard-gateway/thingsboard_gateway/gateway/tb_gateway_service.py
2024-07-02 11:58:50 +03:00

1679 lines
87 KiB
Python

# Copyright 2024. ThingsBoard
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import logging.config
import logging.handlers
import multiprocessing.managers
import os.path
import subprocess
from os import execv, listdir, path, pathsep, stat, system
from platform import system as platform_system
from queue import SimpleQueue
from random import choice
from signal import signal, SIGINT
from string import ascii_lowercase, hexdigits
from sys import argv, executable, getsizeof
from threading import RLock, Thread, main_thread, current_thread
from time import sleep, time, monotonic
from simplejson import JSONDecodeError, dumps, load, loads
from yaml import safe_load
from thingsboard_gateway.connectors.connector import Connector
from thingsboard_gateway.gateway.constant_enums import DeviceActions, Status
from thingsboard_gateway.gateway.constants import CONNECTED_DEVICES_FILENAME, CONNECTOR_PARAMETER, \
PERSISTENT_GRPC_CONNECTORS_KEY_FILENAME, RENAMING_PARAMETER, CONNECTOR_NAME_PARAMETER, DEVICE_TYPE_PARAMETER, \
CONNECTOR_ID_PARAMETER, ATTRIBUTES_FOR_REQUEST
from thingsboard_gateway.gateway.device_filter import DeviceFilter
from thingsboard_gateway.gateway.duplicate_detector import DuplicateDetector
from thingsboard_gateway.gateway.shell.proxy import AutoProxy
from thingsboard_gateway.gateway.statistics_service import StatisticsService
from thingsboard_gateway.gateway.tb_client import TBClient
from thingsboard_gateway.storage.file.file_event_storage import FileEventStorage
from thingsboard_gateway.storage.memory.memory_event_storage import MemoryEventStorage
from thingsboard_gateway.storage.sqlite.sqlite_event_storage import SQLiteEventStorage
from thingsboard_gateway.tb_utility.tb_gateway_remote_configurator import RemoteConfigurator
from thingsboard_gateway.tb_utility.tb_handler import TBLoggerHandler
from thingsboard_gateway.tb_utility.tb_loader import TBModuleLoader
from thingsboard_gateway.tb_utility.tb_logger import TbLogger
from thingsboard_gateway.tb_utility.tb_remote_shell import RemoteShell
from thingsboard_gateway.tb_utility.tb_updater import TBUpdater
from thingsboard_gateway.tb_utility.tb_utility import TBUtility
GRPC_LOADED = False
try:
from thingsboard_gateway.gateway.grpc_service.grpc_connector import GrpcConnector
from thingsboard_gateway.gateway.grpc_service.tb_grpc_manager import TBGRPCServerManager
GRPC_LOADED = True
except ImportError:
print("Cannot load GRPC connector!")
class GrpcConnector:
pass
class TBGRPCServerManager:
pass
log: TbLogger = None # type: ignore
main_handler = logging.handlers.MemoryHandler(-1)
DEFAULT_CONNECTORS = {
"mqtt": "MqttConnector",
"modbus": "ModbusConnector",
"opcua": "OpcUaConnector",
"ble": "BLEConnector",
"request": "RequestConnector",
"can": "CanConnector",
"bacnet": "BACnetConnector",
"odbc": "OdbcConnector",
"rest": "RESTConnector",
"snmp": "SNMPConnector",
"ftp": "FTPConnector",
"socket": "SocketConnector",
"xmpp": "XMPPConnector",
"ocpp": "OcppConnector",
}
DEFAULT_STATISTIC = {
'enable': True,
'statsSendPeriodInSeconds': 3600
}
DEFAULT_DEVICE_FILTER = {
'enable': False
}
def load_file(path_to_file):
with open(path_to_file, 'r') as target_file:
content = load(target_file)
return content
class GatewayManager(multiprocessing.managers.BaseManager):
def __init__(self, address=None, authkey=b''):
super().__init__(address=address, authkey=authkey)
self.gateway = None
def has_gateway(self):
return self.gateway is not None
def add_gateway(self, gateway):
self.gateway = gateway
def shutdown(self) -> None:
super().__exit__(None, None, None)
class TBGatewayService:
DEFAULT_TIMEOUT = 5
EXPOSED_GETTERS = [
'ping',
'get_status',
'get_storage_name',
'get_storage_events_count',
'get_available_connectors',
'get_connector_status',
'get_connector_config'
]
def __init__(self, config_file=None):
self.__init_variables()
if current_thread() is main_thread():
signal(SIGINT, lambda _, __: self.__stop_gateway())
self.__lock = RLock()
self.__process_async_actions_thread = Thread(target=self.__process_async_device_actions,
name="Async device actions processing thread", daemon=True)
self._config_dir = path.dirname(path.abspath(config_file)) + path.sep
if config_file is None:
config_file = (path.dirname(path.dirname(path.abspath(__file__))) +
'/config/tb_gateway.json'.replace('/', path.sep))
logging_error = None
try:
with open(self._config_dir + 'logs.json', 'r') as file:
log_config = load(file)
logging.config.dictConfig(log_config)
except Exception as e:
logging_error = e
global log
log = TbLogger('service', gateway=self, level='INFO')
global main_handler
self.main_handler = main_handler
log.addHandler(self.main_handler)
# load general configuration YAML/JSON
self.__config = self.__load_general_config(config_file)
# change main config if Gateway running with docker env variables
self.__modify_main_config()
log.info("Gateway starting...")
self.__updater = TBUpdater()
self.version = self.__updater.get_version()
log.info("ThingsBoard IoT gateway version: %s", self.version["current_version"])
self.name = ''.join(choice(ascii_lowercase) for _ in range(64))
connection_logger = logging.getLogger('tb_connection')
self.tb_client = TBClient(self.__config["thingsboard"], self._config_dir, connection_logger)
try:
self.tb_client.disconnect()
except Exception as e:
log.exception(e)
self.tb_client.register_service_subscription_callback(self.subscribe_to_required_topics)
self.tb_client.connect()
if self.stopped:
return
if logging_error is not None:
self.tb_client.client.send_telemetry({"ts": time() * 1000, "values": {
"LOGS": "Logging loading exception, logs.json is wrong: %s" % (str(logging_error),)}})
TBLoggerHandler.set_default_handler()
self.remote_handler = TBLoggerHandler(self)
log.addHandler(self.remote_handler)
# self.main_handler.setTarget(self.remote_handler)
self._default_connectors = DEFAULT_CONNECTORS
self.__save_converted_data_thread = Thread(name="Storage fill thread", daemon=True,
target=self.__send_to_storage)
self.__save_converted_data_thread.start()
self.init_remote_shell(self.__config["thingsboard"].get("remoteShell"))
self.__rpc_processing_thread = Thread(target=self.__send_rpc_reply_processing, daemon=True,
name="RPC processing thread")
self.__rpc_processing_thread.start()
self.__rpc_to_devices_processing_thread = Thread(target=self.__rpc_to_devices_processing, daemon=True,
name="RPC to devices processing thread")
self.__rpc_to_devices_processing_thread.start()
self._event_storage = self._event_storage_types[self.__config["storage"]["type"]](self.__config["storage"])
self.init_grpc_service(self.__config.get('grpc'))
self.__devices_idle_checker = self.__config['thingsboard'].get('checkingDeviceActivity', {})
self.__check_devices_idle = self.__devices_idle_checker.get('checkDeviceInactivity', False)
if self.__check_devices_idle:
thread = Thread(name='Checking devices idle time', target=self.__check_devices_idle_time, daemon=True)
thread.start()
log.info('Start checking devices idle time')
self.init_statistics_service(self.__config['thingsboard'].get('statistics', DEFAULT_STATISTIC))
self.__min_pack_send_delay_ms = self.__config['thingsboard'].get('minPackSendDelayMS', 200)
self.__min_pack_send_delay_ms = self.__min_pack_send_delay_ms / 1000.0
self.__min_pack_size_to_send = self.__config['thingsboard'].get('minPackSizeToSend', 50)
self._send_thread = Thread(target=self.__read_data_from_storage, daemon=True,
name="Send data to Thingsboard Thread")
self._send_thread.start()
self.init_device_filtering(self.__config['thingsboard'].get('deviceFiltering', DEFAULT_DEVICE_FILTER))
self.__duplicate_detector = DuplicateDetector(self.available_connectors_by_name)
log.info("Gateway started.")
self._watchers_thread = Thread(target=self._watchers, name='Watchers', daemon=True)
self._watchers_thread.start()
self._load_connectors()
self.__connect_with_connectors()
self.__load_persistent_devices()
self.__init_remote_configuration()
if self.__config['thingsboard'].get('managerEnabled', False):
if path.exists('/tmp/gateway'):
try:
# deleting old manager if it was closed incorrectly
system('rm -rf /tmp/gateway')
except OSError as e:
log.exception(e)
manager_address = '/tmp/gateway'
if platform_system() == 'Windows':
manager_address = ('127.0.0.1', 9999)
self.manager = GatewayManager(address=manager_address, authkey=b'gateway')
if current_thread() is main_thread():
GatewayManager.register('get_gateway',
self.get_gateway,
proxytype=AutoProxy,
exposed=self.EXPOSED_GETTERS,
create_method=False)
self.server = self.manager.get_server()
self.server.serve_forever()
while not self.stopped:
try:
sleep(1)
except KeyboardInterrupt:
self.__stop_gateway()
break
def __init_variables(self):
self.stopped = False
self.__device_filter_config = None
self.__device_filter = None
self.__grpc_manager = None
self.__remote_shell = None
self.__statistics = None
self.__statistics_service = None
self.__grpc_config = None
self.__grpc_connectors = None
self.__grpc_manager = None
self.__remote_configurator = None
self.__request_config_after_connect = False
self.__rpc_reply_sent = False
self.__subscribed_to_rpc_topics = False
self.__rpc_remote_shell_command_in_progress = None
self.connectors_configs = {}
self.__scheduled_rpc_calls = []
self.__rpc_requests_in_progress = {}
self.available_connectors_by_name: dict[str, Connector] = {}
self.available_connectors_by_id: dict[str, Connector] = {}
self.__connector_incoming_messages = {}
self.__connected_devices = {}
self.__renamed_devices = {}
self.__saved_devices = {}
self.__added_devices = {}
self.__events = []
self.__grpc_connectors = {}
self._published_events = SimpleQueue()
self.__rpc_processing_queue = SimpleQueue()
self.__rpc_to_devices_queue = SimpleQueue()
self.__async_device_actions_queue = SimpleQueue()
self.__rpc_register_queue = SimpleQueue()
self.__converted_data_queue = SimpleQueue()
self.__updates_check_period_ms = 300000
self.__updates_check_time = 0
self._implemented_connectors = {}
self._event_storage_types = {
"memory": MemoryEventStorage,
"file": FileEventStorage,
"sqlite": SQLiteEventStorage,
}
self.__gateway_rpc_methods = {
"ping": self.__rpc_ping,
"stats": self.__form_statistics,
"devices": self.__rpc_devices,
"update": self.__rpc_update,
"version": self.__rpc_version,
"device_renamed": self.__process_renamed_gateway_devices,
"device_deleted": self.__process_deleted_gateway_devices,
}
self.__rpc_scheduled_methods_functions = {
"restart": {"function": execv, "arguments": (executable, [executable.split(pathsep)[-1]] + argv)},
"reboot": {"function": subprocess.call, "arguments": (["shutdown", "-r", "-t", "0"],)},
}
self.async_device_actions = {
DeviceActions.CONNECT: self.add_device,
DeviceActions.DISCONNECT: self.del_device
}
@staticmethod
def __load_general_config(config_file):
file_extension = config_file.split('.')[-1]
if file_extension == 'json' and os.path.exists(config_file):
with open(config_file) as general_config:
try:
return load(general_config)
except Exception as e:
log.exception('Failed to load configuration file:\n %s', e)
else:
log.warning('YAML configuration is deprecated. '
'Please, use JSON configuration instead.')
log.warning(
'See default configuration on '
'https://thingsboard.io/docs/iot-gateway/configuration/')
config = {}
try:
filename = ''.join(config_file.split('.')[:-1])
with open(filename + '.yaml') as general_config:
config = safe_load(general_config)
with open(filename + '.json', 'w') as file:
file.writelines(dumps(config, indent=' '))
except Exception as e:
log.exception('Failed to load configuration file:\n %s', e)
return config
def init_grpc_service(self, config):
self.__grpc_config = config # noqa
if GRPC_LOADED and self.__grpc_config is not None and self.__grpc_config.get("enabled"):
self.__process_async_actions_thread.start()
self.__grpc_manager = TBGRPCServerManager(self, self.__grpc_config) # noqa
self.__grpc_manager.set_gateway_read_callbacks(self.__register_connector, self.__unregister_connector)
def init_statistics_service(self, config):
self.__statistics = config # noqa
if self.__statistics['enable']:
if isinstance(self.__statistics_service, StatisticsService):
self.__statistics_service.stop()
self.__statistics_service = StatisticsService(self.__statistics['statsSendPeriodInSeconds'], self, log,
config_path=self._config_dir + self.__statistics[
'configuration'] if self.__statistics.get(
'configuration') else None)
else:
self.__statistics_service = None # noqa
def init_device_filtering(self, config):
self.__device_filter_config = config # noqa
self.__device_filter = None
if self.__device_filter_config['enable'] and self.__device_filter_config.get('filterFile'):
self.__device_filter = DeviceFilter(config_path=self._config_dir + self.__device_filter_config['filterFile']) # noqa
def init_remote_shell(self, enable):
self.__remote_shell = None
if enable:
log.warning("Remote shell is enabled. Please be carefully with this feature.")
self.__remote_shell = RemoteShell(platform=self.__updater.get_platform(),
release=self.__updater.get_release(),
logger=log) # noqa
@property
def event_storage_types(self):
return self._event_storage_types
@property
def config(self):
return self.__config
@config.setter
def config(self, config):
self.__config.update(config)
def get_gateway(self):
if self.manager.has_gateway():
return self.manager.gateway
else:
self.manager.add_gateway(self)
self.manager.register('gateway', lambda: self, proxytype=AutoProxy)
def _watchers(self):
try:
gateway_statistic_send = 0
connectors_configuration_check_time = 0
while not self.stopped:
cur_time = time() * 1000
if not self.tb_client.client.is_connected() and self.__subscribed_to_rpc_topics:
self.__subscribed_to_rpc_topics = False
if (self.tb_client.is_connected()
and not self.tb_client.is_stopped()
and not self.__subscribed_to_rpc_topics):
for device in list(self.__saved_devices.keys()):
self.add_device(device,
{CONNECTOR_PARAMETER: self.__saved_devices[device][CONNECTOR_PARAMETER]},
device_type=self.__saved_devices[device][DEVICE_TYPE_PARAMETER])
self.subscribe_to_required_topics()
if self.__scheduled_rpc_calls:
for rpc_call_index in range(len(self.__scheduled_rpc_calls)):
rpc_call = self.__scheduled_rpc_calls[rpc_call_index]
if cur_time > rpc_call[0]:
rpc_call = self.__scheduled_rpc_calls.pop(rpc_call_index)
result = None
try:
result = rpc_call[1]["function"](*rpc_call[1]["arguments"])
except Exception as e:
log.exception(e)
if result == 256:
log.warning("Error on RPC command: 256. Permission denied.")
if ((self.__rpc_requests_in_progress or not self.__rpc_register_queue.empty())
and self.tb_client.is_connected()):
new_rpc_request_in_progress = {}
if self.__rpc_requests_in_progress:
for rpc_in_progress, data in self.__rpc_requests_in_progress.items():
if cur_time >= data[1]:
data[2](rpc_in_progress)
self.cancel_rpc_request(rpc_in_progress)
self.__rpc_requests_in_progress[rpc_in_progress] = "del"
new_rpc_request_in_progress = {key: value for key, value in
self.__rpc_requests_in_progress.items() if value != 'del'}
if not self.__rpc_register_queue.empty():
new_rpc_request_in_progress = self.__rpc_requests_in_progress
rpc_request_from_queue = self.__rpc_register_queue.get(False)
topic = rpc_request_from_queue["topic"]
data = rpc_request_from_queue["data"]
new_rpc_request_in_progress[topic] = data
self.__rpc_requests_in_progress = new_rpc_request_in_progress
else:
try:
sleep(0.2)
except Exception as e:
log.exception(e)
break
if (not self.__request_config_after_connect and self.tb_client.is_connected()
and not self.tb_client.client.get_subscriptions_in_progress()):
self.__request_config_after_connect = True
self._check_shared_attributes()
if (cur_time - gateway_statistic_send > self.__statistics['statsSendPeriodInSeconds'] * 1000
and self.tb_client.is_connected()):
summary_messages = self.__form_statistics()
# with self.__lock:
self.tb_client.client.send_telemetry(summary_messages)
gateway_statistic_send = time() * 1000
# self.__check_shared_attributes()
if (cur_time - connectors_configuration_check_time > self.__config["thingsboard"].get("checkConnectorsConfigurationInSeconds", 60) * 1000 # noqa
and not (self.__remote_configurator is not None and self.__remote_configurator.in_process)):
self.check_connector_configuration_updates()
connectors_configuration_check_time = time() * 1000
if cur_time - self.__updates_check_time >= self.__updates_check_period_ms:
self.__updates_check_time = time() * 1000
self.version = self.__updater.get_version()
except Exception as e:
log.exception(e)
self.__stop_gateway()
self.__close_connectors()
log.info("The gateway has been stopped.")
self.tb_client.stop()
def __modify_main_config(self):
env_variables = TBUtility.get_service_environmental_variables()
self.__config['thingsboard'] = {**self.__config['thingsboard'], **env_variables}
def __close_connectors(self):
for current_connector in self.available_connectors_by_id:
try:
close_start = monotonic()
while not self.available_connectors_by_id[current_connector].is_stopped():
self.available_connectors_by_id[current_connector].close()
if monotonic() - close_start > 5:
log.error("Connector %s close timeout", current_connector)
break
log.debug("Connector %s closed connection.", current_connector)
except Exception as e:
log.exception("Error while closing connector %s", current_connector, exc_info=e)
def __stop_gateway(self):
self.stopped = True
self.__updater.stop()
log.info("Stopping...")
if self.__statistics_service is not None:
self.__statistics_service.stop()
if self.__grpc_manager is not None:
self.__grpc_manager.stop()
if os.path.exists("/tmp/gateway"):
os.remove("/tmp/gateway")
self.__close_connectors()
if hasattr(self, "_event_storage"):
self._event_storage.stop()
log.info("The gateway has been stopped.")
self.tb_client.disconnect()
self.tb_client.stop()
if hasattr(self, "manager"):
self.manager.shutdown()
def __init_remote_configuration(self, force=False):
remote_configuration_enabled = self.__config["thingsboard"].get("remoteConfiguration")
if not remote_configuration_enabled and force:
log.info("Remote configuration is enabled forcibly!")
if (remote_configuration_enabled or force) and self.__remote_configurator is None:
try:
self.__remote_configurator = RemoteConfigurator(self, self.__config)
if self.tb_client.is_connected() and not self.tb_client.client.get_subscriptions_in_progress():
self._check_shared_attributes()
except Exception as e:
log.exception(e)
if self.__remote_configurator is not None:
self.__remote_configurator.send_current_configuration()
def _attributes_parse(self, content, *args):
try:
log.debug("Received data: %s, %s", content, args)
if content is not None:
shared_attributes = content.get("shared", {})
client_attributes = content.get("client", {})
if shared_attributes or client_attributes:
self.__process_attributes_response(shared_attributes, client_attributes)
else:
self.__process_attribute_update(content)
if shared_attributes:
log.debug("Shared attributes received (%s).",
", ".join([attr for attr in shared_attributes.keys()]))
if client_attributes:
log.debug("Client attributes received (%s).",
", ".join([attr for attr in client_attributes.keys()]))
except Exception as e:
log.exception(e)
def __process_attribute_update(self, content):
self.__process_remote_logging_update(content.get("RemoteLoggingLevel"))
self.__process_remote_configuration(content)
self.__process_remote_converter_configuration_update(content)
def __process_attributes_response(self, shared_attributes, client_attributes):
if shared_attributes:
log.debug("Shared attributes received: %s", shared_attributes)
if client_attributes:
log.debug("Client attributes received: %s", client_attributes)
self.__process_remote_logging_update(shared_attributes.get('RemoteLoggingLevel'))
self.__process_remote_configuration(shared_attributes)
def __process_remote_logging_update(self, remote_logging_level):
if remote_logging_level == 'NONE':
self.remote_handler.deactivate()
log.info('Remote logging has being deactivated.')
elif remote_logging_level is not None:
if self.remote_handler.current_log_level != remote_logging_level or not self.remote_handler.activated:
self.main_handler.setLevel(remote_logging_level)
self.remote_handler.activate(remote_logging_level)
log.info('Remote logging has being updated. Current logging level is: %s ',
remote_logging_level)
def __process_remote_converter_configuration_update(self, content: dict):
try:
key = list(content.keys())[0]
connector_name, converter_name = key.split(':')
log.info('Got remote converter configuration update')
if not self.available_connectors_by_name.get(connector_name):
raise ValueError
self.available_connectors_by_name[connector_name].update_converter_config(converter_name, content[key]) # noqa
except (ValueError, AttributeError, IndexError) as e:
log.debug('Failed to process remote converter update: %s', e)
def update_connector_config_file(self, connector_name, config):
for connector in self.__config['connectors']:
if connector['name'] == connector_name:
with open(self._config_dir + connector['configuration'], 'w', encoding='UTF-8') as file:
file.writelines(dumps(config))
log.info('Updated %s configuration file', connector_name)
return
def __process_deleted_gateway_devices(self, deleted_device_name: str):
log.info("Received deleted gateway device notification: %s", deleted_device_name)
if deleted_device_name in list(self.__renamed_devices.values()):
first_device_name = TBUtility.get_dict_key_by_value(self.__renamed_devices, deleted_device_name)
del self.__renamed_devices[first_device_name]
deleted_device_name = first_device_name
log.debug("Current renamed_devices dict: %s", self.__renamed_devices)
if deleted_device_name in self.__connected_devices:
del self.__connected_devices[deleted_device_name]
log.debug("Device %s - was removed from __connected_devices", deleted_device_name)
if deleted_device_name in self.__saved_devices:
del self.__saved_devices[deleted_device_name]
log.debug("Device %s - was removed from __saved_devices", deleted_device_name)
if hasattr(self, "__duplicate_detector"):
self.__duplicate_detector.delete_device(deleted_device_name)
self.__save_persistent_devices()
self.__load_persistent_devices()
return True
def __process_renamed_gateway_devices(self, renamed_device: dict):
if self.__config.get('handleDeviceRenaming', True):
log.info("Received renamed gateway device notification: %s", renamed_device)
old_device_name, new_device_name = renamed_device.popitem()
if old_device_name in list(self.__renamed_devices.values()):
device_name_key = TBUtility.get_dict_key_by_value(self.__renamed_devices, old_device_name)
else:
device_name_key = new_device_name
self.__renamed_devices[device_name_key] = new_device_name
self.__duplicate_detector.rename_device(old_device_name, new_device_name)
self.__save_persistent_devices()
self.__load_persistent_devices()
log.debug("Current renamed_devices dict: %s", self.__renamed_devices)
else:
log.debug("Received renamed device notification %r, but device renaming handle is disabled",
renamed_device)
return True
def __process_remote_configuration(self, new_configuration):
if new_configuration is not None and self.__remote_configurator is not None:
try:
self.__remote_configurator.process_config_request(new_configuration)
except Exception as e:
log.exception(e)
def get_config_path(self):
return self._config_dir
def subscribe_to_required_topics(self):
if not self.__subscribed_to_rpc_topics and self.tb_client.is_connected():
self.tb_client.client.clean_device_sub_dict()
self.tb_client.client.gw_set_server_side_rpc_request_handler(self._rpc_request_handler)
self.tb_client.client.set_server_side_rpc_request_handler(self._rpc_request_handler)
self.tb_client.client.subscribe_to_all_attributes(self._attribute_update_callback)
self.tb_client.client.gw_subscribe_to_all_attributes(self._attribute_update_callback)
self.__subscribed_to_rpc_topics = True # noqa
def request_device_attributes(self, device_name, shared_keys, client_keys, callback):
if client_keys is not None:
self.tb_client.client.gw_request_client_attributes(device_name, client_keys, callback)
if shared_keys is not None:
self.tb_client.client.gw_request_shared_attributes(device_name, shared_keys, callback)
def _check_shared_attributes(self, shared_keys=None, client_keys=None):
if shared_keys is None:
shared_keys = ATTRIBUTES_FOR_REQUEST
self.tb_client.client.request_attributes(callback=self._attributes_parse,
shared_keys=shared_keys,
client_keys=client_keys)
def __register_connector(self, session_id, connector_key):
if (self.__grpc_connectors.get(connector_key) is not None
and self.__grpc_connectors[connector_key]['id'] not in self.available_connectors_by_id):
target_connector = self.__grpc_connectors.get(connector_key)
connector = GrpcConnector(self, target_connector['config'], self.__grpc_manager, session_id)
connector.setName(target_connector['name'])
self.available_connectors_by_name[connector.get_name()] = connector
self.available_connectors_by_id[connector.get_id()] = connector
self.__grpc_manager.registration_finished(Status.SUCCESS, session_id, target_connector)
log.info("[%r][%r] GRPC connector with key %s registered with name %s", session_id,
connector.get_id(), connector_key, connector.get_name())
elif self.__grpc_connectors.get(connector_key) is not None:
self.__grpc_manager.registration_finished(Status.FAILURE, session_id, None)
log.error("[%r] GRPC connector with key: %s - already registered!", session_id, connector_key)
else:
self.__grpc_manager.registration_finished(Status.NOT_FOUND, session_id, None)
log.error("[%r] GRPC configuration for connector with key: %s - not found", session_id,
connector_key)
def __unregister_connector(self, session_id, connector_key):
if (self.__grpc_connectors.get(connector_key) is not None
and self.__grpc_connectors[connector_key]['id'] in self.available_connectors_by_id):
connector_id = self.__grpc_connectors[connector_key]['id']
target_connector = self.available_connectors_by_id.pop(connector_id)
self.__grpc_manager.unregister(Status.SUCCESS, session_id, target_connector)
log.info("[%r] GRPC connector with key %s and name %s - unregistered", session_id, connector_key,
target_connector.get_name())
elif self.__grpc_connectors.get(connector_key) is not None:
self.__grpc_manager.unregister(Status.NOT_FOUND, session_id, None)
log.error("[%r] GRPC connector with key: %s - is not registered!", session_id, connector_key)
else:
self.__grpc_manager.unregister(Status.FAILURE, session_id, None)
log.error("[%r] GRPC configuration for connector with key: %s - not found and not registered",
session_id, connector_key)
@staticmethod
def _generate_persistent_key(connector, connectors_persistent_keys):
if connectors_persistent_keys and connectors_persistent_keys.get(connector['name']) is not None:
connector_persistent_key = connectors_persistent_keys[connector['name']]
else:
connector_persistent_key = "".join(choice(hexdigits) for _ in range(10))
connectors_persistent_keys[connector['name']] = connector_persistent_key
return connector_persistent_key
def load_connectors(self, config=None):
self._load_connectors(config=config)
def _load_connectors(self, config=None):
global log
self.connectors_configs = {}
connectors_persistent_keys = self.__load_persistent_connector_keys()
if config:
configuration = config.get('connectors')
else:
configuration = self.__config.get('connectors')
if configuration:
for connector in configuration:
try:
connector_persistent_key = None
connector_type = connector["type"].lower() if connector.get("type") is not None else None
# can be removed in future releases
if connector_type == 'opcua_asyncio':
connector_type = 'opcua'
if connector_type is None:
log.error("Connector type is not defined!")
continue
if connector_type == "grpc" and self.__grpc_manager is None:
log.error("Cannot load connector with name: %s and type grpc. GRPC server is disabled!",
connector['name'])
continue
if connector_type != "grpc":
connector_class = None
if connector.get('useGRPC', False):
module_name = f'Grpc{self._default_connectors.get(connector_type, connector.get("class"))}'
connector_class = TBModuleLoader.import_module(connector_type, module_name)
if self.__grpc_manager and self.__grpc_manager.is_alive() and connector_class:
connector_persistent_key = self._generate_persistent_key(connector,
connectors_persistent_keys)
else:
connector_class = TBModuleLoader.import_module(connector_type,
self._default_connectors.get(
connector_type,
connector.get('class')))
if connector_class is None:
log.warning("Connector implementation not found for %s", connector['name'])
else:
self._implemented_connectors[connector_type] = connector_class
elif connector_type == "grpc":
if connector.get('key') == "auto":
self._generate_persistent_key(connector, connectors_persistent_keys)
else:
connector_persistent_key = connector['key']
log.info("Connector key for GRPC connector with name [%s] is: [%s]", connector['name'],
connector_persistent_key)
config_file_path = self._config_dir + connector['configuration']
if not path.exists(config_file_path):
log.error("Configuration file for connector with name: %s not found!", connector['name'])
continue
with open(config_file_path, 'r', encoding="UTF-8") as conf_file:
connector_conf_file_data = conf_file.read()
connector_conf = connector_conf_file_data
try:
connector_conf = loads(connector_conf_file_data)
except JSONDecodeError as e:
log.debug(e)
log.warning("Cannot parse connector configuration as a JSON, it will be passed as a string.")
connector_id = TBUtility.get_or_create_connector_id(connector_conf)
if isinstance(connector_conf, dict):
if connector_conf.get('id') is None:
connector_conf['id'] = connector_id
with open(config_file_path, 'w', encoding="UTF-8") as conf_file:
conf_file.write(dumps(connector_conf, indent=2))
elif isinstance(connector_conf, str) and not connector_conf:
raise ValueError("Connector configuration is empty!")
elif isinstance(connector_conf, str):
start_find = connector_conf.find("{id_var_start}")
end_find = connector_conf.find("{id_var_end}")
if not (start_find > -1 and end_find > -1):
connector_conf = "{id_var_start}" + str(connector_id) + "{id_var_end}" + connector_conf
if not self.connectors_configs.get(connector_type):
self.connectors_configs[connector_type] = []
if connector_type != 'grpc' and isinstance(connector_conf, dict):
connector_conf["name"] = connector['name']
if connector_type != 'grpc':
connector_configuration = {connector['configuration']: connector_conf}
else:
connector_configuration = connector_conf
self.connectors_configs[connector_type].append({"name": connector['name'],
"id": connector_id,
"config": connector_configuration,
"config_updated": stat(config_file_path),
"config_file_path": config_file_path,
"grpc_key": connector_persistent_key})
except Exception as e:
log.exception("Error on loading connector: %r", e)
if connectors_persistent_keys:
self.__save_persistent_keys(connectors_persistent_keys)
else:
log.warning("Connectors - not found!")
self.__init_remote_configuration(force=True)
def connect_with_connectors(self):
self.__connect_with_connectors()
def __connect_with_connectors(self):
global log
for connector_type in self.connectors_configs:
connector_type = connector_type.lower()
for connector_config in self.connectors_configs[connector_type]:
if self._implemented_connectors.get(connector_type) is not None:
if connector_type != 'grpc' and 'Grpc' not in self._implemented_connectors[connector_type].__name__:
for config in connector_config["config"]:
connector = None
connector_name = None
connector_id = None
try:
if connector_config["config"][config] is not None:
if ("logLevel" in connector_config["config"][config]
and len(connector_config["config"][config].keys()) > 3) or \
("logLevel" not in connector_config["config"][config]
and len(connector_config["config"][config].keys()) >= 1):
connector_name = connector_config["name"]
connector_id = connector_config["id"]
available_connector = self.available_connectors_by_id.get(connector_id)
if available_connector is None or available_connector.is_stopped():
connector = self._implemented_connectors[connector_type](self,
connector_config["config"][config], # noqa
connector_type)
connector.name = connector_name
self.available_connectors_by_id[connector_id] = connector
self.available_connectors_by_name[connector_name] = connector
connector.open()
else:
log.warning("[%r] Connector with name %s already exists and not stopped!",
connector_id, connector_name)
else:
log.warning("Config incorrect for %s", connector_type)
else:
log.warning("Config is empty for %s", connector_type)
except Exception as e:
log.error("[%r] Error on loading connector %r: %r", connector_id, connector_name, e)
log.exception(e, attr_name=connector_name)
if connector is not None and not connector.is_stopped():
connector.close()
else:
self.__grpc_connectors.update({connector_config['grpc_key']: connector_config})
if connector_type != 'grpc':
connector_dir_abs = "/".join(self._config_dir.split("/")[:-2])
connector_file_name = f'{connector_type}_connector.py'
connector_abs_path = f'{connector_dir_abs}/grpc_connectors/{connector_type}/{connector_file_name}' # noqa
connector_config_json = dumps({
**connector_config,
'gateway': {
'host': 'localhost',
'port': self.__config['grpc']['serverPort']
}
})
thread = Thread(target=self._run_connector,
args=(connector_abs_path, connector_config_json,),
daemon=True, name='Separated GRPC Connector')
thread.start()
def _run_connector(self, connector_abs_path, connector_config_json):
subprocess.run(['python3', connector_abs_path, connector_config_json, self._config_dir],
check=True,
universal_newlines=True)
def check_connector_configuration_updates(self):
configuration_changed = False
for connector_type in self.connectors_configs:
for connector_config in self.connectors_configs[connector_type]:
if stat(connector_config["config_file_path"]) != connector_config["config_updated"]:
configuration_changed = True
break
if configuration_changed:
break
if configuration_changed:
self.__close_connectors()
self._load_connectors()
self.__connect_with_connectors()
# Updating global self.__config['connectors'] configuration for states syncing
for connector_type in self.connectors_configs:
for connector_config in self.connectors_configs[connector_type]:
for (index, connector) in enumerate(self.__config['connectors']):
if connector_config['config'].get(connector['configuration']):
self.__config['connectors'][index]['configurationJson'] = connector_config['config'][
connector['configuration']]
if self.__remote_configurator is not None:
self.__remote_configurator.send_current_configuration()
def send_to_storage(self, connector_name, connector_id, data=None):
if data is None:
log.error("[%r]Data is empty from connector %r!", connector_id, connector_name)
try:
device_valid = True
if self.__device_filter:
device_valid = self.__device_filter.validate_device(connector_name, data)
if not device_valid:
log.warning('Device %s forbidden', data['deviceName'])
return Status.FORBIDDEN_DEVICE
filtered_data = self.__duplicate_detector.filter_data(connector_name, data)
if filtered_data:
self.__converted_data_queue.put((connector_name, connector_id, filtered_data), True, 100)
return Status.SUCCESS
else:
return Status.NO_NEW_DATA
except Exception as e:
log.exception("Cannot put converted data!", exc_info=e)
return Status.FAILURE
def __send_to_storage(self):
while not self.stopped:
try:
if not self.__converted_data_queue.empty():
connector_name, connector_id, event = self.__converted_data_queue.get(True, 100)
data_array = event if isinstance(event, list) else [event]
for data in data_array:
if not connector_name == self.name:
if 'telemetry' not in data:
data['telemetry'] = []
if 'attributes' not in data:
data['attributes'] = []
if not TBUtility.validate_converted_data(data):
log.error("[%r] Data from %s connector is invalid.", connector_id, connector_name)
continue
if data.get('deviceType') is None:
device_name = data['deviceName']
if self.__connected_devices.get(device_name) is not None:
data["deviceType"] = self.__connected_devices[device_name]['device_type']
elif self.__saved_devices.get(device_name) is not None:
data["deviceType"] = self.__saved_devices[device_name]['device_type']
else:
data["deviceType"] = "default"
if data["deviceName"] in self.__renamed_devices:
data["deviceName"] = self.__renamed_devices[data["deviceName"]]
if self.tb_client.is_connected() and (data["deviceName"] not in self.get_devices() or
data["deviceName"] not in self.__connected_devices):
self.add_device(data["deviceName"],
{CONNECTOR_PARAMETER: self.available_connectors_by_id[connector_id]},
device_type=data["deviceType"])
if not self.__connector_incoming_messages.get(connector_id):
self.__connector_incoming_messages[connector_id] = 0
else:
self.__connector_incoming_messages[connector_id] += 1
else:
data["deviceName"] = "currentThingsBoardGateway"
data['deviceType'] = "gateway"
if self.__check_devices_idle:
self.__connected_devices[data['deviceName']]['last_receiving_data'] = time()
data = self.__convert_telemetry_to_ts(data)
max_data_size = self.__config["thingsboard"].get("maxPayloadSizeBytes", 400)
if self.__get_data_size(data) >= max_data_size:
# Data is too large, so we will attempt to send in pieces
adopted_data = {"deviceName": data['deviceName'],
"deviceType": data['deviceType'],
"attributes": {},
"telemetry": []}
empty_adopted_data_size = self.__get_data_size(adopted_data)
adopted_data_size = empty_adopted_data_size
# First, loop through the attributes
for attribute in data['attributes']:
adopted_data['attributes'].update(attribute)
adopted_data_size += self.__get_data_size(attribute)
if adopted_data_size >= max_data_size:
# We have surpassed the max_data_size, so send what we have and clear attributes
self.__send_data_pack_to_storage(adopted_data, connector_name, connector_id)
adopted_data['attributes'] = {}
adopted_data_size = empty_adopted_data_size
# Now, loop through telemetry. Possibly have some unsent attributes that have been adopted.
telemetry = data['telemetry'] if isinstance(data['telemetry'], list) else [
data['telemetry']]
ts_to_index = {}
for ts_kv_list in telemetry:
ts = ts_kv_list['ts']
for kv in ts_kv_list['values']:
if ts in ts_to_index:
kv_data = {kv: ts_kv_list['values'][kv]}
adopted_data['telemetry'][ts_to_index[ts]]['values'].update(kv_data)
else:
kv_data = {'ts': ts, 'values': {kv: ts_kv_list['values'][kv]}}
adopted_data['telemetry'].append(kv_data)
ts_to_index[ts] = len(adopted_data['telemetry']) - 1
adopted_data_size += self.__get_data_size(kv_data)
if adopted_data_size >= max_data_size:
# we have surpassed the max_data_size,
# so send what we have and clear attributes and telemetry
self.__send_data_pack_to_storage(adopted_data, connector_name, connector_id)
adopted_data['telemetry'] = []
adopted_data['attributes'] = {}
adopted_data_size = empty_adopted_data_size
ts_to_index = {}
# It is possible that we get here and have some telemetry or attributes not yet sent,
# so check for that.
if len(adopted_data['telemetry']) > 0 or len(adopted_data['attributes']) > 0:
self.__send_data_pack_to_storage(adopted_data, connector_name, connector_id)
# technically unnecessary to clear here, but leaving for consistency.
adopted_data['telemetry'] = []
adopted_data['attributes'] = {}
else:
self.__send_data_pack_to_storage(data, connector_name, connector_id)
else:
sleep(0.2)
except Exception as e:
log.error(e)
@staticmethod
def __get_data_size(data: dict):
return getsizeof(str(data))
@staticmethod
def get_data_size(data):
return TBGatewayService.__get_data_size(data)
@staticmethod
def __convert_telemetry_to_ts(data):
telemetry = {}
telemetry_with_ts = []
for item in data["telemetry"]:
if item.get("ts") is None:
telemetry = {**telemetry, **item}
else:
if isinstance(item['ts'], int):
telemetry_with_ts.append({"ts": item["ts"], "values": {**item["values"]}})
else:
log.warning('Data has invalid TS (timestamp) format! Using generated TS instead.')
telemetry_with_ts.append({"ts": int(time() * 1000), "values": {**item["values"]}})
if telemetry_with_ts:
data["telemetry"] = telemetry_with_ts
elif len(data['telemetry']) > 0:
data["telemetry"] = {"ts": int(time() * 1000), "values": telemetry}
return data
def __send_data_pack_to_storage(self, data, connector_name, connector_id=None):
json_data = dumps(data)
save_result = self._event_storage.put(json_data)
if not save_result:
log.error('%rData from the device "%s" cannot be saved, connector name is %s.',
"[" + connector_id + "] " if connector_id is not None else "",
data["deviceName"], connector_name)
def check_size(self, devices_data_in_event_pack):
if (self.__get_data_size(devices_data_in_event_pack)
>= self.__config["thingsboard"].get("maxPayloadSizeBytes", 400)):
self.__send_data(devices_data_in_event_pack)
for device in devices_data_in_event_pack:
devices_data_in_event_pack[device]["telemetry"] = []
devices_data_in_event_pack[device]["attributes"] = {}
def __read_data_from_storage(self):
devices_data_in_event_pack = {}
log.debug("Send data Thread has been started successfully.")
log.debug("Maximal size of the client message queue is: %r",
self.tb_client.client._client._max_queued_messages) # noqa pylint: disable=protected-access
while not self.stopped:
try:
if self.tb_client.is_connected():
events = []
if self.__remote_configurator is None or not self.__remote_configurator.in_process:
events = self._event_storage.get_event_pack()
if events:
for event in events:
try:
current_event = loads(event)
except Exception as e:
log.exception(e)
continue
if not devices_data_in_event_pack.get(current_event["deviceName"]): # noqa
devices_data_in_event_pack[current_event["deviceName"]] = {"telemetry": [],
"attributes": {}}
if current_event.get("telemetry"):
if isinstance(current_event["telemetry"], list):
for item in current_event["telemetry"]:
self.check_size(devices_data_in_event_pack)
devices_data_in_event_pack[current_event["deviceName"]]["telemetry"].append(item) # noqa
else:
self.check_size(devices_data_in_event_pack)
devices_data_in_event_pack[current_event["deviceName"]]["telemetry"].append(current_event["telemetry"]) # noqa
if current_event.get("attributes"):
if isinstance(current_event["attributes"], list):
for item in current_event["attributes"]:
self.check_size(devices_data_in_event_pack)
devices_data_in_event_pack[current_event["deviceName"]]["attributes"].update(item.items()) # noqa
else:
self.check_size(devices_data_in_event_pack)
devices_data_in_event_pack[current_event["deviceName"]]["attributes"].update(current_event["attributes"].items()) # noqa
if devices_data_in_event_pack:
if not self.tb_client.is_connected():
continue
while self.__rpc_reply_sent:
sleep(.01)
self.__send_data(devices_data_in_event_pack) # noqa
if self.tb_client.is_connected() and (
self.__remote_configurator is None or not self.__remote_configurator.in_process):
success = True
while not self._published_events.empty():
if (self.__remote_configurator is not None
and self.__remote_configurator.in_process) or \
not self.tb_client.is_connected() or \
self._published_events.empty() or \
self.__rpc_reply_sent:
success = False
break
events = [self._published_events.get(False, 10) for _ in
range(min(self.__min_pack_size_to_send, self._published_events.qsize()))]
for event in events:
try:
if (self.tb_client.is_connected()
and (self.__remote_configurator is None
or not self.__remote_configurator.in_process)):
if self.tb_client.client.quality_of_service == 1:
success = event.get() == event.TB_ERR_SUCCESS
else:
success = True
else:
break
except RuntimeError as e:
log.error("Error while sending data to ThingsBoard, it will be resent.",
exc_info=e)
success = False
except Exception as e:
log.error("Error while sending data to ThingsBoard, it will be resent.",
exc_info=e)
success = False
if success and self.tb_client.is_connected():
self._event_storage.event_pack_processing_done()
del devices_data_in_event_pack
devices_data_in_event_pack = {}
else:
continue
else:
sleep(self.__min_pack_send_delay_ms)
else:
sleep(1)
except Exception as e:
log.exception(e)
sleep(1)
log.info("Send data Thread has been stopped successfully.")
@StatisticsService.CollectAllSentTBBytesStatistics(start_stat_type='allBytesSentToTB')
def __send_data(self, devices_data_in_event_pack):
try:
for device in devices_data_in_event_pack:
final_device_name = device if self.__renamed_devices.get(device) is None else self.__renamed_devices[
device]
if devices_data_in_event_pack[device].get("attributes"):
if device == self.name or device == "currentThingsBoardGateway":
self._published_events.put(
self.tb_client.client.send_attributes(devices_data_in_event_pack[device]["attributes"]))
else:
self._published_events.put(self.tb_client.client.gw_send_attributes(final_device_name,
devices_data_in_event_pack[
device]["attributes"]))
if devices_data_in_event_pack[device].get("telemetry"):
if device == self.name or device == "currentThingsBoardGateway":
self._published_events.put(
self.tb_client.client.send_telemetry(devices_data_in_event_pack[device]["telemetry"]))
else:
self._published_events.put(self.tb_client.client.gw_send_telemetry(final_device_name,
devices_data_in_event_pack[
device]["telemetry"]))
devices_data_in_event_pack[device] = {"telemetry": [], "attributes": {}}
except Exception as e:
log.exception(e)
def _rpc_request_handler(self, request_id, content):
try:
device = content.get("device")
if device is not None:
self.__rpc_to_devices_queue.put((request_id, content, monotonic()))
else:
try:
method_split = content["method"].split('_')
module = None
if len(method_split) > 0:
module = method_split[0]
if module is not None:
result = None
if self.connectors_configs.get(module):
log.debug("Connector \"%s\" for RPC request \"%s\" found", module, content["method"])
for connector_name in self.available_connectors_by_name:
if self.available_connectors_by_name[connector_name]._connector_type == module: # noqa pylint: disable=protected-access
log.debug("Sending command RPC %s to connector %s", content["method"],
connector_name)
content['id'] = request_id
result = self.available_connectors_by_name[connector_name].server_side_rpc_handler(content) # noqa E501
elif module == 'gateway' or module in self.__remote_shell.shell_commands:
result = self.__rpc_gateway_processing(request_id, content)
else:
log.error("Connector \"%s\" not found", module)
result = {"error": "%s - connector not found in available connectors." % module,
"code": 404}
if result is None:
self.send_rpc_reply(None, request_id, success_sent=False)
elif isinstance(result, dict) and "qos" in result:
self.send_rpc_reply(None, request_id,
dumps({k: v for k, v in result.items() if k != "qos"}),
quality_of_service=result["qos"])
else:
self.send_rpc_reply(None, request_id, dumps(result))
except Exception as e:
self.send_rpc_reply(None, request_id, "{\"error\":\"%s\", \"code\": 500}" % str(e))
log.exception(e)
except Exception as e:
log.exception(e)
def __rpc_to_devices_processing(self):
while not self.stopped:
if not self.__rpc_to_devices_queue.empty():
request_id, content, received_time = self.__rpc_to_devices_queue.get()
timeout = content.get("params", {}).get("timeout", self.DEFAULT_TIMEOUT)
if monotonic() - received_time > timeout:
log.error("RPC request %s timeout", request_id)
self.send_rpc_reply(content["device"], request_id, "{\"error\":\"Request timeout\", \"code\": 408}")
device = content.get("device")
if device in self.get_devices():
connector = self.get_devices()[content['device']].get(CONNECTOR_PARAMETER)
if connector is not None:
content['id'] = request_id
connector.server_side_rpc_handler(content)
else:
log.error("Received RPC request but connector for the device %s not found. Request data: \n %s",
content["device"],
dumps(content))
else:
self.__rpc_to_devices_queue.put((request_id, content, received_time))
else:
sleep(.01)
def __rpc_gateway_processing(self, request_id, content):
log.info("Received RPC request to the gateway, id: %s, method: %s", str(request_id), content["method"])
arguments = content.get('params', {})
method_to_call = content["method"].replace("gateway_", "")
result = None
if self.__remote_shell is not None:
method_function = self.__remote_shell.shell_commands.get(method_to_call,
self.__gateway_rpc_methods.get(method_to_call))
else:
log.info("Remote shell is disabled.")
method_function = self.__gateway_rpc_methods.get(method_to_call)
if method_function is None and method_to_call in self.__rpc_scheduled_methods_functions:
seconds_to_restart = arguments * 1000 if arguments and arguments != '{}' else 0
self.__scheduled_rpc_calls.append([time() * 1000 + seconds_to_restart,
self.__rpc_scheduled_methods_functions[method_to_call]])
log.info("Gateway %s scheduled in %i seconds", method_to_call, seconds_to_restart / 1000)
result = {"success": True}
elif method_function is None:
log.error("RPC method %s - Not found", content["method"])
return {"error": "Method not found", "code": 404}
elif isinstance(arguments, list):
result = method_function(*arguments)
elif arguments:
result = method_function(arguments)
else:
result = method_function()
return result
@staticmethod
def __rpc_ping(*args):
log.debug("Ping RPC request received with arguments %s", args)
return {"code": 200, "resp": "pong"}
def __rpc_devices(self, *args):
log.debug("Devices RPC request received with arguments %s", args)
data_to_send = {}
for device in self.__connected_devices:
if self.__connected_devices[device][CONNECTOR_PARAMETER] is not None:
data_to_send[device] = self.__connected_devices[device][CONNECTOR_PARAMETER].get_name()
return {"code": 200, "resp": data_to_send}
def __rpc_update(self, *args):
log.debug("Update RPC request received with arguments %s", args)
try:
result = {"resp": self.__updater.update(),
"code": 200,
}
except Exception as e:
result = {"error": str(e),
"code": 500
}
return result
def __rpc_version(self, *args):
log.debug("Version RPC request received with arguments %s", args)
try:
result = {"resp": self.__updater.get_version(), "code": 200}
except Exception as e:
result = {"error": str(e), "code": 500}
return result
def is_rpc_in_progress(self, topic):
return topic in self.__rpc_requests_in_progress
def rpc_with_reply_processing(self, topic, content):
req_id = self.__rpc_requests_in_progress[topic][0]["data"]["id"]
device = self.__rpc_requests_in_progress[topic][0]["device"]
log.info("Outgoing RPC. Device: %s, ID: %d", device, req_id)
self.send_rpc_reply(device, req_id, content)
@StatisticsService.CollectRPCReplyStatistics(start_stat_type='allBytesSentToTB')
def send_rpc_reply(self, device=None, req_id=None, content=None, success_sent=None, wait_for_publish=None,
quality_of_service=0):
self.__rpc_processing_queue.put((device, req_id, content, success_sent, wait_for_publish, quality_of_service))
def __send_rpc_reply_processing(self):
while not self.stopped:
if not self.__rpc_processing_queue.empty():
args = self.__rpc_processing_queue.get()
self.__send_rpc_reply(*args)
else:
sleep(.1)
def __send_rpc_reply(self, device=None, req_id=None, content=None, success_sent=None, wait_for_publish=None,
quality_of_service=0):
try:
self.__rpc_reply_sent = True
rpc_response = {"success": False}
if success_sent is not None:
if success_sent:
rpc_response["success"] = True
if device is not None and success_sent is not None:
self.tb_client.client.gw_send_rpc_reply(device, req_id, dumps(rpc_response),
quality_of_service=quality_of_service)
elif device is not None and req_id is not None and content is not None:
self.tb_client.client.gw_send_rpc_reply(device, req_id, content, quality_of_service=quality_of_service)
elif device is None and success_sent is not None:
self.tb_client.client.send_rpc_reply(req_id, dumps(rpc_response), quality_of_service=quality_of_service,
wait_for_publish=wait_for_publish)
elif device is None and content is not None:
self.tb_client.client.send_rpc_reply(req_id, content, quality_of_service=quality_of_service,
wait_for_publish=wait_for_publish)
self.__rpc_reply_sent = False
except Exception as e:
log.exception(e)
def register_rpc_request_timeout(self, content, timeout, topic, cancel_method):
# Put request in outgoing RPC queue. It will be eventually dispatched.
self.__rpc_register_queue.put({"topic": topic, "data": (content, timeout, cancel_method)}, False)
def cancel_rpc_request(self, rpc_request):
content = self.__rpc_requests_in_progress[rpc_request][0]
self.send_rpc_reply(device=content["device"], req_id=content["data"]["id"], success_sent=False)
def _attribute_update_callback(self, content, *args):
log.debug("Attribute request received with content: \"%s\"", content)
log.debug(args)
if content.get('device') is not None:
try:
self.__connected_devices[content["device"]][CONNECTOR_PARAMETER].on_attributes_update(content)
except Exception as e:
log.exception(e)
else:
self._attributes_parse(content)
def __form_statistics(self):
summary_messages = {"eventsProduced": 0, "eventsSent": 0}
for connector in self.available_connectors_by_name:
connector_camel_case = connector.replace(' ', '')
telemetry = {
(connector_camel_case + ' EventsProduced').replace(' ', ''):
self.available_connectors_by_name[connector].statistics.get('MessagesReceived', 0), # noqa
(connector_camel_case + ' EventsSent').replace(' ', ''):
self.available_connectors_by_name[connector].statistics.get('MessagesSent', 0) # noqa
}
summary_messages['eventsProduced'] += telemetry[
str(connector_camel_case + ' EventsProduced').replace(' ', '')]
summary_messages['eventsSent'] += telemetry[
str(connector_camel_case + ' EventsSent').replace(' ', '')]
summary_messages.update(telemetry)
return summary_messages
def add_device_async(self, data):
if data['deviceName'] not in self.__saved_devices:
self.__async_device_actions_queue.put((DeviceActions.CONNECT, data))
return Status.SUCCESS
else:
return Status.FAILURE
def add_device(self, device_name, content, device_type=None):
# if (device_name not in self.__added_devices
# or device_name not in self.__connected_devices
# or device_name not in self.__saved_devices
# or monotonic() - self.__added_devices[device_name]["last_send_ts"] > 60
# or (self.__added_devices[device_name]["device_details"]["connectorName"] != content['connector'].get_name() # noqa E501
# or self.__added_devices[device_name]["device_details"]["connectorType"] != content['connector'].get_type())): # noqa E501
device_type = device_type if device_type is not None else 'default'
self.__connected_devices[device_name] = {**content, DEVICE_TYPE_PARAMETER: device_type}
self.__saved_devices[device_name] = {**content, DEVICE_TYPE_PARAMETER: device_type}
self.__save_persistent_devices()
self.tb_client.client.gw_connect_device(device_name, device_type)
if device_name in self.__saved_devices:
device_details = {
'connectorType': content['connector'].get_type(),
'connectorName': content['connector'].get_name()
}
self.__added_devices[device_name] = {"device_details": device_details, "last_send_ts": monotonic()}
self.tb_client.client.gw_send_attributes(device_name, device_details)
def update_device(self, device_name, event, content):
should_save = False
if self.__connected_devices.get(device_name) is None:
return
if event == 'connector' and self.__connected_devices[device_name].get(event) != content:
should_save = True
self.__connected_devices[device_name][event] = content
if should_save:
self.__save_persistent_devices()
self.send_to_storage(connector_name=content.get_name(),
connector_id=content.get_id(),
data={"deviceName": device_name,
"deviceType": self.__connected_devices[device_name][DEVICE_TYPE_PARAMETER],
"attributes": [{"connectorName": content.name}]})
def del_device_async(self, data):
if data['deviceName'] in self.__saved_devices:
self.__async_device_actions_queue.put((DeviceActions.DISCONNECT, data))
return Status.SUCCESS
else:
return Status.FAILURE
def del_device(self, device_name):
if device_name in self.__connected_devices:
try:
self.tb_client.client.gw_disconnect_device(device_name)
except Exception as e:
log.exception("Error on disconnecting device %s", device_name, exc_info=e)
self.__connected_devices.pop(device_name)
self.__saved_devices.pop(device_name)
self.__save_persistent_devices()
def get_devices(self, connector_id: str = None):
return self.__connected_devices if connector_id is None else {
device_name: self.__connected_devices[device_name][DEVICE_TYPE_PARAMETER] for device_name in
self.__connected_devices.keys() if
self.__connected_devices[device_name].get(CONNECTOR_PARAMETER) is not None and
self.__connected_devices[device_name][CONNECTOR_PARAMETER].get_id() == connector_id}
def __process_async_device_actions(self):
while not self.stopped:
if not self.__async_device_actions_queue.empty():
action, data = self.__async_device_actions_queue.get()
if action == DeviceActions.CONNECT:
self.add_device(data['deviceName'],
{CONNECTOR_PARAMETER: self.available_connectors_by_name[data['name']]},
data.get('deviceType'))
elif action == DeviceActions.DISCONNECT:
self.del_device(data['deviceName'])
else:
sleep(.2)
def __load_persistent_connector_keys(self):
persistent_keys = {}
if PERSISTENT_GRPC_CONNECTORS_KEY_FILENAME in listdir(self._config_dir) and \
path.getsize(self._config_dir + PERSISTENT_GRPC_CONNECTORS_KEY_FILENAME) > 0:
try:
persistent_keys = load_file(self._config_dir + PERSISTENT_GRPC_CONNECTORS_KEY_FILENAME)
except Exception as e:
log.exception(e)
log.debug("Loaded keys: %s", persistent_keys)
else:
log.debug("Persistent keys file not found")
return persistent_keys
def __save_persistent_keys(self, persistent_keys):
try:
with open(self._config_dir + PERSISTENT_GRPC_CONNECTORS_KEY_FILENAME, 'w') as persistent_keys_file:
persistent_keys_file.write(dumps(persistent_keys, indent=2, sort_keys=True))
except Exception as e:
log.exception(e)
def __load_persistent_devices(self):
loaded_connected_devices = None
if CONNECTED_DEVICES_FILENAME in listdir(self._config_dir) and \
path.getsize(self._config_dir + CONNECTED_DEVICES_FILENAME) > 0:
try:
loaded_connected_devices = load_file(self._config_dir + CONNECTED_DEVICES_FILENAME)
except Exception as e:
log.exception(e)
else:
open(self._config_dir + CONNECTED_DEVICES_FILENAME, 'w').close()
if loaded_connected_devices is not None:
log.debug("Loaded devices:\n %s", loaded_connected_devices)
for device_name in loaded_connected_devices:
try:
if isinstance(loaded_connected_devices[device_name], str):
open(self._config_dir + CONNECTED_DEVICES_FILENAME, 'w').close()
log.debug("Old connected_devices file, new file will be created")
return
device_data_to_save = {}
if isinstance(loaded_connected_devices[device_name], list) \
and self.available_connectors_by_name.get(loaded_connected_devices[device_name][0]):
device_data_to_save = {
CONNECTOR_PARAMETER: self.available_connectors_by_name[loaded_connected_devices[device_name][0]], # noqa
DEVICE_TYPE_PARAMETER: loaded_connected_devices[device_name][1]}
if len(loaded_connected_devices[device_name]) > 2 and device_name not in self.__renamed_devices:
new_device_name = loaded_connected_devices[device_name][2]
self.__renamed_devices[device_name] = new_device_name
elif isinstance(loaded_connected_devices[device_name], dict):
connector = None
if not self.available_connectors_by_id.get(
loaded_connected_devices[device_name][CONNECTOR_ID_PARAMETER]):
log.warning("Connector with id %s not found, trying to use connector by name!",
loaded_connected_devices[device_name][CONNECTOR_ID_PARAMETER])
connector = self.available_connectors_by_name.get(
loaded_connected_devices[device_name][CONNECTOR_NAME_PARAMETER])
else:
connector = self.available_connectors_by_id.get(
loaded_connected_devices[device_name][CONNECTOR_ID_PARAMETER])
if connector is None:
log.warning("Connector with name %s not found! probably it is disabled, device %s will be "
"removed from the saved devices",
loaded_connected_devices[device_name][CONNECTOR_NAME_PARAMETER], device_name)
continue
device_data_to_save = {
CONNECTOR_PARAMETER: connector,
DEVICE_TYPE_PARAMETER: loaded_connected_devices[device_name][DEVICE_TYPE_PARAMETER]}
if loaded_connected_devices[device_name].get(RENAMING_PARAMETER) is not None:
new_device_name = loaded_connected_devices[device_name][RENAMING_PARAMETER]
self.__renamed_devices[device_name] = new_device_name
self.__connected_devices[device_name] = device_data_to_save
for device in list(self.__connected_devices.keys()):
self.add_device(device, self.__connected_devices[device], self.__connected_devices[device][
DEVICE_TYPE_PARAMETER])
self.__saved_devices[device_name] = device_data_to_save
except Exception as e:
log.exception(e)
continue
else:
log.debug("No device found in connected device file.")
self.__connected_devices = {} if self.__connected_devices is None else self.__connected_devices
def __save_persistent_devices(self):
with self.__lock:
data_to_save = {}
for device in self.__connected_devices:
if self.__connected_devices[device][CONNECTOR_PARAMETER] is not None:
data_to_save[device] = {
CONNECTOR_NAME_PARAMETER: self.__connected_devices[device][CONNECTOR_PARAMETER].get_name(),
DEVICE_TYPE_PARAMETER: self.__connected_devices[device][DEVICE_TYPE_PARAMETER],
CONNECTOR_ID_PARAMETER: self.__connected_devices[device][CONNECTOR_PARAMETER].get_id(),
RENAMING_PARAMETER: self.__renamed_devices.get(device)
}
with open(self._config_dir + CONNECTED_DEVICES_FILENAME, 'w') as config_file:
try:
config_file.write(dumps(data_to_save, indent=2, sort_keys=True))
except Exception as e:
log.exception(e)
log.debug("Saved connected devices.")
def __check_devices_idle_time(self):
check_devices_idle_every_sec = self.__devices_idle_checker.get('inactivityCheckPeriodSeconds', 1)
disconnect_device_after_idle = self.__devices_idle_checker.get('inactivityTimeoutSeconds', 50)
while not self.stopped:
for_deleting = []
for (device_name, device) in self.__connected_devices.items():
ts = time()
if not device.get('last_receiving_data'):
device['last_receiving_data'] = ts
last_receiving_data = device['last_receiving_data']
if ts - last_receiving_data >= disconnect_device_after_idle:
for_deleting.append(device_name)
for device_name in for_deleting:
self.del_device(device_name)
log.debug('Delete device %s for the reason of idle time > %s.',
device_name,
disconnect_device_after_idle)
sleep(check_devices_idle_every_sec)
# GETTERS --------------------
def ping(self):
return self.name
def get_max_payload_size_bytes(self):
return self.__config["thingsboard"].get("maxPayloadSizeBytes", 400)
# ----------------------------
# Storage --------------------
def get_storage_name(self):
return self._event_storage.__class__.__name__
def get_storage_events_count(self):
return self._event_storage.len()
# Connectors -----------------
def get_available_connectors(self):
return {num + 1: name for (num, name) in enumerate(self.available_connectors_by_name)}
def get_connector_status(self, name):
try:
connector = self.available_connectors_by_name[name]
return {'connected': connector.is_connected()}
except KeyError:
return f'Connector {name} not found!'
def get_connector_config(self, name):
try:
connector = self.available_connectors_by_name[name]
return connector.get_config()
except KeyError:
return f'Connector {name} not found!'
# Gateway ----------------------
def get_status(self):
return {'connected': self.tb_client.is_connected()}
if __name__ == '__main__':
TBGatewayService(
path.dirname(path.dirname(path.abspath(__file__))) + '/config/tb_gateway.yaml'.replace('/', path.sep))