1
0
mirror of https://github.com/thingsboard/thingsboard-gateway synced 2025-10-26 22:31:42 +08:00

Merge pull request #925 from samson0v/master

Added reserved get/set methods for connectors
This commit is contained in:
Illia Barkov
2022-09-19 16:21:21 +03:00
committed by GitHub
4 changed files with 212 additions and 119 deletions

View File

@@ -432,7 +432,7 @@ class ModbusConnector(Connector, Thread):
function_code = config.get('functionCode')
result = None
if function_code == 1:
#why count * 8 ? in my Modbus device one coils is 1bit, tow coils is 2bit,if * 8 can not read right coils
#why count * 8 ? in my Modbus device one coils is 1bit, tow coils is 2bit,if * 8 can not read right coils
# result = device.config['available_functions'][function_code](address=config[ADDRESS_PARAMETER],
# count=config.get(OBJECTS_COUNT_PARAMETER,
# config.get("registersCount",
@@ -511,27 +511,40 @@ class ModbusConnector(Connector, Thread):
)
)[0]
if isinstance(device.config[RPC_SECTION], dict):
rpc_command_config = device.config[RPC_SECTION].get(
server_rpc_request[DATA_PARAMETER][RPC_METHOD_PARAMETER])
rpc_method = server_rpc_request[DATA_PARAMETER].get(RPC_METHOD_PARAMETER)
# check if RPC method is reserved get/set
if rpc_method == 'get' or rpc_method == 'set':
params = {}
for param in server_rpc_request['data']['params'].split(';'):
try:
(key, value) = param.split('=')
except ValueError:
continue
if key and value:
params[key] = value if key not in ('functionCode', 'objectsCount', 'address') else int(
value)
self.__process_request(server_rpc_request, params)
elif isinstance(device.config[RPC_SECTION], dict):
rpc_command_config = device.config[RPC_SECTION].get(rpc_method)
if rpc_command_config is not None:
self.__process_request(server_rpc_request, rpc_command_config)
elif isinstance(device.config[RPC_SECTION], list):
for rpc_command_config in device.config[RPC_SECTION]:
if rpc_command_config[TAG_PARAMETER] == server_rpc_request[DATA_PARAMETER][
RPC_METHOD_PARAMETER]:
if rpc_command_config[TAG_PARAMETER] == rpc_method:
self.__process_request(server_rpc_request, rpc_command_config)
break
else:
log.error("Received rpc request, but method %s not found in config for %s.",
server_rpc_request[DATA_PARAMETER].get(RPC_METHOD_PARAMETER),
rpc_method,
self.get_name())
self.__gateway.send_rpc_reply(server_rpc_request[DEVICE_SECTION_PARAMETER],
server_rpc_request[DATA_PARAMETER][RPC_ID_PARAMETER],
{server_rpc_request[DATA_PARAMETER][
RPC_METHOD_PARAMETER]: "METHOD NOT FOUND!"})
{rpc_method: "METHOD NOT FOUND!"})
else:
log.debug("Received RPC to connector: %r", server_rpc_request)
except Exception as e:
@@ -614,8 +627,9 @@ class ModbusConnector(Connector, Thread):
self.in_progress = True
convert_function, params = self.__msg_queue.get(True, 10)
converted_data = convert_function(params)
log.info(converted_data)
self.__send_result(converted_data)
self.in_progress = False
if converted_data:
log.info(converted_data)
self.__send_result(converted_data)
self.in_progress = False
else:
sleep(.001)

View File

