1
0
mirror of https://github.com/thingsboard/thingsboard-gateway synced 2025-10-26 22:31:42 +08:00

Added Socket Connector

This commit is contained in:
samson0v
2022-01-12 16:03:48 +02:00
parent c284dcf64b
commit 5dc80247ca
8 changed files with 382 additions and 2 deletions

View File

@@ -38,7 +38,7 @@ setup(
packages=['thingsboard_gateway', 'thingsboard_gateway.gateway', 'thingsboard_gateway.gateway.proto', 'thingsboard_gateway.gateway.grpc_service', packages=['thingsboard_gateway', 'thingsboard_gateway.gateway', 'thingsboard_gateway.gateway.proto', 'thingsboard_gateway.gateway.grpc_service',
'thingsboard_gateway.storage', 'thingsboard_gateway.storage.memory', 'thingsboard_gateway.storage', 'thingsboard_gateway.storage.memory',
'thingsboard_gateway.storage.file', 'thingsboard_gateway.storage.sqlite', 'thingsboard_gateway.tb_client', 'thingsboard_gateway.storage.file', 'thingsboard_gateway.storage.sqlite', 'thingsboard_gateway.tb_client',
'thingsboard_gateway.connectors', 'thingsboard_gateway.connectors.ble', 'thingsboard_gateway.connectors', 'thingsboard_gateway.connectors.ble', 'thingsboard_gateway.connectors.socket',
'thingsboard_gateway.connectors.mqtt', 'thingsboard_gateway.connectors.opcua', 'thingsboard_gateway.connectors.request', 'thingsboard_gateway.connectors.mqtt', 'thingsboard_gateway.connectors.opcua', 'thingsboard_gateway.connectors.request',
'thingsboard_gateway.connectors.modbus', 'thingsboard_gateway.connectors.can', 'thingsboard_gateway.connectors.bacnet', 'thingsboard_gateway.connectors.modbus', 'thingsboard_gateway.connectors.can', 'thingsboard_gateway.connectors.bacnet',
'thingsboard_gateway.connectors.bacnet.bacnet_utilities', 'thingsboard_gateway.connectors.odbc', 'thingsboard_gateway.connectors.bacnet.bacnet_utilities', 'thingsboard_gateway.connectors.odbc',
@@ -48,6 +48,7 @@ setup(
'thingsboard_gateway.extensions.ble', 'thingsboard_gateway.extensions.serial', 'thingsboard_gateway.extensions.request', 'thingsboard_gateway.extensions.ble', 'thingsboard_gateway.extensions.serial', 'thingsboard_gateway.extensions.request',
'thingsboard_gateway.extensions.can', 'thingsboard_gateway.extensions.bacnet', 'thingsboard_gateway.extensions.odbc', 'thingsboard_gateway.extensions.can', 'thingsboard_gateway.extensions.bacnet', 'thingsboard_gateway.extensions.odbc',
'thingsboard_gateway.extensions.rest', 'thingsboard_gateway.extensions.snmp', 'thingsboard_gateway.extensions.ftp', 'thingsboard_gateway.extensions.rest', 'thingsboard_gateway.extensions.snmp', 'thingsboard_gateway.extensions.ftp',
'thingsboard_gateway.extensions.socket',
], ],
install_requires=[ install_requires=[
'jsonpath-rw', 'jsonpath-rw',

View File

@@ -0,0 +1,50 @@
{
"name": "TCP Connector Example",
"type": "TCP",
"address": "127.0.0.1",
"port": 50000,
"bufferSize": 1024,
"devices": [
{
"address": "127.0.0.1:50001",
"deviceName": "Device Example",
"deviceType": "default",
"telemetry": [
{
"key": "temp",
"byteFrom": 0,
"byteTo": -1
},
{
"key": "hum",
"byteFrom": 0,
"byteTo": 2
}
],
"attributes": [
{
"key": "name",
"byteFrom": 0,
"byteTo": -1
},
{
"key": "num",
"byteFrom": 2,
"byteTo": 4
}
],
"attributeUpdates": [
{
"attributeOnThingsBoard": "sharedName"
}
],
"serverSideRpc": [
{
"methodRPC": "rpcMethod1",
"withResponse": true,
"methodProcessing": "write"
}
]
}
]
}

View File

@@ -93,6 +93,10 @@ connectors:
# type: ftp # type: ftp
# configuration: ftp.json # configuration: ftp.json
# #
#-
# name: Socket TCP/UDP Connector
# type: socket
# configuration: socket.json
# #
# ========= Customization ========== # ========= Customization ==========
# #

View File

@@ -0,0 +1,13 @@
# Copyright 2022. ThingsBoard
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@@ -0,0 +1,59 @@
# Copyright 2022. ThingsBoard
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from thingsboard_gateway.connectors.socket.socket_uplink_converter import SocketUplinkConverter, log
class BytesSocketUplinkConverter(SocketUplinkConverter):
def __init__(self, config):
self.__config = config
self.dict_result = {
"deviceName": config['deviceName'],
"deviceType": config['deviceType']
}
def convert(self, config, data):
if data is None:
return {}
try:
self.dict_result["telemetry"] = []
self.dict_result["attributes"] = []
for section in ('telemetry', 'attributes'):
for item in config[section]:
try:
byte_from = item.get('byteFrom')
byte_to = item.get('byteTo')
byte_to = byte_to if byte_to != -1 else len(data)
converted_data = data[byte_from:byte_to]
try:
converted_data = converted_data.replace(b"\x00", b'').decode('UTF-8')
except UnicodeDecodeError:
converted_data = str(converted_data)
if item.get('key') is not None:
self.dict_result[section].append(
{item['key']: converted_data})
else:
log.error('Key for %s not found in config: %s', config['type'], config['section_config'])
except Exception as e:
log.exception(e)
except Exception as e:
log.exception(e)
log.debug(self.dict_result)
return self.dict_result

View File

@@ -0,0 +1,230 @@
# Copyright 2022. ThingsBoard
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import socket
from queue import Queue
from random import choice
from string import ascii_lowercase
from threading import Thread
from time import sleep
from thingsboard_gateway.connectors.connector import Connector, log
from thingsboard_gateway.tb_utility.tb_loader import TBModuleLoader
SOCKET_TYPE = {
'TCP': socket.SOCK_STREAM,
'UDP': socket.SOCK_DGRAM
}
DEFAULT_UPLINK_CONVERTER = 'BytesSocketUplinkConverter'
class SocketConnector(Connector, Thread):
def __init__(self, gateway, config, connector_type):
super().__init__()
self.__log = log
self.__config = config
self._connector_type = connector_type
self.statistics = {'MessagesReceived': 0,
'MessagesSent': 0}
self.__gateway = gateway
self.setName(config.get("name", 'TCP Connector ' + ''.join(choice(ascii_lowercase) for _ in range(5))))
self.daemon = True
self.__stopped = False
self._connected = False
self.__socket_type = config['type'].upper()
self.__socket_address = config['address']
self.__socket_port = config['port']
self.__socket_buff_size = config['bufferSize']
self.__socket = socket.socket(socket.AF_INET, SOCKET_TYPE[self.__socket_type])
self.__converting_requests = Queue(-1)
self.__devices = self.__convert_devices_list()
self.__connections = {}
def __convert_devices_list(self):
devices = self.__config.get('devices', [])
converted_devices = {}
for device in devices:
address = device.get('address')
module = self.__load_converter(device)
converter = module(
{'deviceName': device['deviceName'],
'deviceType': device.get('deviceType', 'default')}) if module else None
device['converter'] = converter
converted_devices[address] = device
return converted_devices
def __load_converter(self, device):
converter_class_name = device.get('converter', DEFAULT_UPLINK_CONVERTER)
module = TBModuleLoader.import_module(self._connector_type, converter_class_name)
if module:
log.debug('Converter %s for device %s - found!', converter_class_name, self.name)
return module
log.error("Cannot find converter for %s device", self.name)
return None
def open(self):
self.__stopped = False
self.start()
def run(self):
self._connected = True
converting_thread = Thread(target=self.__convert_data, daemon=True, name='Converter Thread')
converting_thread.start()
self.__socket.bind((self.__socket_address, self.__socket_port))
if self.__socket_type == 'TCP':
self.__socket.listen(5)
self.__log.info('%s socket is up', self.__socket_type)
while not self.__stopped:
try:
if self.__socket_type == 'TCP':
conn, address = self.__socket.accept()
self.__connections[address] = conn
self.__log.debug('New connection %s established', address)
thread = Thread(target=self.__process_tcp_connection, daemon=True,
name=f'Processing {address} connection',
args=(conn, address))
thread.start()
else:
data, client_address = self.__socket.recvfrom(self.__socket_buff_size)
self.__converting_requests.put((client_address, data))
except ConnectionAbortedError:
self.__socket.close()
def __process_tcp_connection(self, connection, address):
while not self.__stopped:
data = connection.recv(self.__socket_buff_size)
if data:
self.__converting_requests.put((address, data))
else:
break
connection.close()
self.__connections.pop(address)
self.__log.debug('Connection %s closed', address)
def __convert_data(self):
while not self.__stopped:
if not self.__converting_requests.empty():
(address, port), data = self.__converting_requests.get()
device = self.__devices.get(f'{address}:{port}', None)
if not device:
self.__log.error('Can\'t convert data from %s:%s - not in config file', address, port)
converter = device['converter']
if not converter:
self.__log.error('Converter not found for %s:%s', address, port)
try:
device_config = {
'telemetry': device.get('telemetry', []),
'attributes': device.get('attributes', [])
}
converted_data = converter.convert(device_config, data)
self.statistics['MessagesReceived'] = self.statistics['MessagesReceived'] + 1
if converted_data is not None:
self.__gateway.send_to_storage(self.get_name(), converted_data)
self.statistics['MessagesSent'] = self.statistics['MessagesSent'] + 1
log.info('Data to ThingsBoard %s', converted_data)
except Exception as e:
self.__log.exception(e)
sleep(.2)
def close(self):
self.__stopped = True
self._connected = False
self.__connections = {}
def get_name(self):
return self.name
def is_connected(self):
return self._connected
def __write_value_via_tcp(self, address, port, value):
try:
self.__connections[(address, int(port))].sendall(value)
return 'ok'
except KeyError:
try:
new_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
new_socket.connect((address, int(port)))
new_socket.sendall(value)
new_socket.close()
return 'ok'
except ConnectionRefusedError as e:
self.__log.error('Can\'t connect to %s:%s', address, port)
self.__log.exception(e)
return e
@staticmethod
def __write_value_via_udp(address, port, value):
new_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
new_socket.sendto(value, (address, int(port)))
new_socket.close()
def on_attributes_update(self, content):
try:
device = tuple(filter(lambda item: item['deviceName'] == content['device'], self.__config['devices']))[0]
for attribute_update_config in device['attributeUpdates']:
for attribute_update in content['data']:
if attribute_update_config['attributeOnThingsBoard'] == attribute_update:
address, port = device['address'].split(':')
converted_data = bytes(str(content['data'][attribute_update]), encoding='utf8')
self.__write_value_via_tcp(address, port, converted_data)
except IndexError:
self.__log.error('Device not found')
def server_side_rpc_handler(self, content):
try:
device = tuple(filter(lambda item: item['deviceName'] == content['device'], self.__config['devices']))[0]
for rpc_config in device['serverSideRpc']:
for (key, value) in content['data'].items():
if value == rpc_config['methodRPC']:
rpc_method = rpc_config['methodProcessing']
return_result = rpc_config['withResponse']
result = None
address, port = device['address'].split(':')
converted_data = bytes(str(content['data']['params']), encoding='utf8')
if rpc_method.upper() == 'WRITE':
if self.__socket_type == 'TCP':
result = self.__write_value_via_tcp(address, port, converted_data)
else:
self.__write_value_via_udp(address, port, converted_data)
if return_result and self.__socket_type == 'TCP':
self.__gateway.send_rpc_reply(content['device'], content['data']['id'], str(result))
return
except IndexError:
self.__log.error('Device not found')

View File

@@ -0,0 +1,22 @@
# Copyright 2022. ThingsBoard
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from thingsboard_gateway.connectors.converter import Converter, abstractmethod, log
class SocketUplinkConverter(Converter):
@abstractmethod
def convert(self, config, data):
pass

View File

@@ -56,7 +56,8 @@ DEFAULT_CONNECTORS = {
"odbc": "OdbcConnector", "odbc": "OdbcConnector",
"rest": "RESTConnector", "rest": "RESTConnector",
"snmp": "SNMPConnector", "snmp": "SNMPConnector",
"ftp": "FTPConnector" "ftp": "FTPConnector",
"socket": "SocketConnector"
} }