1
0
mirror of https://github.com/thingsboard/thingsboard-gateway synced 2025-10-26 22:31:42 +08:00

Merge branch 'master' of https://github.com/thingsboard/thingsboard-gateway into feature/grpc-api

This commit is contained in:
zbeacon
2021-12-06 13:58:16 +02:00
7 changed files with 581 additions and 298 deletions

View File

@@ -47,7 +47,7 @@ setup(
'thingsboard_gateway.extensions.mqtt', 'thingsboard_gateway.extensions.modbus', 'thingsboard_gateway.extensions.opcua',
'thingsboard_gateway.extensions.ble', 'thingsboard_gateway.extensions.serial', 'thingsboard_gateway.extensions.request',
'thingsboard_gateway.extensions.can', 'thingsboard_gateway.extensions.bacnet', 'thingsboard_gateway.extensions.odbc',
'thingsboard_gateway.extensions.rest', 'thingsboard_gateway.extensions.snmp', 'thingsboard_gateway.extensions.ftp'
'thingsboard_gateway.extensions.rest', 'thingsboard_gateway.extensions.snmp', 'thingsboard_gateway.extensions.ftp',
],
install_requires=[
'jsonpath-rw',

View File

@@ -1,16 +1,17 @@
{
"server": {
"type": "tcp",
"host": "127.0.0.1",
"port": 5020,
"timeout": 35,
"method": "socket",
"byteOrder": "BIG",
"retries": true,
"retryOnEmpty": true,
"retryOnInvalid": true,
"devices": [
"master": {
"slaves": [
{
"host": "127.0.0.1",
"port": 5021,
"type": "tcp",
"method": "socket",
"timeout": 35,
"byteOrder": "BIG",
"retries": true,
"retryOnEmpty": true,
"retryOnInvalid": true,
"pollPeriod": 5000,
"unitId": 1,
"deviceName": "Temp Sensor",
"attributesPollPeriod": 5000,
@@ -171,5 +172,77 @@
]
}
]
},
"slave": {
"type": "tcp",
"host": "127.0.0.1",
"port": 5026,
"method": "socket",
"deviceName": "Gateway",
"deviceType": "default",
"pollPeriod": 5000,
"sendDataToThingsBoard": false,
"byteOrder": "BIG",
"unitId": 0,
"values": {
"holding_registers": [
{
"attributes": [
{
"address": 1,
"type": "string",
"tag": "sm",
"objectsCount": 1,
"value": "ON"
}
],
"timeseries": [
{
"address": 2,
"type": "int",
"tag": "smm",
"objectsCount": 1,
"value": "12334"
}
],
"attributeUpdates": [
{
"tag": "shared_attribute_write",
"type": "32int",
"functionCode": 6,
"objectsCount": 2,
"address": 29,
"value": 1243
}
],
"rpc": [
{
"tag": "setValue",
"type": "bits",
"functionCode": 5,
"objectsCount": 1,
"address": 31,
"value": 22
}
]
}
],
"coils_initializer": [
{
"attributes": [
{
"address": 5,
"type": "string",
"tag": "sm",
"objectsCount": 1,
"value": "12"
}
],
"timeseries": [],
"attributeUpdates": [],
"rpc": []
}
]
}
}
}

View File

@@ -47,6 +47,7 @@ connectors:
# type: modbus
# configuration: modbus_serial.json
#
#
# -
# name: OPC-UA Connector
# type: opcua

View File

@@ -0,0 +1,54 @@
# Copyright 2021. ThingsBoard
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from simplejson import dumps
from thingsboard_gateway.connectors.connector import log
class BackwardCompatibilityAdapter:
def __init__(self, config):
self.__config = config
self.__keys = ['host', 'port', 'type', 'method', 'timeout', 'byteOrder', 'wordOrder', 'retries', 'retryOnEmpty',
'retryOnInvalid', 'baudrate']
@staticmethod
def __save_json_config_file(config):
with open('config/modbus_new.json', 'w') as file:
file.writelines(dumps(config, sort_keys=False, indent=' ', separators=(',', ': ')))
def convert(self):
if not self.__config.get('server'):
return self.__config
log.warning(
'You are using old configuration structure for Modbus connector. It will be DEPRECATED in the future '
'version! New config file "modbus_new.json" was generated in config/ folder. Please, use it.')
slaves = []
for device in self.__config['server'].get('devices', []):
slave = {**device}
for key in self.__keys:
if not device.get(key):
slave[key] = self.__config['server'].get(key)
slave['pollPeriod'] = slave['timeseriesPollPeriod']
slaves.append(slave)
result_dict = {'master': {'slaves': slaves}, 'slave': self.__config.get('slave')}
self.__save_json_config_file(result_dict)
return result_dict

View File

@@ -12,13 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import threading
import time
from threading import Thread
from time import sleep, time
from queue import Queue
from random import choice
from string import ascii_lowercase
from thingsboard_gateway.tb_utility.tb_loader import TBModuleLoader
from thingsboard_gateway.tb_utility.tb_utility import TBUtility
# Try import Pymodbus library or install it and import
@@ -30,23 +29,56 @@ except ImportError:
TBUtility.install_package('pyserial')
from pymodbus.constants import Defaults
from pymodbus.client.sync import ModbusTcpClient, ModbusUdpClient, ModbusSerialClient, ModbusRtuFramer, \
ModbusSocketFramer
from pymodbus.bit_write_message import WriteSingleCoilResponse, WriteMultipleCoilsResponse
from pymodbus.register_write_message import WriteMultipleRegistersResponse, WriteSingleRegisterResponse
from pymodbus.register_read_message import ReadRegistersResponseBase
from pymodbus.bit_read_message import ReadBitsResponseBase
from pymodbus.client.sync import ModbusTcpClient, ModbusUdpClient, ModbusSerialClient
from pymodbus.client.sync import ModbusRtuFramer, ModbusSocketFramer, ModbusAsciiFramer
from pymodbus.exceptions import ConnectionException
from pymodbus.server.asynchronous import StartTcpServer, StartUdpServer, StartSerialServer, StopServer
from pymodbus.device import ModbusDeviceIdentification
from pymodbus.version import version
from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext
from pymodbus.datastore import ModbusSparseDataBlock
from thingsboard_gateway.connectors.connector import Connector, log
from thingsboard_gateway.connectors.modbus.constants import *
from thingsboard_gateway.connectors.modbus.bytes_modbus_uplink_converter import BytesModbusUplinkConverter
from thingsboard_gateway.connectors.modbus.slave import Slave
from thingsboard_gateway.connectors.modbus.backward_compability_adapter import BackwardCompatibilityAdapter
from thingsboard_gateway.connectors.modbus.bytes_modbus_downlink_converter import BytesModbusDownlinkConverter
CONVERTED_DATA_SECTIONS = [ATTRIBUTES_PARAMETER, TELEMETRY_PARAMETER]
FRAMER_TYPE = {
'rtu': ModbusRtuFramer,
'socket': ModbusSocketFramer,
'ascii': ModbusAsciiFramer
}
SLAVE_TYPE = {
'tcp': StartTcpServer,
'udp': StartUdpServer,
'serial': StartSerialServer
}
FUNCTION_TYPE = {
'coils_initializer': 'ci',
'holding_registers': 'hr',
'input_registers': 'ir',
'discrete_inputs': 'di'
}
FUNCTION_CODE_WRITE = {
'holding_registers': (6, 16),
'coils_initializer': (5, 15)
}
FUNCTION_CODE_READ = {
'holding_registers': 3,
'coils_initializer': 1,
'input_registers': 4,
'discrete_inputs': 2
}
class ModbusConnector(Connector, threading.Thread):
class ModbusConnector(Connector, Thread):
process_requests = Queue(-1)
def __init__(self, gateway, config, connector_type):
self.statistics = {STATISTIC_MESSAGE_RECEIVED_PARAMETER: 0,
@@ -54,28 +86,25 @@ class ModbusConnector(Connector, threading.Thread):
super().__init__()
self.__gateway = gateway
self._connector_type = connector_type
self.__config = config.get(CONFIG_SERVER_SECTION_PARAMETER)
self.__previous_master_config = dict()
self.__current_master, self.__available_functions = self.__configure_master()
self.__default_config_parameters = [HOST_PARAMETER,
PORT_PARAMETER,
BAUDRATE_PARAMETER,
TIMEOUT_PARAMETER,
METHOD_PARAMETER,
STOPBITS_PARAMETER,
BYTESIZE_PARAMETER,
PARITY_PARAMETER,
STRICT_PARAMETER,
TYPE_PARAMETER]
self.__byte_order = self.__config.get(BYTE_ORDER_PARAMETER)
self.__word_order = self.__config.get(WORD_ORDER_PARAMETER)
self.__devices = {}
self.__backward_compatibility_adapter = BackwardCompatibilityAdapter(config)
self.__config = self.__backward_compatibility_adapter.convert()
self.setName(self.__config.get("name", 'Modbus Default ' + ''.join(choice(ascii_lowercase) for _ in range(5))))
self.__load_converters()
self.__connected = False
self.__stopped = False
self.daemon = True
self.__data_to_convert_queue = Queue()
if self.__config.get('slave'):
self.__slave_thread = Thread(target=self.__configure_and_run_slave, args=(self.__config['slave'],),
daemon=True, name='Gateway as a slave')
self.__slave_thread.start()
if config['slave'].get('sendDataToThingsBoard', False):
self.__modify_main_config()
self.__slaves = []
self.__load_slaves()
def is_connected(self):
return self.__connected
@@ -83,29 +112,93 @@ class ModbusConnector(Connector, threading.Thread):
def open(self):
self.__stopped = False
self.start()
log.info("Starting Modbus connector")
def run(self):
self.__connected = True
while True:
time.sleep(.2)
self.__process_devices()
if not self.__stopped and not ModbusConnector.process_requests.empty():
thread = Thread(target=self.__process_slaves, daemon=True)
thread.start()
if not self.__data_to_convert_queue.empty():
for _ in range(self.__data_to_convert_queue.qsize()):
thread = threading.Thread(target=self.__convert_and_save_data, args=(self.__data_to_convert_queue,),
daemon=True)
thread.start()
if self.__stopped:
break
def __convert_and_save_data(self, queue):
device, current_device_config, config, device_responses = queue.get()
sleep(.2)
@staticmethod
def __configure_and_run_slave(config):
identity = None
if config.get('identity'):
identity = ModbusDeviceIdentification()
identity.VendorName = config['identity'].get('vendorName', '')
identity.ProductCode = config['identity'].get('productCode', '')
identity.VendorUrl = config['identity'].get('vendorUrl', '')
identity.ProductName = config['identity'].get('productName', '')
identity.ModelName = config['identity'].get('ModelName', '')
identity.MajorMinorRevision = version.short()
blocks = {}
for (key, value) in config.get('values').items():
values = {}
converter = BytesModbusDownlinkConverter({})
for item in value:
for section in ('attributes', 'timeseries', 'attributeUpdates', 'rpc'):
for val in item[section]:
function_code = FUNCTION_CODE_WRITE[key][0] if val['objectsCount'] <= 1 else \
FUNCTION_CODE_WRITE[key][1]
converted_value = converter.convert(
{**val,
'device': config.get('deviceName', 'Gateway'), 'functionCode': function_code,
'byteOrder': config['byteOrder']},
{'data': {'params': val['value']}})
values[val['address']] = converted_value
blocks[FUNCTION_TYPE[key]] = ModbusSparseDataBlock(values)
context = ModbusServerContext(slaves=ModbusSlaveContext(**blocks), single=True)
SLAVE_TYPE[config['type']](context, identity=identity,
address=(config.get('host'), config.get('port')) if (
config['type'] == 'tcp' or 'udp') else None,
port=config.get('port') if config['type'] == 'serial' else None,
framer=FRAMER_TYPE[config['method']])
def __modify_main_config(self):
config = self.__config['slave']
values = config.pop('values')
device = config
for (register, reg_values) in values.items():
for value in reg_values:
for section in ('attributes', 'timeseries', 'attributeUpdates', 'rpc'):
if not device.get(section):
device[section] = []
for item in value.get(section, []):
device[section].append({**item, 'functionCode': FUNCTION_CODE_READ[register]})
self.__config['master']['slaves'].append(device)
def __load_slaves(self):
self.__slaves = [
Slave(**{**device, 'connector': self, 'gateway': self.__gateway, 'callback': ModbusConnector.callback}) for
device in self.__config.get('master', {'slaves': []}).get('slaves', [])]
@classmethod
def callback(cls, slave):
cls.process_requests.put(slave)
@property
def connector_type(self):
return self._connector_type
def __convert_and_save_data(self, config_tuple):
device, current_device_config, config, device_responses = config_tuple
converted_data = {}
try:
converted_data = self.__devices[device][UPLINK_PREFIX + CONVERTER_PARAMETER].convert(
converted_data = device.config[UPLINK_PREFIX + CONVERTER_PARAMETER].convert(
config=config,
data=device_responses)
except Exception as e:
@@ -117,20 +210,22 @@ class ModbusConnector(Connector, threading.Thread):
ATTRIBUTES_PARAMETER: []
}
if current_device_config.get(SEND_DATA_ONLY_ON_CHANGE_PARAMETER):
if current_device_config.get('sendDataOnlyOnChange'):
self.statistics[STATISTIC_MESSAGE_RECEIVED_PARAMETER] += 1
for converted_data_section in CONVERTED_DATA_SECTIONS:
for current_section_dict in converted_data[converted_data_section]:
for key, value in current_section_dict.items():
if self.__devices[device][LAST_PREFIX + converted_data_section].get(key) is None or \
self.__devices[device][LAST_PREFIX + converted_data_section][key] != value:
self.__devices[device][LAST_PREFIX + converted_data_section][key] = value
if device.config[LAST_PREFIX + converted_data_section].get(key) is None or \
device.config[LAST_PREFIX + converted_data_section][key] != value:
device.config[LAST_PREFIX + converted_data_section][key] = value
to_send[converted_data_section].append({key: value})
elif converted_data and current_device_config.get(SEND_DATA_ONLY_ON_CHANGE_PARAMETER) is None or \
not current_device_config.get(SEND_DATA_ONLY_ON_CHANGE_PARAMETER):
elif converted_data and current_device_config.get('sendDataOnlyOnChange') is None or \
not current_device_config.get('sendDataOnlyOnChange'):
self.statistics[STATISTIC_MESSAGE_RECEIVED_PARAMETER] += 1
for converted_data_section in CONVERTED_DATA_SECTIONS:
self.__devices[device][LAST_PREFIX + converted_data_section] = converted_data[
device.config[LAST_PREFIX + converted_data_section] = converted_data[
converted_data_section]
to_send[converted_data_section] = converted_data[converted_data_section]
@@ -138,225 +233,142 @@ class ModbusConnector(Connector, threading.Thread):
self.__gateway.send_to_storage(self.get_name(), to_send)
self.statistics[STATISTIC_MESSAGE_SENT_PARAMETER] += 1
def __load_converters(self):
try:
for device in self.__config[CONFIG_DEVICES_SECTION_PARAMETER]:
if self.__config.get(UPLINK_PREFIX + CONVERTER_PARAMETER) is not None:
converter = TBModuleLoader.import_module(self._connector_type,
self.__config[UPLINK_PREFIX + CONVERTER_PARAMETER])(device)
else:
converter = BytesModbusUplinkConverter(device)
if self.__config.get(DOWNLINK_PREFIX + CONVERTER_PARAMETER) is not None:
downlink_converter = TBModuleLoader.import_module(self._connector_type, self.__config[
DOWNLINK_PREFIX + CONVERTER_PARAMETER])(device)
else:
downlink_converter = BytesModbusDownlinkConverter(device)
if device.get(DEVICE_NAME_PARAMETER) not in self.__gateway.get_devices():
self.__gateway.add_device(device.get(DEVICE_NAME_PARAMETER), {CONNECTOR_PARAMETER: self},
device_type=device.get(DEVICE_TYPE_PARAMETER))
self.__devices[device[DEVICE_NAME_PARAMETER]] = {CONFIG_SECTION_PARAMETER: device,
UPLINK_PREFIX + CONVERTER_PARAMETER: converter,
DOWNLINK_PREFIX + CONVERTER_PARAMETER: downlink_converter,
NEXT_PREFIX + ATTRIBUTES_PARAMETER + CHECK_POSTFIX: 0,
NEXT_PREFIX + TIMESERIES_PARAMETER + CHECK_POSTFIX: 0,
TELEMETRY_PARAMETER: {},
ATTRIBUTES_PARAMETER: {},
LAST_PREFIX + TELEMETRY_PARAMETER: {},
LAST_PREFIX + ATTRIBUTES_PARAMETER: {},
CONNECTION_ATTEMPT_PARAMETER: 0
}
except Exception as e:
log.exception(e)
def close(self):
self.__stopped = True
self.__stop_connections_to_masters()
StopServer()
log.info('%s has been stopped.', self.get_name())
def get_name(self):
return self.name
def __process_devices(self):
for device in self.__devices:
current_time = time.time()
device_responses = {TIMESERIES_PARAMETER: {},
ATTRIBUTES_PARAMETER: {},
}
current_device_config = {}
try:
for config_section in device_responses:
if self.__devices[device][CONFIG_SECTION_PARAMETER].get(config_section) is not None:
current_device_config = self.__devices[device][CONFIG_SECTION_PARAMETER]
unit_id = current_device_config[UNIT_ID_PARAMETER]
if self.__devices[device][NEXT_PREFIX + config_section + CHECK_POSTFIX] < current_time:
self.__connect_to_current_master(device)
if not self.__current_master.is_socket_open() or not len(
current_device_config[config_section]):
continue
# Reading data from device
for interested_data in range(len(current_device_config[config_section])):
current_data = current_device_config[config_section][interested_data]
current_data[DEVICE_NAME_PARAMETER] = device
input_data = self.__function_to_device(current_data, unit_id)
device_responses[config_section][current_data[TAG_PARAMETER]] = {
"data_sent": current_data,
"input_data": input_data}
def __process_slaves(self):
# TODO: write documentation
device = ModbusConnector.process_requests.get()
log.debug("Checking %s for device %s", config_section, device)
self.__devices[device][NEXT_PREFIX + config_section + CHECK_POSTFIX] = current_time + \
current_device_config[
config_section + POLL_PERIOD_POSTFIX] / 1000
log.debug('Device response: ', device_responses)
if device_responses.get('timeseries') or device_responses.get('attributes'):
self.__data_to_convert_queue.put((device, current_device_config, {
**current_device_config,
BYTE_ORDER_PARAMETER: current_device_config.get(BYTE_ORDER_PARAMETER,
self.__byte_order),
WORD_ORDER_PARAMETER: current_device_config.get(WORD_ORDER_PARAMETER,
self.__word_order)
}, device_responses))
except ConnectionException:
time.sleep(5)
log.error("Connection lost! Reconnecting...")
except Exception as e:
log.exception(e)
def on_attributes_update(self, content):
device_responses = {'timeseries': {}, 'attributes': {}}
current_device_config = {}
try:
for attribute_updates_command_config in \
self.__devices[content[DEVICE_SECTION_PARAMETER]][CONFIG_SECTION_PARAMETER]["attributeUpdates"]:
for attribute_updated in content[DATA_PARAMETER]:
if attribute_updates_command_config[TAG_PARAMETER] == attribute_updated:
to_process = {
DEVICE_SECTION_PARAMETER: content[DEVICE_SECTION_PARAMETER],
DATA_PARAMETER: {
RPC_METHOD_PARAMETER: attribute_updated,
RPC_PARAMS_PARAMETER: content[DATA_PARAMETER][attribute_updated]
}
}
self.__process_rpc_request(to_process, attribute_updates_command_config)
for config_section in device_responses:
if device.config.get(config_section) is not None:
current_device_config = device.config
self.__connect_to_current_master(device)
if not device.config['master'].is_socket_open() or not len(
current_device_config[config_section]):
continue
# Reading data from device
for interested_data in range(len(current_device_config[config_section])):
current_data = current_device_config[config_section][interested_data]
current_data[DEVICE_NAME_PARAMETER] = device
input_data = self.__function_to_device(device, current_data)
device_responses[config_section][current_data[TAG_PARAMETER]] = {
"data_sent": current_data,
"input_data": input_data}
log.debug("Checking %s for device %s", config_section, device)
log.debug('Device response: ', device_responses)
if device_responses.get('timeseries') or device_responses.get('attributes'):
self.__convert_and_save_data((device, current_device_config, {
**current_device_config,
BYTE_ORDER_PARAMETER: current_device_config.get(BYTE_ORDER_PARAMETER,
device.byte_order),
WORD_ORDER_PARAMETER: current_device_config.get(WORD_ORDER_PARAMETER,
device.word_order)
}, device_responses))
except ConnectionException:
sleep(5)
log.error("Connection lost! Reconnecting...")
except Exception as e:
log.exception(e)
def __connect_to_current_master(self, device=None):
# TODO: write documentation
connect_attempt_count = 5
connect_attempt_time_ms = 100
wait_after_failed_attempts_ms = 300000
# if device is None:
# device = list(self.__devices.keys())[0]
if self.__devices[device].get(MASTER_PARAMETER) is None:
self.__devices[device][MASTER_PARAMETER], self.__devices[device][
AVAILABLE_FUNCTIONS_PARAMETER] = self.__configure_master(
self.__devices[device][CONFIG_SECTION_PARAMETER])
if self.__devices[device][MASTER_PARAMETER] != self.__current_master:
self.__current_master = self.__devices[device][MASTER_PARAMETER]
self.__available_functions = self.__devices[device][AVAILABLE_FUNCTIONS_PARAMETER]
connect_attempt_count = self.__devices[device][CONFIG_SECTION_PARAMETER].get(CONNECT_ATTEMPT_COUNT_PARAMETER,
connect_attempt_count)
if device.config.get('master') is None:
device.config['master'], device.config['available_functions'] = self.__configure_master(device.config)
if connect_attempt_count < 1:
connect_attempt_count = 1
connect_attempt_time_ms = self.__devices[device][CONFIG_SECTION_PARAMETER].get(
CONNECT_ATTEMPT_TIME_MS_PARAMETER, connect_attempt_time_ms)
connect_attempt_time_ms = device.config.get('connectAttemptTimeMs', connect_attempt_time_ms)
if connect_attempt_time_ms < 500:
connect_attempt_time_ms = 500
wait_after_failed_attempts_ms = self.__devices[device][CONFIG_SECTION_PARAMETER].get(
WAIT_AFTER_FAILED_ATTEMPTS_MS_PARAMETER, wait_after_failed_attempts_ms)
wait_after_failed_attempts_ms = device.config.get('waitAfterFailedAttemptsMs', wait_after_failed_attempts_ms)
if wait_after_failed_attempts_ms < 1000:
wait_after_failed_attempts_ms = 1000
current_time = time.time() * 1000
if not self.__current_master.is_socket_open():
if self.__devices[device][CONNECTION_ATTEMPT_PARAMETER] >= connect_attempt_count and \
current_time - self.__devices[device][
LAST_CONNECTION_ATTEMPT_TIME_PARAMETER] >= wait_after_failed_attempts_ms:
self.__devices[device][CONNECTION_ATTEMPT_PARAMETER] = 0
while not self.__current_master.is_socket_open() \
and self.__devices[device][CONNECTION_ATTEMPT_PARAMETER] < connect_attempt_count \
and current_time - self.__devices[device].get(LAST_CONNECTION_ATTEMPT_TIME_PARAMETER,
0) >= connect_attempt_time_ms:
self.__devices[device][CONNECTION_ATTEMPT_PARAMETER] = self.__devices[device][
CONNECTION_ATTEMPT_PARAMETER] + 1
self.__devices[device][LAST_CONNECTION_ATTEMPT_TIME_PARAMETER] = current_time
current_time = time() * 1000
if not device.config['master'].is_socket_open():
if device.config['connection_attempt'] >= connect_attempt_count and current_time - device.config[
'last_connection_attempt_time'] >= wait_after_failed_attempts_ms:
device.config['connection_attempt'] = 0
while not device.config['master'].is_socket_open() \
and device.config['connection_attempt'] < connect_attempt_count \
and current_time - device.config.get('last_connection_attempt_time',
0) >= connect_attempt_time_ms:
device.config['connection_attempt'] = device.config[
'connection_attempt'] + 1
device.config['last_connection_attempt_time'] = current_time
log.debug("Modbus trying connect to %s", device)
self.__current_master.connect()
if self.__devices[device][CONNECTION_ATTEMPT_PARAMETER] == connect_attempt_count:
device.config['master'].connect()
if device.config['connection_attempt'] == connect_attempt_count:
log.warn("Maximum attempt count (%i) for device \"%s\" - encountered.", connect_attempt_count,
device)
# time.sleep(connect_attempt_time_ms / 1000)
# if not self.__current_master.is_socket_open():
if self.__devices[device][CONNECTION_ATTEMPT_PARAMETER] >= 0 and self.__current_master.is_socket_open():
self.__devices[device][CONNECTION_ATTEMPT_PARAMETER] = 0
self.__devices[device][LAST_CONNECTION_ATTEMPT_TIME_PARAMETER] = current_time
# log.debug("Modbus connected to device %s.", device)
def __configure_master(self, config=None):
current_config = self.__config if config is None else config
if device.config['connection_attempt'] >= 0 and device.config['master'].is_socket_open():
device.config['connection_attempt'] = 0
device.config['last_connection_attempt_time'] = current_time
master_config = dict()
master_config["host"] = current_config[HOST_PARAMETER] if current_config.get(
HOST_PARAMETER) is not None else self.__config.get(HOST_PARAMETER, "localhost")
try:
master_config["port"] = int(current_config[PORT_PARAMETER]) if current_config.get(
PORT_PARAMETER) is not None else self.__config.get(int(PORT_PARAMETER), 502)
except ValueError:
master_config["port"] = current_config[PORT_PARAMETER] if current_config.get(
PORT_PARAMETER) is not None else self.__config.get(PORT_PARAMETER, 502)
master_config["baudrate"] = current_config[BAUDRATE_PARAMETER] if current_config.get(
BAUDRATE_PARAMETER) is not None else self.__config.get(BAUDRATE_PARAMETER, 19200)
master_config["timeout"] = current_config[TIMEOUT_PARAMETER] if current_config.get(
TIMEOUT_PARAMETER) is not None else self.__config.get(TIMEOUT_PARAMETER, 35)
master_config["method"] = current_config[METHOD_PARAMETER] if current_config.get(
METHOD_PARAMETER) is not None else self.__config.get(METHOD_PARAMETER, "rtu")
master_config["stopbits"] = current_config[STOPBITS_PARAMETER] if current_config.get(
STOPBITS_PARAMETER) is not None else self.__config.get(STOPBITS_PARAMETER, Defaults.Stopbits)
master_config["bytesize"] = current_config[BYTESIZE_PARAMETER] if current_config.get(
BYTESIZE_PARAMETER) is not None else self.__config.get(BYTESIZE_PARAMETER, Defaults.Bytesize)
master_config["parity"] = current_config[PARITY_PARAMETER] if current_config.get(
PARITY_PARAMETER) is not None else self.__config.get(PARITY_PARAMETER, Defaults.Parity)
master_config["strict"] = current_config[STRICT_PARAMETER] if current_config.get(
STRICT_PARAMETER) is not None else self.__config.get(STRICT_PARAMETER, True)
master_config["retries"] = current_config[RETRIES_PARAMETER] if current_config.get(
RETRIES_PARAMETER) is not None else self.__config.get(RETRIES_PARAMETER, 3)
master_config["retry_on_empty"] = current_config[RETRY_ON_EMPTY_PARAMETER] if current_config.get(
RETRY_ON_EMPTY_PARAMETER) is not None else self.__config.get(RETRY_ON_EMPTY_PARAMETER, True)
master_config["retry_on_invalid"] = current_config[RETRY_ON_INVALID_PARAMETER] if current_config.get(
RETRY_ON_INVALID_PARAMETER) is not None else self.__config.get(RETRY_ON_INVALID_PARAMETER, True)
master_config["rtu"] = ModbusRtuFramer if current_config.get(METHOD_PARAMETER) == "rtu" or (
current_config.get(METHOD_PARAMETER) is None and self.__config.get(
METHOD_PARAMETER) == "rtu") else ModbusSocketFramer
if self.__previous_master_config != master_config:
self.__previous_master_config = master_config
if current_config.get(TYPE_PARAMETER) == 'tcp' or (
current_config.get(TYPE_PARAMETER) is None and self.__config.get(TYPE_PARAMETER) == "tcp"):
master = ModbusTcpClient(master_config["host"], master_config["port"], master_config["rtu"],
timeout=master_config["timeout"],
retry_on_empty=master_config["retry_on_empty"],
retry_on_invalid=master_config["retry_on_invalid"],
retries=master_config["retries"])
elif current_config.get(TYPE_PARAMETER) == 'udp' or (
current_config.get(TYPE_PARAMETER) is None and self.__config.get(TYPE_PARAMETER) == "udp"):
master = ModbusUdpClient(master_config["host"], master_config["port"], master_config["rtu"],
timeout=master_config["timeout"],
retry_on_empty=master_config["retry_on_empty"],
retry_on_invalid=master_config["retry_on_invalid"],
retries=master_config["retries"])
elif current_config.get(TYPE_PARAMETER) == 'serial' or (
current_config.get(TYPE_PARAMETER) is None and self.__config.get(TYPE_PARAMETER) == "serial"):
master = ModbusSerialClient(method=master_config["method"],
port=master_config["port"],
timeout=master_config["timeout"],
retry_on_empty=master_config["retry_on_empty"],
retry_on_invalid=master_config["retry_on_invalid"],
retries=master_config["retries"],
baudrate=master_config["baudrate"],
stopbits=master_config["stopbits"],
bytesize=master_config["bytesize"],
parity=master_config["parity"],
strict=master_config["strict"])
else:
raise Exception("Invalid Modbus transport type.")
@staticmethod
def __configure_master(config):
current_config = config
current_config["rtu"] = FRAMER_TYPE[current_config['method']]
if current_config.get('type') == 'tcp':
master = ModbusTcpClient(current_config["host"],
current_config["port"],
current_config["rtu"],
timeout=current_config["timeout"],
retry_on_empty=current_config["retry_on_empty"],
retry_on_invalid=current_config["retry_on_invalid"],
retries=current_config["retries"])
elif current_config.get(TYPE_PARAMETER) == 'udp':
master = ModbusUdpClient(current_config["host"],
current_config["port"],
current_config["rtu"],
timeout=current_config["timeout"],
retry_on_empty=current_config["retry_on_empty"],
retry_on_invalid=current_config["retry_on_invalid"],
retries=current_config["retries"])
elif current_config.get(TYPE_PARAMETER) == 'serial':
master = ModbusSerialClient(method=current_config["method"],
port=current_config["port"],
timeout=current_config["timeout"],
retry_on_empty=current_config["retry_on_empty"],
retry_on_invalid=current_config["retry_on_invalid"],
retries=current_config["retries"],
baudrate=current_config["baudrate"],
stopbits=current_config["stopbits"],
bytesize=current_config["bytesize"],
parity=current_config["parity"],
strict=current_config["strict"])
else:
master = self.__current_master
raise Exception("Invalid Modbus transport type.")
available_functions = {
1: master.read_coils,
2: master.read_discrete_inputs,
@@ -366,59 +378,83 @@ class ModbusConnector(Connector, threading.Thread):
6: master.write_register,
15: master.write_coils,
16: master.write_registers,
}
}
return master, available_functions
def __stop_connections_to_masters(self):
for device in self.__devices:
if self.__devices[device].get(MASTER_PARAMETER) is not None:
self.__devices[device][MASTER_PARAMETER].close()
for slave in self.__slaves:
if slave.config.get('master') is not None and slave.config.get('master').is_socket_open():
slave.config['master'].close()
def __function_to_device(self, config, unit_id):
function_code = config.get(FUNCTION_CODE_PARAMETER)
@staticmethod
def __function_to_device(device, config):
function_code = config.get('functionCode')
result = None
if function_code in (1, 2, 3, 4):
result = self.__available_functions[function_code](address=config[ADDRESS_PARAMETER],
count=config.get(OBJECTS_COUNT_PARAMETER,
config.get("registersCount",
config.get("registerCount",
1))),
unit=unit_id)
result = device.config['available_functions'][function_code](address=config[ADDRESS_PARAMETER],
count=config.get(OBJECTS_COUNT_PARAMETER,
config.get("registersCount",
config.get(
"registerCount",
1))),
unit=device.config['unitId'])
elif function_code in (5, 6):
result = self.__available_functions[function_code](address=config[ADDRESS_PARAMETER],
value=config[PAYLOAD_PARAMETER],
unit=unit_id)
result = device.config['available_functions'][function_code](address=config[ADDRESS_PARAMETER],
value=config[PAYLOAD_PARAMETER],
unit=device.config['unitId'])
elif function_code in (15, 16):
result = self.__available_functions[function_code](address=config[ADDRESS_PARAMETER],
values=config[PAYLOAD_PARAMETER],
unit=unit_id)
result = device.config['available_functions'][function_code](address=config[ADDRESS_PARAMETER],
values=config[PAYLOAD_PARAMETER],
unit=device.config['unitId'])
else:
log.error("Unknown Modbus function with code: %i", function_code)
log.error("Unknown Modbus function with code: %s", function_code)
log.debug("With result %s", str(result))
if "Exception" in str(result):
log.exception(result)
return result
def on_attributes_update(self, content):
try:
device = tuple(filter(lambda slave: slave.name == content[DEVICE_SECTION_PARAMETER], self.__slaves))[0]
for attribute_updates_command_config in device.config['attributeUpdates']:
for attribute_updated in content[DATA_PARAMETER]:
if attribute_updates_command_config[TAG_PARAMETER] == attribute_updated:
to_process = {
DEVICE_SECTION_PARAMETER: content[DEVICE_SECTION_PARAMETER],
DATA_PARAMETER: {
RPC_METHOD_PARAMETER: attribute_updated,
RPC_PARAMS_PARAMETER: content[DATA_PARAMETER][attribute_updated]
}
}
self.__process_rpc_request(to_process, attribute_updates_command_config)
except Exception as e:
log.exception(e)
def server_side_rpc_handler(self, server_rpc_request):
try:
if server_rpc_request.get(DEVICE_SECTION_PARAMETER) is not None:
log.debug("Modbus connector received rpc request for %s with server_rpc_request: %s",
server_rpc_request[DEVICE_SECTION_PARAMETER],
server_rpc_request)
if isinstance(self.__devices[server_rpc_request[DEVICE_SECTION_PARAMETER]][CONFIG_SECTION_PARAMETER][
RPC_SECTION], dict):
rpc_command_config = \
self.__devices[server_rpc_request[DEVICE_SECTION_PARAMETER]][CONFIG_SECTION_PARAMETER][
RPC_SECTION].get(
server_rpc_request[DATA_PARAMETER][RPC_METHOD_PARAMETER])
device = tuple(
filter(
lambda slave: slave.name == server_rpc_request[DEVICE_SECTION_PARAMETER], self.__slaves
)
)[0]
if isinstance(device.config[RPC_SECTION], dict):
rpc_command_config = device.config[RPC_SECTION].get(
server_rpc_request[DATA_PARAMETER][RPC_METHOD_PARAMETER])
if rpc_command_config is not None:
self.__process_rpc_request(server_rpc_request, rpc_command_config)
elif isinstance(self.__devices[server_rpc_request[DEVICE_SECTION_PARAMETER]][CONFIG_SECTION_PARAMETER][
RPC_SECTION], list):
for rpc_command_config in \
self.__devices[server_rpc_request[DEVICE_SECTION_PARAMETER]][CONFIG_SECTION_PARAMETER][
RPC_SECTION]:
elif isinstance(device.config[RPC_SECTION], list):
for rpc_command_config in device.config[RPC_SECTION]:
if rpc_command_config[TAG_PARAMETER] == server_rpc_request[DATA_PARAMETER][
RPC_METHOD_PARAMETER]:
self.__process_rpc_request(server_rpc_request, rpc_command_config)
@@ -438,45 +474,41 @@ class ModbusConnector(Connector, threading.Thread):
def __process_rpc_request(self, content, rpc_command_config):
if rpc_command_config is not None:
rpc_command_config[UNIT_ID_PARAMETER] = \
self.__devices[content[DEVICE_SECTION_PARAMETER]][CONFIG_SECTION_PARAMETER][UNIT_ID_PARAMETER]
self.__connect_to_current_master(content[DEVICE_SECTION_PARAMETER])
# if rpc_command_config.get('bit') is not None:
# rpc_command_config[FUNCTION_CODE_PARAMETER] = 6
device = tuple(filter(lambda slave: slave.name == content[DEVICE_SECTION_PARAMETER], self.__slaves))[0]
rpc_command_config[UNIT_ID_PARAMETER] = device.config['unitId']
self.__connect_to_current_master(device)
if rpc_command_config.get(FUNCTION_CODE_PARAMETER) in (5, 6, 15, 16):
rpc_command_config[PAYLOAD_PARAMETER] = self.__devices[content[DEVICE_SECTION_PARAMETER]][
rpc_command_config[PAYLOAD_PARAMETER] = device.config[
DOWNLINK_PREFIX + CONVERTER_PARAMETER].convert(
rpc_command_config, content)
response = None
try:
response = self.__function_to_device(rpc_command_config, rpc_command_config[UNIT_ID_PARAMETER])
response = self.__function_to_device(device, rpc_command_config)
except Exception as e:
log.exception(e)
response = e
if isinstance(response, (ReadRegistersResponseBase, ReadBitsResponseBase)):
to_converter = {
RPC_SECTION: {content[DATA_PARAMETER][RPC_METHOD_PARAMETER]: {"data_sent": rpc_command_config,
"input_data": response}}}
response = self.__devices[content[DEVICE_SECTION_PARAMETER]][
response = device.config[
UPLINK_PREFIX + CONVERTER_PARAMETER].convert(
config={**self.__devices[content[DEVICE_SECTION_PARAMETER]][CONFIG_SECTION_PARAMETER],
BYTE_ORDER_PARAMETER: self.__devices[content[DEVICE_SECTION_PARAMETER]][
CONFIG_SECTION_PARAMETER].get(BYTE_ORDER_PARAMETER,
self.__byte_order),
WORD_ORDER_PARAMETER: self.__devices[content[DEVICE_SECTION_PARAMETER]][
CONFIG_SECTION_PARAMETER].get(WORD_ORDER_PARAMETER,
self.__word_order)
config={**device.config,
BYTE_ORDER_PARAMETER: device.byte_order,
WORD_ORDER_PARAMETER: device.word_order
},
data=to_converter)
log.debug("Received RPC method: %s, result: %r", content[DATA_PARAMETER][RPC_METHOD_PARAMETER],
response)
# response = {"success": response}
elif isinstance(response, (WriteMultipleRegistersResponse,
WriteMultipleCoilsResponse,
WriteSingleCoilResponse,
WriteSingleRegisterResponse)):
log.debug("Write %r", str(response))
response = {"success": True}
if content.get(RPC_ID_PARAMETER) or (
content.get(DATA_PARAMETER) is not None and content[DATA_PARAMETER].get(RPC_ID_PARAMETER)):
if isinstance(response, Exception):
@@ -487,4 +519,5 @@ class ModbusConnector(Connector, threading.Thread):
self.__gateway.send_rpc_reply(content[DEVICE_SECTION_PARAMETER],
content[DATA_PARAMETER][RPC_ID_PARAMETER],
response)
log.debug("%r", response)

View File

@@ -0,0 +1,115 @@
# Copyright 2021. ThingsBoard
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from threading import Thread
from time import time, sleep
from pymodbus.constants import Defaults
from thingsboard_gateway.connectors.modbus.constants import *
from thingsboard_gateway.connectors.connector import log
from thingsboard_gateway.connectors.modbus.bytes_modbus_uplink_converter import BytesModbusUplinkConverter
from thingsboard_gateway.connectors.modbus.bytes_modbus_downlink_converter import BytesModbusDownlinkConverter
from thingsboard_gateway.tb_utility.tb_loader import TBModuleLoader
class Slave(Thread):
def __init__(self, **kwargs):
super().__init__()
self.timeout = kwargs.get('timeout')
self.name = kwargs['deviceName']
self.poll_period = kwargs['pollPeriod'] / 1000
self.byte_order = kwargs['byteOrder']
self.word_order = kwargs.get('wordOrder')
self.config = {
'unitId': kwargs['unitId'],
'deviceType': kwargs.get('deviceType', 'default'),
'type': kwargs['type'],
'host': kwargs.get('host'),
'port': kwargs['port'],
'timeout': kwargs.get('timeout', 35),
'stopbits': kwargs.get('stopbits', Defaults.Stopbits),
'bytesize': kwargs.get('bytesize', Defaults.Bytesize),
'parity': kwargs.get('parity', Defaults.Parity),
'strict': kwargs.get('strict', True),
'retries': kwargs.get('retries', 3),
'connection_attempt': 0,
'last_connection_attempt_time': 0,
'sendDataOnlyOnChange': kwargs.get('sendDataOnlyOnChange', False),
'waitAfterFailedAttemptsMs': kwargs.get('waitAfterFailedAttemptsMs', 0),
'connectAttemptTimeMs': kwargs.get('connectAttemptTimeMs', 0),
'retry_on_empty': kwargs.get('retry_on_empty', True),
'retry_on_invalid': kwargs.get('retry_on_invalid', True),
'method': kwargs.get('method', 'rtu'),
'baudrate': kwargs.get('baudrate', 19200),
'attributes': kwargs.get('attributes', []),
'timeseries': kwargs.get('timeseries', []),
'attributeUpdates': kwargs.get('attributeUpdates', []),
'rpc': kwargs.get('rpc', []),
'last_attributes': {},
'last_telemetry': {}
}
self.__load_converters(kwargs['connector'], kwargs['gateway'])
self.callback = kwargs['callback']
self.last_polled_time = None
self.daemon = True
self.start()
def timer(self):
self.callback(self)
self.last_polled_time = time()
while True:
if time() - self.last_polled_time >= self.poll_period:
self.callback(self)
self.last_polled_time = time()
sleep(.2)
def run(self):
self.timer()
def get_name(self):
return self.name
def __load_converters(self, connector, gateway):
try:
if self.config.get(UPLINK_PREFIX + CONVERTER_PARAMETER) is not None:
converter = TBModuleLoader.import_module(connector.connector_type,
self.config[UPLINK_PREFIX + CONVERTER_PARAMETER])(self)
else:
converter = BytesModbusUplinkConverter({**self.config, 'deviceName': self.name})
if self.config.get(DOWNLINK_PREFIX + CONVERTER_PARAMETER) is not None:
downlink_converter = TBModuleLoader.import_module(connector.connector_type, self.config[
DOWNLINK_PREFIX + CONVERTER_PARAMETER])(self)
else:
downlink_converter = BytesModbusDownlinkConverter(self.config)
if self.name not in gateway.get_devices():
gateway.add_device(self.name, {CONNECTOR_PARAMETER: connector},
device_type=self.config.get(DEVICE_TYPE_PARAMETER))
self.config[UPLINK_PREFIX + CONVERTER_PARAMETER] = converter
self.config[DOWNLINK_PREFIX + CONVERTER_PARAMETER] = downlink_converter
except Exception as e:
log.exception(e)
def __str__(self):
return f'{self.name}'

View File

@@ -153,7 +153,6 @@ class TBGatewayService:
self.connectors_configs = {}
self.__remote_configurator = None
self.__request_config_after_connect = False
self.__connected_devices = {}
self.__load_persistent_devices()
self.__init_remote_configuration()
self.__grpc_config = self.__config.get('grpc')
@@ -898,9 +897,17 @@ class TBGatewayService:
log.debug("Loaded devices:\n %s", devices)
for device_name in devices:
try:
if self.available_connectors.get(devices[device_name]):
if not isinstance(devices[device_name], tuple):
open(self._config_dir + self.__connected_devices_file, 'w').close()
log.debug("Old connected_devices file, new file will be created")
return
if self.available_connectors.get(devices[device_name][0]):
self.__connected_devices[device_name] = {
"connector": self.available_connectors[devices[device_name]]}
"connector": self.available_connectors[devices[device_name][0]],
"device_type": devices[device_name][1]}
self.__saved_devices[device_name] = {
"connector": self.available_connectors[devices[device_name][0]],
"device_type": devices[device_name][1]}
except Exception as e:
log.exception(e)
continue
@@ -914,7 +921,7 @@ class TBGatewayService:
data_to_save = {}
for device in self.__connected_devices:
if self.__connected_devices[device]["connector"] is not None:
data_to_save[device] = self.__connected_devices[device]["connector"].get_name()
data_to_save[device] = (self.__connected_devices[device]["connector"].get_name(), self.__connected_devices[device]["device_type"])
config_file.write(dumps(data_to_save, indent=2, sort_keys=True))
except Exception as e:
log.exception(e)