@@ -636,93 +636,111 @@ class MqttConnector(Connector, Thread):
else:
self.__log.error("Attribute updates config not found.")
def __process_rpc_request(self, content, rpc_config):
# This handler seems able to handle the request
self.__log.info("Candidate RPC handler found")
expects_response = rpc_config.get("responseTopicExpression")
defines_timeout = rpc_config.get("responseTimeout")
# 2-way RPC setup
if expects_response and defines_timeout:
expected_response_topic = rpc_config["responseTopicExpression"] \
.replace("${deviceName}", str(content["device"])) \
.replace("${methodName}", str(content['data']['method'])) \
.replace("${requestId}", str(content["data"]["id"]))
expected_response_topic = TBUtility.replace_params_tags(expected_response_topic, content)
timeout = time() * 1000 + rpc_config.get("responseTimeout")
# Start listenting on the response topic
self.__log.info("Subscribing to: %s", expected_response_topic)
self.__subscribe(expected_response_topic, rpc_config.get("responseTopicQoS", 1))
# Wait for subscription to be carried out
sub_response_timeout = 10
while expected_response_topic in self.__subscribes_sent.values():
sub_response_timeout -= 1
sleep(0.1)
if sub_response_timeout == 0:
break
# Ask the gateway to enqueue this as an RPC response
self.__gateway.register_rpc_request_timeout(content,
timeout,
expected_response_topic,
self.rpc_cancel_processing)
# Wait for RPC to be successfully enqueued, which never fails.
while self.__gateway.is_rpc_in_progress(expected_response_topic):
sleep(0.1)
elif expects_response and not defines_timeout:
self.__log.info("2-way RPC without timeout: treating as 1-way")
# Actually reach out for the device
request_topic: str = rpc_config.get("requestTopicExpression") \
.replace("${deviceName}", str(content["device"])) \
.replace("${methodName}", str(content['data']['method'])) \
.replace("${requestId}", str(content["data"]["id"]))
request_topic = TBUtility.replace_params_tags(request_topic, content)
data_to_send_tags = TBUtility.get_values(rpc_config.get('valueExpression'), content['data'],
'params',
get_tag=True)
data_to_send_values = TBUtility.get_values(rpc_config.get('valueExpression'), content['data'],
'params',
expression_instead_none=True)
data_to_send = rpc_config.get('valueExpression')
for (tag, value) in zip(data_to_send_tags, data_to_send_values):
data_to_send = data_to_send.replace('${' + tag + '}', simplejson.dumps(value))
try:
self.__log.info("Publishing to: %s with data %s", request_topic, data_to_send)
self._publish(request_topic, data_to_send, rpc_config.get('retain', False))
if not expects_response or not defines_timeout:
self.__log.info("One-way RPC: sending ack to ThingsBoard immediately")
self.__gateway.send_rpc_reply(device=content["device"], req_id=content["data"]["id"],
success_sent=True)
# Everything went out smoothly: RPC is served
return
except Exception as e:
self.__log.exception(e)
@StatisticsService.CollectAllReceivedBytesStatistics(start_stat_type='allReceivedBytesFromTB')
def server_side_rpc_handler(self, content):
self.__log.info("Incoming server-side RPC: %s", content)
# Check whether one of my RPC handlers can handle this request
for rpc_config in self.__server_side_rpc:
if search(rpc_config["deviceNameFilter"], content["device"]) \
and search(rpc_config["methodFilter"], content["data"]["method"]) is not None:
# This handler seems able to handle the request
self.__log.info("Candidate RPC handler found")
expects_response = rpc_config.get("responseTopicExpression")
defines_timeout = rpc_config.get("responseTimeout")
# 2-way RPC setup
if expects_response and defines_timeout:
expected_response_topic = rpc_config["responseTopicExpression"] \
.replace("${deviceName}", str(content["device"])) \
.replace("${methodName}", str(content["data"]["method"])) \
.replace("${requestId}", str(content["data"]["id"]))
expected_response_topic = TBUtility.replace_params_tags(expected_response_topic, content)
timeout = time() * 1000 + rpc_config.get("responseTimeout")
# Start listenting on the response topic
self.__log.info("Subscribing to: %s", expected_response_topic)
self.__subscribe(expected_response_topic, rpc_config.get("responseTopicQoS", 1))
# Wait for subscription to be carried out
sub_response_timeout = 10
while expected_response_topic in self.__subscribes_sent.values():
sub_response_timeout -= 1
sleep(0.1)
if sub_response_timeout == 0:
break
# Ask the gateway to enqueue this as an RPC response
self.__gateway.register_rpc_request_timeout(content,
timeout,
expected_response_topic,
self.rpc_cancel_processing)
# Wait for RPC to be successfully enqueued, which never fails.
while self.__gateway.is_rpc_in_progress(expected_response_topic):
sleep(0.1)
elif expects_response and not defines_timeout:
self.__log.info("2-way RPC without timeout: treating as 1-way")
# Actually reach out for the device
request_topic: str = rpc_config.get("requestTopicExpression") \
.replace("${deviceName}", str(content["device"])) \
.replace("${methodName}", str(content["data"]["method"])) \
.replace("${requestId}", str(content["data"]["id"]))
request_topic = TBUtility.replace_params_tags(request_topic, content)
data_to_send_tags = TBUtility.get_values(rpc_config.get('valueExpression'), content['data'],
'params',
get_tag=True)
data_to_send_values = TBUtility.get_values(rpc_config.get('valueExpression'), content['data'],
'params',
expression_instead_none=True)
data_to_send = rpc_config.get('valueExpression')
for (tag, value) in zip(data_to_send_tags, data_to_send_values):
data_to_send = data_to_send.replace('${' + tag + '}', simplejson.dumps(value))
rpc_method = content['data']['method']
# check if RPC method is reserved get/set
if rpc_method == 'get' or rpc_method == 'set':
params = {}
for param in content['data']['params'].split(';'):
try:
self.__log.info("Publishing to: %s with data %s", request_topic, data_to_send)
self._publish(request_topic, data_to_send, rpc_config.get('retain', False))
(key, value) = param.split('=')
except ValueError:
continue
if not expects_response or not defines_timeout:
self.__log.info("One-way RPC: sending ack to ThingsBoard immediately")
self.__gateway.send_rpc_reply(device=content["device"], req_id=content["data"]["id"],
success_sent=True)
if key and value:
params[key] = value
# Everything went out smoothly: RPC is served
return
return self.__process_rpc_request(content, params)
else:
# Check whether one of my RPC handlers can handle this request
for rpc_config in self.__server_side_rpc:
if search(rpc_config["deviceNameFilter"], content["device"]) \
and search(rpc_config["methodFilter"], rpc_method) is not None:
except Exception as e:
self.__log.exception(e)
return self.__process_rpc_request(content, rpc_config)
self.__log.error("RPC not handled: %s", content)
self.__log.error("RPC not handled: %s", content)
@CustomCollectStatistics(start_stat_type='allBytesSentToDevices')
def _publish(self, request_topic, data_to_send, retain):

