mirror of
https://github.com/thingsboard/thingsboard-gateway
synced 2025-10-26 22:31:42 +08:00
Moved RPC to devicesprocessing to separated thread, to avoid hidden locking and refactored device connector addition
This commit is contained in:
@@ -173,6 +173,9 @@ class GatewayManager(multiprocessing.managers.BaseManager):
|
||||
|
||||
|
||||
class TBGatewayService:
|
||||
|
||||
DEFAULT_TIMEOUT = 5
|
||||
|
||||
EXPOSED_GETTERS = [
|
||||
'ping',
|
||||
'get_status',
|
||||
@@ -217,10 +220,6 @@ class TBGatewayService:
|
||||
# change main config if Gateway running with docker env variables
|
||||
self.__modify_main_config()
|
||||
|
||||
self.__test_env = False
|
||||
if environ.get('TB_BASE_URL') == 'http://127.0.0.1:9090':
|
||||
self.__test_env = True
|
||||
|
||||
log.info("Gateway starting...")
|
||||
self.__updater = TBUpdater()
|
||||
self.version = self.__updater.get_version()
|
||||
@@ -252,6 +251,9 @@ class TBGatewayService:
|
||||
self.__rpc_processing_thread = Thread(target=self.__send_rpc_reply_processing, daemon=True,
|
||||
name="RPC processing thread")
|
||||
self.__rpc_processing_thread.start()
|
||||
self.__rpc_to_devices_processing_thread = Thread(target=self.__rpc_to_devices_processing, daemon=True,
|
||||
name="RPC to devices processing thread")
|
||||
self.__rpc_to_devices_processing_thread.start()
|
||||
self._event_storage = self._event_storage_types[self.__config["storage"]["type"]](self.__config["storage"])
|
||||
|
||||
self.init_grpc_service(self.__config.get('grpc'))
|
||||
@@ -340,6 +342,7 @@ class TBGatewayService:
|
||||
|
||||
self._published_events = SimpleQueue()
|
||||
self.__rpc_processing_queue = SimpleQueue()
|
||||
self.__rpc_to_devices_queue = SimpleQueue()
|
||||
self.__async_device_actions_queue = SimpleQueue()
|
||||
self.__rpc_register_queue = SimpleQueue()
|
||||
self.__converted_data_queue = SimpleQueue()
|
||||
@@ -1261,14 +1264,7 @@ class TBGatewayService:
|
||||
try:
|
||||
device = content.get("device")
|
||||
if device is not None:
|
||||
connector = self.get_devices()[device].get(CONNECTOR_PARAMETER)
|
||||
if connector is not None:
|
||||
content['id'] = request_id
|
||||
connector.server_side_rpc_handler(content)
|
||||
else:
|
||||
log.error("Received RPC request but connector for the device %s not found. Request data: \n %s",
|
||||
content["device"],
|
||||
dumps(content))
|
||||
self.__rpc_to_devices_queue.put((request_id, content, monotonic()))
|
||||
else:
|
||||
try:
|
||||
method_split = content["method"].split('_')
|
||||
@@ -1305,6 +1301,30 @@ class TBGatewayService:
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
|
||||
def __rpc_to_devices_processing(self):
|
||||
while not self.stopped:
|
||||
if not self.__rpc_to_devices_queue.empty():
|
||||
request_id, content, received_time = self.__rpc_to_devices_queue.get()
|
||||
timeout = content.get("params", {}).get("timeout", self.DEFAULT_TIMEOUT)
|
||||
if monotonic() - received_time > timeout:
|
||||
log.error("RPC request %s timeout", request_id)
|
||||
self.send_rpc_reply(content["device"], request_id, "{\"error\":\"Request timeout\", \"code\": 408}")
|
||||
device = content.get("device")
|
||||
if device in self.get_devices():
|
||||
connector = self.get_devices()[content['device']].get(CONNECTOR_PARAMETER)
|
||||
if connector is not None:
|
||||
content['id'] = request_id
|
||||
connector.server_side_rpc_handler(content)
|
||||
else:
|
||||
log.error("Received RPC request but connector for the device %s not found. Request data: \n %s",
|
||||
content["device"],
|
||||
dumps(content))
|
||||
else:
|
||||
self.__rpc_to_devices_queue.put((request_id, content, received_time))
|
||||
sleep(.001)
|
||||
else:
|
||||
sleep(.01)
|
||||
|
||||
def __rpc_gateway_processing(self, request_id, content):
|
||||
log.info("Received RPC request to the gateway, id: %s, method: %s", str(request_id), content["method"])
|
||||
arguments = content.get('params', {})
|
||||
@@ -1454,10 +1474,12 @@ class TBGatewayService:
|
||||
return Status.FAILURE
|
||||
|
||||
def add_device(self, device_name, content, device_type=None):
|
||||
if device_name not in self.__added_devices or self.__test_env \
|
||||
or monotonic() - self.__added_devices[device_name]["last_send_ts"] > 60 \
|
||||
if (device_name not in self.__added_devices
|
||||
or device_name not in self.__connected_devices
|
||||
or device_name not in self.__saved_devices
|
||||
or monotonic() - self.__added_devices[device_name]["last_send_ts"] > 60
|
||||
or (self.__added_devices[device_name]["device_details"]["connectorName"] != content['connector'].get_name() # noqa E501
|
||||
or self.__added_devices[device_name]["device_details"]["connectorType"] != content['connector'].get_type()): # noqa E501
|
||||
or self.__added_devices[device_name]["device_details"]["connectorType"] != content['connector'].get_type())): # noqa E501
|
||||
|
||||
device_type = device_type if device_type is not None else 'default'
|
||||
self.__connected_devices[device_name] = {**content, DEVICE_TYPE_PARAMETER: device_type}
|
||||
|
||||
Reference in New Issue
Block a user