From 12a88a9693c1ad3ddcbf681cd67409258ab839d8 Mon Sep 17 00:00:00 2001 From: zbeacon Date: Tue, 13 Apr 2021 11:37:24 +0300 Subject: [PATCH] Refactoring in Modbus Connector --- .../connectors/modbus/constants.py | 63 ++++ .../connectors/modbus/modbus_connector.py | 320 ++++++++++-------- thingsboard_gateway/gateway/constants.py | 45 +++ 3 files changed, 284 insertions(+), 144 deletions(-) create mode 100644 thingsboard_gateway/connectors/modbus/constants.py create mode 100644 thingsboard_gateway/gateway/constants.py diff --git a/thingsboard_gateway/connectors/modbus/constants.py b/thingsboard_gateway/connectors/modbus/constants.py new file mode 100644 index 00000000..86b2d675 --- /dev/null +++ b/thingsboard_gateway/connectors/modbus/constants.py @@ -0,0 +1,63 @@ +# Copyright 2021. ThingsBoard +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from thingsboard_gateway.gateway.constants import * + +# Connector constants + +LAST_PREFIX = "last_" +NEXT_PREFIX = "next_" + +CHECK_POSTFIX = "_check" +POLL_PERIOD_POSTFIX = "PollPeriod" + +TIMESERIES_PARAMETER = "timeseries" + +MASTER_PARAMETER = "master" +AVAILABLE_FUNCTIONS_PARAMETER = "available_functions" + +CONNECTION_ATTEMPT_PARAMETER = "connection_attempt" +LAST_CONNECTION_ATTEMPT_TIME_PARAMETER = "last_connection_attempt_time" + +# Configuration parameters + +RPC_SECTION = "rpc" + +BYTE_ORDER_PARAMETER = "byteOrder" +WORD_ORDER_PARAMETER = "wordOrder" +SEND_DATA_ONLY_ON_CHANGE_PARAMETER = "sendDataOnlyOnChange" +CONNECT_ATTEMPT_COUNT_PARAMETER = "connectAttemptCount" +CONNECT_ATTEMPT_TIME_MS_PARAMETER = "connectAttemptTimeMs" +WAIT_AFTER_FAILED_ATTEMPTS_MS_PARAMETER = "waitAfterFailedAttemptsMs" + +FUNCTION_CODE_PARAMETER = "functionCode" + +ADDRESS_PARAMETER = "address" +OBJECTS_COUNT_PARAMETER = "objectsCount" + +UNIT_ID_PARAMETER = "unitId" + +HOST_PARAMETER = "host" +PORT_PARAMETER = "port" +BAUDRATE_PARAMETER = "baudrate" +TIMEOUT_PARAMETER = "timeout" +METHOD_PARAMETER = "method" +STOPBITS_PARAMETER = "stopbits" +BYTESIZE_PARAMETER = "bytesize" +PARITY_PARAMETER = "parity" +STRICT_PARAMETER = "strict" +TYPE_PARAMETER = "type" + +PAYLOAD_PARAMETER = "payload" +TAG_PARAMETER = "tag" diff --git a/thingsboard_gateway/connectors/modbus/modbus_connector.py b/thingsboard_gateway/connectors/modbus/modbus_connector.py index 88ceb77e..d229fee1 100644 --- a/thingsboard_gateway/connectors/modbus/modbus_connector.py +++ b/thingsboard_gateway/connectors/modbus/modbus_connector.py @@ -19,6 +19,7 @@ from string import ascii_lowercase from thingsboard_gateway.tb_utility.tb_loader import TBModuleLoader from thingsboard_gateway.tb_utility.tb_utility import TBUtility + # Try import Pymodbus library or install it and import try: from pymodbus.constants import Defaults @@ -30,37 +31,44 @@ except ImportError: from pymodbus.client.sync import ModbusTcpClient, ModbusUdpClient, ModbusSerialClient, ModbusRtuFramer, ModbusSocketFramer from pymodbus.bit_write_message import WriteSingleCoilResponse, WriteMultipleCoilsResponse -from pymodbus.register_write_message import WriteMultipleRegistersResponse, \ - WriteSingleRegisterResponse +from pymodbus.register_write_message import WriteMultipleRegistersResponse, WriteSingleRegisterResponse from pymodbus.register_read_message import ReadRegistersResponseBase from pymodbus.bit_read_message import ReadBitsResponseBase from pymodbus.exceptions import ConnectionException from thingsboard_gateway.connectors.connector import Connector, log +from thingsboard_gateway.connectors.modbus.constants import * from thingsboard_gateway.connectors.modbus.bytes_modbus_uplink_converter import BytesModbusUplinkConverter from thingsboard_gateway.connectors.modbus.bytes_modbus_downlink_converter import BytesModbusDownlinkConverter - -CONVERTED_DATA_SECTIONS = ["attributes", "telemetry"] +CONVERTED_DATA_SECTIONS = [ATTRIBUTES_PARAMETER, TELEMETRY_PARAMETER] class ModbusConnector(Connector, threading.Thread): def __init__(self, gateway, config, connector_type): - self.statistics = {'MessagesReceived': 0, - 'MessagesSent': 0} + self.statistics = {STATISTIC_MESSAGE_RECEIVED_PARAMETER: 0, + STATISTIC_MESSAGE_SENT_PARAMETER: 0} super().__init__() self.__gateway = gateway self._connector_type = connector_type - self.__config = config.get("server") + self.__config = config.get(CONFIG_SERVER_SECTION_PARAMETER) self.__current_master, self.__available_functions = self.__configure_master() - self.__default_config_parameters = ['host', 'port', 'baudrate', 'timeout', 'method', 'stopbits', 'bytesize', 'parity', 'strict', 'type'] - self.__byte_order = self.__config.get("byteOrder") - self.__word_order = self.__config.get("wordOrder") + self.__default_config_parameters = [HOST_PARAMETER, + PORT_PARAMETER, + BAUDRATE_PARAMETER, + TIMEOUT_PARAMETER, + METHOD_PARAMETER, + STOPBITS_PARAMETER, + BYTESIZE_PARAMETER, + PARITY_PARAMETER, + STRICT_PARAMETER, + TYPE_PARAMETER] + self.__byte_order = self.__config.get(BYTE_ORDER_PARAMETER) + self.__word_order = self.__config.get(WORD_ORDER_PARAMETER) self.__configure_master() self.__devices = {} - self.setName(self.__config.get("name", - 'Modbus Default ' + ''.join(choice(ascii_lowercase) for _ in range(5)))) + self.setName(self.__config.get("name", 'Modbus Default ' + ''.join(choice(ascii_lowercase) for _ in range(5)))) self.__load_converters() self.__connected = False self.__stopped = False @@ -86,28 +94,28 @@ class ModbusConnector(Connector, threading.Thread): def __load_converters(self): try: - for device in self.__config["devices"]: - if self.__config.get("converter") is not None: - converter = TBModuleLoader.import_module(self._connector_type, self.__config["converter"])(device) + for device in self.__config[CONFIG_DEVICES_SECTION_PARAMETER]: + if self.__config.get(UPLINK_PREFIX + CONVERTER_PARAMETER) is not None: + converter = TBModuleLoader.import_module(self._connector_type, self.__config[UPLINK_PREFIX + CONVERTER_PARAMETER])(device) else: converter = BytesModbusUplinkConverter(device) - if self.__config.get("downlink_converter") is not None: - downlink_converter = TBModuleLoader.import_module(self._connector_type, self.__config["downlink_converter"])(device) + if self.__config.get(DOWNLINK_PREFIX + CONVERTER_PARAMETER) is not None: + downlink_converter = TBModuleLoader.import_module(self._connector_type, self.__config[DOWNLINK_PREFIX + CONVERTER_PARAMETER])(device) else: downlink_converter = BytesModbusDownlinkConverter(device) - if device.get('deviceName') not in self.__gateway.get_devices(): - self.__gateway.add_device(device.get('deviceName'), {"connector": self}, device_type=device.get("deviceType")) - self.__devices[device["deviceName"]] = {"config": device, - "converter": converter, - "downlink_converter": downlink_converter, - "next_attributes_check": 0, - "next_timeseries_check": 0, - "telemetry": {}, - "attributes": {}, - "last_telemetry": {}, - "last_attributes": {}, - "connection_attempt": 0 - } + if device.get(DEVICE_NAME_PARAMETER) not in self.__gateway.get_devices(): + self.__gateway.add_device(device.get(DEVICE_NAME_PARAMETER), {CONNECTOR_PARAMETER: self}, device_type=device.get(DEVICE_TYPE_PARAMETER)) + self.__devices[device[DEVICE_NAME_PARAMETER]] = {CONFIG_SECTION_PARAMETER: device, + UPLINK_PREFIX + CONVERTER_PARAMETER: converter, + DOWNLINK_PREFIX + CONVERTER_PARAMETER: downlink_converter, + NEXT_PREFIX + ATTRIBUTES_PARAMETER + CHECK_POSTFIX: 0, + NEXT_PREFIX + TIMESERIES_PARAMETER + CHECK_POSTFIX: 0, + TELEMETRY_PARAMETER: {}, + ATTRIBUTES_PARAMETER: {}, + LAST_PREFIX + TELEMETRY_PARAMETER: {}, + LAST_PREFIX + ATTRIBUTES_PARAMETER: {}, + CONNECTION_ATTEMPT_PARAMETER: 0 + } except Exception as e: log.exception(e) @@ -122,63 +130,69 @@ class ModbusConnector(Connector, threading.Thread): def __process_devices(self): for device in self.__devices: current_time = time.time() - device_responses = {"timeseries": {}, - "attributes": {}, + device_responses = {TIMESERIES_PARAMETER: {}, + ATTRIBUTES_PARAMETER: {}, } to_send = {} try: for config_section in device_responses: - if self.__devices[device]["config"].get(config_section) is not None: - current_device_config = self.__devices[device]["config"] - unit_id = current_device_config["unitId"] - if self.__devices[device]["next_"+config_section+"_check"] < current_time: + if self.__devices[device][CONFIG_SECTION_PARAMETER].get(config_section) is not None: + current_device_config = self.__devices[device][CONFIG_SECTION_PARAMETER] + unit_id = current_device_config[UNIT_ID_PARAMETER] + if self.__devices[device][NEXT_PREFIX + config_section + CHECK_POSTFIX] < current_time: self.__connect_to_current_master(device) if not self.__current_master.is_socket_open() or not len(current_device_config[config_section]): continue # Reading data from device for interested_data in range(len(current_device_config[config_section])): current_data = current_device_config[config_section][interested_data] - current_data["deviceName"] = device + current_data[DEVICE_NAME_PARAMETER] = device input_data = self.__function_to_device(current_data, unit_id) - device_responses[config_section][current_data["tag"]] = {"data_sent": current_data, - "input_data": input_data} + device_responses[config_section][current_data[TAG_PARAMETER]] = {"data_sent": current_data, + "input_data": input_data} log.debug("Checking %s for device %s", config_section, device) - self.__devices[device]["next_"+config_section+"_check"] = current_time + current_device_config[config_section+"PollPeriod"]/1000 + self.__devices[device][NEXT_PREFIX + config_section + CHECK_POSTFIX] = current_time + current_device_config[ + config_section + POLL_PERIOD_POSTFIX] / 1000 log.debug(device_responses) converted_data = {} try: - converted_data = self.__devices[device]["converter"].convert(config={**current_device_config, - "byteOrder": current_device_config.get("byteOrder", self.__byte_order), - "wordOrder": current_device_config.get("wordOrder", self.__word_order)}, - data=device_responses) + converted_data = self.__devices[device][UPLINK_PREFIX + CONVERTER_PARAMETER].convert(config={ + **current_device_config, + BYTE_ORDER_PARAMETER: current_device_config.get(BYTE_ORDER_PARAMETER, self.__byte_order), + WORD_ORDER_PARAMETER: current_device_config.get(WORD_ORDER_PARAMETER, self.__word_order) + }, + data=device_responses) except Exception as e: log.error(e) - to_send = {"deviceName": converted_data["deviceName"], "deviceType": converted_data["deviceType"], - "telemetry": [], "attributes": []} - if converted_data and current_device_config.get("sendDataOnlyOnChange"): - self.statistics['MessagesReceived'] += 1 + to_send = {DEVICE_NAME_PARAMETER: converted_data[DEVICE_NAME_PARAMETER], + DEVICE_TYPE_PARAMETER: converted_data[DEVICE_TYPE_PARAMETER], + TELEMETRY_PARAMETER: [], + ATTRIBUTES_PARAMETER: [] + } + if converted_data and current_device_config.get(SEND_DATA_ONLY_ON_CHANGE_PARAMETER): + self.statistics[STATISTIC_MESSAGE_RECEIVED_PARAMETER] += 1 for converted_data_section in CONVERTED_DATA_SECTIONS: for current_section_dict in converted_data[converted_data_section]: for key, value in current_section_dict.items(): - if self.__devices[device]["last_" + converted_data_section].get(key) is None or \ - self.__devices[device]["last_" + converted_data_section][key] != value: - self.__devices[device]["last_" + converted_data_section][key] = value + if self.__devices[device][LAST_PREFIX + converted_data_section].get(key) is None or \ + self.__devices[device][LAST_PREFIX + converted_data_section][key] != value: + self.__devices[device][LAST_PREFIX + converted_data_section][key] = value to_send[converted_data_section].append({key: value}) - if not to_send.get("attributes") and not to_send.get("telemetry"): + if not to_send.get(ATTRIBUTES_PARAMETER) and not to_send.get(TELEMETRY_PARAMETER): log.debug("Data has not been changed.") continue - elif converted_data and current_device_config.get("sendDataOnlyOnChange") is None or \ - not current_device_config.get("sendDataOnlyOnChange"): - self.statistics['MessagesReceived'] += 1 + elif converted_data and current_device_config.get(SEND_DATA_ONLY_ON_CHANGE_PARAMETER) is None or \ + not current_device_config.get(SEND_DATA_ONLY_ON_CHANGE_PARAMETER): + self.statistics[STATISTIC_MESSAGE_RECEIVED_PARAMETER] += 1 for converted_data_section in CONVERTED_DATA_SECTIONS: - self.__devices[device]["last_" + converted_data_section] = converted_data[converted_data_section] + self.__devices[device][LAST_PREFIX + converted_data_section] = converted_data[converted_data_section] to_send[converted_data_section] = converted_data[converted_data_section] - if to_send.get("attributes") or to_send.get("telemetry"): + if to_send.get(ATTRIBUTES_PARAMETER) or to_send.get(TELEMETRY_PARAMETER): self.__gateway.send_to_storage(self.get_name(), to_send) - self.statistics['MessagesSent'] += 1 + self.statistics[STATISTIC_MESSAGE_SENT_PARAMETER] += 1 except ConnectionException: time.sleep(5) log.error("Connection lost! Reconnecting...") @@ -187,16 +201,16 @@ class ModbusConnector(Connector, threading.Thread): def on_attributes_update(self, content): try: - for attribute_updates_command_config in self.__devices[content["device"]]["config"]["attributeUpdates"]: - for attribute_updated in content["data"]: - if attribute_updates_command_config["tag"] == attribute_updated: + for attribute_updates_command_config in self.__devices[content[DEVICE_SECTION_PARAMETER]][CONFIG_SECTION_PARAMETER]["attributeUpdates"]: + for attribute_updated in content[DATA_PARAMETER]: + if attribute_updates_command_config[TAG_PARAMETER] == attribute_updated: to_process = { - "device": content["device"], - "data": { - "method": attribute_updated, - "params": content["data"][attribute_updated] + DEVICE_SECTION_PARAMETER: content[DEVICE_SECTION_PARAMETER], + DATA_PARAMETER: { + RPC_METHOD_PARAMETER: attribute_updated, + RPC_PARAMS_PARAMETER: content[DATA_PARAMETER][attribute_updated] + } } - } self.__process_rpc_request(to_process, attribute_updates_command_config) except Exception as e: log.exception(e) @@ -207,52 +221,64 @@ class ModbusConnector(Connector, threading.Thread): wait_after_failed_attempts_ms = 300000 if device is None: device = list(self.__devices.keys())[0] - if self.__devices[device].get('master') is None: - self.__devices[device]['master'], self.__devices[device]['available_functions'] = self.__configure_master( - self.__devices[device]["config"]) - if self.__devices[device]['master'] != self.__current_master: - self.__current_master = self.__devices[device]['master'] - self.__available_functions = self.__devices[device]['available_functions'] - connect_attempt_count = self.__devices[device]["config"].get("connectAttemptCount", connect_attempt_count) - connect_attempt_time_ms = self.__devices[device]["config"].get("connectAttemptTimeMs", connect_attempt_time_ms) - wait_after_failed_attempts_ms = self.__devices[device]["config"].get("waitAfterFailedAttemptsMs", wait_after_failed_attempts_ms) + if self.__devices[device].get(MASTER_PARAMETER) is None: + self.__devices[device][MASTER_PARAMETER], self.__devices[device][AVAILABLE_FUNCTIONS_PARAMETER] = self.__configure_master( + self.__devices[device][CONFIG_SECTION_PARAMETER]) + if self.__devices[device][MASTER_PARAMETER] != self.__current_master: + self.__current_master = self.__devices[device][MASTER_PARAMETER] + self.__available_functions = self.__devices[device][AVAILABLE_FUNCTIONS_PARAMETER] + connect_attempt_count = self.__devices[device][CONFIG_SECTION_PARAMETER].get(CONNECT_ATTEMPT_COUNT_PARAMETER, connect_attempt_count) + if connect_attempt_count < 1: + connect_attempt_count = 1 + connect_attempt_time_ms = self.__devices[device][CONFIG_SECTION_PARAMETER].get(CONNECT_ATTEMPT_TIME_MS_PARAMETER, connect_attempt_time_ms) + if connect_attempt_time_ms < 500: + connect_attempt_time_ms = 500 + wait_after_failed_attempts_ms = self.__devices[device][CONFIG_SECTION_PARAMETER].get(WAIT_AFTER_FAILED_ATTEMPTS_MS_PARAMETER, wait_after_failed_attempts_ms) + if wait_after_failed_attempts_ms < 1000: + wait_after_failed_attempts_ms = 1000 current_time = time.time() * 1000 if not self.__current_master.is_socket_open(): - if self.__devices[device]["connection_attempt"] >= connect_attempt_count and \ - self.__devices[device]["last_connection_attempt_time"] - current_time >= wait_after_failed_attempts_ms: - self.__devices[device]["connection_attempt"] = 0 - while not self.__current_master.is_socket_open() and self.__devices[device]["connection_attempt"] < connect_attempt_count: - self.__devices[device]["connection_attempt"] = self.__devices[device]["connection_attempt"] + 1 - self.__devices[device]["last_connection_attempt_time"] = current_time - self.__current_master.connect() - if not self.__current_master.is_socket_open(): - time.sleep(connect_attempt_time_ms / 1000) + if self.__devices[device][CONNECTION_ATTEMPT_PARAMETER] >= connect_attempt_count and \ + current_time - self.__devices[device][LAST_CONNECTION_ATTEMPT_TIME_PARAMETER] >= wait_after_failed_attempts_ms: + self.__devices[device][CONNECTION_ATTEMPT_PARAMETER] = 0 + while not self.__current_master.is_socket_open() \ + and self.__devices[device][CONNECTION_ATTEMPT_PARAMETER] < connect_attempt_count \ + and current_time - self.__devices[device].get(LAST_CONNECTION_ATTEMPT_TIME_PARAMETER, 0) >= connect_attempt_time_ms: + self.__devices[device][CONNECTION_ATTEMPT_PARAMETER] = self.__devices[device][CONNECTION_ATTEMPT_PARAMETER] + 1 + self.__devices[device][LAST_CONNECTION_ATTEMPT_TIME_PARAMETER] = current_time log.debug("Modbus trying connect to %s", device) - if self.__devices[device]["connection_attempt"] >= 0 and self.__current_master.is_socket_open(): - self.__devices[device]["connection_attempt"] = 0 + self.__current_master.connect() + if self.__devices[device][CONNECTION_ATTEMPT_PARAMETER] == connect_attempt_count: + log.warn("Maximum attempt count (%i) for device \"%s\" - encountered.", connect_attempt_count, device) + # time.sleep(connect_attempt_time_ms / 1000) + # if not self.__current_master.is_socket_open(): + if self.__devices[device][CONNECTION_ATTEMPT_PARAMETER] >= 0 and self.__current_master.is_socket_open(): + self.__devices[device][CONNECTION_ATTEMPT_PARAMETER] = 0 + self.__devices[device][LAST_CONNECTION_ATTEMPT_TIME_PARAMETER] = current_time log.debug("Modbus connected to device %s.", device) def __configure_master(self, config=None): current_config = self.__config if config is None else config - host = current_config['host'] if current_config.get("host") is not None else self.__config.get("host", "localhost") + host = current_config[HOST_PARAMETER] if current_config.get(HOST_PARAMETER) is not None else self.__config.get(HOST_PARAMETER, "localhost") try: - port = int(current_config['port']) if current_config.get("port") is not None else self.__config.get(int("port"), 502) + port = int(current_config[PORT_PARAMETER]) if current_config.get(PORT_PARAMETER) is not None else self.__config.get(int(PORT_PARAMETER), 502) except ValueError: - port = current_config['port'] if current_config.get("port") is not None else self.__config.get("port", 502) - baudrate = current_config['baudrate'] if current_config.get('baudrate') is not None else self.__config.get('baudrate', 19200) - timeout = current_config['timeout'] if current_config.get("timeout") is not None else self.__config.get("timeout", 35) - method = current_config['method'] if current_config.get('method') is not None else self.__config.get('method', 'rtu') - stopbits = current_config['stopbits'] if current_config.get('stopbits') is not None else self.__config.get('stopbits', Defaults.Stopbits) - bytesize = current_config['bytesize'] if current_config.get('bytesize') is not None else self.__config.get('bytesize', Defaults.Bytesize) - parity = current_config['parity'] if current_config.get('parity') is not None else self.__config.get('parity', Defaults.Parity) - strict = current_config["strict"] if current_config.get("strict") is not None else self.__config.get("strict", True) - rtu = ModbusRtuFramer if current_config.get("method") == "rtu" or (current_config.get("method") is None and self.__config.get("method") == "rtu") else ModbusSocketFramer - if current_config.get('type') == 'tcp' or (current_config.get("type") is None and self.__config.get("type") == "tcp"): + port = current_config[PORT_PARAMETER] if current_config.get(PORT_PARAMETER) is not None else self.__config.get(PORT_PARAMETER, 502) + baudrate = current_config[BAUDRATE_PARAMETER] if current_config.get(BAUDRATE_PARAMETER) is not None else self.__config.get(BAUDRATE_PARAMETER, 19200) + timeout = current_config[TIMEOUT_PARAMETER] if current_config.get(TIMEOUT_PARAMETER) is not None else self.__config.get(TIMEOUT_PARAMETER, 35) + method = current_config[METHOD_PARAMETER] if current_config.get(METHOD_PARAMETER) is not None else self.__config.get(METHOD_PARAMETER, "rtu") + stopbits = current_config[STOPBITS_PARAMETER] if current_config.get(STOPBITS_PARAMETER) is not None else self.__config.get(STOPBITS_PARAMETER, Defaults.Stopbits) + bytesize = current_config[BYTESIZE_PARAMETER] if current_config.get(BYTESIZE_PARAMETER) is not None else self.__config.get(BYTESIZE_PARAMETER, Defaults.Bytesize) + parity = current_config[PARITY_PARAMETER] if current_config.get(PARITY_PARAMETER) is not None else self.__config.get(PARITY_PARAMETER, Defaults.Parity) + strict = current_config[STRICT_PARAMETER] if current_config.get(STRICT_PARAMETER) is not None else self.__config.get(STRICT_PARAMETER, True) + rtu = ModbusRtuFramer if current_config.get(METHOD_PARAMETER) == "rtu" or ( + current_config.get(METHOD_PARAMETER) is None and self.__config.get(METHOD_PARAMETER) == "rtu") else ModbusSocketFramer + if current_config.get(TYPE_PARAMETER) == 'tcp' or (current_config.get(TYPE_PARAMETER) is None and self.__config.get(TYPE_PARAMETER) == "tcp"): master = ModbusTcpClient(host, port, rtu, timeout=timeout) - elif current_config.get('type') == 'udp' or (current_config.get("type") is None and self.__config.get("type") == "udp"): + elif current_config.get(TYPE_PARAMETER) == 'udp' or (current_config.get(TYPE_PARAMETER) is None and self.__config.get(TYPE_PARAMETER) == "udp"): master = ModbusUdpClient(host, port, rtu, timeout=timeout) - elif current_config.get('type') == 'serial' or (current_config.get("type") is None and self.__config.get("type") == "serial"): + elif current_config.get(TYPE_PARAMETER) == 'serial' or (current_config.get(TYPE_PARAMETER) is None and self.__config.get(TYPE_PARAMETER) == "serial"): master = ModbusSerialClient(method=method, port=port, timeout=timeout, @@ -272,27 +298,27 @@ class ModbusConnector(Connector, threading.Thread): 6: master.write_register, 15: master.write_coils, 16: master.write_registers, - } + } return master, available_functions def __stop_connections_to_masters(self): for device in self.__devices: - self.__devices[device]['master'].close() + self.__devices[device][MASTER_PARAMETER].close() def __function_to_device(self, config, unit_id): - function_code = config.get('functionCode') + function_code = config.get(FUNCTION_CODE_PARAMETER) result = None if function_code in (1, 2, 3, 4): - result = self.__available_functions[function_code](address=config["address"], - count=config.get("objectsCount", config.get("registersCount", config.get("registerCount", 1))), + result = self.__available_functions[function_code](address=config[ADDRESS_PARAMETER], + count=config.get(OBJECTS_COUNT_PARAMETER, config.get("registersCount", config.get("registerCount", 1))), unit=unit_id) elif function_code in (5, 6): - result = self.__available_functions[function_code](address=config["address"], - value=config["payload"], + result = self.__available_functions[function_code](address=config[ADDRESS_PARAMETER], + value=config[PAYLOAD_PARAMETER], unit=unit_id) elif function_code in (15, 16): - result = self.__available_functions[function_code](address=config["address"], - values=config["payload"], + result = self.__available_functions[function_code](address=config[ADDRESS_PARAMETER], + values=config[PAYLOAD_PARAMETER], unit=unit_id) else: log.error("Unknown Modbus function with code: %i", function_code) @@ -301,56 +327,62 @@ class ModbusConnector(Connector, threading.Thread): log.exception(result) return result - def server_side_rpc_handler(self, content): + def server_side_rpc_handler(self, server_rpc_request): try: - if content.get("device") is not None: + if server_rpc_request.get(DEVICE_SECTION_PARAMETER) is not None: - log.debug("Modbus connector received rpc request for %s with content: %s", content["device"], content) - if isinstance(self.__devices[content["device"]]["config"]["rpc"], dict): - rpc_command_config = self.__devices[content["device"]]["config"]["rpc"].get(content["data"]["method"]) + log.debug("Modbus connector received rpc request for %s with server_rpc_request: %s", + server_rpc_request[DEVICE_SECTION_PARAMETER], + server_rpc_request) + if isinstance(self.__devices[server_rpc_request[DEVICE_SECTION_PARAMETER]][CONFIG_SECTION_PARAMETER][RPC_SECTION], dict): + rpc_command_config = self.__devices[server_rpc_request[DEVICE_SECTION_PARAMETER]][CONFIG_SECTION_PARAMETER][RPC_SECTION].get( + server_rpc_request[DATA_PARAMETER][RPC_METHOD_PARAMETER]) if rpc_command_config is not None: - self.__process_rpc_request(content, rpc_command_config) - elif isinstance(self.__devices[content["device"]]["config"]["rpc"], list): - for rpc_command_config in self.__devices[content["device"]]["config"]["rpc"]: - if rpc_command_config["tag"] == content["data"]["method"]: - self.__process_rpc_request(content, rpc_command_config) + self.__process_rpc_request(server_rpc_request, rpc_command_config) + elif isinstance(self.__devices[server_rpc_request[DEVICE_SECTION_PARAMETER]][CONFIG_SECTION_PARAMETER][RPC_SECTION], list): + for rpc_command_config in self.__devices[server_rpc_request[DEVICE_SECTION_PARAMETER]][CONFIG_SECTION_PARAMETER][RPC_SECTION]: + if rpc_command_config[TAG_PARAMETER] == server_rpc_request[DATA_PARAMETER][RPC_METHOD_PARAMETER]: + self.__process_rpc_request(server_rpc_request, rpc_command_config) break else: log.error("Received rpc request, but method %s not found in config for %s.", - content["data"].get("method"), + server_rpc_request[DATA_PARAMETER].get(RPC_METHOD_PARAMETER), self.get_name()) - self.__gateway.send_rpc_reply(content["device"], - content["data"]["id"], - {content["data"]["method"]: "METHOD NOT FOUND!"}) + self.__gateway.send_rpc_reply(server_rpc_request[DEVICE_SECTION_PARAMETER], + server_rpc_request[DATA_PARAMETER][RPC_ID_PARAMETER], + {server_rpc_request[DATA_PARAMETER][RPC_METHOD_PARAMETER]: "METHOD NOT FOUND!"}) else: - log.debug("Received RPC to connector: %r", content) + log.debug("Received RPC to connector: %r", server_rpc_request) except Exception as e: log.exception(e) def __process_rpc_request(self, content, rpc_command_config): if rpc_command_config is not None: - rpc_command_config["unitId"] = self.__devices[content["device"]]["config"]["unitId"] - self.__connect_to_current_master(content["device"]) + rpc_command_config[UNIT_ID_PARAMETER] = self.__devices[content[DEVICE_SECTION_PARAMETER]][CONFIG_SECTION_PARAMETER][UNIT_ID_PARAMETER] + self.__connect_to_current_master(content[DEVICE_SECTION_PARAMETER]) # if rpc_command_config.get('bit') is not None: - # rpc_command_config["functionCode"] = 6 - if rpc_command_config.get("functionCode") in (5, 6, 15, 16): - rpc_command_config["payload"] = self.__devices[content["device"]]["downlink_converter"].convert( + # rpc_command_config[FUNCTION_CODE_PARAMETER] = 6 + if rpc_command_config.get(FUNCTION_CODE_PARAMETER) in (5, 6, 15, 16): + rpc_command_config[PAYLOAD_PARAMETER] = self.__devices[content[DEVICE_SECTION_PARAMETER]][DOWNLINK_PREFIX + CONVERTER_PARAMETER].convert( rpc_command_config, content) response = None try: - response = self.__function_to_device(rpc_command_config, rpc_command_config["unitId"]) + response = self.__function_to_device(rpc_command_config, rpc_command_config[UNIT_ID_PARAMETER]) except Exception as e: log.exception(e) response = e if isinstance(response, (ReadRegistersResponseBase, ReadBitsResponseBase)): - to_converter = {"rpc": {content["data"]["method"]: {"data_sent": rpc_command_config, - "input_data": response}}} - response = self.__devices[content["device"]]["converter"].convert(config={**self.__devices[content["device"]]["config"], - "byteOrder": self.__devices[content["device"]]["config"].get("byteOrder", self.__byte_order), - "wordOrder": self.__devices[content["device"]]["config"].get("wordOrder", self.__word_order) - }, - data=to_converter) - log.debug("Received RPC method: %s, result: %r", content["data"]["method"], response) + to_converter = {RPC_SECTION: {content[DATA_PARAMETER][RPC_METHOD_PARAMETER]: {"data_sent": rpc_command_config, + "input_data": response}}} + response = self.__devices[content[DEVICE_SECTION_PARAMETER]][UPLINK_PREFIX + CONVERTER_PARAMETER].convert( + config={**self.__devices[content[DEVICE_SECTION_PARAMETER]][CONFIG_SECTION_PARAMETER], + BYTE_ORDER_PARAMETER: self.__devices[content[DEVICE_SECTION_PARAMETER]][CONFIG_SECTION_PARAMETER].get(BYTE_ORDER_PARAMETER, + self.__byte_order), + WORD_ORDER_PARAMETER: self.__devices[content[DEVICE_SECTION_PARAMETER]][CONFIG_SECTION_PARAMETER].get(WORD_ORDER_PARAMETER, + self.__word_order) + }, + data=to_converter) + log.debug("Received RPC method: %s, result: %r", content[DATA_PARAMETER][RPC_METHOD_PARAMETER], response) # response = {"success": response} elif isinstance(response, (WriteMultipleRegistersResponse, WriteMultipleCoilsResponse, @@ -358,13 +390,13 @@ class ModbusConnector(Connector, threading.Thread): WriteSingleRegisterResponse)): log.debug("Write %r", str(response)) response = {"success": True} - if content.get("id") or (content.get("data") is not None and content["data"].get("id")): + if content.get(RPC_ID_PARAMETER) or (content.get(DATA_PARAMETER) is not None and content[DATA_PARAMETER].get(RPC_ID_PARAMETER)): if isinstance(response, Exception): - self.__gateway.send_rpc_reply(content["device"], - content["data"]["id"], - {content["data"]["method"]: str(response)}) + self.__gateway.send_rpc_reply(content[DEVICE_SECTION_PARAMETER], + content[DATA_PARAMETER][RPC_ID_PARAMETER], + {content[DATA_PARAMETER][RPC_METHOD_PARAMETER]: str(response)}) else: - self.__gateway.send_rpc_reply(content["device"], - content["data"]["id"], + self.__gateway.send_rpc_reply(content[DEVICE_SECTION_PARAMETER], + content[DATA_PARAMETER][RPC_ID_PARAMETER], response) log.debug("%r", response) diff --git a/thingsboard_gateway/gateway/constants.py b/thingsboard_gateway/gateway/constants.py new file mode 100644 index 00000000..c38c9aed --- /dev/null +++ b/thingsboard_gateway/gateway/constants.py @@ -0,0 +1,45 @@ +# Copyright 2021. ThingsBoard +# # +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# # +# http://www.apache.org/licenses/LICENSE-2.0 +# # +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# Service constants + +STATISTIC_MESSAGE_RECEIVED_PARAMETER = "MessagesReceived" +STATISTIC_MESSAGE_SENT_PARAMETER = "MessagesSent" + +CONNECTOR_PARAMETER = "connector" +CONVERTER_PARAMETER = "converter" +UPLINK_PREFIX = "uplink_" +DOWNLINK_PREFIX = "downlink_" + +CONFIG_SECTION_PARAMETER = "config" +CONFIG_SERVER_SECTION_PARAMETER = "server" +CONFIG_DEVICES_SECTION_PARAMETER = "devices" + +# Data parameter constants + +DEVICE_SECTION_PARAMETER = "device" + +DEVICE_NAME_PARAMETER = "deviceName" +DEVICE_TYPE_PARAMETER = "deviceType" + +ATTRIBUTES_PARAMETER = "attributes" +TELEMETRY_PARAMETER = "telemetry" + +# RPC parameter constants + +RPC_ID_PARAMETER = "id" +RPC_METHOD_PARAMETER = "method" +RPC_PARAMS_PARAMETER = "params" +DATA_PARAMETER = "data" \ No newline at end of file