View File

@@ -64,6 +64,10 @@ class RESTConnector(Connector, Thread):
self.statistics = {'MessagesReceived': 0,
'MessagesSent': 0}
self.__gateway = gateway
self._default_downlink_converter = TBModuleLoader.import_module(self._connector_type,
self._default_converters['downlink'])
self._default_uplink_converter = TBModuleLoader.import_module(self._connector_type,
self._default_converters['uplink'])
self.__USER_DATA = {}
self.setName(config.get("name", 'REST Connector ' + ''.join(choice(ascii_lowercase) for _ in range(5))))
self._app = None
@@ -209,22 +213,51 @@ class RESTConnector(Connector, Thread):
def server_side_rpc_handler(self, content):
try:
for rpc_request in self.__rpc_requests:
if fullmatch(rpc_request["deviceNameFilter"], content["device"]) and \
fullmatch(rpc_request["methodFilter"], content["data"]["method"]):
converted_data = rpc_request["downlink_converter"].convert(rpc_request, content)
rpc_method = content['data']['method']
request_dict = {"config": {**rpc_request,
**converted_data},
"request": regular_request}
request_dict["converter"] = request_dict["config"].get("uplink_converter")
# check if RPC method is reserved get/set
if rpc_method == 'get' or rpc_method == 'set':
params = {}
for param in content['data']['params'].split(';'):
try:
(key, value) = param.split('=')
except ValueError:
continue
response = self.__send_request(request_dict, Queue(1), log, with_queue=False)
if key and value:
params[key] = value
log.debug('Response from RPC request: %s', response)
self.__gateway.send_rpc_reply(device=content["device"],
req_id=content["data"]["id"],
content=response[2] if response and len(response) >= 3 else response)
uplink_converter = self._default_uplink_converter
downlink_converter = self._default_downlink_converter
converted_data = downlink_converter.convert(params, content)
request_dict = {'config': {**params, **converted_data}, 'request': regular_request,
'converter': uplink_converter}
response = self.__send_request(request_dict, Queue(1), log, with_queue=False)
log.debug('Response from RPC request: %s', response)
self.__gateway.send_rpc_reply(device=content["device"],
req_id=content["data"]["id"],
content=response[2] if response and len(
response) >= 3 else response)
else:
for rpc_request in self.__rpc_requests:
if fullmatch(rpc_request["deviceNameFilter"], content["device"]) and \
fullmatch(rpc_request["methodFilter"], rpc_method):
converted_data = rpc_request["downlink_converter"].convert(rpc_request, content)
request_dict = {"config": {**rpc_request,
**converted_data},
"request": regular_request}
request_dict["converter"] = request_dict["config"].get("uplink_converter")
response = self.__send_request(request_dict, Queue(1), log, with_queue=False)
log.debug('Response from RPC request: %s', response)
self.__gateway.send_rpc_reply(device=content["device"],
req_id=content["data"]["id"],
content=response[2] if response and len(
response) >= 3 else response)
except Exception as e:
log.exception(e)

