From a80cbf54c9d9b85b825499dd7cdb9701d50115f7 Mon Sep 17 00:00:00 2001 From: zbeacon Date: Fri, 28 Feb 2020 13:33:50 +0200 Subject: [PATCH] Improvements for the tb_client and RPC method for restarting the gateway (gateway_restart) and rebooting the device with gateway (gateway_reboot) --- .../gateway/tb_gateway_service.py | 73 +++++++++++++++---- .../tb_client/tb_gateway_mqtt.py | 6 +- 2 files changed, 60 insertions(+), 19 deletions(-) diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index d1bb347c..aa650e99 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -15,11 +15,10 @@ import logging.config import logging.handlers import time -# import yaml -from yaml import safe_dump, safe_load +from yaml import safe_load from simplejson import load, loads, dumps -from os import listdir, path -from sys import getsizeof +from os import listdir, path, execv, pathsep, system +from sys import getsizeof, executable, argv from threading import Thread, RLock from random import choice from string import ascii_lowercase @@ -46,6 +45,7 @@ class TBGatewayService: logging.config.fileConfig(self._config_dir + "logs.conf") global log log = logging.getLogger('service') + log.info("Gateway starting...") self.available_connectors = {} self.__connector_incoming_messages = {} self.__connected_devices = {} @@ -74,6 +74,14 @@ class TBGatewayService: "memory": MemoryEventStorage, "file": FileEventStorage, } + self.__gateway_rpc_methods = { + "ping": self.__rpc_ping, + } + self.__sheduled_rpc_calls = [] + self.__self_rpc_sheduled_methods_functions = { + "restart": {"function": execv, "arguments": (executable, [executable.split(pathsep)[-1]] + argv)}, + "reboot": {"function": system, "arguments": ("reboot 0",)}, + } self._event_storage = self._event_storage_types[config["storage"]["type"]](config["storage"]) self.connectors_configs = {} self._load_connectors(config) @@ -92,21 +100,33 @@ class TBGatewayService: self._send_thread = Thread(target=self.__read_data_from_storage, daemon=True, name="Send data to Thingsboard Thread") self._send_thread.start() + log.info("Gateway started.") try: gateway_statistic_send = 0 while True: cur_time = time.time()*1000 + if self.__sheduled_rpc_calls: + for rpc_call_index in range(len(self.__sheduled_rpc_calls)): + rpc_call = self.__sheduled_rpc_calls.pop(rpc_call_index) + if rpc_call != 'del' and cur_time > rpc_call[0]: + result = None + try: + result = rpc_call[1]["function"](*rpc_call[1]["arguments"]) + except Exception as e: + log.exception(e) + log.info(result) + else: + del rpc_call + rpc_call = "del" if self.__rpc_requests_in_progress and self.tb_client.is_connected(): for rpc_in_progress, data in self.__rpc_requests_in_progress.items(): if cur_time >= data[1]: data[2](rpc_in_progress) self.cancel_rpc_request(rpc_in_progress) self.__rpc_requests_in_progress[rpc_in_progress] = "del" - new_rpc_request_in_progress = {key:value for key, value in self.__rpc_requests_in_progress.items() if value != 'del'} + new_rpc_request_in_progress = {key: value for key, value in self.__rpc_requests_in_progress.items() if value != 'del'} self.__rpc_requests_in_progress = new_rpc_request_in_progress - - else: try: time.sleep(1) @@ -365,7 +385,7 @@ class TBGatewayService: except Exception as e: log.exception(e) - def _rpc_request_handler(self, id, content): + def _rpc_request_handler(self, request_id, content): try: device = content.get("device") if device is not None: @@ -390,27 +410,49 @@ class TBGatewayService: if self.available_connectors[connector_name]._connector_type == module: log.debug("Sending command RPC %s to connector %s", content["method"], connector_name) result = self.available_connectors[connector_name].server_side_rpc_handler(content) + elif module == 'gateway': + result = self.__rpc_gateway_processing(request_id, content) else: log.error("Connector \"%s\" not found", module) result = {"error": "%s - connector not found in available connectors." % module, "code": 404} if result is None: - self.send_rpc_reply(None, id, success_sent=False) + self.send_rpc_reply(None, request_id, success_sent=False) else: - self.send_rpc_reply(None, id, dumps(result)) - log.debug(content) + self.send_rpc_reply(None, request_id, dumps(result)) except Exception as e: - self.send_rpc_reply(None, id, "{\"error\":\"%s\", \"code\": 500}" % str(e)) + self.send_rpc_reply(None, request_id, "{\"error\":\"%s\", \"code\": 500}" % str(e)) log.exception(e) except Exception as e: log.exception(e) + def __rpc_gateway_processing(self, request_id, content): + log.info("Received RPC request to the gateway, id: %s, method: %s", str(request_id), content["method"]) + arguments = content.get('params') + method_to_call = content["method"].replace("gateway_", "") + result = None + if isinstance(arguments, list): + result = self.__gateway_rpc_methods[method_to_call](*arguments) + elif method_to_call in self.__self_rpc_sheduled_methods_functions: + seconds_to_restart = arguments*1000 if arguments else 0 + self.__sheduled_rpc_calls.append([time.time()*1000 + seconds_to_restart, self.__self_rpc_sheduled_methods_functions[method_to_call]]) + log.info("Gateway %s sheduled in %i seconds", method_to_call, seconds_to_restart/1000) + result = {"success": True} + elif arguments is not None: + result = self.__gateway_rpc_methods[method_to_call]() + else: + result = self.__gateway_rpc_methods[method_to_call]() + return result + + def __rpc_ping(self, *args): + return {"code": 200, "resp": "pong"} + def rpc_with_reply_processing(self, topic, content): req_id = self.__rpc_requests_in_progress[topic][0]["data"]["id"] device = self.__rpc_requests_in_progress[topic][0]["device"] self.send_rpc_reply(device, req_id, content) self.cancel_rpc_request(topic) - def send_rpc_reply(self, device=None, req_id=None, content=None, success_sent=None): + def send_rpc_reply(self, device=None, req_id=None, content=None, success_sent=None, wait_for_publish=None): try: rpc_response = {"success": False} if success_sent is not None: @@ -421,13 +463,12 @@ class TBGatewayService: elif device is not None and req_id is not None and content is not None: self.tb_client.client.gw_send_rpc_reply(device, req_id, content) elif device is None and success_sent is not None: - self.tb_client.client.send_rpc_reply(req_id, dumps(rpc_response), quality_of_service=1) + self.tb_client.client.send_rpc_reply(req_id, dumps(rpc_response), quality_of_service=1, wait_for_publish=wait_for_publish) elif device is None and content is not None: - self.tb_client.client.send_rpc_reply(req_id, content, quality_of_service=1) + self.tb_client.client.send_rpc_reply(req_id, content, quality_of_service=1, wait_for_publish=wait_for_publish) except Exception as e: log.exception(e) - def register_rpc_request_timeout(self, content, timeout, topic, cancel_method): self.__rpc_requests_in_progress[topic] = (content, timeout, cancel_method) diff --git a/thingsboard_gateway/tb_client/tb_gateway_mqtt.py b/thingsboard_gateway/tb_client/tb_gateway_mqtt.py index 3518884d..5c42b502 100644 --- a/thingsboard_gateway/tb_client/tb_gateway_mqtt.py +++ b/thingsboard_gateway/tb_client/tb_gateway_mqtt.py @@ -52,9 +52,9 @@ class TBGatewayMqttClient(TBDeviceMqttClient): super()._on_connect(client, userdata, flags, rc, *extra_params) if rc == 0: self._gw_subscriptions[int(self._client.subscribe(GATEWAY_ATTRIBUTES_TOPIC, qos=1)[1])] = GATEWAY_ATTRIBUTES_TOPIC - self._gw_subscriptions[int(self._client.subscribe(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC + "/+")[1])] = GATEWAY_ATTRIBUTES_RESPONSE_TOPIC - self._gw_subscriptions[int(self._client.subscribe(GATEWAY_RPC_TOPIC + "/+")[1])] = GATEWAY_RPC_TOPIC - self._gw_subscriptions[int(self._client.subscribe(GATEWAY_RPC_RESPONSE_TOPIC + "/+")[1])] = GATEWAY_RPC_RESPONSE_TOPIC + self._gw_subscriptions[int(self._client.subscribe(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC)[1])] = GATEWAY_ATTRIBUTES_RESPONSE_TOPIC + self._gw_subscriptions[int(self._client.subscribe(GATEWAY_RPC_TOPIC)[1])] = GATEWAY_RPC_TOPIC + # self._gw_subscriptions[int(self._client.subscribe(GATEWAY_RPC_RESPONSE_TOPIC)[1])] = GATEWAY_RPC_RESPONSE_TOPIC def _on_subscribe(self, client, userdata, mid, granted_qos): subscription = self._gw_subscriptions.get(mid)