mirror of
https://github.com/thingsboard/thingsboard-gateway
synced 2025-10-26 22:31:42 +08:00
Improvements for the tb_client and RPC method for restarting the gateway (gateway_restart) and rebooting the device with gateway (gateway_reboot)
This commit is contained in:
@@ -15,11 +15,10 @@
|
||||
import logging.config
|
||||
import logging.handlers
|
||||
import time
|
||||
# import yaml
|
||||
from yaml import safe_dump, safe_load
|
||||
from yaml import safe_load
|
||||
from simplejson import load, loads, dumps
|
||||
from os import listdir, path
|
||||
from sys import getsizeof
|
||||
from os import listdir, path, execv, pathsep, system
|
||||
from sys import getsizeof, executable, argv
|
||||
from threading import Thread, RLock
|
||||
from random import choice
|
||||
from string import ascii_lowercase
|
||||
@@ -46,6 +45,7 @@ class TBGatewayService:
|
||||
logging.config.fileConfig(self._config_dir + "logs.conf")
|
||||
global log
|
||||
log = logging.getLogger('service')
|
||||
log.info("Gateway starting...")
|
||||
self.available_connectors = {}
|
||||
self.__connector_incoming_messages = {}
|
||||
self.__connected_devices = {}
|
||||
@@ -74,6 +74,14 @@ class TBGatewayService:
|
||||
"memory": MemoryEventStorage,
|
||||
"file": FileEventStorage,
|
||||
}
|
||||
self.__gateway_rpc_methods = {
|
||||
"ping": self.__rpc_ping,
|
||||
}
|
||||
self.__sheduled_rpc_calls = []
|
||||
self.__self_rpc_sheduled_methods_functions = {
|
||||
"restart": {"function": execv, "arguments": (executable, [executable.split(pathsep)[-1]] + argv)},
|
||||
"reboot": {"function": system, "arguments": ("reboot 0",)},
|
||||
}
|
||||
self._event_storage = self._event_storage_types[config["storage"]["type"]](config["storage"])
|
||||
self.connectors_configs = {}
|
||||
self._load_connectors(config)
|
||||
@@ -92,21 +100,33 @@ class TBGatewayService:
|
||||
self._send_thread = Thread(target=self.__read_data_from_storage, daemon=True,
|
||||
name="Send data to Thingsboard Thread")
|
||||
self._send_thread.start()
|
||||
log.info("Gateway started.")
|
||||
|
||||
try:
|
||||
gateway_statistic_send = 0
|
||||
while True:
|
||||
cur_time = time.time()*1000
|
||||
if self.__sheduled_rpc_calls:
|
||||
for rpc_call_index in range(len(self.__sheduled_rpc_calls)):
|
||||
rpc_call = self.__sheduled_rpc_calls.pop(rpc_call_index)
|
||||
if rpc_call != 'del' and cur_time > rpc_call[0]:
|
||||
result = None
|
||||
try:
|
||||
result = rpc_call[1]["function"](*rpc_call[1]["arguments"])
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
log.info(result)
|
||||
else:
|
||||
del rpc_call
|
||||
rpc_call = "del"
|
||||
if self.__rpc_requests_in_progress and self.tb_client.is_connected():
|
||||
for rpc_in_progress, data in self.__rpc_requests_in_progress.items():
|
||||
if cur_time >= data[1]:
|
||||
data[2](rpc_in_progress)
|
||||
self.cancel_rpc_request(rpc_in_progress)
|
||||
self.__rpc_requests_in_progress[rpc_in_progress] = "del"
|
||||
new_rpc_request_in_progress = {key:value for key, value in self.__rpc_requests_in_progress.items() if value != 'del'}
|
||||
new_rpc_request_in_progress = {key: value for key, value in self.__rpc_requests_in_progress.items() if value != 'del'}
|
||||
self.__rpc_requests_in_progress = new_rpc_request_in_progress
|
||||
|
||||
|
||||
else:
|
||||
try:
|
||||
time.sleep(1)
|
||||
@@ -365,7 +385,7 @@ class TBGatewayService:
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
|
||||
def _rpc_request_handler(self, id, content):
|
||||
def _rpc_request_handler(self, request_id, content):
|
||||
try:
|
||||
device = content.get("device")
|
||||
if device is not None:
|
||||
@@ -390,27 +410,49 @@ class TBGatewayService:
|
||||
if self.available_connectors[connector_name]._connector_type == module:
|
||||
log.debug("Sending command RPC %s to connector %s", content["method"], connector_name)
|
||||
result = self.available_connectors[connector_name].server_side_rpc_handler(content)
|
||||
elif module == 'gateway':
|
||||
result = self.__rpc_gateway_processing(request_id, content)
|
||||
else:
|
||||
log.error("Connector \"%s\" not found", module)
|
||||
result = {"error": "%s - connector not found in available connectors." % module, "code": 404}
|
||||
if result is None:
|
||||
self.send_rpc_reply(None, id, success_sent=False)
|
||||
self.send_rpc_reply(None, request_id, success_sent=False)
|
||||
else:
|
||||
self.send_rpc_reply(None, id, dumps(result))
|
||||
log.debug(content)
|
||||
self.send_rpc_reply(None, request_id, dumps(result))
|
||||
except Exception as e:
|
||||
self.send_rpc_reply(None, id, "{\"error\":\"%s\", \"code\": 500}" % str(e))
|
||||
self.send_rpc_reply(None, request_id, "{\"error\":\"%s\", \"code\": 500}" % str(e))
|
||||
log.exception(e)
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
|
||||
def __rpc_gateway_processing(self, request_id, content):
|
||||
log.info("Received RPC request to the gateway, id: %s, method: %s", str(request_id), content["method"])
|
||||
arguments = content.get('params')
|
||||
method_to_call = content["method"].replace("gateway_", "")
|
||||
result = None
|
||||
if isinstance(arguments, list):
|
||||
result = self.__gateway_rpc_methods[method_to_call](*arguments)
|
||||
elif method_to_call in self.__self_rpc_sheduled_methods_functions:
|
||||
seconds_to_restart = arguments*1000 if arguments else 0
|
||||
self.__sheduled_rpc_calls.append([time.time()*1000 + seconds_to_restart, self.__self_rpc_sheduled_methods_functions[method_to_call]])
|
||||
log.info("Gateway %s sheduled in %i seconds", method_to_call, seconds_to_restart/1000)
|
||||
result = {"success": True}
|
||||
elif arguments is not None:
|
||||
result = self.__gateway_rpc_methods[method_to_call]()
|
||||
else:
|
||||
result = self.__gateway_rpc_methods[method_to_call]()
|
||||
return result
|
||||
|
||||
def __rpc_ping(self, *args):
|
||||
return {"code": 200, "resp": "pong"}
|
||||
|
||||
def rpc_with_reply_processing(self, topic, content):
|
||||
req_id = self.__rpc_requests_in_progress[topic][0]["data"]["id"]
|
||||
device = self.__rpc_requests_in_progress[topic][0]["device"]
|
||||
self.send_rpc_reply(device, req_id, content)
|
||||
self.cancel_rpc_request(topic)
|
||||
|
||||
def send_rpc_reply(self, device=None, req_id=None, content=None, success_sent=None):
|
||||
def send_rpc_reply(self, device=None, req_id=None, content=None, success_sent=None, wait_for_publish=None):
|
||||
try:
|
||||
rpc_response = {"success": False}
|
||||
if success_sent is not None:
|
||||
@@ -421,13 +463,12 @@ class TBGatewayService:
|
||||
elif device is not None and req_id is not None and content is not None:
|
||||
self.tb_client.client.gw_send_rpc_reply(device, req_id, content)
|
||||
elif device is None and success_sent is not None:
|
||||
self.tb_client.client.send_rpc_reply(req_id, dumps(rpc_response), quality_of_service=1)
|
||||
self.tb_client.client.send_rpc_reply(req_id, dumps(rpc_response), quality_of_service=1, wait_for_publish=wait_for_publish)
|
||||
elif device is None and content is not None:
|
||||
self.tb_client.client.send_rpc_reply(req_id, content, quality_of_service=1)
|
||||
self.tb_client.client.send_rpc_reply(req_id, content, quality_of_service=1, wait_for_publish=wait_for_publish)
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
|
||||
|
||||
def register_rpc_request_timeout(self, content, timeout, topic, cancel_method):
|
||||
self.__rpc_requests_in_progress[topic] = (content, timeout, cancel_method)
|
||||
|
||||
|
||||
@@ -52,9 +52,9 @@ class TBGatewayMqttClient(TBDeviceMqttClient):
|
||||
super()._on_connect(client, userdata, flags, rc, *extra_params)
|
||||
if rc == 0:
|
||||
self._gw_subscriptions[int(self._client.subscribe(GATEWAY_ATTRIBUTES_TOPIC, qos=1)[1])] = GATEWAY_ATTRIBUTES_TOPIC
|
||||
self._gw_subscriptions[int(self._client.subscribe(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC + "/+")[1])] = GATEWAY_ATTRIBUTES_RESPONSE_TOPIC
|
||||
self._gw_subscriptions[int(self._client.subscribe(GATEWAY_RPC_TOPIC + "/+")[1])] = GATEWAY_RPC_TOPIC
|
||||
self._gw_subscriptions[int(self._client.subscribe(GATEWAY_RPC_RESPONSE_TOPIC + "/+")[1])] = GATEWAY_RPC_RESPONSE_TOPIC
|
||||
self._gw_subscriptions[int(self._client.subscribe(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC)[1])] = GATEWAY_ATTRIBUTES_RESPONSE_TOPIC
|
||||
self._gw_subscriptions[int(self._client.subscribe(GATEWAY_RPC_TOPIC)[1])] = GATEWAY_RPC_TOPIC
|
||||
# self._gw_subscriptions[int(self._client.subscribe(GATEWAY_RPC_RESPONSE_TOPIC)[1])] = GATEWAY_RPC_RESPONSE_TOPIC
|
||||
|
||||
def _on_subscribe(self, client, userdata, mid, granted_qos):
|
||||
subscription = self._gw_subscriptions.get(mid)
|
||||
|
||||
Reference in New Issue
Block a user