View File

@@ -337,26 +337,54 @@ class SocketConnector(Connector, Thread):
try:
device = tuple(filter(lambda item: item['deviceName'] == content['device'], self.__config['devices']))[0]
for rpc_config in device['serverSideRpc']:
for (key, value) in content['data'].items():
if value == rpc_config['methodRPC']:
rpc_method = rpc_config['methodProcessing']
return_result = rpc_config['withResponse']
result = None
# check if RPC method is reserved set
if content['data']['method'] == 'set':
params = {}
for param in content['data']['params'].split(';'):
try:
(key, value) = param.split('=')
except ValueError:
continue
address, port = device['address'].split(':')
encoding = device.get('encoding', 'utf-8').lower()
converted_data = bytes(str(content['data']['params']), encoding=encoding)
if key and value:
params[key] = value
if rpc_method.upper() == 'WRITE':
if self.__socket_type == 'TCP':
result = self.__write_value_via_tcp(address, port, converted_data)
else:
self.__write_value_via_udp(address, port, converted_data)
result = None
try:
if self.__socket_type == 'TCP':
result = self.__write_value_via_tcp(params['address'], int(params['port']), params['value'])
else:
self.__write_value_via_udp(params['address'], int(params['port']), params['value'])
except KeyError:
self.__gateway.send_rpc_reply(content['device'], content['data']['id'], 'Not enough params')
except ValueError:
self.__gateway.send_rpc_reply(content['device'], content['data']['id'],
'Param "port" have to be int type')
else:
self.__gateway.send_rpc_reply(content['device'], content['data']['id'], str(result))
else:
for rpc_config in device['serverSideRpc']:
for (key, value) in content['data'].items():
if value == rpc_config['methodRPC']:
rpc_method = rpc_config['methodProcessing']
return_result = rpc_config['withResponse']
result = None
if return_result and self.__socket_type == 'TCP':
self.__gateway.send_rpc_reply(content['device'], content['data']['id'], str(result))
address, port = device['address'].split(':')
encoding = device.get('encoding', 'utf-8').lower()
converted_data = bytes(str(content['data']['params']), encoding=encoding)
return
if rpc_method.upper() == 'WRITE':
if self.__socket_type == 'TCP':
result = self.__write_value_via_tcp(address, port, converted_data)
else:
self.__write_value_via_udp(address, port, converted_data)
if return_result and self.__socket_type == 'TCP':
self.__gateway.send_rpc_reply(content['device'], content['data']['id'], str(result))
return
except IndexError:
self.__log.error('Device not found')
except Exception as e:
self.__log.exception(e)