1
0
mirror of https://github.com/thingsboard/thingsboard-gateway synced 2025-10-26 22:31:42 +08:00

Added connectors RPC

This commit is contained in:
samson0v
2023-09-06 11:58:35 +03:00
parent d57021857e
commit 9b49056e25
7 changed files with 138 additions and 51 deletions

View File

@@ -546,18 +546,31 @@ class ModbusConnector(Connector, Thread):
def server_side_rpc_handler(self, server_rpc_request):
try:
if server_rpc_request.get('data') is None:
server_rpc_request['data'] = {'params': server_rpc_request['params'],
'method': server_rpc_request['method']}
rpc_method = server_rpc_request['data']['method']
# check if RPC type is connector RPC (can be only 'set')
try:
(connector_type, rpc_method_name) = rpc_method.split('_')
if connector_type == self._connector_type:
rpc_method = rpc_method_name
server_rpc_request['device'] = server_rpc_request['params'].split(' ')[0].split('=')[-1]
except (IndexError, ValueError):
pass
if server_rpc_request.get(DEVICE_SECTION_PARAMETER) is not None:
self.__log.debug("Modbus connector received rpc request for %s with server_rpc_request: %s",
server_rpc_request[DEVICE_SECTION_PARAMETER],
server_rpc_request)
server_rpc_request[DEVICE_SECTION_PARAMETER],
server_rpc_request)
device = tuple(
filter(
lambda slave: slave.name == server_rpc_request[DEVICE_SECTION_PARAMETER], self.__slaves
)
)[0]
rpc_method = server_rpc_request[DATA_PARAMETER].get(RPC_METHOD_PARAMETER)
# check if RPC method is reserved get/set
if rpc_method == 'get' or rpc_method == 'set':
params = {}
@@ -635,8 +648,8 @@ class ModbusConnector(Connector, Thread):
},
data=to_converter)
self.__log.debug("Received %s method: %s, result: %r", request_type,
content[DATA_PARAMETER][RPC_METHOD_PARAMETER],
response)
content[DATA_PARAMETER][RPC_METHOD_PARAMETER],
response)
elif isinstance(response, (WriteMultipleRegistersResponse,
WriteMultipleCoilsResponse,
WriteSingleCoilResponse,
@@ -648,13 +661,14 @@ class ModbusConnector(Connector, Thread):
content.get(DATA_PARAMETER) is not None and content[DATA_PARAMETER].get(
RPC_ID_PARAMETER)) is not None:
if isinstance(response, Exception):
self.__gateway.send_rpc_reply(content[DEVICE_SECTION_PARAMETER],
content[DATA_PARAMETER][RPC_ID_PARAMETER],
{content[DATA_PARAMETER][RPC_METHOD_PARAMETER]: str(response)})
self.__gateway.send_rpc_reply(device=content[DEVICE_SECTION_PARAMETER],
req_id=content[DATA_PARAMETER].get(RPC_ID_PARAMETER),
content={
content[DATA_PARAMETER][RPC_METHOD_PARAMETER]: str(response)})
else:
self.__gateway.send_rpc_reply(content[DEVICE_SECTION_PARAMETER],
content[DATA_PARAMETER][RPC_ID_PARAMETER],
response)
self.__gateway.send_rpc_reply(device=content[DEVICE_SECTION_PARAMETER],
req_id=content[DATA_PARAMETER].get(RPC_ID_PARAMETER),
content=response)
self.__log.debug("%r", response)

View File

@@ -771,10 +771,12 @@ class MqttConnector(Connector, Thread):
# 2-way RPC setup
if expects_response and defines_timeout:
expected_response_topic = rpc_config["responseTopicExpression"] \
.replace("${deviceName}", str(content["device"])) \
.replace("${methodName}", str(content['data']['method'])) \
.replace("${requestId}", str(content["data"]["id"]))
if content.get('device'):
expected_response_topic.replace("${deviceName}", str(content["device"]))
expected_response_topic = TBUtility.replace_params_tags(expected_response_topic, content)
timeout = time() * 1000 + rpc_config.get("responseTimeout")
@@ -807,10 +809,12 @@ class MqttConnector(Connector, Thread):
# Actually reach out for the device
request_topic: str = rpc_config.get("requestTopicExpression") \
.replace("${deviceName}", str(content["device"])) \
.replace("${methodName}", str(content['data']['method'])) \
.replace("${requestId}", str(content["data"]["id"]))
if content['data'].get('device'):
request_topic.replace("${deviceName}", str(content["device"]))
request_topic = TBUtility.replace_params_tags(request_topic, content)
data_to_send_tags = TBUtility.get_values(rpc_config.get('valueExpression'), content['data'],
@@ -842,8 +846,19 @@ class MqttConnector(Connector, Thread):
def server_side_rpc_handler(self, content):
self.__log.info("Incoming server-side RPC: %s", content)
if content.get('data') is None:
content['data'] = {'params': content['params'], 'method': content['method']}
rpc_method = content['data']['method']
# check if RPC type is connector RPC (can be only 'get' or 'set')
try:
(connector_type, rpc_method_name) = rpc_method.split('_')
if connector_type == self._connector_type:
rpc_method = rpc_method_name
except ValueError:
pass
# check if RPC method is reserved get/set
if rpc_method == 'get' or rpc_method == 'set':
params = {}

View File

@@ -266,12 +266,25 @@ class OpcUaConnector(Thread, Connector):
@StatisticsService.CollectAllReceivedBytesStatistics(start_stat_type='allReceivedBytesFromTB')
def server_side_rpc_handler(self, content):
try:
if content.get('data') is None:
content['data'] = {'params': content['params'], 'method': content['method']}
rpc_method = content["data"].get("method")
# check if RPC type is connector RPC (can be only 'get' or 'set')
try:
(connector_type, rpc_method_name) = rpc_method.split('_')
if connector_type == self._connector_type:
rpc_method = rpc_method_name
content['device'] = content['params'].split(' ')[0].split('=')[-1]
except (ValueError, IndexError):
pass
# firstly check if a method is not service
if rpc_method == 'set' or rpc_method == 'get':
full_path = ''
args_list = []
device = content.get('device')
try:
args_list = content['data']['params'].split(';')
@@ -281,44 +294,45 @@ class OpcUaConnector(Thread, Connector):
full_path = args_list[0].split('=')[-1]
except IndexError:
self._log.error('Not enough arguments. Expected min 2.')
self.__gateway.send_rpc_reply(content['device'],
content['data']['id'],
{content['data']['method']: 'Not enough arguments. Expected min 2.',
'code': 400})
self.__gateway.send_rpc_reply(device=device,
req_id=content['data'].get('id'),
content={content['data'][
'method']: 'Not enough arguments. Expected min 2.',
'code': 400})
node_list = []
self.__search_node(current_node=content['device'], fullpath=full_path, result=node_list)
self.__search_node(current_node=device, fullpath=full_path, result=node_list)
node = None
try:
node = node_list[0]
except IndexError:
self.__gateway.send_rpc_reply(content['device'], content['data']['id'],
{content['data']['method']: 'Node didn\'t find!',
'code': 500})
self.__gateway.send_rpc_reply(device=device, req_id=content['data'].get('id'),
content={content['data']['method']: 'Node didn\'t find!',
'code': 500})
if rpc_method == 'get':
self.__gateway.send_rpc_reply(content['device'],
content['data']['id'],
{content['data']['method']: node.get_value(),
'code': 200})
self.__gateway.send_rpc_reply(device=device,
req_id=content['data'].get('id'),
content={content['data']['method']: node.get_value(), 'code': 200})
else:
try:
value = args_list[2].split('=')[-1]
node.set_value(value)
self.__gateway.send_rpc_reply(content['device'],
content['data']['id'],
{'success': 'true', 'code': 200})
self.__gateway.send_rpc_reply(device=device,
req_id=content['data'].get('id'),
content={'success': 'true', 'code': 200})
except ValueError:
self._log.error('Method SET take three arguments!')
self.__gateway.send_rpc_reply(content['device'],
content['data']['id'],
{'error': 'Method SET take three arguments!', 'code': 400})
self.__gateway.send_rpc_reply(device=device,
req_id=content['data'].get('id'),
content={'error': 'Method SET take three arguments!',
'code': 400})
except ua.UaStatusCodeError:
self._log.error('Write method doesn\'t allow!')
self.__gateway.send_rpc_reply(content['device'],
content['data']['id'],
{'error': 'Write method doesn\'t allow!', 'code': 400})
self.__gateway.send_rpc_reply(device=device,
req_id=content['data'].get('id'),
content={'error': 'Write method doesn\'t allow!', 'code': 400})
for method in self.__available_object_resources[content["device"]]['methods']:
if rpc_method is not None and method.get(rpc_method) is not None:

View File

@@ -440,12 +440,25 @@ class OpcUaConnectorAsyncIO(Connector, Thread):
def server_side_rpc_handler(self, content):
try:
if content.get('data') is None:
content['data'] = {'params': content['params'], 'method': content['method']}
rpc_method = content["data"].get("method")
# check if RPC type is connector RPC (can be only 'get' or 'set')
try:
(connector_type, rpc_method_name) = rpc_method.split('_')
if connector_type == self._connector_type:
rpc_method = rpc_method_name
content['device'] = content['params'].split(' ')[0].split('=')[-1]
except (ValueError, IndexError):
pass
# firstly check if a method is not service
if rpc_method == 'set' or rpc_method == 'get':
full_path = ''
args_list = []
device = content.get('device')
try:
args_list = content['data']['params'].split(';')
@@ -456,10 +469,11 @@ class OpcUaConnectorAsyncIO(Connector, Thread):
full_path = args_list[0].split('=')[-1]
except IndexError:
self.__log.error('Not enough arguments. Expected min 2.')
self.__gateway.send_rpc_reply(content['device'],
content['data']['id'],
{content['data']['method']: 'Not enough arguments. Expected min 2.',
'code': 400})
self.__gateway.send_rpc_reply(device=device,
req_id=content['data'].get('id'),
content={content['data'][
'method']: 'Not enough arguments. Expected min 2.',
'code': 400})
result = {}
if rpc_method == 'get':
@@ -474,9 +488,9 @@ class OpcUaConnectorAsyncIO(Connector, Thread):
while not task.done():
sleep(.2)
self.__gateway.send_rpc_reply(content['device'],
content['data']['id'],
{content['data']['method']: result})
self.__gateway.send_rpc_reply(device=device,
req_id=content['data'].get('id'),
content={content['data']['method']: result})
else:
device = tuple(filter(lambda i: i.name == content['device'], self.__device_nodes))[0]

View File

@@ -221,10 +221,23 @@ class RESTConnector(Connector, Thread):
def server_side_rpc_handler(self, content):
try:
if content.get('data') is None:
content['data'] = {'params': content['params'], 'method': content['method']}
rpc_method = content['data']['method']
# check if RPC type is connector RPC (can be only 'get' or 'set')
try:
(connector_type, rpc_method_name) = rpc_method.split('_')
if connector_type == self._connector_type:
rpc_method = rpc_method_name
content['device'] = content['params'].split(' ')[0].split('=')[-1]
except (IndexError, ValueError):
pass
# check if RPC method is reserved get/set
if rpc_method == 'get' or rpc_method == 'set':
device = content.get('device')
params = {}
for param in content['data']['params'].split(';'):
try:
@@ -244,8 +257,8 @@ class RESTConnector(Connector, Thread):
response = self.__send_request(request_dict, Queue(1), self.__log, with_queue=False)
self.__log.debug('Response from RPC request: %s', response)
self.__gateway.send_rpc_reply(device=content["device"],
req_id=content["data"]["id"],
self.__gateway.send_rpc_reply(device=device,
req_id=content["data"].get('id'),
content=response[2] if response and len(
response) >= 3 else response)
else:

View File

@@ -342,10 +342,24 @@ class SocketConnector(Connector, Thread):
@StatisticsService.CollectAllReceivedBytesStatistics(start_stat_type='allReceivedBytesFromTB')
def server_side_rpc_handler(self, content):
try:
if content.get('data') is None:
content['data'] = {'params': content['params'], 'method': content['method']}
rpc_method = content['data']['method']
# check if RPC type is connector RPC (can be only 'set')
try:
(connector_type, rpc_method_name) = rpc_method.split('_')
if connector_type == self._connector_type:
rpc_method = rpc_method_name
content['device'] = content['params'].split(' ')[0].split('=')[-1]
except (IndexError, ValueError):
pass
device = tuple(filter(lambda item: item['deviceName'] == content['device'], self.__config['devices']))[0]
# check if RPC method is reserved set
if content['data']['method'] == 'set':
if rpc_method == 'set':
params = {}
for param in content['data']['params'].split(';'):
try:
@@ -363,12 +377,13 @@ class SocketConnector(Connector, Thread):
else:
self.__write_value_via_udp(params['address'], int(params['port']), params['value'])
except KeyError:
self.__gateway.send_rpc_reply(content['device'], content['data']['id'], 'Not enough params')
self.__gateway.send_rpc_reply(device=device, req_id=content['data'].get('id'),
content='Not enough params')
except ValueError:
self.__gateway.send_rpc_reply(content['device'], content['data']['id'],
'Param "port" have to be int type')
self.__gateway.send_rpc_reply(device=device, req_id=content['data']['id'],
content='Param "port" have to be int type')
else:
self.__gateway.send_rpc_reply(content['device'], content['data']['id'], str(result))
self.__gateway.send_rpc_reply(device=device, req_id=content['data'].get('id'), content=str(result))
else:
for rpc_config in device['serverSideRpc']:
for (key, value) in content['data'].items():

View File

@@ -83,12 +83,14 @@ class TbLogger(logging.Logger):
self._is_on_init_state = False
def error(self, msg, *args, **kwargs):
super(TbLogger, self).error(msg, *args, **kwargs, stacklevel=2)
kwargs['stacklevel'] = 2
super(TbLogger, self).error(msg, *args, **kwargs)
self._send_error_count()
def exception(self, msg, *args, **kwargs) -> None:
attr_name = kwargs.pop('attr_name', None)
super(TbLogger, self).exception(msg, *args, **kwargs, stacklevel=2)
kwargs['stacklevel'] = 2
super(TbLogger, self).exception(msg, *args, **kwargs)
self._send_error_count(error_attr_name=attr_name)
def _send_error_count(self, error_attr_name=None):