From 317b1b1e0368ea0b4de060b300dd4f70da2dcc00 Mon Sep 17 00:00:00 2001 From: Vitalii <43861229+samson0v@users.noreply.github.com> Date: Mon, 29 Nov 2021 14:29:01 +0200 Subject: [PATCH 1/3] [WIP] Added Modbus Master Connector (#633) * Fixed stopping SQLite storage * Changed the way of data reading and writing in SQLite Storage * Changed stopping way for SQLite Storage * Added Modbus Master Connector * Created Modbus Server Connector * Refactored Modbus connector * Added backward compability adapter for modbus connector * Fixed adapter * Added license * Added ASCII Framer support --- setup.py | 2 +- thingsboard_gateway/config/modbus.json | 2 +- thingsboard_gateway/config/tb_gateway.yaml | 1 + .../modbus/backward_compability_adapter.py | 54 ++ .../connectors/modbus/modbus_connector.py | 505 ++++++++---------- .../connectors/modbus/slave.py | 115 ++++ 6 files changed, 395 insertions(+), 284 deletions(-) create mode 100644 thingsboard_gateway/connectors/modbus/backward_compability_adapter.py create mode 100644 thingsboard_gateway/connectors/modbus/slave.py diff --git a/setup.py b/setup.py index c63249d1..95645a97 100644 --- a/setup.py +++ b/setup.py @@ -46,7 +46,7 @@ setup( 'thingsboard_gateway.extensions.mqtt', 'thingsboard_gateway.extensions.modbus', 'thingsboard_gateway.extensions.opcua', 'thingsboard_gateway.extensions.ble', 'thingsboard_gateway.extensions.serial', 'thingsboard_gateway.extensions.request', 'thingsboard_gateway.extensions.can', 'thingsboard_gateway.extensions.bacnet', 'thingsboard_gateway.extensions.odbc', - 'thingsboard_gateway.extensions.rest', 'thingsboard_gateway.extensions.snmp', 'thingsboard_gateway.extensions.ftp' + 'thingsboard_gateway.extensions.rest', 'thingsboard_gateway.extensions.snmp', 'thingsboard_gateway.extensions.ftp', ], install_requires=[ 'jsonpath-rw', diff --git a/thingsboard_gateway/config/modbus.json b/thingsboard_gateway/config/modbus.json index 67489c78..368da9a5 100644 --- a/thingsboard_gateway/config/modbus.json +++ b/thingsboard_gateway/config/modbus.json @@ -172,4 +172,4 @@ } ] } -} \ No newline at end of file +} diff --git a/thingsboard_gateway/config/tb_gateway.yaml b/thingsboard_gateway/config/tb_gateway.yaml index 0d91ef90..9131ad7e 100644 --- a/thingsboard_gateway/config/tb_gateway.yaml +++ b/thingsboard_gateway/config/tb_gateway.yaml @@ -37,6 +37,7 @@ connectors: # type: modbus # configuration: modbus_serial.json # +# # - # name: OPC-UA Connector # type: opcua diff --git a/thingsboard_gateway/connectors/modbus/backward_compability_adapter.py b/thingsboard_gateway/connectors/modbus/backward_compability_adapter.py new file mode 100644 index 00000000..b50395a7 --- /dev/null +++ b/thingsboard_gateway/connectors/modbus/backward_compability_adapter.py @@ -0,0 +1,54 @@ +# Copyright 2021. ThingsBoard +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from simplejson import dumps + +from thingsboard_gateway.connectors.connector import log + + +class BackwardCompatibilityAdapter: + def __init__(self, config): + self.__config = config + self.__keys = ['host', 'port', 'type', 'method', 'timeout', 'byteOrder', 'wordOrder', 'retries', 'retryOnEmpty', + 'retryOnInvalid', 'baudrate'] + + @staticmethod + def __save_json_config_file(config): + with open('config/modbus_new.json', 'w') as file: + file.writelines(dumps(config, sort_keys=True, indent=' ', separators=(',', ': '))) + + def convert(self): + if not self.__config.get('server'): + return self.__config + + log.warning( + 'You are using old configuration structure for Modbus connector. It will be DEPRECATED in the future ' + 'version! New config file "modbus_new.json" was generated in config/ folder. Please, use it.') + + slaves = [] + for device in self.__config['server'].get('devices', []): + slave = {**device} + + for key in self.__keys: + if not device.get(key): + slave[key] = self.__config['server'].get(key) + + slave['pollPeriod'] = slave['timeseriesPollPeriod'] + + slaves.append(slave) + + result_dict = {'master': {'slaves': slaves}} + self.__save_json_config_file(result_dict) + + return result_dict diff --git a/thingsboard_gateway/connectors/modbus/modbus_connector.py b/thingsboard_gateway/connectors/modbus/modbus_connector.py index c53f15e1..3ebee661 100644 --- a/thingsboard_gateway/connectors/modbus/modbus_connector.py +++ b/thingsboard_gateway/connectors/modbus/modbus_connector.py @@ -12,13 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -import threading -import time +from threading import Thread +from time import sleep, time from queue import Queue from random import choice from string import ascii_lowercase -from thingsboard_gateway.tb_utility.tb_loader import TBModuleLoader from thingsboard_gateway.tb_utility.tb_utility import TBUtility # Try import Pymodbus library or install it and import @@ -30,23 +29,29 @@ except ImportError: TBUtility.install_package('pyserial') from pymodbus.constants import Defaults -from pymodbus.client.sync import ModbusTcpClient, ModbusUdpClient, ModbusSerialClient, ModbusRtuFramer, \ - ModbusSocketFramer from pymodbus.bit_write_message import WriteSingleCoilResponse, WriteMultipleCoilsResponse from pymodbus.register_write_message import WriteMultipleRegistersResponse, WriteSingleRegisterResponse from pymodbus.register_read_message import ReadRegistersResponseBase from pymodbus.bit_read_message import ReadBitsResponseBase +from pymodbus.client.sync import ModbusTcpClient, ModbusUdpClient, ModbusSerialClient +from pymodbus.client.sync import ModbusRtuFramer, ModbusSocketFramer, ModbusAsciiFramer from pymodbus.exceptions import ConnectionException from thingsboard_gateway.connectors.connector import Connector, log from thingsboard_gateway.connectors.modbus.constants import * -from thingsboard_gateway.connectors.modbus.bytes_modbus_uplink_converter import BytesModbusUplinkConverter -from thingsboard_gateway.connectors.modbus.bytes_modbus_downlink_converter import BytesModbusDownlinkConverter +from thingsboard_gateway.connectors.modbus.slave import Slave +from thingsboard_gateway.connectors.modbus.backward_compability_adapter import BackwardCompatibilityAdapter CONVERTED_DATA_SECTIONS = [ATTRIBUTES_PARAMETER, TELEMETRY_PARAMETER] +FRAMER_TYPE = { + 'rtu': ModbusRtuFramer, + 'socket': ModbusSocketFramer, + 'ascii': ModbusAsciiFramer +} -class ModbusConnector(Connector, threading.Thread): +class ModbusConnector(Connector, Thread): + process_requests = Queue(-1) def __init__(self, gateway, config, connector_type): self.statistics = {STATISTIC_MESSAGE_RECEIVED_PARAMETER: 0, @@ -54,28 +59,15 @@ class ModbusConnector(Connector, threading.Thread): super().__init__() self.__gateway = gateway self._connector_type = connector_type - self.__config = config.get(CONFIG_SERVER_SECTION_PARAMETER) - self.__previous_master_config = dict() - self.__current_master, self.__available_functions = self.__configure_master() - self.__default_config_parameters = [HOST_PARAMETER, - PORT_PARAMETER, - BAUDRATE_PARAMETER, - TIMEOUT_PARAMETER, - METHOD_PARAMETER, - STOPBITS_PARAMETER, - BYTESIZE_PARAMETER, - PARITY_PARAMETER, - STRICT_PARAMETER, - TYPE_PARAMETER] - self.__byte_order = self.__config.get(BYTE_ORDER_PARAMETER) - self.__word_order = self.__config.get(WORD_ORDER_PARAMETER) - self.__devices = {} + self.__backward_compatibility_adapter = BackwardCompatibilityAdapter(config) + self.__config = self.__backward_compatibility_adapter.convert() self.setName(self.__config.get("name", 'Modbus Default ' + ''.join(choice(ascii_lowercase) for _ in range(5)))) - self.__load_converters() self.__connected = False self.__stopped = False self.daemon = True - self.__data_to_convert_queue = Queue() + + self.__slaves = [] + self.__load_slaves() def is_connected(self): return self.__connected @@ -83,29 +75,39 @@ class ModbusConnector(Connector, threading.Thread): def open(self): self.__stopped = False self.start() - log.info("Starting Modbus connector") def run(self): self.__connected = True while True: - time.sleep(.2) - self.__process_devices() + if not self.__stopped and not ModbusConnector.process_requests.empty(): + thread = Thread(target=self.__process_slaves, daemon=True) + thread.start() - if not self.__data_to_convert_queue.empty(): - for _ in range(self.__data_to_convert_queue.qsize()): - thread = threading.Thread(target=self.__convert_and_save_data, args=(self.__data_to_convert_queue,), - daemon=True) - thread.start() if self.__stopped: break - def __convert_and_save_data(self, queue): - device, current_device_config, config, device_responses = queue.get() + sleep(.2) + + def __load_slaves(self): + self.__slaves = [ + Slave(**{**device, 'connector': self, 'gateway': self.__gateway, 'callback': ModbusConnector.callback}) for + device in self.__config.get('master', {'slaves': []}).get('slaves', [])] + + @classmethod + def callback(cls, slave): + cls.process_requests.put(slave) + + @property + def connector_type(self): + return self._connector_type + + def __convert_and_save_data(self, config_tuple): + device, current_device_config, config, device_responses = config_tuple converted_data = {} try: - converted_data = self.__devices[device][UPLINK_PREFIX + CONVERTER_PARAMETER].convert( + converted_data = device.config[UPLINK_PREFIX + CONVERTER_PARAMETER].convert( config=config, data=device_responses) except Exception as e: @@ -117,20 +119,22 @@ class ModbusConnector(Connector, threading.Thread): ATTRIBUTES_PARAMETER: [] } - if current_device_config.get(SEND_DATA_ONLY_ON_CHANGE_PARAMETER): + if current_device_config.get('sendDataOnlyOnChange'): self.statistics[STATISTIC_MESSAGE_RECEIVED_PARAMETER] += 1 + for converted_data_section in CONVERTED_DATA_SECTIONS: for current_section_dict in converted_data[converted_data_section]: for key, value in current_section_dict.items(): - if self.__devices[device][LAST_PREFIX + converted_data_section].get(key) is None or \ - self.__devices[device][LAST_PREFIX + converted_data_section][key] != value: - self.__devices[device][LAST_PREFIX + converted_data_section][key] = value + if device.config[LAST_PREFIX + converted_data_section].get(key) is None or \ + device.config[LAST_PREFIX + converted_data_section][key] != value: + device.config[LAST_PREFIX + converted_data_section][key] = value to_send[converted_data_section].append({key: value}) - elif converted_data and current_device_config.get(SEND_DATA_ONLY_ON_CHANGE_PARAMETER) is None or \ - not current_device_config.get(SEND_DATA_ONLY_ON_CHANGE_PARAMETER): + elif converted_data and current_device_config.get('sendDataOnlyOnChange') is None or \ + not current_device_config.get('sendDataOnlyOnChange'): self.statistics[STATISTIC_MESSAGE_RECEIVED_PARAMETER] += 1 + for converted_data_section in CONVERTED_DATA_SECTIONS: - self.__devices[device][LAST_PREFIX + converted_data_section] = converted_data[ + device.config[LAST_PREFIX + converted_data_section] = converted_data[ converted_data_section] to_send[converted_data_section] = converted_data[converted_data_section] @@ -138,36 +142,6 @@ class ModbusConnector(Connector, threading.Thread): self.__gateway.send_to_storage(self.get_name(), to_send) self.statistics[STATISTIC_MESSAGE_SENT_PARAMETER] += 1 - def __load_converters(self): - try: - for device in self.__config[CONFIG_DEVICES_SECTION_PARAMETER]: - if self.__config.get(UPLINK_PREFIX + CONVERTER_PARAMETER) is not None: - converter = TBModuleLoader.import_module(self._connector_type, - self.__config[UPLINK_PREFIX + CONVERTER_PARAMETER])(device) - else: - converter = BytesModbusUplinkConverter(device) - if self.__config.get(DOWNLINK_PREFIX + CONVERTER_PARAMETER) is not None: - downlink_converter = TBModuleLoader.import_module(self._connector_type, self.__config[ - DOWNLINK_PREFIX + CONVERTER_PARAMETER])(device) - else: - downlink_converter = BytesModbusDownlinkConverter(device) - if device.get(DEVICE_NAME_PARAMETER) not in self.__gateway.get_devices(): - self.__gateway.add_device(device.get(DEVICE_NAME_PARAMETER), {CONNECTOR_PARAMETER: self}, - device_type=device.get(DEVICE_TYPE_PARAMETER)) - self.__devices[device[DEVICE_NAME_PARAMETER]] = {CONFIG_SECTION_PARAMETER: device, - UPLINK_PREFIX + CONVERTER_PARAMETER: converter, - DOWNLINK_PREFIX + CONVERTER_PARAMETER: downlink_converter, - NEXT_PREFIX + ATTRIBUTES_PARAMETER + CHECK_POSTFIX: 0, - NEXT_PREFIX + TIMESERIES_PARAMETER + CHECK_POSTFIX: 0, - TELEMETRY_PARAMETER: {}, - ATTRIBUTES_PARAMETER: {}, - LAST_PREFIX + TELEMETRY_PARAMETER: {}, - LAST_PREFIX + ATTRIBUTES_PARAMETER: {}, - CONNECTION_ATTEMPT_PARAMETER: 0 - } - except Exception as e: - log.exception(e) - def close(self): self.__stopped = True self.__stop_connections_to_masters() @@ -176,187 +150,133 @@ class ModbusConnector(Connector, threading.Thread): def get_name(self): return self.name - def __process_devices(self): - for device in self.__devices: - current_time = time.time() - device_responses = {TIMESERIES_PARAMETER: {}, - ATTRIBUTES_PARAMETER: {}, - } - current_device_config = {} - try: - for config_section in device_responses: - if self.__devices[device][CONFIG_SECTION_PARAMETER].get(config_section) is not None: - current_device_config = self.__devices[device][CONFIG_SECTION_PARAMETER] - unit_id = current_device_config[UNIT_ID_PARAMETER] - if self.__devices[device][NEXT_PREFIX + config_section + CHECK_POSTFIX] < current_time: - self.__connect_to_current_master(device) - if not self.__current_master.is_socket_open() or not len( - current_device_config[config_section]): - continue - # Reading data from device - for interested_data in range(len(current_device_config[config_section])): - current_data = current_device_config[config_section][interested_data] - current_data[DEVICE_NAME_PARAMETER] = device - input_data = self.__function_to_device(current_data, unit_id) - device_responses[config_section][current_data[TAG_PARAMETER]] = { - "data_sent": current_data, - "input_data": input_data} + def __process_slaves(self): + # TODO: write documentation + device = ModbusConnector.process_requests.get() - log.debug("Checking %s for device %s", config_section, device) - self.__devices[device][NEXT_PREFIX + config_section + CHECK_POSTFIX] = current_time + \ - current_device_config[ - config_section + POLL_PERIOD_POSTFIX] / 1000 - log.debug('Device response: ', device_responses) - - if device_responses.get('timeseries') or device_responses.get('attributes'): - self.__data_to_convert_queue.put((device, current_device_config, { - **current_device_config, - BYTE_ORDER_PARAMETER: current_device_config.get(BYTE_ORDER_PARAMETER, - self.__byte_order), - WORD_ORDER_PARAMETER: current_device_config.get(WORD_ORDER_PARAMETER, - self.__word_order) - }, device_responses)) - except ConnectionException: - time.sleep(5) - log.error("Connection lost! Reconnecting...") - except Exception as e: - log.exception(e) - - def on_attributes_update(self, content): + device_responses = {'timeseries': {}, 'attributes': {}} + current_device_config = {} try: - for attribute_updates_command_config in \ - self.__devices[content[DEVICE_SECTION_PARAMETER]][CONFIG_SECTION_PARAMETER]["attributeUpdates"]: - for attribute_updated in content[DATA_PARAMETER]: - if attribute_updates_command_config[TAG_PARAMETER] == attribute_updated: - to_process = { - DEVICE_SECTION_PARAMETER: content[DEVICE_SECTION_PARAMETER], - DATA_PARAMETER: { - RPC_METHOD_PARAMETER: attribute_updated, - RPC_PARAMS_PARAMETER: content[DATA_PARAMETER][attribute_updated] - } - } - self.__process_rpc_request(to_process, attribute_updates_command_config) + for config_section in device_responses: + if device.config.get(config_section) is not None: + current_device_config = device.config + + self.__connect_to_current_master(device) + + if not device.config['master'].is_socket_open() or not len( + current_device_config[config_section]): + continue + + # Reading data from device + for interested_data in range(len(current_device_config[config_section])): + current_data = current_device_config[config_section][interested_data] + current_data[DEVICE_NAME_PARAMETER] = device + input_data = self.__function_to_device(device, current_data) + device_responses[config_section][current_data[TAG_PARAMETER]] = { + "data_sent": current_data, + "input_data": input_data} + + log.debug("Checking %s for device %s", config_section, device) + log.debug('Device response: ', device_responses) + + if device_responses.get('timeseries') or device_responses.get('attributes'): + self.__convert_and_save_data((device, current_device_config, { + **current_device_config, + BYTE_ORDER_PARAMETER: current_device_config.get(BYTE_ORDER_PARAMETER, + device.byte_order), + WORD_ORDER_PARAMETER: current_device_config.get(WORD_ORDER_PARAMETER, + device.word_order) + }, device_responses)) + + except ConnectionException: + sleep(5) + log.error("Connection lost! Reconnecting...") except Exception as e: log.exception(e) def __connect_to_current_master(self, device=None): + # TODO: write documentation connect_attempt_count = 5 connect_attempt_time_ms = 100 wait_after_failed_attempts_ms = 300000 - # if device is None: - # device = list(self.__devices.keys())[0] - if self.__devices[device].get(MASTER_PARAMETER) is None: - self.__devices[device][MASTER_PARAMETER], self.__devices[device][ - AVAILABLE_FUNCTIONS_PARAMETER] = self.__configure_master( - self.__devices[device][CONFIG_SECTION_PARAMETER]) - if self.__devices[device][MASTER_PARAMETER] != self.__current_master: - self.__current_master = self.__devices[device][MASTER_PARAMETER] - self.__available_functions = self.__devices[device][AVAILABLE_FUNCTIONS_PARAMETER] - connect_attempt_count = self.__devices[device][CONFIG_SECTION_PARAMETER].get(CONNECT_ATTEMPT_COUNT_PARAMETER, - connect_attempt_count) + + if device.config.get('master') is None: + device.config['master'], device.config['available_functions'] = self.__configure_master(device.config) + if connect_attempt_count < 1: connect_attempt_count = 1 - connect_attempt_time_ms = self.__devices[device][CONFIG_SECTION_PARAMETER].get( - CONNECT_ATTEMPT_TIME_MS_PARAMETER, connect_attempt_time_ms) + + connect_attempt_time_ms = device.config.get('connectAttemptTimeMs', connect_attempt_time_ms) + if connect_attempt_time_ms < 500: connect_attempt_time_ms = 500 - wait_after_failed_attempts_ms = self.__devices[device][CONFIG_SECTION_PARAMETER].get( - WAIT_AFTER_FAILED_ATTEMPTS_MS_PARAMETER, wait_after_failed_attempts_ms) + + wait_after_failed_attempts_ms = device.config.get('waitAfterFailedAttemptsMs', wait_after_failed_attempts_ms) + if wait_after_failed_attempts_ms < 1000: wait_after_failed_attempts_ms = 1000 - current_time = time.time() * 1000 - if not self.__current_master.is_socket_open(): - if self.__devices[device][CONNECTION_ATTEMPT_PARAMETER] >= connect_attempt_count and \ - current_time - self.__devices[device][ - LAST_CONNECTION_ATTEMPT_TIME_PARAMETER] >= wait_after_failed_attempts_ms: - self.__devices[device][CONNECTION_ATTEMPT_PARAMETER] = 0 - while not self.__current_master.is_socket_open() \ - and self.__devices[device][CONNECTION_ATTEMPT_PARAMETER] < connect_attempt_count \ - and current_time - self.__devices[device].get(LAST_CONNECTION_ATTEMPT_TIME_PARAMETER, - 0) >= connect_attempt_time_ms: - self.__devices[device][CONNECTION_ATTEMPT_PARAMETER] = self.__devices[device][ - CONNECTION_ATTEMPT_PARAMETER] + 1 - self.__devices[device][LAST_CONNECTION_ATTEMPT_TIME_PARAMETER] = current_time + + current_time = time() * 1000 + + if not device.config['master'].is_socket_open(): + if device.config['connection_attempt'] >= connect_attempt_count and current_time - device.config[ + 'last_connection_attempt_time'] >= wait_after_failed_attempts_ms: + device.config['connection_attempt'] = 0 + + while not device.config['master'].is_socket_open() \ + and device.config['connection_attempt'] < connect_attempt_count \ + and current_time - device.config.get('last_connection_attempt_time', + 0) >= connect_attempt_time_ms: + device.config['connection_attempt'] = device.config[ + 'connection_attempt'] + 1 + device.config['last_connection_attempt_time'] = current_time log.debug("Modbus trying connect to %s", device) - self.__current_master.connect() - if self.__devices[device][CONNECTION_ATTEMPT_PARAMETER] == connect_attempt_count: + device.config['master'].connect() + + if device.config['connection_attempt'] == connect_attempt_count: log.warn("Maximum attempt count (%i) for device \"%s\" - encountered.", connect_attempt_count, device) - # time.sleep(connect_attempt_time_ms / 1000) - # if not self.__current_master.is_socket_open(): - if self.__devices[device][CONNECTION_ATTEMPT_PARAMETER] >= 0 and self.__current_master.is_socket_open(): - self.__devices[device][CONNECTION_ATTEMPT_PARAMETER] = 0 - self.__devices[device][LAST_CONNECTION_ATTEMPT_TIME_PARAMETER] = current_time - # log.debug("Modbus connected to device %s.", device) - def __configure_master(self, config=None): - current_config = self.__config if config is None else config + if device.config['connection_attempt'] >= 0 and device.config['master'].is_socket_open(): + device.config['connection_attempt'] = 0 + device.config['last_connection_attempt_time'] = current_time - master_config = dict() - master_config["host"] = current_config[HOST_PARAMETER] if current_config.get( - HOST_PARAMETER) is not None else self.__config.get(HOST_PARAMETER, "localhost") - try: - master_config["port"] = int(current_config[PORT_PARAMETER]) if current_config.get( - PORT_PARAMETER) is not None else self.__config.get(int(PORT_PARAMETER), 502) - except ValueError: - master_config["port"] = current_config[PORT_PARAMETER] if current_config.get( - PORT_PARAMETER) is not None else self.__config.get(PORT_PARAMETER, 502) - master_config["baudrate"] = current_config[BAUDRATE_PARAMETER] if current_config.get( - BAUDRATE_PARAMETER) is not None else self.__config.get(BAUDRATE_PARAMETER, 19200) - master_config["timeout"] = current_config[TIMEOUT_PARAMETER] if current_config.get( - TIMEOUT_PARAMETER) is not None else self.__config.get(TIMEOUT_PARAMETER, 35) - master_config["method"] = current_config[METHOD_PARAMETER] if current_config.get( - METHOD_PARAMETER) is not None else self.__config.get(METHOD_PARAMETER, "rtu") - master_config["stopbits"] = current_config[STOPBITS_PARAMETER] if current_config.get( - STOPBITS_PARAMETER) is not None else self.__config.get(STOPBITS_PARAMETER, Defaults.Stopbits) - master_config["bytesize"] = current_config[BYTESIZE_PARAMETER] if current_config.get( - BYTESIZE_PARAMETER) is not None else self.__config.get(BYTESIZE_PARAMETER, Defaults.Bytesize) - master_config["parity"] = current_config[PARITY_PARAMETER] if current_config.get( - PARITY_PARAMETER) is not None else self.__config.get(PARITY_PARAMETER, Defaults.Parity) - master_config["strict"] = current_config[STRICT_PARAMETER] if current_config.get( - STRICT_PARAMETER) is not None else self.__config.get(STRICT_PARAMETER, True) - master_config["retries"] = current_config[RETRIES_PARAMETER] if current_config.get( - RETRIES_PARAMETER) is not None else self.__config.get(RETRIES_PARAMETER, 3) - master_config["retry_on_empty"] = current_config[RETRY_ON_EMPTY_PARAMETER] if current_config.get( - RETRY_ON_EMPTY_PARAMETER) is not None else self.__config.get(RETRY_ON_EMPTY_PARAMETER, True) - master_config["retry_on_invalid"] = current_config[RETRY_ON_INVALID_PARAMETER] if current_config.get( - RETRY_ON_INVALID_PARAMETER) is not None else self.__config.get(RETRY_ON_INVALID_PARAMETER, True) - master_config["rtu"] = ModbusRtuFramer if current_config.get(METHOD_PARAMETER) == "rtu" or ( - current_config.get(METHOD_PARAMETER) is None and self.__config.get( - METHOD_PARAMETER) == "rtu") else ModbusSocketFramer - if self.__previous_master_config != master_config: - self.__previous_master_config = master_config - if current_config.get(TYPE_PARAMETER) == 'tcp' or ( - current_config.get(TYPE_PARAMETER) is None and self.__config.get(TYPE_PARAMETER) == "tcp"): - master = ModbusTcpClient(master_config["host"], master_config["port"], master_config["rtu"], - timeout=master_config["timeout"], - retry_on_empty=master_config["retry_on_empty"], - retry_on_invalid=master_config["retry_on_invalid"], - retries=master_config["retries"]) - elif current_config.get(TYPE_PARAMETER) == 'udp' or ( - current_config.get(TYPE_PARAMETER) is None and self.__config.get(TYPE_PARAMETER) == "udp"): - master = ModbusUdpClient(master_config["host"], master_config["port"], master_config["rtu"], - timeout=master_config["timeout"], - retry_on_empty=master_config["retry_on_empty"], - retry_on_invalid=master_config["retry_on_invalid"], - retries=master_config["retries"]) - elif current_config.get(TYPE_PARAMETER) == 'serial' or ( - current_config.get(TYPE_PARAMETER) is None and self.__config.get(TYPE_PARAMETER) == "serial"): - master = ModbusSerialClient(method=master_config["method"], - port=master_config["port"], - timeout=master_config["timeout"], - retry_on_empty=master_config["retry_on_empty"], - retry_on_invalid=master_config["retry_on_invalid"], - retries=master_config["retries"], - baudrate=master_config["baudrate"], - stopbits=master_config["stopbits"], - bytesize=master_config["bytesize"], - parity=master_config["parity"], - strict=master_config["strict"]) - else: - raise Exception("Invalid Modbus transport type.") + @staticmethod + def __configure_master(config): + current_config = config + current_config["rtu"] = FRAMER_TYPE[current_config['method']] + + if current_config.get('type') == 'tcp': + master = ModbusTcpClient(current_config["host"], + current_config["port"], + current_config["rtu"], + timeout=current_config["timeout"], + retry_on_empty=current_config["retry_on_empty"], + retry_on_invalid=current_config["retry_on_invalid"], + retries=current_config["retries"]) + elif current_config.get(TYPE_PARAMETER) == 'udp': + master = ModbusUdpClient(current_config["host"], + current_config["port"], + current_config["rtu"], + timeout=current_config["timeout"], + retry_on_empty=current_config["retry_on_empty"], + retry_on_invalid=current_config["retry_on_invalid"], + retries=current_config["retries"]) + elif current_config.get(TYPE_PARAMETER) == 'serial': + master = ModbusSerialClient(method=current_config["method"], + port=current_config["port"], + timeout=current_config["timeout"], + retry_on_empty=current_config["retry_on_empty"], + retry_on_invalid=current_config["retry_on_invalid"], + retries=current_config["retries"], + baudrate=current_config["baudrate"], + stopbits=current_config["stopbits"], + bytesize=current_config["bytesize"], + parity=current_config["parity"], + strict=current_config["strict"]) else: - master = self.__current_master + raise Exception("Invalid Modbus transport type.") + available_functions = { 1: master.read_coils, 2: master.read_discrete_inputs, @@ -366,59 +286,83 @@ class ModbusConnector(Connector, threading.Thread): 6: master.write_register, 15: master.write_coils, 16: master.write_registers, - } + } return master, available_functions def __stop_connections_to_masters(self): - for device in self.__devices: - if self.__devices[device].get(MASTER_PARAMETER) is not None: - self.__devices[device][MASTER_PARAMETER].close() + for slave in self.__slaves: + if slave.config.get('master') is not None and slave.config.get('master').is_socket_open(): + slave.config['master'].close() - def __function_to_device(self, config, unit_id): - function_code = config.get(FUNCTION_CODE_PARAMETER) + @staticmethod + def __function_to_device(device, config): + function_code = config.get('functionCode') result = None if function_code in (1, 2, 3, 4): - result = self.__available_functions[function_code](address=config[ADDRESS_PARAMETER], - count=config.get(OBJECTS_COUNT_PARAMETER, - config.get("registersCount", - config.get("registerCount", - 1))), - unit=unit_id) + result = device.config['available_functions'][function_code](address=config[ADDRESS_PARAMETER], + count=config.get(OBJECTS_COUNT_PARAMETER, + config.get("registersCount", + config.get( + "registerCount", + 1))), + unit=device.config['unitId']) elif function_code in (5, 6): - result = self.__available_functions[function_code](address=config[ADDRESS_PARAMETER], - value=config[PAYLOAD_PARAMETER], - unit=unit_id) + result = device.config['available_functions'][function_code](address=config[ADDRESS_PARAMETER], + value=config[PAYLOAD_PARAMETER], + unit=device.config['unitId']) elif function_code in (15, 16): - result = self.__available_functions[function_code](address=config[ADDRESS_PARAMETER], - values=config[PAYLOAD_PARAMETER], - unit=unit_id) + result = device.config['available_functions'][function_code](address=config[ADDRESS_PARAMETER], + values=config[PAYLOAD_PARAMETER], + unit=device.config['unitId']) else: - log.error("Unknown Modbus function with code: %i", function_code) + log.error("Unknown Modbus function with code: %s", function_code) + log.debug("With result %s", str(result)) + if "Exception" in str(result): log.exception(result) + return result + def on_attributes_update(self, content): + try: + device = tuple(filter(lambda slave: slave.name == content[DEVICE_SECTION_PARAMETER], self.__slaves))[0] + + for attribute_updates_command_config in device.config['attributeUpdates']: + for attribute_updated in content[DATA_PARAMETER]: + if attribute_updates_command_config[TAG_PARAMETER] == attribute_updated: + to_process = { + DEVICE_SECTION_PARAMETER: content[DEVICE_SECTION_PARAMETER], + DATA_PARAMETER: { + RPC_METHOD_PARAMETER: attribute_updated, + RPC_PARAMS_PARAMETER: content[DATA_PARAMETER][attribute_updated] + } + } + self.__process_rpc_request(to_process, attribute_updates_command_config) + except Exception as e: + log.exception(e) + def server_side_rpc_handler(self, server_rpc_request): try: if server_rpc_request.get(DEVICE_SECTION_PARAMETER) is not None: - log.debug("Modbus connector received rpc request for %s with server_rpc_request: %s", server_rpc_request[DEVICE_SECTION_PARAMETER], server_rpc_request) - if isinstance(self.__devices[server_rpc_request[DEVICE_SECTION_PARAMETER]][CONFIG_SECTION_PARAMETER][ - RPC_SECTION], dict): - rpc_command_config = \ - self.__devices[server_rpc_request[DEVICE_SECTION_PARAMETER]][CONFIG_SECTION_PARAMETER][ - RPC_SECTION].get( - server_rpc_request[DATA_PARAMETER][RPC_METHOD_PARAMETER]) + device = tuple( + filter( + lambda slave: slave.name == server_rpc_request[DEVICE_SECTION_PARAMETER], self.__slaves + ) + )[0] + + if isinstance(device.config[RPC_SECTION], dict): + rpc_command_config = device.config[RPC_SECTION].get( + server_rpc_request[DATA_PARAMETER][RPC_METHOD_PARAMETER]) + if rpc_command_config is not None: self.__process_rpc_request(server_rpc_request, rpc_command_config) - elif isinstance(self.__devices[server_rpc_request[DEVICE_SECTION_PARAMETER]][CONFIG_SECTION_PARAMETER][ - RPC_SECTION], list): - for rpc_command_config in \ - self.__devices[server_rpc_request[DEVICE_SECTION_PARAMETER]][CONFIG_SECTION_PARAMETER][ - RPC_SECTION]: + + elif isinstance(device.config[RPC_SECTION], list): + for rpc_command_config in device.config[RPC_SECTION]: if rpc_command_config[TAG_PARAMETER] == server_rpc_request[DATA_PARAMETER][ RPC_METHOD_PARAMETER]: self.__process_rpc_request(server_rpc_request, rpc_command_config) @@ -438,45 +382,41 @@ class ModbusConnector(Connector, threading.Thread): def __process_rpc_request(self, content, rpc_command_config): if rpc_command_config is not None: - rpc_command_config[UNIT_ID_PARAMETER] = \ - self.__devices[content[DEVICE_SECTION_PARAMETER]][CONFIG_SECTION_PARAMETER][UNIT_ID_PARAMETER] - self.__connect_to_current_master(content[DEVICE_SECTION_PARAMETER]) - # if rpc_command_config.get('bit') is not None: - # rpc_command_config[FUNCTION_CODE_PARAMETER] = 6 + device = tuple(filter(lambda slave: slave.name == content[DEVICE_SECTION_PARAMETER], self.__slaves))[0] + rpc_command_config[UNIT_ID_PARAMETER] = device.config['unitId'] + self.__connect_to_current_master(device) + if rpc_command_config.get(FUNCTION_CODE_PARAMETER) in (5, 6, 15, 16): - rpc_command_config[PAYLOAD_PARAMETER] = self.__devices[content[DEVICE_SECTION_PARAMETER]][ + rpc_command_config[PAYLOAD_PARAMETER] = device.config[ DOWNLINK_PREFIX + CONVERTER_PARAMETER].convert( rpc_command_config, content) - response = None + try: - response = self.__function_to_device(rpc_command_config, rpc_command_config[UNIT_ID_PARAMETER]) + response = self.__function_to_device(device, rpc_command_config) except Exception as e: log.exception(e) response = e + if isinstance(response, (ReadRegistersResponseBase, ReadBitsResponseBase)): to_converter = { RPC_SECTION: {content[DATA_PARAMETER][RPC_METHOD_PARAMETER]: {"data_sent": rpc_command_config, "input_data": response}}} - response = self.__devices[content[DEVICE_SECTION_PARAMETER]][ + response = device.config[ UPLINK_PREFIX + CONVERTER_PARAMETER].convert( - config={**self.__devices[content[DEVICE_SECTION_PARAMETER]][CONFIG_SECTION_PARAMETER], - BYTE_ORDER_PARAMETER: self.__devices[content[DEVICE_SECTION_PARAMETER]][ - CONFIG_SECTION_PARAMETER].get(BYTE_ORDER_PARAMETER, - self.__byte_order), - WORD_ORDER_PARAMETER: self.__devices[content[DEVICE_SECTION_PARAMETER]][ - CONFIG_SECTION_PARAMETER].get(WORD_ORDER_PARAMETER, - self.__word_order) + config={**device.config, + BYTE_ORDER_PARAMETER: device.byte_order, + WORD_ORDER_PARAMETER: device.word_order }, data=to_converter) log.debug("Received RPC method: %s, result: %r", content[DATA_PARAMETER][RPC_METHOD_PARAMETER], response) - # response = {"success": response} elif isinstance(response, (WriteMultipleRegistersResponse, WriteMultipleCoilsResponse, WriteSingleCoilResponse, WriteSingleRegisterResponse)): log.debug("Write %r", str(response)) response = {"success": True} + if content.get(RPC_ID_PARAMETER) or ( content.get(DATA_PARAMETER) is not None and content[DATA_PARAMETER].get(RPC_ID_PARAMETER)): if isinstance(response, Exception): @@ -487,4 +427,5 @@ class ModbusConnector(Connector, threading.Thread): self.__gateway.send_rpc_reply(content[DEVICE_SECTION_PARAMETER], content[DATA_PARAMETER][RPC_ID_PARAMETER], response) + log.debug("%r", response) diff --git a/thingsboard_gateway/connectors/modbus/slave.py b/thingsboard_gateway/connectors/modbus/slave.py new file mode 100644 index 00000000..3f27b226 --- /dev/null +++ b/thingsboard_gateway/connectors/modbus/slave.py @@ -0,0 +1,115 @@ +# Copyright 2021. ThingsBoard +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from threading import Thread +from time import time, sleep + +from pymodbus.constants import Defaults + +from thingsboard_gateway.connectors.modbus.constants import * +from thingsboard_gateway.connectors.connector import log +from thingsboard_gateway.connectors.modbus.bytes_modbus_uplink_converter import BytesModbusUplinkConverter +from thingsboard_gateway.connectors.modbus.bytes_modbus_downlink_converter import BytesModbusDownlinkConverter +from thingsboard_gateway.tb_utility.tb_loader import TBModuleLoader + + +class Slave(Thread): + def __init__(self, **kwargs): + super().__init__() + self.timeout = kwargs.get('timeout') + self.name = kwargs['deviceName'] + self.poll_period = kwargs['pollPeriod'] / 1000 + + self.byte_order = kwargs['byteOrder'] + self.word_order = kwargs.get('wordOrder') + self.config = { + 'unitId': kwargs['unitId'], + 'deviceType': kwargs.get('deviceType', 'default'), + 'type': kwargs['type'], + 'host': kwargs.get('host'), + 'port': kwargs['port'], + 'timeout': kwargs.get('timeout', 35), + 'stopbits': kwargs.get('stopbits', Defaults.Stopbits), + 'bytesize': kwargs.get('bytesize', Defaults.Bytesize), + 'parity': kwargs.get('parity', Defaults.Parity), + 'strict': kwargs.get('strict', True), + 'retries': kwargs.get('retries', 3), + 'connection_attempt': 0, + 'last_connection_attempt_time': 0, + 'sendDataOnlyOnChange': kwargs.get('sendDataOnlyOnChange', False), + 'waitAfterFailedAttemptsMs': kwargs.get('waitAfterFailedAttemptsMs', 0), + 'connectAttemptTimeMs': kwargs.get('connectAttemptTimeMs', 0), + 'retry_on_empty': kwargs.get('retry_on_empty', True), + 'retry_on_invalid': kwargs.get('retry_on_invalid', True), + 'method': kwargs.get('method', 'rtu'), + 'baudrate': kwargs.get('baudrate', 19200), + 'attributes': kwargs.get('attributes', []), + 'timeseries': kwargs.get('timeseries', []), + 'attributeUpdates': kwargs.get('attributeUpdates', []), + 'rpc': kwargs.get('rpc', []), + 'last_attributes': {}, + 'last_telemetry': {} + } + + self.__load_converters(kwargs['connector'], kwargs['gateway']) + + self.callback = kwargs['callback'] + + self.last_polled_time = None + self.daemon = True + + self.start() + + def timer(self): + self.callback(self) + self.last_polled_time = time() + + while True: + if time() - self.last_polled_time >= self.poll_period: + self.callback(self) + self.last_polled_time = time() + + sleep(.2) + + def run(self): + self.timer() + + def get_name(self): + return self.name + + def __load_converters(self, connector, gateway): + try: + if self.config.get(UPLINK_PREFIX + CONVERTER_PARAMETER) is not None: + converter = TBModuleLoader.import_module(connector.connector_type, + self.config[UPLINK_PREFIX + CONVERTER_PARAMETER])(self) + else: + converter = BytesModbusUplinkConverter({**self.config, 'deviceName': self.name}) + + if self.config.get(DOWNLINK_PREFIX + CONVERTER_PARAMETER) is not None: + downlink_converter = TBModuleLoader.import_module(connector.connector_type, self.config[ + DOWNLINK_PREFIX + CONVERTER_PARAMETER])(self) + else: + downlink_converter = BytesModbusDownlinkConverter(self.config) + + if self.name not in gateway.get_devices(): + gateway.add_device(self.name, {CONNECTOR_PARAMETER: connector}, + device_type=self.config.get(DEVICE_TYPE_PARAMETER)) + + self.config[UPLINK_PREFIX + CONVERTER_PARAMETER] = converter + self.config[DOWNLINK_PREFIX + CONVERTER_PARAMETER] = downlink_converter + except Exception as e: + log.exception(e) + + def __str__(self): + return f'{self.name}' From e9f5950f4367b9194e6745051699445b62a9d602 Mon Sep 17 00:00:00 2001 From: zbeacon Date: Tue, 30 Nov 2021 15:24:25 +0200 Subject: [PATCH 2/3] Fix for saving and loading for persistent devices with device type --- .../gateway/tb_gateway_service.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index 873847ba..8885ba1e 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -138,7 +138,6 @@ class TBGatewayService: self.connectors_configs = {} self.__remote_configurator = None self.__request_config_after_connect = False - self.__connected_devices = {} self.__load_persistent_devices() self.__init_remote_configuration() self._load_connectors() @@ -724,7 +723,7 @@ class TBGatewayService: summary_messages.update(**telemetry) return summary_messages - def add_device(self, device_name, content, device_type=None): + def add_device(self, device_name, content, device_type): if device_name not in self.__saved_devices: device_type = device_type if device_type is not None else 'default' self.__connected_devices[device_name] = {**content, "device_type": device_type} @@ -763,9 +762,17 @@ class TBGatewayService: log.debug("Loaded devices:\n %s", devices) for device_name in devices: try: - if self.available_connectors.get(devices[device_name]): + if not isinstance(devices[device_name], tuple): + open(self._config_dir + self.__connected_devices_file, 'w').close() + log.debug("Old connected_devices file, new file will be created") + return + if self.available_connectors.get(devices[device_name][0]): self.__connected_devices[device_name] = { - "connector": self.available_connectors[devices[device_name]]} + "connector": self.available_connectors[devices[device_name][0]], + "device_type": devices[device_name][1]} + self.__saved_devices[device_name] = { + "connector": self.available_connectors[devices[device_name][0]], + "device_type": devices[device_name][1]} except Exception as e: log.exception(e) continue @@ -779,7 +786,7 @@ class TBGatewayService: data_to_save = {} for device in self.__connected_devices: if self.__connected_devices[device]["connector"] is not None: - data_to_save[device] = self.__connected_devices[device]["connector"].get_name() + data_to_save[device] = (self.__connected_devices[device]["connector"].get_name(), self.__connected_devices[device]["device_type"]) config_file.write(dumps(data_to_save, indent=2, sort_keys=True)) except Exception as e: log.exception(e) From 95fcc14b48d9f8ecb0cae91116c092730f135859 Mon Sep 17 00:00:00 2001 From: Vitalii <43861229+samson0v@users.noreply.github.com> Date: Fri, 3 Dec 2021 12:13:00 +0200 Subject: [PATCH 3/3] Added Modbus as a slave (#640) * Fixed stopping SQLite storage * Changed the way of data reading and writing in SQLite Storage * Changed stopping way for SQLite Storage * Added Modbus Master Connector * Created Modbus Server Connector * Refactored Modbus connector * Added backward compability adapter for modbus connector * Fixed adapter * Added license * Added ASCII Framer support * Added Modbus as a slave * Added sub configuration to modbus slave * Fixed modbus config file * Fixed rpc and attributeUpdates for modbus slave * Changed modbus config file * Changed modbus config file * Changed modbus config file * Changed modbus config file --- thingsboard_gateway/config/modbus.json | 95 ++++++++++++++++--- .../modbus/backward_compability_adapter.py | 4 +- .../connectors/modbus/modbus_connector.py | 92 ++++++++++++++++++ 3 files changed, 178 insertions(+), 13 deletions(-) diff --git a/thingsboard_gateway/config/modbus.json b/thingsboard_gateway/config/modbus.json index 368da9a5..e85d1738 100644 --- a/thingsboard_gateway/config/modbus.json +++ b/thingsboard_gateway/config/modbus.json @@ -1,16 +1,17 @@ { - "server": { - "type": "tcp", - "host": "127.0.0.1", - "port": 5020, - "timeout": 35, - "method": "socket", - "byteOrder": "BIG", - "retries": true, - "retryOnEmpty": true, - "retryOnInvalid": true, - "devices": [ + "master": { + "slaves": [ { + "host": "127.0.0.1", + "port": 5021, + "type": "tcp", + "method": "socket", + "timeout": 35, + "byteOrder": "BIG", + "retries": true, + "retryOnEmpty": true, + "retryOnInvalid": true, + "pollPeriod": 5000, "unitId": 1, "deviceName": "Temp Sensor", "attributesPollPeriod": 5000, @@ -171,5 +172,77 @@ ] } ] + }, + "slave": { + "type": "tcp", + "host": "127.0.0.1", + "port": 5026, + "method": "socket", + "deviceName": "Gateway", + "deviceType": "default", + "pollPeriod": 5000, + "sendDataToThingsBoard": false, + "byteOrder": "BIG", + "unitId": 0, + "values": { + "holding_registers": [ + { + "attributes": [ + { + "address": 1, + "type": "string", + "tag": "sm", + "objectsCount": 1, + "value": "ON" + } + ], + "timeseries": [ + { + "address": 2, + "type": "int", + "tag": "smm", + "objectsCount": 1, + "value": "12334" + } + ], + "attributeUpdates": [ + { + "tag": "shared_attribute_write", + "type": "32int", + "functionCode": 6, + "objectsCount": 2, + "address": 29, + "value": 1243 + } + ], + "rpc": [ + { + "tag": "setValue", + "type": "bits", + "functionCode": 5, + "objectsCount": 1, + "address": 31, + "value": 22 + } + ] + } + ], + "coils_initializer": [ + { + "attributes": [ + { + "address": 5, + "type": "string", + "tag": "sm", + "objectsCount": 1, + "value": "12" + } + ], + "timeseries": [], + "attributeUpdates": [], + "rpc": [] + } + ] + } } } diff --git a/thingsboard_gateway/connectors/modbus/backward_compability_adapter.py b/thingsboard_gateway/connectors/modbus/backward_compability_adapter.py index b50395a7..cd5f5fcb 100644 --- a/thingsboard_gateway/connectors/modbus/backward_compability_adapter.py +++ b/thingsboard_gateway/connectors/modbus/backward_compability_adapter.py @@ -26,7 +26,7 @@ class BackwardCompatibilityAdapter: @staticmethod def __save_json_config_file(config): with open('config/modbus_new.json', 'w') as file: - file.writelines(dumps(config, sort_keys=True, indent=' ', separators=(',', ': '))) + file.writelines(dumps(config, sort_keys=False, indent=' ', separators=(',', ': '))) def convert(self): if not self.__config.get('server'): @@ -48,7 +48,7 @@ class BackwardCompatibilityAdapter: slaves.append(slave) - result_dict = {'master': {'slaves': slaves}} + result_dict = {'master': {'slaves': slaves}, 'slave': self.__config.get('slave')} self.__save_json_config_file(result_dict) return result_dict diff --git a/thingsboard_gateway/connectors/modbus/modbus_connector.py b/thingsboard_gateway/connectors/modbus/modbus_connector.py index 3ebee661..0e8573a2 100644 --- a/thingsboard_gateway/connectors/modbus/modbus_connector.py +++ b/thingsboard_gateway/connectors/modbus/modbus_connector.py @@ -36,11 +36,17 @@ from pymodbus.bit_read_message import ReadBitsResponseBase from pymodbus.client.sync import ModbusTcpClient, ModbusUdpClient, ModbusSerialClient from pymodbus.client.sync import ModbusRtuFramer, ModbusSocketFramer, ModbusAsciiFramer from pymodbus.exceptions import ConnectionException +from pymodbus.server.asynchronous import StartTcpServer, StartUdpServer, StartSerialServer, StopServer +from pymodbus.device import ModbusDeviceIdentification +from pymodbus.version import version +from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext +from pymodbus.datastore import ModbusSparseDataBlock from thingsboard_gateway.connectors.connector import Connector, log from thingsboard_gateway.connectors.modbus.constants import * from thingsboard_gateway.connectors.modbus.slave import Slave from thingsboard_gateway.connectors.modbus.backward_compability_adapter import BackwardCompatibilityAdapter +from thingsboard_gateway.connectors.modbus.bytes_modbus_downlink_converter import BytesModbusDownlinkConverter CONVERTED_DATA_SECTIONS = [ATTRIBUTES_PARAMETER, TELEMETRY_PARAMETER] FRAMER_TYPE = { @@ -48,6 +54,27 @@ FRAMER_TYPE = { 'socket': ModbusSocketFramer, 'ascii': ModbusAsciiFramer } +SLAVE_TYPE = { + 'tcp': StartTcpServer, + 'udp': StartUdpServer, + 'serial': StartSerialServer +} +FUNCTION_TYPE = { + 'coils_initializer': 'ci', + 'holding_registers': 'hr', + 'input_registers': 'ir', + 'discrete_inputs': 'di' +} +FUNCTION_CODE_WRITE = { + 'holding_registers': (6, 16), + 'coils_initializer': (5, 15) +} +FUNCTION_CODE_READ = { + 'holding_registers': 3, + 'coils_initializer': 1, + 'input_registers': 4, + 'discrete_inputs': 2 +} class ModbusConnector(Connector, Thread): @@ -59,13 +86,23 @@ class ModbusConnector(Connector, Thread): super().__init__() self.__gateway = gateway self._connector_type = connector_type + self.__backward_compatibility_adapter = BackwardCompatibilityAdapter(config) self.__config = self.__backward_compatibility_adapter.convert() + self.setName(self.__config.get("name", 'Modbus Default ' + ''.join(choice(ascii_lowercase) for _ in range(5)))) self.__connected = False self.__stopped = False self.daemon = True + if self.__config.get('slave'): + self.__slave_thread = Thread(target=self.__configure_and_run_slave, args=(self.__config['slave'],), + daemon=True, name='Gateway as a slave') + self.__slave_thread.start() + + if config['slave'].get('sendDataToThingsBoard', False): + self.__modify_main_config() + self.__slaves = [] self.__load_slaves() @@ -89,6 +126,60 @@ class ModbusConnector(Connector, Thread): sleep(.2) + @staticmethod + def __configure_and_run_slave(config): + identity = None + if config.get('identity'): + identity = ModbusDeviceIdentification() + identity.VendorName = config['identity'].get('vendorName', '') + identity.ProductCode = config['identity'].get('productCode', '') + identity.VendorUrl = config['identity'].get('vendorUrl', '') + identity.ProductName = config['identity'].get('productName', '') + identity.ModelName = config['identity'].get('ModelName', '') + identity.MajorMinorRevision = version.short() + + blocks = {} + for (key, value) in config.get('values').items(): + values = {} + converter = BytesModbusDownlinkConverter({}) + for item in value: + for section in ('attributes', 'timeseries', 'attributeUpdates', 'rpc'): + for val in item[section]: + function_code = FUNCTION_CODE_WRITE[key][0] if val['objectsCount'] <= 1 else \ + FUNCTION_CODE_WRITE[key][1] + converted_value = converter.convert( + {**val, + 'device': config.get('deviceName', 'Gateway'), 'functionCode': function_code, + 'byteOrder': config['byteOrder']}, + {'data': {'params': val['value']}}) + values[val['address']] = converted_value + + blocks[FUNCTION_TYPE[key]] = ModbusSparseDataBlock(values) + + context = ModbusServerContext(slaves=ModbusSlaveContext(**blocks), single=True) + SLAVE_TYPE[config['type']](context, identity=identity, + address=(config.get('host'), config.get('port')) if ( + config['type'] == 'tcp' or 'udp') else None, + port=config.get('port') if config['type'] == 'serial' else None, + framer=FRAMER_TYPE[config['method']]) + + def __modify_main_config(self): + config = self.__config['slave'] + + values = config.pop('values') + device = config + + for (register, reg_values) in values.items(): + for value in reg_values: + for section in ('attributes', 'timeseries', 'attributeUpdates', 'rpc'): + if not device.get(section): + device[section] = [] + + for item in value.get(section, []): + device[section].append({**item, 'functionCode': FUNCTION_CODE_READ[register]}) + + self.__config['master']['slaves'].append(device) + def __load_slaves(self): self.__slaves = [ Slave(**{**device, 'connector': self, 'gateway': self.__gateway, 'callback': ModbusConnector.callback}) for @@ -145,6 +236,7 @@ class ModbusConnector(Connector, Thread): def close(self): self.__stopped = True self.__stop_connections_to_masters() + StopServer() log.info('%s has been stopped.', self.get_name()) def get_name(self):