1
0
mirror of https://github.com/thingsboard/thingsboard-gateway synced 2025-10-26 22:31:42 +08:00

Merge branch 'develop/2.4-python' of https://github.com/thingsboard/thingsboard-gateway into feature/http-connector

This commit is contained in:
zbeacon
2020-06-01 13:09:11 +03:00
35 changed files with 2453 additions and 268 deletions

View File

@@ -0,0 +1,58 @@
{
"general": {
"objectName": "TB_gateway",
"address": "192.168.188.181:1052",
"objectIdentifier": 599,
"maxApduLengthAccepted": 1024,
"segmentationSupported": "segmentedBoth",
"vendorIdentifier": 15
},
"devices": [
{
"deviceName": "BACnet Device ${objectName}",
"deviceType": "default",
"address": "192.168.188.181:10520",
"pollPeriod": 10000,
"attributes": [
{
"key": "temperature",
"type": "string",
"objectId": "analogOutput:1",
"propertyId": "presentValue"
}
],
"timeseries": [
{
"key": "state",
"type": "bool",
"objectId": "binaryValue:1",
"propertyId": "presentValue"
}
],
"attributeUpdates": [
{
"key": "brightness",
"requestType": "writeProperty",
"objectId": "analogOutput:1",
"propertyId": "presentValue"
}
],
"serverSideRpc": [
{
"method": "set_state",
"requestType": "writeProperty",
"requestTimeout": 10000,
"objectId": "binaryOutput:1",
"propertyId": "presentValue"
},
{
"method": "get_state",
"requestType": "readProperty",
"requestTimeout": 10000,
"objectId": "binaryOutput:1",
"propertyId": "presentValue"
}
]
}
]
}

View File

@@ -27,19 +27,10 @@
"timeseries": [
{
"key": "rpm",
"nodeId": 3,
"nodeId": 1918,
"isExtendedId": true,
"command": {
"start": 2,
"length": 2,
"byteorder": "little",
"value": 48059
},
"value": {
"start": 4,
"length": 2,
"type": "int"
},
"command": "2:2:big:48059",
"value": "4:2:big:int",
"expression": "value / 4",
"polling": {
"type": "always",
@@ -95,4 +86,4 @@
]
}
]
}
}

View File

@@ -3,6 +3,7 @@
"devices": [
{
"name": "CustomSerialDevice1",
"type": "default",
"port": "/dev/ttyUSB0",
"baudrate": 9600,
"converter": "CustomSerialUplinkConverter",

View File

@@ -1,94 +1,77 @@
[loggers]
keys=root, service, connector, converter, tb_connection, storage, extension
[handlers]
keys=consoleHandler, serviceHandler, connectorHandler, converterHandler, tb_connectionHandler, storageHandler, extensionHandler
[formatters]
keys=LogFormatter
[logger_root]
level=CRITICAL
level=ERROR
handlers=consoleHandler
[logger_connector]
level=DEBUG
level=INFO
handlers=connectorHandler
formatter=LogFormatter
qualname=connector
[logger_storage]
level=DEBUG
level=INFO
handlers=storageHandler
formatter=LogFormatter
qualname=storage
[logger_tb_connection]
level=DEBUG
level=INFO
handlers=tb_connectionHandler
formatter=LogFormatter
qualname=tb_connection
[logger_service]
level=DEBUG
level=INFO
handlers=serviceHandler
formatter=LogFormatter
qualname=service
[logger_converter]
level=DEBUG
handlers=connectorHandler
level=INFO
handlers=converterHandler
formatter=LogFormatter
qualname=converter
[logger_extension]
level=DEBUG
level=INFO
handlers=connectorHandler
formatter=LogFormatter
qualname=extension
[handler_consoleHandler]
class=StreamHandler
level=DEBUG
level=INFO
formatter=LogFormatter
args=(sys.stdout,)
[handler_connectorHandler]
level=DEBUG
level=INFO
class=logging.handlers.TimedRotatingFileHandler
formatter=LogFormatter
args=("./logs/connector.log", 'd', 1, 7,)
args=("./logs/connector.log", "d", 1, 7,)
[handler_storageHandler]
level=DEBUG
level=INFO
class=logging.handlers.TimedRotatingFileHandler
formatter=LogFormatter
args=("./logs/storage.log", 'd', 1, 7,)
args=("./logs/storage.log", "d", 1, 7,)
[handler_serviceHandler]
level=DEBUG
level=INFO
class=logging.handlers.TimedRotatingFileHandler
formatter=LogFormatter
args=("./logs/service.log", 'd', 1, 7,)
args=("./logs/service.log", "d", 1, 7,)
[handler_converterHandler]
level=DEBUG
level=INFO
class=logging.handlers.TimedRotatingFileHandler
formatter=LogFormatter
args=("./logs/converter.log", 'd', 1, 3,)
args=("./logs/converter.log", "d", 1, 3,)
[handler_extensionHandler]
level=DEBUG
level=INFO
class=logging.handlers.TimedRotatingFileHandler
formatter=LogFormatter
args=("./logs/extension.log", 'd', 1, 3,)
args=("./logs/extension.log", "d", 1, 3,)
[handler_tb_connectionHandler]
level=DEBUG
level=INFO
class=logging.handlers.TimedRotatingFileHandler
formatter=LogFormatter
args=("./logs/tb_connection.log", 'd', 1, 3,)
args=("./logs/tb_connection.log", "d", 1, 3,)
[formatter_LogFormatter]
format='%(asctime)s - %(levelname)s - %(module)s - %(lineno)d - %(message)s'
datefmt='%Y-%m-%d %H:%M:%S'
format="%(asctime)s - %(levelname)s - [%(filename)s] - %(module)s - %(lineno)d - %(message)s"
datefmt="%Y-%m-%d %H:%M:%S"

View File

@@ -1,11 +1,11 @@
{
"server": {
"name": "Modbus Default Server",
"type": "tcp",
"host": "127.0.0.1",
"port": 5020,
"timeout": 35,
"rtuOverTcp": false,
"method": "socket",
"byteOrder": "BIG",
"devices": [
{
"unitId": 1,
@@ -15,53 +15,154 @@
"sendDataOnlyOnChange": true,
"attributes": [
{
"byteOrder": "BIG",
"tag": "test",
"type": "long",
"tag": "string_read",
"type": "string",
"functionCode": 4,
"registerCount": 1,
"address": 0
}
],
"timeseries": [
{
"byteOrder": "BIG",
"tag": "test",
"type": "long",
"functionCode": 4,
"registerCount": 1,
"address": 0
"objectsCount": 4,
"address": 1
},
{
"byteOrder": "BIG",
"tag": "test2",
"type": "long",
"tag": "bits_read",
"type": "bits",
"functionCode": 4,
"registerCount": 1,
"address": 2
"objectsCount": 1,
"address": 5
},
{
"tag": "8int_read",
"type": "8int",
"functionCode": 4,
"objectsCount": 1,
"address": 6
},
{
"tag": "16int_read",
"type": "16int",
"functionCode": 4,
"objectsCount": 1,
"address": 7
},
{
"tag": "32int_read_divider",
"type": "32int",
"functionCode": 4,
"objectsCount": 2,
"address": 8,
"divider": 10
},
{
"tag": "8int_read_multiplier",
"type": "8int",
"functionCode": 4,
"objectsCount": 1,
"address": 10,
"multiplier": 10
},
{
"tag": "32int_read",
"type": "32int",
"functionCode": 4,
"objectsCount": 2,
"address": 11
},
{
"tag": "64int_read",
"type": "64int",
"functionCode": 4,
"objectsCount": 4,
"address": 13
}
],
"timeseries": [
{
"tag": "8uint_read",
"type": "8uint",
"functionCode": 4,
"objectsCount": 1,
"address": 17
},
{
"tag": "16uint_read",
"type": "16uint",
"functionCode": 4,
"objectsCount": 2,
"address": 18
},
{
"tag": "32uint_read",
"type": "32uint",
"functionCode": 4,
"objectsCount": 4,
"address": 20
},
{
"tag": "64uint_read",
"type": "64uint",
"functionCode": 4,
"objectsCount": 1,
"address": 24
},
{
"tag": "16float_read",
"type": "16float",
"functionCode": 4,
"objectsCount": 1,
"address": 25
},
{
"tag": "32float_read",
"type": "32float",
"functionCode": 4,
"objectsCount": 2,
"address": 26
},
{
"tag": "64float_read",
"type": "64float",
"functionCode": 4,
"objectsCount": 4,
"address": 28
}
],
"rpc": {
"turnLightOn": {
"address": 4,
"bit": 2,
"value": true
"attributeUpdates": [
{
"tag": "shared_attribute_write",
"type": "32int",
"functionCode": 6,
"objectsCount": 2,
"address": 29
}
],
"rpc": [
{
"tag": "setValue",
"type": "bits",
"functionCode": 5,
"objectsCount": 1,
"address": 31
},
"turnLightOff": {
"address": 4,
"bit": 2,
"value": false
{
"tag": "getValue",
"type": "bits",
"functionCode": 1,
"objectsCount": 1,
"address": 31
},
"getCPULoad": {
"tag": "Integer",
"value": 42,
{
"tag": "setCPUFanSpeed",
"type": "32int",
"functionCode": 16,
"address": 0,
"unitId": 1,
"byteOrder": "BIG",
"registerCount": 1
"objectsCount": 2,
"address": 33
},
{
"tag":"getCPULoad",
"type": "32int",
"functionCode": 4,
"objectsCount": 2,
"address": 35
}
}
]
}
]
}

View File

@@ -22,6 +22,11 @@
"type": "string",
"key": "model",
"value": "${sensorModel}"
},
{
"type": "string",
"key": "${sensorModel}",
"value": "on"
}
],
"timeseries": [

View File

@@ -18,12 +18,12 @@
"pooling": false
},
"polling": {
"query": "SELECT bool_v, str_v, dbl_v, long_v, entity_id, ts FROM ts_kv_latest WHERE ts > ? ORDER BY ts ASC LIMIT 10",
"query": "SELECT bool_v, str_v, dbl_v, long_v, entity_id, ts FROM ts_kv WHERE ts > ? ORDER BY ts ASC LIMIT 10",
"period": 10,
"iterator": {
"column": "ts",
"query": "SELECT MIN(ts) - 1 FROM ts_kv_latest",
"save": false
"query": "SELECT MIN(ts) - 1 FROM ts_kv",
"persistent": false
}
},
"mapping": {
@@ -47,7 +47,7 @@
"procedureOne",
{
"name": "procedureTwo",
"params": [ "One", 2, 3.0 ]
"args": [ "One", 2, 3.0 ]
}
]
}

View File

@@ -2,8 +2,11 @@
"server": {
"name": "OPC-UA Default Server",
"url": "localhost:4840/freeopcua/server/",
"scanPeriodInMillis": 10000,
"timeoutInMillis": 5000,
"scanPeriodInMillis": 5000,
"disableSubscriptions":false,
"subCheckPeriodInMillis": 100,
"showMap": false,
"security": "Basic128Rsa15",
"identity": {
"type": "anonymous"
@@ -14,8 +17,8 @@
"deviceNamePattern": "Device ${Root\\.Objects\\.Device1\\.serialNumber}",
"attributes": [
{
"key": "temperature",
"path": "${TemperatureAndHumiditySensor\\.Temperature}"
"key": "temperature °C",
"path": "${ns=2;i=5}"
}
],
"timeseries": [

View File

@@ -0,0 +1,79 @@
{
"host": "127.0.0.1",
"port": "5000",
"mapping":[
{
"endpoint": "/test_device",
"HTTPMethod": [
"POST"
],
"security":
{
"type": "basic",
"username": "user",
"password": "passwd"
},
"converter": {
"type": "json",
"deviceNameExpression": "Device ${name}",
"deviceTypeExpression": "default",
"attributes": [
{
"type": "string",
"key": "model",
"value": "${sensorModel}"
}
],
"timeseries": [
{
"type": "double",
"key": "temperature",
"value": "${temp}"
},
{
"type": "double",
"key": "humidity",
"value": "${hum}",
"converter": "CustomConverter"
}
]
}
},
{
"endpoint": "/test",
"HTTPMethod": [
"GET",
"POST"
],
"security":
{
"type": "anonymous"
},
"converter": {
"type": "custom",
"class": "CustomConverter",
"deviceNameExpression": "Device 2",
"deviceTypeExpression": "default",
"attributes": [
{
"type": "string",
"key": "model",
"value": "Model2"
}
],
"timeseries": [
{
"type": "double",
"key": "temperature",
"value": "${temp}"
},
{
"type": "double",
"key": "humidity",
"value": "${hum}"
}
]
}
}
]
}

View File

@@ -51,6 +51,11 @@ connectors:
# configuration: can.json
#
# -
# name: BACnet Connector
# type: bacnet
# configuration: bacnet.json
#
# -
# name: ODBC Connector
# type: odbc
# configuration: odbc.json

View File

@@ -0,0 +1,405 @@
# Copyright 2020. ThingsBoard
#
# Licensed under the Apache License, Version 2.0 (the "License"];
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import re
import struct
import unittest
from os import path
from random import choice, randint, uniform
from string import ascii_lowercase
from time import sleep
from unittest.mock import Mock
import thingsboard_gateway
from can import Notifier, BufferedReader, Bus, Message
from simplejson import load
from thingsboard_gateway.connectors.can.can_connector import CanConnector
logging.basicConfig(level=logging.ERROR,
format='%(asctime)s - %(levelname)s - %(module)s - %(lineno)d - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
class CanConnectorTestsBase(unittest.TestCase):
CONFIG_PATH = path.join(path.dirname(path.dirname(path.abspath(__file__))),
"data" + path.sep + "can" + path.sep)
def setUp(self):
self.bus = self._create_bus()
self.gateway = Mock(spec=thingsboard_gateway.TBGatewayService)
self.connector = None
self.config = None
def tearDown(self):
self.connector.close()
self.bus.shutdown()
def _create_bus(self):
return Bus(
channel="virtual_channel",
bustype="virtual",
receive_own_messages=False
)
def _create_connector(self, config_file_name):
with open(self.CONFIG_PATH + config_file_name, 'r', encoding="UTF-8") as file:
self.config = load(file)
self.connector = CanConnector(self.gateway, self.config, "can")
self.connector.open()
sleep(1) # some time to init
class CanConnectorPollingTests(CanConnectorTestsBase):
def test_polling_once(self):
self._create_connector("polling_once.json")
config = self.config["devices"][0]["attributes"][0]
message = self.bus.recv(self.connector.DEFAULT_POLL_PERIOD)
self.assertEqual(message.arbitration_id, config["nodeId"])
self.assertEqual(message.is_extended_id, config["isExtendedId"])
self.assertEqual(message.is_fd, self.connector.DEFAULT_FD_FLAG)
self.assertEqual(message.bitrate_switch, self.connector.DEFAULT_BITRATE_SWITCH_FLAG)
self.assertEqual(message.data, bytearray.fromhex(config["polling"]["dataInHex"]))
# Some buses may receive their own messages. Remove it from the queue
self.bus.recv(0)
# Check if no new polling messages
sleep(self.connector.DEFAULT_POLL_PERIOD)
message = self.bus.recv(self.connector.DEFAULT_POLL_PERIOD)
self.assertIsNone(message)
def test_polling_always(self):
self._create_connector("polling_always.json")
config = self.config["devices"][0]["attributes"][0]
for _ in range(1, 5):
# Timeout should be greater that polling period to prevent the case
# when message is received earlier than time is out.
message = self.bus.recv(config["polling"]["period"] + 0.2)
self.assertIsNotNone(message)
# Some buses may receive their own messages. Remove it from the queue
self.bus.recv(0)
self.assertEqual(message.arbitration_id, config["nodeId"])
self.assertEqual(message.is_extended_id, self.connector.DEFAULT_EXTENDED_ID_FLAG)
self.assertEqual(message.is_fd, self.connector.DEFAULT_FD_FLAG)
self.assertEqual(message.bitrate_switch, self.connector.DEFAULT_BITRATE_SWITCH_FLAG)
self.assertEqual(message.data, bytearray.fromhex(config["polling"]["dataInHex"]))
def test_multiple_polling(self):
reader = BufferedReader()
bus_notifier = Notifier(self.bus, [reader])
self._create_connector("multiple_polling.json")
config1 = self.config["devices"][0]["timeseries"][0]
config2 = self.config["devices"][0]["timeseries"][1]
config3 = self.config["devices"][0]["timeseries"][2]
time_to_wait = config2["polling"]["period"] * 4
message_count = int(time_to_wait / config2["polling"]["period"]) + 1 + \
int(time_to_wait / config3["polling"]["period"]) + 1 + \
1 # one time polling task
sleep(time_to_wait)
self.connector.close()
bus_notifier.stop()
messages = []
while True:
msg = reader.get_message(time_to_wait)
if msg is None:
break
messages.append(msg)
self.assertEqual(len(messages), message_count)
expected_message_ids = [config1["nodeId"], config2["nodeId"], config3["nodeId"],
config2["nodeId"], config3["nodeId"], config2["nodeId"],
config3["nodeId"], config2["nodeId"], config3["nodeId"],
config2["nodeId"]]
for i in range(0, message_count):
self.assertEqual(messages[i].arbitration_id, expected_message_ids[i])
class CanConnectorTsAndAttrTests(CanConnectorTestsBase):
def _create_bus(self):
return Bus(
channel="virtual_channel",
bustype="virtual",
receive_own_messages=False,
is_fd=True
)
def test_string_attribute_and_custom_device_type(self):
self._create_connector("ts_and_attr.json")
device_name = self.config["devices"][0]["name"]
config = self.config["devices"][0]["attributes"][0]
value_matches = re.search(self.connector.VALUE_REGEX, config["value"])
string_value = ''.join(choice(ascii_lowercase) for _ in range(int(value_matches.group(2))))
can_data = list(config["command"]["value"].to_bytes(config["command"]["length"],
config["command"]["byteorder"]))
can_data.extend(string_value.encode(value_matches.group(5)))
message_count = 5
for _ in range(message_count):
self.bus.send(Message(arbitration_id=config["nodeId"],
is_fd=config["isFd"],
data=can_data))
sleep(1) # Wait while connector process CAN message
self.assertEqual(self.gateway.send_to_storage.call_count, message_count)
self.gateway.send_to_storage.assert_called_with(self.connector.get_name(),
{"deviceName": device_name,
"deviceType": self.config["devices"][0]["type"],
"attributes": [{"serialNumber": string_value}],
"telemetry": []})
def test_send_only_on_change_and_default_device_type(self):
self._create_connector("ts_and_attr.json")
config = self.config["devices"][1]["timeseries"][0]
value_matches = re.search(self.connector.VALUE_REGEX, config["value"])
value = randint(0, pow(2, int(value_matches.group(2))))
can_data = list(bytearray.fromhex("0" * 2 * int(value_matches.group(1))))
can_data.extend(value.to_bytes(int(value_matches.group(2)),
value_matches.group(3) if value_matches.group(
3) else self.connector.DEFAULT_BYTEORDER))
for _ in range(5):
self.bus.send(Message(arbitration_id=config["nodeId"],
data=can_data))
sleep(1)
self.gateway.send_to_storage.assert_called_once_with(self.connector.get_name(),
{"deviceName": self.config["devices"][1]["name"],
"deviceType": self.connector._CanConnector__connector_type,
"attributes": [],
"telemetry": [{config["key"]: value}]})
class CanConnectorAttributeUpdatesTests(CanConnectorTestsBase):
def test_update(self):
reader = BufferedReader()
bus_notifier = Notifier(self.bus, [reader])
self._create_connector("attribute_updates.json")
configs = self.config["devices"][0]["attributeUpdates"]
updates = {"device": self.config["devices"][0]["name"],
"data": {
"boolAttr": True,
"intAttr": randint(-int(pow(2, configs[1]["dataLength"]) / 2),
pow(2, configs[1]["dataLength"] - 1)),
"floatAttr": uniform(-3.1415926535, 3.1415926535),
"stringAttr": ''.join(choice(ascii_lowercase) for _ in range(8)),
"wrongConfigAttr": True
}}
data_list = [[int(updates["data"]["boolAttr"])],
updates["data"]["intAttr"].to_bytes(configs[1]["dataLength"],
configs[1]["dataByteorder"],
signed=(updates["data"]["intAttr"] < 0)),
list(struct.pack(">f", updates["data"]["floatAttr"])),
list(str("Test" + updates["data"]["stringAttr"]).encode(self.connector.DEFAULT_ENCODING))
]
self.connector.on_attributes_update(updates)
sleep(1)
self.connector.close()
bus_notifier.stop()
messages = []
while True:
msg = reader.get_message(1)
if msg is None:
break
messages.append(msg)
self.assertEqual(len(messages), len(data_list))
messages = sorted(messages, key=lambda message: message.arbitration_id)
for i in range(len(messages)):
self.assertTrue(messages[i].equals(Message(arbitration_id=configs[i]["nodeId"],
is_extended_id=configs[i].get("isExtendedId",
self.connector.DEFAULT_EXTENDED_ID_FLAG),
is_fd=configs[i].get("isFd", self.connector.DEFAULT_FD_FLAG),
bitrate_switch=configs[i].get("bitrateSwitch",
self.connector.DEFAULT_BITRATE_SWITCH_FLAG),
data=data_list[i],
timestamp=messages[i].timestamp,
channel=messages[i].channel)))
class CanConnectorRpcTests(CanConnectorTestsBase):
def _create_bus(self):
return Bus(
channel="virtual_channel",
bustype="virtual",
receive_own_messages=False,
is_fd=True
)
def test_rpc_with_hex_data_in_config(self):
self._create_connector("rpc.json")
config = self.config["devices"][0]["serverSideRpc"][0]
self.connector.server_side_rpc_handler({"device": self.config["devices"][0]["name"],
"data": {
"id": 1,
"method": config["method"]
}})
actual_message = self.bus.recv(1)
self.assertTrue(actual_message.equals(Message(arbitration_id=config["nodeId"],
is_fd=config["isFd"],
bitrate_switch=config["bitrateSwitch"],
data=bytearray.fromhex(config["dataInHex"]),
timestamp=actual_message.timestamp,
channel=actual_message.channel)))
def test_rpc_with_hex_data_in_params(self):
self._create_connector("rpc.json")
config = self.config["devices"][1]["serverSideRpc"][0]
hex_data = "1234 abcd"
self.assertNotEqual(hex_data, config["dataInHex"])
self.connector.server_side_rpc_handler({"device": self.config["devices"][1]["name"],
"data": {
"id": 1,
"method": config["method"],
"params": {
"dataInHex": hex_data
}
}})
actual_message = self.bus.recv(1)
self.assertTrue(actual_message.equals(Message(arbitration_id=config["nodeId"],
is_fd=config["isFd"],
bitrate_switch=config["bitrateSwitch"],
data=bytearray.fromhex(hex_data),
timestamp=actual_message.timestamp,
channel=actual_message.channel)))
def test_rpc_expression_in_config(self):
self._create_connector("rpc.json")
config = self.config["devices"][0]["serverSideRpc"][1]
max_allowed_speed = randint(100, 200)
user_speed = randint(150, 250)
self.connector.server_side_rpc_handler({"device": self.config["devices"][0]["name"],
"data": {
"id": 1,
"method": config["method"],
"params": {
"userSpeed": user_speed,
"maxAllowedSpeed": max_allowed_speed
}
}})
can_data = int(user_speed if max_allowed_speed > user_speed else max_allowed_speed)\
.to_bytes(config["dataLength"], "little")
actual_message = self.bus.recv(1)
self.assertTrue(actual_message.equals(Message(arbitration_id=config["nodeId"],
is_extended_id=config.get("isExtendedId",
self.connector.DEFAULT_EXTENDED_ID_FLAG),
data=can_data,
timestamp=actual_message.timestamp,
channel=actual_message.channel)))
def test_deny_unknown_rpc(self):
self._create_connector("rpc.json")
self.connector.server_side_rpc_handler({"device": self.config["devices"][0]["name"],
"data": {
"id": 1,
"method": ''.join(choice(ascii_lowercase) for _ in range(8))
}})
self.assertIsNone(self.bus.recv(5))
def test_enable_unknown_rpc(self):
self._create_connector("rpc.json")
max_not_extended_node_id = 0x800
node_id = randint(0, 0x20000000)
data_before = "aa bb"
data_after = "cc dd ee ff"
data_length = 4
integer_value = randint(-int(pow(2, 8 * data_length) / 2), pow(2, 8 * data_length) - 1)
can_data = list(bytearray.fromhex(data_before))
can_data.extend(integer_value.to_bytes(data_length, "big", signed=(integer_value < 0)))
can_data.extend(bytearray.fromhex(data_after))
self.connector.server_side_rpc_handler({"device": self.config["devices"][2]["name"],
"data": {
"id": 1,
"method": ''.join(choice(ascii_lowercase) for _ in range(8)),
"params": {
"value": integer_value,
"nodeId": node_id,
"isExtendedId": (node_id > max_not_extended_node_id),
"isFd": (len(can_data) > 8),
"dataLength": data_length,
# Actually value may be either signed or unsigned,
# connector should process this case correctly
"dataSigned": False,
"dataBefore": data_before,
"dataAfter": data_after,
"response": True
}
}})
actual_message = self.bus.recv(1)
self.assertTrue(actual_message.equals(Message(arbitration_id=node_id,
is_extended_id=(node_id > max_not_extended_node_id),
is_fd=(len(can_data) > 8),
data=can_data,
timestamp=actual_message.timestamp,
channel=actual_message.channel)))
self.gateway.send_rpc_reply.assert_called_once_with(self.config["devices"][2]["name"],
1,
{"success": True})
def test_rpc_response_failed(self):
self._create_connector("rpc.json")
config = self.config["devices"][3]["serverSideRpc"][0]
self.connector.server_side_rpc_handler({"device": self.config["devices"][3]["name"],
"data": {
"id": 1,
"method": config["method"]
}})
sleep(1)
self.gateway.send_rpc_reply.assert_called_once_with(self.config["devices"][3]["name"],
1,
{"success": False})
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,528 @@
# Copyright 2020. ThingsBoard
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import unittest
from os import path
from pathlib import Path
from time import sleep
from unittest.mock import Mock, call
import pyodbc
from simplejson import load
from thingsboard_gateway.tb_utility.tb_utility import TBUtility
import thingsboard_gateway
from thingsboard_gateway.connectors.odbc.odbc_connector import OdbcConnector
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(module)s - %(lineno)d - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
ODBC_DRIVER_WITH_STORED_PROCEDURE = "postgres"
ODBC_DRIVER_WITHOUT_STORED_PROCEDURE = "sqlite"
IS_ODBC_DRIVER_WITH_STORED_PROCEDURE_INSTALLED = False
IS_ODBC_DRIVER_INSTALLED = False
for driver_name in pyodbc.drivers():
IS_ODBC_DRIVER_WITH_STORED_PROCEDURE_INSTALLED = IS_ODBC_DRIVER_WITH_STORED_PROCEDURE_INSTALLED or \
ODBC_DRIVER_WITH_STORED_PROCEDURE in driver_name.lower()
IS_ODBC_DRIVER_INSTALLED = IS_ODBC_DRIVER_WITH_STORED_PROCEDURE_INSTALLED or \
ODBC_DRIVER_WITHOUT_STORED_PROCEDURE in driver_name.lower()
if IS_ODBC_DRIVER_WITH_STORED_PROCEDURE_INSTALLED:
try:
import testing.postgresql
except ImportError:
print("ODBC library not found - installing...")
TBUtility.install_package("testing.postgresql")
import testing.postgresql
@unittest.skipIf(not IS_ODBC_DRIVER_INSTALLED,
"To run ODBC tests install " + ODBC_DRIVER_WITH_STORED_PROCEDURE + "or" +
ODBC_DRIVER_WITHOUT_STORED_PROCEDURE + " ODBC driver")
class OdbcConnectorTests(unittest.TestCase):
CONFIG_PATH = path.join(path.dirname(path.dirname(path.abspath(__file__))),
"data" + path.sep + "odbc" + path.sep)
POSTGRES_PORT = 12345
def setUp(self):
self.gateway = Mock(spec=thingsboard_gateway.TBGatewayService)
self.gateway.get_config_path.return_value = self.CONFIG_PATH
self.connector = None
self.config = None
self.db_connection = None
self.db_cursor = None
self.db_connection_str = self._get_connection_string()
if IS_ODBC_DRIVER_WITH_STORED_PROCEDURE_INSTALLED:
self.postgresql = testing.postgresql.Postgresql(port=self.POSTGRES_PORT)
# To prevent database overlapping start each test server on different port
self.POSTGRES_PORT += 1
def tearDown(self):
self.connector.close()
if self.config["polling"]["iterator"].get("persistent", OdbcConnector.DEFAULT_SAVE_ITERATOR):
Path(self.CONFIG_PATH + "odbc" + path.sep + self.connector._OdbcConnector__iterator_file_name).unlink()
Path(self.CONFIG_PATH + "odbc").rmdir()
if self.db_connection:
self.db_cursor.close()
self.db_connection.close()
if IS_ODBC_DRIVER_WITH_STORED_PROCEDURE_INSTALLED:
self.postgresql.stop()
def _create_connector(self,
config_file_name,
send_on_change=OdbcConnector.DEFAULT_SEND_IF_CHANGED,
unknown_rpc=OdbcConnector.DEFAULT_ENABLE_UNKNOWN_RPC,
override_rpc_params=OdbcConnector.DEFAULT_OVERRIDE_RPC_PARAMS,
connection_str="",
reconnect=OdbcConnector.DEFAULT_RECONNECT_STATE):
with open(self.CONFIG_PATH + config_file_name, 'r', encoding="UTF-8") as file:
self.config = load(file)
if "sendDataOnlyOnChange" not in self.config["mapping"]:
self.config["mapping"]["sendDataOnlyOnChange"] = send_on_change
if "serverSideRpc" not in self.config:
self.config["serverSideRpc"] = {}
if "enableUnknownRpc" not in self.config["serverSideRpc"]:
self.config["serverSideRpc"]["enableUnknownRpc"] = unknown_rpc
if "overrideRpcConfig" not in self.config["serverSideRpc"]:
self.config["serverSideRpc"]["overrideRpcConfig"] = override_rpc_params
self.config["connection"]["reconnect"] = reconnect
if connection_str:
self.config["connection"]["str"] = connection_str
else:
self.config["connection"]["str"] = self.db_connection_str
self._init_test_db()
self.connector = OdbcConnector(self.gateway, self.config, "odbc")
self.connector.open()
sleep(1) # some time to init
def _get_connection_string(self):
return "Driver={PostgreSQL};" \
"Server=localhost;" \
"Port=" + str(self.POSTGRES_PORT) + ";" \
"Database=test;" \
"Uid=postgres;" \
"Pwd=postgres;" if IS_ODBC_DRIVER_WITH_STORED_PROCEDURE_INSTALLED else \
"Driver={SQLITE3};" \
"Database=" + self.CONFIG_PATH + "sqlite3.db;"
def _init_test_db(self):
self.db_connection = pyodbc.connect(self.config["connection"]["str"])
self.db_cursor = self.db_connection.cursor()
if IS_ODBC_DRIVER_WITH_STORED_PROCEDURE_INSTALLED:
with open(self.CONFIG_PATH + 'postgres.sql', 'r') as f:
self.db_cursor.execute(f.read())
self.db_cursor.commit()
def test_override_default_pyodbc_attributes(self):
self._create_connector("odbc_timeseries.json")
self.assertEqual(pyodbc.pooling, self.config["pyodbc"]["pooling"])
self.assertEqual(pyodbc.native_uuid, self.config["pyodbc"]["native_uuid"])
def test_override_default_connection_attributes(self):
self._create_connector("odbc_timeseries.json")
self.assertEqual(self.connector._OdbcConnector__connection.autocommit,
self.config["connection"]["attributes"]["autocommit"])
self.assertEqual(self.connector._OdbcConnector__connection.timeout,
self.config["connection"]["attributes"]["timeout"])
def test_timeseries(self):
self._create_connector("odbc_timeseries.json")
record_count, device_id = self.db_cursor.execute("SELECT COUNT(*), device_id "
"FROM timeseries "
"GROUP BY device_id").fetchone()
self.gateway.add_device.assert_called_once_with(eval(self.config["mapping"]["device"]["name"]),
{"connector": self.connector})
self.assertEqual(self.gateway.send_to_storage.call_count, record_count)
def test_timeseries_send_on_change(self):
self._create_connector("odbc_timeseries.json", True)
rows = self.db_cursor.execute("SELECT DISTINCT long_v "
"FROM timeseries "
"ORDER BY long_v ASC").fetchall()
self.assertEqual(self.gateway.send_to_storage.call_count, len(rows))
calls = []
device_id = 1 # for eval
for row in rows:
calls.append(call(self.connector.get_name(),
{"deviceName": eval(self.config["mapping"]["device"]["name"]),
"deviceType": self.config["mapping"]["device"]["type"],
"attributes": [],
"telemetry": [{"value": row.long_v}]}))
self.gateway.send_to_storage.assert_has_calls(calls)
def test_attributes(self):
self._create_connector("odbc_attributes.json")
rows = self.db_cursor.execute("SELECT key, device_id, bool_v, str_v, long_v, dbl_v "
"FROM attributes "
"ORDER BY ts ASC").fetchall()
self.assertEqual(self.gateway.send_to_storage.call_count, len(rows))
calls = []
device_id = 1 # for eval
for row in rows:
data = {
"bool_v": row[2],
"str_v": row[3],
"long_v": row[4],
"dbl_v": row[5]}
device_id = row.device_id # for eval
calls.append(call(self.connector.get_name(),
{"deviceName": eval(self.config["mapping"]["device"]["name"]),
"deviceType": self.connector._OdbcConnector__connector_type,
"attributes": [
{row[0]: eval(self.config["mapping"]["attributes"][0]["value"],
globals(),
data)}],
"telemetry": []}))
self.gateway.send_to_storage.assert_has_calls(calls)
@unittest.skipIf(not IS_ODBC_DRIVER_WITH_STORED_PROCEDURE_INSTALLED,
"To run RPC ODBC tests install " + ODBC_DRIVER_WITH_STORED_PROCEDURE +
" database and add it to PATH")
def test_rpc_without_param(self):
self._create_connector("odbc_rpc.json")
value_before = self.db_cursor.execute("SELECT get_integer_value(?)", "inc_value").fetchval()
self.connector.server_side_rpc_handler({"device": "someDevice",
"data": {
"id": 1,
"method": "decrement_value"
}})
sleep(1)
# Cursor from test connection doesn't see changes made through connector's cursor,
# so use latter to get up to date data
value_after = self.connector._OdbcConnector__rpc_cursor.execute("SELECT get_integer_value(?)",
"inc_value").fetchval()
self.assertEqual(value_before - 1, value_after)
@unittest.skipIf(not IS_ODBC_DRIVER_WITH_STORED_PROCEDURE_INSTALLED,
"To run RPC ODBC tests install " + ODBC_DRIVER_WITH_STORED_PROCEDURE +
" database and add it to PATH")
def test_rpc_without_param_with_config_query(self):
self._create_connector("odbc_rpc.json")
value_before = self.db_cursor.execute("SELECT get_integer_value(?)", "inc_value").fetchval()
self.connector.server_side_rpc_handler({"device": "someDevice",
"data": {
"id": 1,
"method": "increment_value"
}})
sleep(1)
# Cursor from test connection doesn't see changes made through connector's cursor,
# so use latter to get up to date data
value_after = self.connector._OdbcConnector__rpc_cursor.execute("SELECT get_integer_value(?)",
"inc_value").fetchval()
self.assertEqual(value_before + 1, value_after)
@unittest.skipIf(not IS_ODBC_DRIVER_WITH_STORED_PROCEDURE_INSTALLED,
"To run RPC ODBC tests install " + ODBC_DRIVER_WITH_STORED_PROCEDURE +
" database and add it to PATH")
def test_rpc_with_param(self):
self._create_connector("odbc_rpc.json")
values_before = self.config["serverSideRpc"]["methods"]["reset_values"]["params"]
self.connector.server_side_rpc_handler({"device": "someDevice",
"data": {
"id": 1,
"method": "reset_values"
}})
sleep(1)
# Cursor from test connection doesn't see changes made through connector's cursor,
# so use latter to get up to date data
str_value_after = self.connector._OdbcConnector__rpc_cursor.execute("SELECT get_string_value(?)",
"str_value").fetchval()
int_value_after = self.connector._OdbcConnector__rpc_cursor.execute("SELECT get_integer_value(?)",
"int_value").fetchval()
self.assertEqual(values_before[0], str_value_after)
self.assertEqual(values_before[1], int_value_after)
@unittest.skipIf(not IS_ODBC_DRIVER_WITH_STORED_PROCEDURE_INSTALLED,
"To run RPC ODBC tests install " + ODBC_DRIVER_WITH_STORED_PROCEDURE +
" database and add it to PATH")
def test_rpc_with_param_with_config_query(self):
self._create_connector("odbc_rpc.json")
values_before = self.config["serverSideRpc"]["methods"]["update_values"]["params"]
self.connector.server_side_rpc_handler({"device": "someDevice",
"data": {
"id": 1,
"method": "update_values"
}})
sleep(1)
# Cursor from test connection doesn't see changes made through connector's cursor,
# so use latter to get up to date data
str_value_after = self.connector._OdbcConnector__rpc_cursor.execute("SELECT get_string_value(?)",
"str_value").fetchval()
int_value_after = self.connector._OdbcConnector__rpc_cursor.execute("SELECT get_integer_value(?)",
"int_value").fetchval()
self.assertEqual(values_before[0], str_value_after)
self.assertEqual(values_before[1], int_value_after)
@unittest.skipIf(not IS_ODBC_DRIVER_WITH_STORED_PROCEDURE_INSTALLED,
"To run RPC ODBC tests install " + ODBC_DRIVER_WITH_STORED_PROCEDURE +
" database and add it to PATH")
def test_rpc_with_param_override(self):
self._create_connector("odbc_rpc.json", override_rpc_params=True)
data = {"device": "someDevice",
"data": {
"id": 1,
"method": "reset_values",
"params": {
"args": ["override_value", 12345],
"query": "SELECT reset_values(?,?)"
}
}}
self.connector.server_side_rpc_handler(data)
sleep(1)
# Cursor from test connection doesn't see changes made through connector's cursor,
# so use latter to get up to date data
str_value_after = self.connector._OdbcConnector__rpc_cursor.execute("SELECT get_string_value(?)",
"str_value").fetchval()
int_value_after = self.connector._OdbcConnector__rpc_cursor.execute("SELECT get_integer_value(?)",
"int_value").fetchval()
self.assertEqual(data["data"]["params"]["args"][0], str_value_after)
self.assertEqual(data["data"]["params"]["args"][1], int_value_after)
self.gateway.send_rpc_reply.assert_has_calls([call(data["device"],
data["data"]["id"],
{"success": True})])
@unittest.skipIf(not IS_ODBC_DRIVER_WITH_STORED_PROCEDURE_INSTALLED,
"To run RPC ODBC tests install " + ODBC_DRIVER_WITH_STORED_PROCEDURE +
" database and add it to PATH")
def test_unknown_rpc_disabled(self):
self._create_connector("odbc_rpc.json")
data = {"device": "someDevice",
"data": {
"id": 1,
"method": "unknown_function"
}}
self.connector.server_side_rpc_handler(data)
sleep(1)
self.gateway.send_rpc_reply.assert_has_calls([call(data["device"],
data["data"]["id"],
{"success": False})])
@unittest.skipIf(not IS_ODBC_DRIVER_WITH_STORED_PROCEDURE_INSTALLED,
"To run RPC ODBC tests install " + ODBC_DRIVER_WITH_STORED_PROCEDURE +
" database and add it to PATH")
def test_unknown_rpc_enabled_without_query_and_result(self):
self._create_connector("odbc_rpc.json", unknown_rpc=True)
inc_value_before = self.db_cursor.execute("SELECT get_integer_value(?)", "inc_value").fetchval()
data = {"device": "someDevice",
"data": {
"id": 1,
"method": "secret_function",
"params": {
"args": ["secret_string"],
"result": True
}
}}
self.connector.server_side_rpc_handler(data)
sleep(1)
# Cursor from test connection doesn't see changes made through connector's cursor,
# so use latter to get up to date data
str_value_after = self.connector._OdbcConnector__rpc_cursor.execute("SELECT get_string_value(?)",
"str_value").fetchval()
inc_value_after = self.connector._OdbcConnector__rpc_cursor.execute("SELECT get_integer_value(?)",
"inc_value").fetchval()
self.assertEqual(data["data"]["params"]["args"][0], str_value_after)
self.assertEqual(inc_value_before + 1, inc_value_after)
self.gateway.send_rpc_reply.assert_has_calls([call(data["device"],
data["data"]["id"],
{data["data"]["method"]: 1})])
@unittest.skipIf(not IS_ODBC_DRIVER_WITH_STORED_PROCEDURE_INSTALLED,
"To run RPC ODBC tests install " + ODBC_DRIVER_WITH_STORED_PROCEDURE +
" database and add it to PATH")
def test_unknown_rpc_enabled_with_query_and_result(self):
self._create_connector("odbc_rpc.json", unknown_rpc=True)
inc_value_before = self.db_cursor.execute("SELECT get_integer_value(?)", "inc_value").fetchval()
data = {"device": "someDevice",
"data": {
"id": 1,
"method": "secret_function",
"params": {
"args": ["secret_string"],
"query": "SELECT secret_function(?) as return_value",
"result": True
}
}}
self.connector.server_side_rpc_handler(data)
sleep(1)
# Cursor from test connection doesn't see changes made through connector's cursor,
# so use latter to get up to date data
str_value_after = self.connector._OdbcConnector__rpc_cursor.execute("SELECT get_string_value(?)",
"str_value").fetchval()
inc_value_after = self.connector._OdbcConnector__rpc_cursor.execute("SELECT get_integer_value(?)",
"inc_value").fetchval()
self.assertEqual(data["data"]["params"]["args"][0], str_value_after)
self.assertEqual(inc_value_before + 1, inc_value_after)
self.gateway.send_rpc_reply.assert_has_calls([call(data["device"],
data["data"]["id"],
{"return_value": 1})])
@unittest.skipIf(not IS_ODBC_DRIVER_WITH_STORED_PROCEDURE_INSTALLED,
"To run RPC ODBC tests install " + ODBC_DRIVER_WITH_STORED_PROCEDURE +
" database and add it to PATH")
def test_rpc_multiple_field_response(self):
self._create_connector("odbc_rpc.json", override_rpc_params=True)
row = self.db_cursor.execute("{CALL get_values}").fetchone()
expected_values = {}
for d in self.db_cursor.description:
expected_values[d[0]] = getattr(row, d[0])
data = {"device": "someDevice",
"data": {
"id": 1,
"method": "get_values",
"params": {
"result": True
}
}}
self.connector.server_side_rpc_handler(data)
sleep(1)
self.gateway.send_rpc_reply.assert_has_calls([call(data["device"],
data["data"]["id"],
expected_values)])
def test_rpc_no_connection(self):
self._create_connector("odbc_rpc.json", connection_str="fake_connection_string")
self.assertFalse(self.connector.is_connected())
data = {"device": "someDevice",
"data": {
"id": 1,
"method": "get_values",
"params": {
"result": True
}
}}
self.connector.server_side_rpc_handler(data)
sleep(1)
self.gateway.send_rpc_reply.assert_has_calls([call(data["device"],
data["data"]["id"],
{"success": False})])
def test_reconnect_disabled(self):
self._create_connector("odbc_rpc.json", connection_str="fake_connection_string")
self.assertFalse(self.connector.is_connected())
sleep(self.config["connection"]["reconnectPeriod"] * 2 + 1)
self.assertFalse(self.connector.is_connected())
@unittest.skipIf(not IS_ODBC_DRIVER_INSTALLED,
"To run ODBC tests install " + ODBC_DRIVER_WITH_STORED_PROCEDURE + "or" +
ODBC_DRIVER_WITHOUT_STORED_PROCEDURE + " ODBC driver")
def test_reconnect_enabled(self):
self._create_connector("odbc_rpc.json", connection_str="fake_connection_string")
self.assertFalse(self.connector.is_connected())
self.config["connection"]["str"] = self.db_connection_str
self._init_test_db()
sleep(self.config["connection"]["reconnectPeriod"] + 1)
self.assertTrue(self.connector.is_connected())
def test_restore_connection(self):
self._create_connector("odbc_iterator.json")
postgres_port = self.postgresql.dsn()["port"]
device_id = 1 # For eval
data = {"deviceName": eval(self.config["mapping"]["device"]["name"]),
"deviceType": self.connector._OdbcConnector__connector_type,
"attributes": [],
"telemetry": [{"value": 0}]}
self.gateway.send_to_storage.assert_has_calls([call(self.connector.get_name(), data)])
self.postgresql.stop()
sleep(self.config["polling"]["period"])
self.assertFalse(self.connector.is_connected())
sleep(self.config["connection"]["reconnectPeriod"])
self.postgresql = testing.postgresql.Postgresql(port=postgres_port)
self._init_test_db()
sleep(self.config["connection"]["reconnectPeriod"])
self.assertTrue(self.connector.is_connected())
data["telemetry"] = [{"value": 5}]
self.gateway.send_to_storage.assert_has_calls([call(self.connector.get_name(), data)])
def test_iterator_persistence(self):
self._create_connector("odbc_iterator.json")
iterator_file_name = self.connector._OdbcConnector__iterator_file_name
device_id = 1 # For eval
data = {"deviceName": eval(self.config["mapping"]["device"]["name"]),
"deviceType": self.connector._OdbcConnector__connector_type,
"attributes": [],
"telemetry": [{"value": 0}]}
self.gateway.send_to_storage.assert_has_calls([call(self.connector.get_name(), data)])
self.connector.close()
sleep(1)
self.assertTrue(Path(self.CONFIG_PATH + "odbc" + path.sep + iterator_file_name).exists())
self.connector = OdbcConnector(self.gateway, self.config, "odbc")
self.connector.open()
sleep(1)
data["telemetry"] = [{"value": 5}]
self.gateway.send_to_storage.assert_has_calls([call(self.connector.get_name(), data)])
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,162 @@
# Copyright 2020. ThingsBoard
#
# Licensed under the Apache License, Version 2.0 (the "License"];
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import struct
import unittest
from random import randint, uniform, choice
from string import ascii_lowercase
from thingsboard_gateway.connectors.can.bytes_can_downlink_converter import BytesCanDownlinkConverter
class BytesCanDownlinkConverterTests(unittest.TestCase):
def setUp(self):
self.converter = BytesCanDownlinkConverter()
def test_data_in_hex_in_conf(self):
expected_can_data = [0, 1, 2, 3]
config = {"dataInHex": "00 01 02 03"}
data = {}
actual_can_data = self.converter.convert(config, data)
self.assertListEqual(actual_can_data, expected_can_data)
def test_data_in_hex_in_data(self):
expected_can_data = [0, 1, 2, 3]
config = {}
data = {"dataInHex": "00 01 02 03"}
actual_can_data = self.converter.convert(config, data)
self.assertListEqual(actual_can_data, expected_can_data)
def test_no_data(self):
self.assertIsNone(self.converter.convert({}, {}))
def test_wrong_data_format(self):
self.assertIsNone(self.converter.convert({}, [1, 2, 3]))
def test_bool_data(self):
value = True
expected_can_data = [int(value)]
data = {"value": value}
actual_can_data = self.converter.convert({}, data)
self.assertListEqual(actual_can_data, expected_can_data)
def test_unsigned_integer_data(self):
for data_length in [1, 2, 3, 4]:
# Empty byteorder value means default encoding (big)
for byteorder in ["", "little"]:
config = { "dataLength": data_length }
if byteorder:
config["dataByteorder"] = byteorder
else:
byteorder = "big"
data = {"value": randint(0, pow(2, 8 * data_length))}
actual_can_data = self.converter.convert(config, data)
self.assertListEqual(actual_can_data,
list(data["value"].to_bytes(data_length, byteorder, signed=False)))
def test_signed_integer_data(self):
for data_length in [1, 2, 3, 4]:
# Empty byteorder value means default encoding (big)
for byteorder in ["", "little"]:
config = {
"dataLength": data_length,
"dataSigned": True
}
if byteorder:
config["dataByteorder"] = byteorder
else:
byteorder = "big"
data = {"value": randint(-int(pow(2, 8 * data_length) / 2),
int(pow(2, 8 * data_length) / 2) - 1)}
actual_can_data = self.converter.convert(config, data)
self.assertListEqual(actual_can_data,
list(data["value"].to_bytes(data_length, byteorder, signed=True)))
def test_float_data(self):
# Empty byteorder value means default encoding (big)
for byteorder in ["", "little"]:
data = {"value": uniform(-3.1415926535, 3.1415926535)}
config = {}
if byteorder:
config["dataByteorder"] = byteorder
else:
byteorder = "big"
actual_can_data = self.converter.convert(config, data)
self.assertListEqual(actual_can_data,
list(struct.pack(">f" if byteorder[0] == "b" else "<f", data["value"])))
def test_string_data(self):
# Empty encoding value means default encoding (ascii)
for encoding in ["", "utf-8"]:
value = "".join(choice(ascii_lowercase) for _ in range(8))
data = {"value": value}
config = {}
if encoding:
config["dataEncoding"] = encoding
else:
encoding = "ascii"
actual_can_data = self.converter.convert(config, data)
self.assertListEqual(actual_can_data, list(value.encode(encoding)))
def test_expression_data(self):
default_data_length = 1
default_byteorder = "big"
data = {
"one": 1,
"two": 2,
"three": 3
}
config = {"dataExpression": "one + two + three"}
value = 0
for i in data.values():
value += i
actual_can_data = self.converter.convert(config, data)
self.assertListEqual(actual_can_data,
list(value.to_bytes(default_data_length, default_byteorder)))
def test_strict_eval_violation(self):
data = {"value": randint(0, 256)}
config = {
"dataExpression": "pow(value, 2)",
"strictEval": True
}
self.assertIsNone(self.converter.convert(config, data))
def test_data_before(self):
value = True
expected_can_data = [0, 1, 2, 3, int(value)]
data = {"value": value}
config = {"dataBefore": "00 01 02 03"}
actual_can_data = self.converter.convert(config, data)
self.assertListEqual(actual_can_data, expected_can_data)
def test_data_after(self):
value = True
expected_can_data = [int(value), 3, 2, 1, 0]
data = {"value": value}
config = {"dataAfter": "03 02 01 00"}
actual_can_data = self.converter.convert(config, data)
self.assertListEqual(actual_can_data, expected_can_data)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,234 @@
# Copyright 2020. ThingsBoard
#
# Licensed under the Apache License, Version 2.0 (the "License"];
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import _struct
import unittest
from math import isclose
from random import randint, uniform, choice
from string import ascii_lowercase
from thingsboard_gateway.connectors.can.bytes_can_uplink_converter import BytesCanUplinkConverter
class BytesCanUplinkConverterTests(unittest.TestCase):
def setUp(self):
self.converter = BytesCanUplinkConverter()
def _has_no_data(self, data):
return bool(data is None or not data.get("attributes", []) and not data.get("telemetry", []))
def test_wrong_type(self):
can_data = [0, 1, 0, 0, 0]
configs = [{
"key": "var",
"is_ts": True,
"type": "wrong_type"
}]
tb_data = self.converter.convert(configs, can_data)
self.assertTrue(self._has_no_data(tb_data))
def test_bool_true(self):
can_data = [0, 1, 0, 0, 0]
configs = [{
"key": "boolVar",
"is_ts": True,
"type": "bool",
"start": 1
}]
tb_data = self.converter.convert(configs, can_data)
self.assertTrue(tb_data["telemetry"]["boolVar"])
def test_bool_false(self):
can_data = [1, 0, 1, 1, 1]
configs = [{
"key": "boolVar",
"is_ts": False,
"type": "bool",
"start": 1
}]
tb_data = self.converter.convert(configs, can_data)
self.assertFalse(tb_data["attributes"]["boolVar"])
def _test_int(self, type, byteorder):
int_value = randint(-32768, 32767)
int_size = 2
can_data = [0, 0]
configs = [{
"key": type + "Var",
"is_ts": True,
"type": type,
"start": len(can_data),
"length": int_size,
"byteorder": byteorder,
"signed": int_value < 0
}]
can_data.extend(int_value.to_bytes(int_size, byteorder, signed=(int_value < 0)))
tb_data = self.converter.convert(configs, can_data)
self.assertEqual(tb_data["telemetry"][type + "Var"], int_value)
def test_int_big_byteorder(self):
self._test_int("int", "big")
def test_int_little_byteorder(self):
self._test_int("int", "little")
def test_long_big_byteorder(self):
self._test_int("long", "big")
def test_long_little_byteorder(self):
self._test_int("long", "little")
def _test_float_point_number(self, type, byteorder):
float_value = uniform(-3.1415926535, 3.1415926535)
can_data = [0, 0]
configs = [{
"key": type + "Var",
"is_ts": True,
"type": type,
"start": len(can_data),
"length": 4 if type[0] == "f" else 8,
"byteorder": byteorder
}]
can_data.extend(_struct.pack((">" if byteorder[0] == "b" else "<") + type[0],
float_value))
tb_data = self.converter.convert(configs, can_data)
self.assertTrue(isclose(tb_data["telemetry"][type + "Var"], float_value, rel_tol=1e-05))
def test_float_big_byteorder(self):
self._test_float_point_number("float", "big")
def test_float_little_byteorder(self):
self._test_float_point_number("float", "little")
def test_double_big_byteorder(self):
self._test_float_point_number("double", "big")
def test_double_little_byteorder(self):
self._test_float_point_number("double", "little")
def _test_string(self, encoding="ascii"):
str_length = randint(1, 8)
str_value = ''.join(choice(ascii_lowercase) for _ in range(str_length))
configs = [{
"key": "stringVar",
"is_ts": True,
"type": "string",
"start": 0,
"length": str_length,
"encoding": encoding
}]
can_data = str_value.encode(encoding)
tb_data = self.converter.convert(configs, can_data)
self.assertEqual(tb_data["telemetry"]["stringVar"], str_value)
def test_string_default_ascii_encoding(self):
self._test_string()
def test_string_utf_8_string(self):
self._test_string("utf-8")
def _test_eval_int(self, number, strict_eval, expression):
can_data = number.to_bytes(1, "big", signed=(number < 0))
# By default the strictEval flag is True
configs = [{
"key": "var",
"is_ts": True,
"type": "int",
"start": 0,
"length": 1,
"byteorder": "big",
"signed": number < 0,
"expression": expression,
"strictEval": strict_eval
}]
return self.converter.convert(configs, can_data)
def test_strict_eval_violation(self):
number = randint(-128, 256)
tb_data = self._test_eval_int(number, True, "pow(value, 2)")
self.assertTrue(self._has_no_data(tb_data))
def test_strict_eval(self):
number = randint(-128, 256)
tb_data = self._test_eval_int(number, True, "value * value")
self.assertEqual(tb_data["telemetry"]["var"], number * number)
def test_no_strict_eval(self):
number = randint(-128, 256)
tb_data = self._test_eval_int(number, False, "pow(value, 2)")
self.assertEqual(tb_data["telemetry"]["var"], number * number)
def test_multiple_valid_configs(self):
bool_value = True
int_value = randint(0, 256)
can_data = [0, int(bool_value), int_value, 0, 0, 0]
configs = [
{
"key": "boolVar",
"type": "boolean",
"is_ts": True,
"start": 1
},
{
"key": "intVar",
"type": "int",
"is_ts": False,
"start": 2,
"length": 4,
"byteorder": "little",
"signed": False
}
]
tb_data = self.converter.convert(configs, can_data)
self.assertEqual(tb_data["telemetry"]["boolVar"], bool_value)
self.assertEqual(tb_data["attributes"]["intVar"], int_value)
def test_multiple_configs_one_invalid(self):
bool_value = True
invalid_length = 3 # Float requires 4 bytes
can_data = [0, int(bool_value), randint(0, 256), 0, 0, 0]
configs = [
{
"key": "validVar",
"type": "boolean",
"is_ts": True,
"start": 1
},
{
"key": "invalidVar",
"type": "float",
"is_ts": False,
"start": 2,
"length": invalid_length
}
]
tb_data = self.converter.convert(configs, can_data)
self.assertEqual(tb_data["telemetry"]["validVar"], bool_value)
self.assertIsNone(tb_data["attributes"].get("invalidVar"))
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,67 @@
# Copyright 2020. ThingsBoard
#
# Licensed under the Apache License, Version 2.0 (the "License"];
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from random import randint, uniform, choice
from string import ascii_lowercase
from thingsboard_gateway.connectors.odbc.odbc_uplink_converter import OdbcUplinkConverter
class OdbcUplinkConverterTests(unittest.TestCase):
def setUp(self):
self.converter = OdbcUplinkConverter()
self.db_data = {"boolValue": True,
"intValue": randint(0, 256),
"floatValue": uniform(-3.1415926535, 3.1415926535),
"stringValue": "".join(choice(ascii_lowercase) for _ in range(8))}
def test_glob_matching(self):
converted_data = self.converter.convert("*", self.db_data)
self.assertDictEqual(converted_data, self.db_data)
def test_data_subset(self):
config = ["floatValue", "boolValue"]
converted_data = self.converter.convert(config, self.db_data)
expected_data = {}
for key in config:
expected_data[key] = self.db_data[key]
self.assertDictEqual(converted_data, expected_data)
def test_alias(self):
config = [{"column": "stringValue", "name": "valueOfString"}]
converted_data = self.converter.convert(config, self.db_data)
self.assertDictEqual(converted_data, {config[0]["name"]: self.db_data[config[0]["column"]]})
def test_name_expression(self):
attr_name = "someAttribute"
config = [{"nameExpression": "key", "value": "intValue"}]
self.db_data["key"] = attr_name
converted_data = self.converter.convert(config, self.db_data)
self.assertDictEqual(converted_data, {attr_name: self.db_data[config[0]["value"]]})
def test_value_config(self):
config = [{"name": "someValue", "value": "stringValue + str(intValue)"}]
converted_data = self.converter.convert(config, self.db_data)
self.assertDictEqual(converted_data, {config[0]["name"]: self.db_data["stringValue"] + str(self.db_data["intValue"])})
def test_one_valid_one_invalid_configs(self):
config = ["unkownColumnValue", "stringValue"]
converted_data = self.converter.convert(config, self.db_data)
self.assertDictEqual(converted_data, {config[1]: self.db_data[config[1]]})
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,40 @@
{
"interface": "virtual",
"channel": "virtual_channel",
"backend": {
"fd": true
},
"devices": [
{
"name": "Car",
"strictEval": false,
"attributeUpdates": [
{
"attributeOnThingsBoard": "boolAttr",
"nodeId": 1
},
{
"attribute": "intAttr",
"nodeId": 2,
"isExtendedId": true,
"dataLength": 4,
"dataByteorder": "little"
},
{
"attributeOnThingsBoard": "floatAttr",
"nodeId": 3
},
{
"attribute": "stringAttr",
"nodeId": 4,
"isFd": true,
"dataExpression": "'Test' + value"
},
{
"attributeOnThingsBoard": "wrongConfigAttr",
"nodeId": 123456
}
]
}
]
}

View File

@@ -0,0 +1,39 @@
{
"interface": "virtual",
"channel": "virtual_channel",
"devices": [
{
"name": "TestDevice",
"timeseries": [
{
"key": "testVar1",
"nodeId": 1,
"value": "4:1:int",
"polling": {
"type": "once",
"dataInHex": "01"
}
},
{
"key": "testVar2",
"nodeId": 2,
"value": "4:1:int",
"polling": {
"dataInHex": "02",
"period": 2.5
}
},
{
"key": "testVar3",
"nodeId": 3,
"value": "4:1:int",
"polling": {
"type": "always",
"dataInHex": "03",
"period": 3
}
}
]
}
]
}

View File

@@ -0,0 +1,20 @@
{
"interface": "virtual",
"channel": "virtual_channel",
"devices": [
{
"name": "TestDevice",
"attributes": [
{
"key": "testVar",
"nodeId": 41,
"value": "4:1:int",
"polling": {
"dataInHex": "CC",
"period": 2
}
}
]
}
]
}

View File

@@ -0,0 +1,21 @@
{
"interface": "virtual",
"channel": "virtual_channel",
"devices": [
{
"name": "TestDevice",
"attributes": [
{
"key": "testVar",
"nodeId": 12345,
"isExtendedId": true,
"value": "4:1:int",
"polling": {
"type": "once",
"dataInHex": "AB CD AB CD"
}
}
]
}
]
}

66
tests/data/can/rpc.json Normal file
View File

@@ -0,0 +1,66 @@
{
"interface": "virtual",
"channel": "virtual_channel",
"backend": {
"fd": true
},
"devices": [
{
"name": "Car1",
"enableUnknownRpc": false,
"serverSideRpc": [
{
"method": "sendSameData",
"nodeId": 4,
"isExtendedId": true,
"isFd": true,
"bitrateSwitch": true,
"dataInHex": "aa bb cc dd ee ff aa bb aa bb cc dd ee ff"
},
{
"method": "setSpeed",
"nodeId": 16,
"dataExpression": "userSpeed if maxAllowedSpeed > userSpeed else maxAllowedSpeed",
"dataByteorder": "little",
"dataLength": 2
}
]
},
{
"name": "Car2",
"overrideRpcConfig": true,
"serverSideRpc": [
{
"method": "sendSameData",
"nodeId": 4,
"isExtendedId": true,
"isFd": true,
"bitrateSwitch": true,
"dataInHex": "aa bb cc dd ee ff aa bb aa bb cc dd ee ff"
}
]
},
{
"name": "Car3",
"enableUnknownRpc": true,
"serverSideRpc": [
{
"method": "someMethod",
"nodeId": 4,
"dataInHex": "010203"
}
]
},
{
"name": "Car4",
"serverSideRpc": [
{
"method": "wrongDataMethod",
"nodeId": 4,
"dataInHex": "123",
"response": true
}
]
}
]
}

View File

@@ -0,0 +1,47 @@
{
"interface": "virtual",
"channel": "virtual_channel",
"backend": {
"fd": true
},
"devices": [
{
"name": "Car1",
"type": "test_car",
"attributes": [
{
"key": "serialNumber",
"nodeId": 1,
"isFd": true,
"command": {
"start": 0,
"length": 2,
"byteorder": "little",
"value": 12345
},
"value": "2:8:string:utf-8"
}
],
"timeseries": [
{
"key": "milliage",
"nodeId": 1918,
"isExtendedId": true,
"value": "4:2:little:int",
"expression": "value * 10"
}
]
},
{
"name": "Car2",
"sendDataOnlyOnChange": true,
"timeseries": [
{
"key": "rpm",
"nodeId": 2,
"value": "4:2:int"
}
]
}
]
}

View File

@@ -0,0 +1,24 @@
{
"connection": {
"str": "DO_NOT_EDIT_THIS_PARAMETER.IT WILL BE OVERRIDDEN BY TESTS."
},
"polling": {
"query": "SELECT key, bool_v, str_v, dbl_v, long_v, device_id, ts FROM attributes WHERE ts > ? ORDER BY ts ASC LIMIT 10",
"period": 1,
"iterator": {
"column": "ts",
"query": "SELECT MIN(ts) - 1 FROM attributes"
}
},
"mapping": {
"device": {
"name": "'ODBC ' + str(device_id)"
},
"attributes": [
{
"nameExpression": "key",
"value": "[i for i in [str_v, long_v, dbl_v,bool_v] if i is not None][0]"
}
]
}
}

View File

@@ -0,0 +1,27 @@
{
"connection": {
"str": "DO_NOT_EDIT_THIS_PARAMETER.IT WILL BE OVERRIDDEN BY TESTS.",
"reconnectPeriod": 3
},
"polling": {
"query": "SELECT bool_v, str_v, dbl_v, long_v, device_id, ts FROM timeseries WHERE ts > ? ORDER BY ts ASC LIMIT 2",
"period": 5,
"iterator": {
"column": "ts",
"query": "SELECT MIN(ts) - 1 FROM timeseries",
"persistent": true
}
},
"mapping": {
"sendDataOnlyOnChange": true,
"device": {
"name": "'ODBC ' + str(device_id)"
},
"timeseries": [
{
"name": "value",
"value": "[i for i in [str_v, long_v, dbl_v,bool_v] if i is not None][0]"
}
]
}
}

View File

@@ -0,0 +1,43 @@
{
"connection": {
"str": "DO_NOT_EDIT_THIS_PARAMETER.IT WILL BE OVERRIDDEN BY TESTS.",
"reconnectPeriod": 5
},
"polling": {
"query": "SELECT key FROM attributes WHERE ts > ? ORDER BY ts ASC LIMIT 10",
"iterator": {
"column": "ts",
"query": "SELECT MAX(ts) FROM attributes"
}
},
"mapping": {
"device": {
"name": "'ODBC ' + entity_id"
},
"timeseries": [
{
"name": "value",
"value": "[i for i in [str_v, long_v, dbl_v,bool_v] if i is not None][0]"
}
]
},
"serverSideRpc": {
"methods": [
"decrement_value",
{
"name": "increment_value",
"query": "CALL increment_value()"
},
{
"name": "reset_values",
"params": [ "test", 25 ]
},
{
"name": "update_values",
"params": [ "hello world", 150 ],
"query": "CALL update_values(?,?)"
},
"get_values"
]
}
}

View File

@@ -0,0 +1,33 @@
{
"connection": {
"str": "DO_NOT_EDIT_THIS_PARAMETER.IT WILL BE OVERRIDDEN BY TESTS.",
"attributes": {
"autocommit": false,
"timeout": 0
}
},
"pyodbc": {
"pooling": false,
"native_uuid": true
},
"polling": {
"query": "SELECT bool_v, str_v, dbl_v, long_v, device_id, ts FROM timeseries WHERE ts > ? ORDER BY ts ASC LIMIT 10",
"period": 1,
"iterator": {
"column": "ts",
"query": "SELECT MIN(ts) - 1 FROM timeseries"
}
},
"mapping": {
"device": {
"type": "postgres",
"name": "'ODBC ' + str(device_id)"
},
"timeseries": [
{
"name": "value",
"value": "[i for i in [str_v, long_v, dbl_v,bool_v] if i is not None][0]"
}
]
}
}

View File

@@ -0,0 +1,129 @@
CREATE TABLE IF NOT EXISTS public.devices (
id smallint,
name character varying(32)
);
CREATE TABLE IF NOT EXISTS public.attributes (
device_id smallint,
ts bigint not null,
key character varying(32),
bool_v boolean,
str_v character varying(32),
long_v bigint,
dbl_v double precision
);
CREATE TABLE IF NOT EXISTS public.timeseries (
device_id smallint,
ts bigint not null,
bool_v boolean,
str_v character varying(32),
long_v bigint,
dbl_v double precision
);
CREATE TABLE IF NOT EXISTS public.rpc (
inc_value smallint not null,
str_value character varying(32) not null,
int_value smallint not null
);
CREATE OR REPLACE PROCEDURE public.increment_value()
LANGUAGE plpgsql
AS $$
BEGIN
UPDATE public.rpc SET inc_value = inc_value + 1;
END
$$;
CREATE OR REPLACE FUNCTION public.decrement_value()
RETURNS smallint
LANGUAGE plpgsql
AS $$
DECLARE
value smallint;
BEGIN
EXECUTE 'SELECT inc_value FROM public.rpc LIMIT 1' INTO value;
value := value - 1;
UPDATE public.rpc SET inc_value = value;
RETURN value;
END
$$;
CREATE OR REPLACE FUNCTION public.reset_values(string_value character varying, integer_value smallint)
RETURNS smallint
LANGUAGE plpgsql
AS $$
BEGIN
UPDATE public.rpc SET str_value = string_value, int_value = integer_value;
RETURN 1;
END
$$;
CREATE OR REPLACE FUNCTION public.secret_function(string_value character varying)
RETURNS smallint
LANGUAGE plpgsql
AS $$
BEGIN
UPDATE public.rpc SET str_value = string_value, inc_value = inc_value + 1;
RETURN 1;
END
$$;
CREATE OR REPLACE PROCEDURE public.update_values(string_value character varying, integer_value smallint)
LANGUAGE plpgsql
AS $$
BEGIN
UPDATE public.rpc SET str_value = string_value, int_value = integer_value;
END
$$;
CREATE OR REPLACE FUNCTION public.get_integer_value(column_name character varying, OUT return_value smallint)
RETURNS smallint
LANGUAGE plpgsql
AS $$
BEGIN
EXECUTE format('SELECT %s FROM public.rpc', column_name) into return_value;
END;
$$;
CREATE OR REPLACE FUNCTION public.get_string_value(column_name character varying)
RETURNS character varying
LANGUAGE plpgsql
AS $$
DECLARE
return_value character varying;
BEGIN
EXECUTE format('SELECT %s FROM public.rpc', column_name) into return_value;
RETURN return_value;
END;
$$;
CREATE OR REPLACE FUNCTION public.get_values()
RETURNS SETOF public.rpc
LANGUAGE plpgsql
AS $$
BEGIN
RETURN QUERY SELECT * FROM public.rpc LIMIT 1;
END;
$$;
INSERT INTO public.rpc VALUES (1, 'test', 25);
INSERT INTO public.devices (name) VALUES ('Sensor 1');
INSERT INTO public.devices (name) VALUES ('Sensor 2');
INSERT INTO public.devices (name) VALUES ('Sensor 3');
INSERT INTO public.devices (name) VALUES ('Sensor 4');
INSERT INTO public.timeseries (device_id, ts, bool_v, str_v, long_v, dbl_v) VALUES (1, 1589476731000, NULL, NULL, 0, NULL);
INSERT INTO public.timeseries (device_id, ts, bool_v, str_v, long_v, dbl_v) VALUES (1, 1589476732000, NULL, NULL, 0, NULL);
INSERT INTO public.timeseries (device_id, ts, bool_v, str_v, long_v, dbl_v) VALUES (1, 1589476733000, NULL, NULL, 5, NULL);
INSERT INTO public.timeseries (device_id, ts, bool_v, str_v, long_v, dbl_v) VALUES (1, 1589476734000, NULL, NULL, 5, NULL);
INSERT INTO public.timeseries (device_id, ts, bool_v, str_v, long_v, dbl_v) VALUES (1, 1589476735000, NULL, NULL, 9, NULL);
INSERT INTO public.timeseries (device_id, ts, bool_v, str_v, long_v, dbl_v) VALUES (1, 1589476736000, NULL, NULL, 9, NULL);
INSERT INTO public.attributes (device_id, ts, key, bool_v, str_v, long_v, dbl_v) VALUES (2, 1589476731000, 'serialNumber', NULL, '123456789', NULL, NULL);
INSERT INTO public.attributes (device_id, ts, key, bool_v, str_v, long_v, dbl_v) VALUES (2, 1589476732000, 'enableUpgrade', True, NULL, NULL, NULL);
INSERT INTO public.attributes (device_id, ts, key, bool_v, str_v, long_v, dbl_v) VALUES (2, 1589476733000, 'latitude', NULL, NULL, NULL, 51.62);
INSERT INTO public.attributes (device_id, ts, key, bool_v, str_v, long_v, dbl_v) VALUES (2, 1589476734000, 'longitude', NULL, NULL, NULL, 35.63);
INSERT INTO public.attributes (device_id, ts, key, bool_v, str_v, long_v, dbl_v) VALUES (2, 1589476735000, 'softwareVersion', NULL, NULL, 465, NULL);

BIN
tests/data/odbc/sqlite3.db Normal file

Binary file not shown.

View File

@@ -14,6 +14,7 @@
import logging
import unittest
from os import remove, listdir, removedirs
from time import sleep
from random import randint
from pymodbus.constants import Endian
@@ -409,14 +410,15 @@ class TestStorage(unittest.TestCase):
def test_file_storage(self):
test_size = randint(0, 100)
storage_test_config = {"data_folder_path": "storage/data/",
"max_files_count": 100,
"max_file_count": 1000,
"max_records_per_file": 10,
"max_read_records_count": 10,
"no_records_sleep_interval": 5000
}
test_size = randint(0, storage_test_config["max_file_count"]-1)
storage = FileEventStorage(storage_test_config)
for test_value in range(test_size * 10):
@@ -433,6 +435,9 @@ class TestStorage(unittest.TestCase):
print(result)
print(correct_result)
for file in listdir(storage_test_config["data_folder_path"]):
remove(storage_test_config["data_folder_path"]+"/"+file)
removedirs(storage_test_config["data_folder_path"])
self.assertListEqual(result, correct_result)

View File

@@ -18,12 +18,12 @@
"pooling": false
},
"polling": {
"query": "SELECT bool_v, str_v, dbl_v, long_v, entity_id, ts FROM ts_kv_latest WHERE ts > ? ORDER BY ts ASC LIMIT 10",
"query": "SELECT bool_v, str_v, dbl_v, long_v, entity_id, ts FROM ts_kv WHERE ts > ? ORDER BY ts ASC LIMIT 10",
"period": 10,
"iterator": {
"column": "ts",
"query": "SELECT MIN(ts) - 1 FROM ts_kv_latest",
"save": false
"query": "SELECT MIN(ts) - 1 FROM ts_kv",
"persistent": false
}
},
"mapping": {
@@ -47,7 +47,7 @@
"procedureOne",
{
"name": "procedureTwo",
"params": [ "One", 2, 3.0 ]
"args": [ "One", 2, 3.0 ]
}
]
}

View File

@@ -22,14 +22,15 @@ class BytesCanDownlinkConverter(CanConverter):
def convert(self, config, data):
try:
if config.get("dataInHex", ""):
return bytearray.fromhex(config["dataInHex"])
elif data.get("dataInHex", ""):
return bytearray.fromhex(data["dataInHex"])
return list(bytearray.fromhex(config["dataInHex"]))
if not isinstance(data, dict) or not data:
log.error("Failed to convert TB data to CAN payload: data is empty or not a dictionary")
return
if data.get("dataInHex", ""):
return list(bytearray.fromhex(data["dataInHex"]))
if config.get("dataExpression", ""):
value = eval(config["dataExpression"],
{"__builtins__": {}} if config.get("strictEval", True) else globals(),
@@ -50,7 +51,9 @@ class BytesCanDownlinkConverter(CanConverter):
elif isinstance(value, int) or isinstance(value, float):
byteorder = config["dataByteorder"] if config.get("dataByteorder", "") else "big"
if isinstance(value, int):
can_data.extend(value.to_bytes(config.get("dataLength", 1), byteorder))
can_data.extend(value.to_bytes(config.get("dataLength", 1),
byteorder,
signed=(config.get("dataSigned", False) or value < 0)))
else:
can_data.extend(struct.pack(">f" if byteorder[0] == "b" else "<f", value))
elif isinstance(value, str):

View File

@@ -24,34 +24,36 @@ class BytesCanUplinkConverter(CanConverter):
"telemetry": {}}
for config in configs:
tb_key = config["key"]
tb_item = "telemetry" if config["is_ts"] else "attributes"
try:
tb_key = config["key"]
tb_item = "telemetry" if config["is_ts"] else "attributes"
# The 'value' variable is used in eval
if config["type"][0] == "b":
value = bool(can_data[config["start"]])
elif config["type"][0] == "i" or config["type"][0] == "l":
value = int.from_bytes(can_data[config["start"]:config["start"] + config["length"]],
config["byteorder"])
config["byteorder"],
signed=config["signed"])
elif config["type"][0] == "f" or config["type"][0] == "d":
fmt = ">" + config["type"][0] if config["byteorder"][0] == "b" else "<" + config["type"][0]
value = struct.unpack_from(fmt, can_data[config["start"]:config["start"] + config["length"]])
value = struct.unpack_from(fmt,
bytes(can_data[config["start"]:config["start"] + config["length"]]))[0]
elif config["type"][0] == "s":
value = can_data[config["start"]:config["start"] + config["length"]].decode(config["encoding"])
else:
log.error("Failed to convert CAN data to TB %s '%s': unknown data type '%s'",
"timeseries key" if config["is_ts"] else "attribute", tb_key, config["type"])
return
"time series key" if config["is_ts"] else "attribute", tb_key, config["type"])
continue
if config.get("expression", ""):
result[tb_item][tb_key] = eval(config["expression"],
{"__builtins__": {}} if config.get("strictEval", True) else globals(),
{"__builtins__": {}} if config["strictEval"] else globals(),
{"value": value, "can_data": can_data})
else:
result[tb_item][tb_key] = value
except Exception as e:
log.error("Failed to convert CAN data to TB %s '%s': %s",
"timeseries key" if config["is_ts"] else "attribute", tb_key, str(e))
"time series key" if config["is_ts"] else "attribute", tb_key, str(e))
continue
return result

View File

@@ -21,6 +21,7 @@ from random import choice
from string import ascii_lowercase
from thingsboard_gateway.tb_utility.tb_utility import TBUtility
try:
from can import Notifier, BufferedReader, Message, CanError, ThreadSafeBus
except ImportError:
@@ -40,6 +41,27 @@ class CanConnector(Connector, Thread):
NO_CMD_ID = "no_cmd"
UNKNOWN_ARBITRATION_ID = -1
DEFAULT_RECONNECT_PERIOD = 30.0
DEFAULT_POLL_PERIOD = 1.0
DEFAULT_SEND_IF_CHANGED = False
DEFAULT_RECONNECT_STATE = True
DEFAULT_EXTENDED_ID_FLAG = False
DEFAULT_FD_FLAG = False
DEFAULT_BITRATE_SWITCH_FLAG = False
DEFAULT_BYTEORDER = "big"
DEFAULT_ENCODING = "ascii"
DEFAULT_ENABLE_UNKNOWN_RPC = False
DEFAULT_OVERRIDE_RPC_PARAMS = False
DEFAULT_STRICT_EVAL_FLAG = True
DEFAULT_SIGNED_FLAG = False
DEFAULT_RPC_RESPONSE_SEND_FLAG = False
def __init__(self, gateway, config, connector_type):
self.statistics = {'MessagesReceived': 0,
'MessagesSent': 0}
@@ -84,7 +106,7 @@ class CanConnector(Connector, Thread):
for attr_name, attr_value in content["data"].items():
attr_config = self.__shared_attributes.get(content["device"], {}).get(attr_name)
if attr_config is None:
log.warn("[%s] No configuration for '%s' attribute, ignore its update", self.get_name(), attr_name)
log.warning("[%s] No configuration for '%s' attribute, ignore its update", self.get_name(), attr_name)
return
log.debug("[%s] Processing attribute update for '%s' device: attribute=%s,value=%s",
@@ -97,23 +119,18 @@ class CanConnector(Connector, Thread):
self.get_name(), attr_name, content["device"])
return
done = self.send_data_to_bus(attr_config.get("nodeId", self.UNKNOWN_ARBITRATION_ID),
data,
attr_config.get("isExtendedId", False),
attr_config.get("isFd", False),
attr_config.get("bitrateSwitch", False),
True)
if not done:
done = self.send_data_to_bus(data, attr_config, data_check=True)
if done:
log.debug("[%s] Updated '%s' attribute for '%s' device", self.get_name(), attr_name, content["device"])
else:
log.error("[%s] Failed to update '%s' attribute for '%s' device",
self.get_name(), attr_name, content["device"])
else:
log.debug("[%s] Updated '%s' attribute for '%s' device", self.get_name(), attr_name, content["device"])
def server_side_rpc_handler(self, content):
rpc_config = self.__rpc_calls.get(content["device"], {}).get(content["data"]["method"])
if rpc_config is None:
if not self.__devices[content["device"]]["enableUnknownRpc"]:
log.warn("[%s] No configuration for '%s' RPC request (id=%s), ignore it",
log.warning("[%s] No configuration for '%s' RPC request (id=%s), ignore it",
self.get_name(), content["data"]["method"], content["data"]["id"])
return
else:
@@ -137,25 +154,20 @@ class CanConnector(Connector, Thread):
data = self.__converters[content["device"]]["downlink"].convert(conversion_config,
content["data"].get("params", {}))
if data is None:
if data is not None:
done = self.send_data_to_bus(data, conversion_config, data_check=True)
if done:
log.debug("[%s] Processed '%s' RPC request (id=%s) for '%s' device",
self.get_name(), content["data"]["method"], content["data"]["id"], content["device"])
else:
log.error("[%s] Failed to process '%s' RPC request (id=%s) for '%s' device",
self.get_name(), content["data"]["method"], content["data"]["id"], content["device"])
else:
done = False
log.error("[%s] Failed to process '%s' RPC request (id=%s) for '%s' device: data conversion failure",
self.get_name(), content["data"]["method"], content["data"]["id"], content["device"])
return
done = self.send_data_to_bus(conversion_config.get("nodeId", self.UNKNOWN_ARBITRATION_ID),
data,
conversion_config.get("isExtendedId"),
conversion_config.get("isFd"),
conversion_config.get("bitrateSwitch"),
True)
if not done:
log.error("[%s] Failed to process '%s' RPC request (id=%s) for '%s' device",
self.get_name(), content["data"]["method"], content["data"]["id"], content["device"])
else:
log.debug("[%s] Processed '%s' RPC request (id=%s) for '%s' device",
self.get_name(), content["data"]["method"], content["data"]["id"], content["device"])
if conversion_config.get("response", True):
if conversion_config.get("response", self.DEFAULT_RPC_RESPONSE_SEND_FLAG):
self.__gateway.send_rpc_reply(content["device"], content["data"]["id"], {"success": done})
def run(self):
@@ -177,9 +189,7 @@ class CanConnector(Connector, Thread):
poller = Poller(self)
# Poll once to check if network is not down.
# It would be better to have some kind of a ping message to check a bus state.
poller.poll_once()
self.__check_if_error_happened()
poller.start()
poller.poll()
# Initialize the connected flag and reconnect count only after bus creation and sending poll messages.
# It is expected that after these operations most likely the bus is up.
@@ -230,12 +240,12 @@ class CanConnector(Connector, Thread):
def get_polling_messages(self):
return self.__polling_messages
def send_data_to_bus(self, node_id, data, is_extended_id=False, is_fd=False, bitrate_switch=False, data_check=False):
def send_data_to_bus(self, data, config, data_check=True, raise_exception=False):
try:
self.__bus.send(Message(arbitration_id=node_id,
is_extended_id=is_extended_id,
is_fd=is_fd,
bitrate_switch=bitrate_switch,
self.__bus.send(Message(arbitration_id=config["nodeId"],
is_extended_id=config.get("isExtendedId", self.DEFAULT_EXTENDED_ID_FLAG),
is_fd=config.get("isFd", self.DEFAULT_FD_FLAG),
bitrate_switch=config.get("bitrateSwitch", self.DEFAULT_BITRATE_SWITCH_FLAG),
data=data,
check=data_check))
return True
@@ -243,11 +253,14 @@ class CanConnector(Connector, Thread):
log.error("[%s] Wrong CAN message data: %s", self.get_name(), str(e))
except CanError as e:
log.error("[%s] Failed to send CAN message: %s", self.get_name(), str(e))
self.__on_bus_error(e)
if raise_exception:
raise e
else:
self.__on_bus_error(e)
return False
def __on_bus_error(self, e):
log.warn("[%s] Notified about CAN bus error. Store it to later processing", self.get_name())
log.warning("[%s] Notified about CAN bus error. Store it to later processing", self.get_name())
self.__bus_error = e
def __check_if_error_happened(self):
@@ -261,14 +274,15 @@ class CanConnector(Connector, Thread):
config = {}
options = {
"nodeId": self.UNKNOWN_ARBITRATION_ID,
"isExtendedId": False,
"isFd": False,
"bitrateSwitch": False,
"isExtendedId": self.DEFAULT_EXTENDED_ID_FLAG,
"isFd": self.DEFAULT_FD_FLAG,
"bitrateSwitch": self.DEFAULT_BITRATE_SWITCH_FLAG,
"response": True,
"dataLength": 1,
"dataByteorder": "big",
"dataByteorder": self.DEFAULT_BYTEORDER,
"dataSigned": self.DEFAULT_SIGNED_FLAG,
"dataExpression": None,
"dataEncoding": "ascii",
"dataEncoding": self.DEFAULT_ENCODING,
"dataBefore": None,
"dataAfter": None,
"dataInHex": None
@@ -302,9 +316,9 @@ class CanConnector(Connector, Thread):
parsing_conf = self.__nodes[message.arbitration_id][cmd_id]
data = self.__converters[parsing_conf["deviceName"]]["uplink"].convert(parsing_conf["configs"], message.data)
if data is None or data.get("attributes", []) and data.get("telemetry", []):
log.error("[%s] Failed to process CAN message (id=%d,cmd_id=%s): data conversion failure",
self.get_name(), message.arbitration_id, cmd_id)
if data is None or not data.get("attributes", []) and not data.get("telemetry", []):
log.warning("[%s] Failed to process CAN message (id=%d,cmd_id=%s): data conversion failure",
self.get_name(), message.arbitration_id, cmd_id)
return
self.__check_and_send(parsing_conf, data)
@@ -343,8 +357,8 @@ class CanConnector(Connector, Thread):
def __parse_config(self, config):
self.__reconnect_count = 0
self.__reconnect_conf = {
"enabled": config.get("reconnect", True),
"period": config.get("reconnectPeriod", 30.0),
"enabled": config.get("reconnect", self.DEFAULT_RECONNECT_STATE),
"period": config.get("reconnectPeriod", self.DEFAULT_RECONNECT_PERIOD),
"maxCount": config.get("reconnectCount", None)
}
@@ -357,13 +371,14 @@ class CanConnector(Connector, Thread):
for device_config in config.get("devices"):
is_device_config_valid = False
device_name = device_config["name"]
device_type = device_config.get("type", "CAN")
strict_eval = device_config.get("strictEval", True)
device_type = device_config.get("type", self.__connector_type)
strict_eval = device_config.get("strictEval", self.DEFAULT_STRICT_EVAL_FLAG)
self.__devices[device_name] = {}
self.__devices[device_name]["enableUnknownRpc"] = device_config.get("enableUnknownRpc", False)
self.__devices[device_name]["enableUnknownRpc"] = device_config.get("enableUnknownRpc",
self.DEFAULT_ENABLE_UNKNOWN_RPC)
self.__devices[device_name]["overrideRpcConfig"] = True if self.__devices[device_name]["enableUnknownRpc"] \
else device_config.get("overrideRpcConfig", False)
else device_config.get("overrideRpcConfig", self.DEFAULT_OVERRIDE_RPC_PARAMS)
self.__converters[device_name] = {}
@@ -388,13 +403,14 @@ class CanConnector(Connector, Thread):
False)
for attribute_config in device_config["attributeUpdates"]:
attribute_config["strictEval"] = strict_eval
self.__shared_attributes[device_name][attribute_config["attributeOnThingsBoard"]] = attribute_config
attribute_name = attribute_config.get("attributeOnThingsBoard") or attribute_config.get("attribute")
self.__shared_attributes[device_name][attribute_name] = attribute_config
for config_key in ["timeseries", "attributes"]:
if config_key not in device_config or not device_config[config_key]:
continue
is_device_config_valid = False
is_device_config_valid = True
is_ts = (config_key[0] == "t")
tb_item = "telemetry" if is_ts else "attributes"
@@ -419,7 +435,7 @@ class CanConnector(Connector, Thread):
msg_config.update(value_config)
else:
log.warning("[%s] Ignore '%s' %s configuration: no value configuration",
self.get_name(), tb_key, config_key,)
self.get_name(), tb_key, config_key, )
continue
if msg_config.get("command", "") and node_id not in self.__commands:
@@ -442,7 +458,7 @@ class CanConnector(Connector, Thread):
self.__nodes[node_id][cmd_id] = {
"deviceName": device_name,
"deviceType": device_type,
"sendOnChange": device_config.get("sendDataOnlyOnChange", False),
"sendOnChange": device_config.get("sendDataOnlyOnChange", self.DEFAULT_SEND_IF_CHANGED),
"configs": []}
self.__nodes[node_id][cmd_id]["configs"].append(msg_config)
@@ -453,11 +469,13 @@ class CanConnector(Connector, Thread):
polling_config = msg_config.get("polling")
polling_config["key"] = tb_key # Just for logging
polling_config["type"] = polling_config.get("type", "always")
polling_config["period"] = polling_config.get("period", 1.0)
polling_config["period"] = polling_config.get("period", self.DEFAULT_POLL_PERIOD)
polling_config["nodeId"] = node_id
polling_config["isExtendedId"] = msg_config.get("isExtendedId", False)
polling_config["isFd"] = msg_config.get("isFd", False)
polling_config["bitrateSwitch"] = msg_config.get("bitrateSwitch", False)
polling_config["isExtendedId"] = msg_config.get("isExtendedId",
self.DEFAULT_EXTENDED_ID_FLAG)
polling_config["isFd"] = msg_config.get("isFd", self.DEFAULT_FD_FLAG)
polling_config["bitrateSwitch"] = msg_config.get("bitrateSwitch",
self.DEFAULT_BITRATE_SWITCH_FLAG)
# Create CAN message object to validate its data
can_msg = Message(arbitration_id=polling_config["nodeId"],
is_extended_id=polling_config["isExtendedId"],
@@ -488,22 +506,35 @@ class CanConnector(Connector, Thread):
log.warning("[%s] Wrong value configuration: '%s' doesn't match pattern", self.get_name(), config)
return
return {
value_config = {
"start": int(value_matches.group(1)),
"length": int(value_matches.group(2)),
"byteorder": value_matches.group(3) if value_matches.group(3) else "big",
"type": value_matches.group(4),
"encoding": value_matches.group(5) if value_matches.group(5) else "ascii"
"byteorder": value_matches.group(3) if value_matches.group(3) else self.DEFAULT_BYTEORDER,
"type": value_matches.group(4)
}
if value_config["type"][0] == "i" or value_config["type"][0] == "l":
value_config["signed"] = value_matches.group(5) == "signed" if value_matches.group(5) \
else self.DEFAULT_SIGNED_FLAG
elif value_config["type"][0] == "s":
value_config["encoding"] = value_matches.group(5) if value_matches.group(5) else self.DEFAULT_ENCODING
return value_config
elif isinstance(config, dict):
try:
return {
value_config = {
"start": int(config["start"]),
"length": int(config["length"]),
"byteorder": config["byteorder"] if config.get("byteorder", "") else "big",
"type": config["type"],
"encoding": config["encoding"] if config.get("encoding", "") else "ascii"
"byteorder": config["byteorder"] if config.get("byteorder", "") else self.DEFAULT_BYTEORDER,
"type": config["type"]
}
if value_config["type"][0] == "i" or value_config["type"][0] == "l":
value_config["signed"] = config.get("signed", self.DEFAULT_SIGNED_FLAG)
elif value_config["type"][0] == "s":
value_config["encoding"] = config["encoding"] if config.get("encoding", "") else self.DEFAULT_ENCODING
return value_config
except (KeyError, ValueError) as e:
log.warning("[%s] Wrong value configuration: %s", self.get_name(), str(e))
return
@@ -524,7 +555,7 @@ class CanConnector(Connector, Thread):
return {
"start": int(cmd_matches.group(1)),
"length": int(cmd_matches.group(2)),
"byteorder": cmd_matches.group(3) if cmd_matches.group(3) else "big",
"byteorder": cmd_matches.group(3) if cmd_matches.group(3) else self.DEFAULT_BYTEORDER,
"value": int(cmd_matches.group(4))
}
elif isinstance(config, dict):
@@ -532,7 +563,7 @@ class CanConnector(Connector, Thread):
return {
"start": int(config["start"]),
"length": int(config["length"]),
"byteorder": config["byteorder"] if config.get("byteorder", "") else "big",
"byteorder": config["byteorder"] if config.get("byteorder", "") else self.DEFAULT_BYTEORDER,
"value": int(config["value"])
}
except (KeyError, ValueError) as e:
@@ -561,36 +592,28 @@ class Poller(Thread):
self.connector = connector
self.scheduler = sched.scheduler(time.time, time.sleep)
self.events = []
self.first_run = True
self.daemon = True
log.info("[%s] Starting poller", self.connector.get_name())
def poll_once(self):
log.info("[%s] Starting one time poll", self.connector.get_name())
for polling_config in self.connector.get_polling_messages():
self.connector.send_data_to_bus(polling_config["nodeId"],
bytearray.fromhex(polling_config["dataInHex"]),
polling_config["isExtendedId"],
polling_config["isFd"],
polling_config["bitrateSwitch"])
def poll(self):
if self.first_run:
log.info("[%s] Starting poller", self.connector.get_name())
def run(self):
for polling_config in self.connector.get_polling_messages():
key = polling_config["key"]
if polling_config["type"] == "always":
log.info("[%s] Polling '%s' key every %f sec", self.connector.get_name(), key, polling_config["period"])
self.__poll_and_schedule(polling_config["period"],
polling_config["nodeId"],
bytearray.fromhex(polling_config["dataInHex"]),
polling_config["isExtendedId"],
polling_config["isFd"],
polling_config["bitrateSwitch"])
else:
self.__poll_and_schedule(bytearray.fromhex(polling_config["dataInHex"]), polling_config)
elif self.first_run:
log.info("[%s] Polling '%s' key once", self.connector.get_name(), key)
self.connector.send_data_to_bus(polling_config["nodeId"],
bytearray.fromhex(polling_config["dataInHex"]),
polling_config["isExtendedId"],
polling_config["isFd"],
polling_config["bitrateSwitch"])
self.connector.send_data_to_bus(bytearray.fromhex(polling_config["dataInHex"]),
polling_config,
raise_exception=self.first_run)
if self.first_run:
self.first_run = False
self.start()
def run(self):
self.scheduler.run()
log.info("[%s] Poller stopped", self.connector.get_name())
@@ -599,18 +622,15 @@ class Poller(Thread):
for event in self.events:
self.scheduler.cancel(event)
def __poll_and_schedule(self, period, node_id, data, is_extended_id, is_fd, bitrate_switch):
def __poll_and_schedule(self, data, config):
if self.connector.is_stopped():
return
if self.events:
self.events.pop(0)
log.debug("[%s] Sending periodic (%d sec) CAN message (arbitration_id=%d, data=%s)",
self.connector.get_name(), period, node_id, data)
self.connector.send_data_to_bus(node_id, data, is_extended_id, is_fd, bitrate_switch)
log.debug("[%s] Sending periodic (%f sec) CAN message (arbitration_id=%d, data=%s)",
self.connector.get_name(), config["period"], config["nodeId"], data)
self.connector.send_data_to_bus(data, config, raise_exception=self.first_run)
event = self.scheduler.enter(period,
1,
self.__poll_and_schedule,
argument=(period, node_id, data, is_extended_id, is_fd, bitrate_switch))
event = self.scheduler.enter(config["period"], 1, self.__poll_and_schedule, argument=(data, config))
self.events.append(event)

View File

@@ -209,7 +209,10 @@ class ModbusConnector(Connector, threading.Thread):
def __configure_master(self):
host = self.__config.get("host", "localhost")
port = self.__config.get("port", 502)
try:
port = self.__config.get(int("port"), 502)
except ValueError:
port = self.__config.get("port", 502)
baudrate = self.__config.get('baudrate', 19200)
timeout = self.__config.get("timeout", 35)
method = self.__config.get('method', 'rtu')

View File

@@ -11,7 +11,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from hashlib import sha1
from os import path
from pathlib import Path
from time import sleep
from random import choice
@@ -20,6 +22,7 @@ from threading import Thread
from simplejson import dumps, load
from thingsboard_gateway.tb_utility.tb_utility import TBUtility
try:
import pyodbc
except ImportError:
@@ -33,13 +36,14 @@ from thingsboard_gateway.connectors.connector import Connector, log
class OdbcConnector(Connector, Thread):
DEFAULT_SEND_IF_CHANGED = True
DEFAULT_SEND_IF_CHANGED = False
DEFAULT_RECONNECT_STATE = True
DEFAULT_SAVE_ITERATOR = False
DEFAULT_RECONNECT_PERIOD = 60
DEFAULT_POLL_PERIOD = 60
DEFAULT_ENABLE_UNKNOWN_RPC = False
DEFAULT_OVERRIDE_RPC_PARAMS = False
DEFAULT_PROCESS_RPC_RESULT = False
def __init__(self, gateway, config, connector_type):
super().__init__()
@@ -53,7 +57,7 @@ class OdbcConnector(Connector, Thread):
self.__config = config
self.__stopped = False
self.__config_dir = "thingsboard_gateway/config/odbc/"
self.__config_dir = self.__gateway.get_config_path() + "odbc" + path.sep
self.__connection = None
self.__cursor = None
@@ -96,46 +100,64 @@ class OdbcConnector(Connector, Thread):
done = False
try:
if not self.is_connected():
log.warn("[%s] Cannot process RPC request: not connected to database", self.get_name())
log.warning("[%s] Cannot process RPC request: not connected to database", self.get_name())
raise Exception("no connection")
sql_params = self.__config["serverSideRpc"]["methods"].get(content["data"]["method"])
if sql_params is None:
is_rpc_unknown = False
rpc_config = self.__config["serverSideRpc"]["methods"].get(content["data"]["method"])
if rpc_config is None:
if not self.__config["serverSideRpc"]["enableUnknownRpc"]:
log.warn("[%s] Ignore unknown RPC request '%s' (id=%s)",
self.get_name(), content["data"]["method"], content["data"]["id"])
log.warning("[%s] Ignore unknown RPC request '%s' (id=%s)",
self.get_name(), content["data"]["method"], content["data"]["id"])
raise Exception("unknown RPC request")
else:
sql_params = content["data"].get("params", {}).get("args", [])
elif self.__config["serverSideRpc"]["overrideRpcConfig"]:
if content["data"].get("params", {}).get("args", []):
sql_params = content["data"]["params"]["args"]
is_rpc_unknown = True
rpc_config = content["data"].get("params", {})
sql_params = rpc_config.get("args", [])
query = rpc_config.get("query", "")
else:
if self.__config["serverSideRpc"]["overrideRpcConfig"]:
rpc_config = {**rpc_config, **content["data"].get("params", {})}
log.debug("[%s] Processing '%s' RPC request (id=%s) for '%s' device: params=%s",
self.get_name(), content["data"]["method"], content["data"]["id"],
content["device"], content["data"].get("params"))
# The params attribute is obsolete but leave for backward configuration compatibility
sql_params = rpc_config.get("args") or rpc_config.get("params", [])
query = rpc_config.get("query", "")
log.debug("[%s] Processing %s '%s' RPC request (id=%s) for '%s' device: params=%s, query=%s",
self.get_name(), "unknown" if is_rpc_unknown else "", content["data"]["method"],
content["data"]["id"], content["device"], sql_params, query)
if self.__rpc_cursor is None:
self.__rpc_cursor = self.__connection.cursor()
if sql_params:
self.__rpc_cursor.execute("{{CALL {} ({})}}".format(content["data"]["method"],
("?," * len(sql_params))[0:-1]),
sql_params)
if query:
if sql_params:
self.__rpc_cursor.execute(query, sql_params)
else:
self.__rpc_cursor.execute(query)
else:
self.__rpc_cursor.execute("{{CALL {}}}".format(content["data"]["method"]))
if sql_params:
self.__rpc_cursor.execute("{{CALL {} ({})}}".format(content["data"]["method"],
("?," * len(sql_params))[0:-1]),
sql_params)
else:
self.__rpc_cursor.execute("{{CALL {}}}".format(content["data"]["method"]))
done = True
log.debug("[%s] Processed '%s' RPC request (id=%s) for '%s' device",
self.get_name(), content["data"]["method"], content["data"]["id"], content["device"])
except pyodbc.Warning as w:
log.warn("[%s] Warning while processing '%s' RPC request (id=%s) for '%s' device: %s",
self.get_name(), content["data"]["method"], content["data"]["id"], content["device"], str(w))
log.warning("[%s] Warning while processing '%s' RPC request (id=%s) for '%s' device: %s",
self.get_name(), content["data"]["method"], content["data"]["id"], content["device"], str(w))
except Exception as e:
log.error("[%s] Failed to process '%s' RPC request (id=%s) for '%s' device: %s",
self.get_name(), content["data"]["method"], content["data"]["id"], content["device"], str(e))
finally:
self.__gateway.send_rpc_reply(content["device"], content["data"]["id"], {"success": done})
if done and rpc_config.get("result", self.DEFAULT_PROCESS_RPC_RESULT):
response = self.row_to_dict(self.__rpc_cursor.fetchone())
self.__gateway.send_rpc_reply(content["device"], content["data"]["id"], response)
else:
self.__gateway.send_rpc_reply(content["device"], content["data"]["id"], {"success": done})
def run(self):
while not self.__stopped:
@@ -150,7 +172,6 @@ class OdbcConnector(Connector, Thread):
if not self.is_connected():
log.error("[%s] Cannot connect to database so exit from main loop", self.get_name())
self.__stopped = True
break
if not self.__init_iterator():
@@ -160,58 +181,58 @@ class OdbcConnector(Connector, Thread):
# Polling phase
try:
self.__poll()
# self.server_side_rpc_handler({"device": "RPC test",
# "data": {
# "id": 777,
# "method": "usp_NoParameters",
# "params": [ 8, True, "Three" ]
# }})
if not self.__stopped:
polling_period = self.__config["polling"].get("period", self.DEFAULT_POLL_PERIOD)
log.debug("[%s] Next polling iteration will be in %d second(s)", self.get_name(), polling_period)
sleep(polling_period)
except pyodbc.Warning as w:
log.warn("[%s] Warning while polling database: %s", self.get_name(), str(w))
log.warning("[%s] Warning while polling database: %s", self.get_name(), str(w))
except pyodbc.Error as e:
log.error("[%s] Error while polling database: %s", self.get_name(), str(e))
self.__close()
self.__close()
self.__stopped = False
log.info("[%s] Stopped", self.get_name())
def __close(self):
if self.is_connected():
try:
self.__cursor.close()
if self.__rpc_cursor is not None:
self.__rpc_cursor.close()
self.__connection.close()
finally:
log.info("[%s] Connection to database closed", self.get_name())
self.__connection = None
self.__cursor = None
self.__rpc_cursor = None
def __poll(self):
rows = self.__cursor.execute(self.__config["polling"]["query"], self.__iterator["value"])
if not self.__column_names and self.__cursor.rowcount > 0:
if not self.__column_names:
for column in self.__cursor.description:
self.__column_names.append(column[0])
log.info("[%s] Fetch column names: %s", self.get_name(), self.__column_names)
# For some reason pyodbc.Cursor.rowcount may be 0 (sqlite) so use our own row counter
row_count = 0
for row in rows:
# log.debug("[%s] Fetch row: %s", self.get_name(), row)
row_count += 1
self.__process_row(row)
self.__iterator["total"] += self.__cursor.rowcount
self.__iterator["total"] += row_count
log.info("[%s] Polling iteration finished. Processed rows: current %d, total %d",
self.get_name(), self.__cursor.rowcount, self.__iterator["total"])
self.get_name(), row_count, self.__iterator["total"])
if self.__config["polling"]["iterator"].get("persistent",
self.DEFAULT_SAVE_ITERATOR) and self.__cursor.rowcount > 0:
if self.__config["polling"]["iterator"]["persistent"] and row_count > 0:
self.__save_iterator_config()
def __process_row(self, row):
try:
data = {}
for column in self.__column_names:
data[column] = getattr(row, column)
data = self.row_to_dict(row)
to_send = {"attributes": {} if "attributes" not in self.__config["mapping"] else
self.__converter.convert(self.__config["mapping"]["attributes"], data),
@@ -228,7 +249,14 @@ class OdbcConnector(Connector, Thread):
self.__config["mapping"]["device"].get("type", self.__connector_type),
to_send)
except Exception as e:
log.warn("[%s] Failed to process database row: %s", self.get_name(), str(e))
log.warning("[%s] Failed to process database row: %s", self.get_name(), str(e))
@staticmethod
def row_to_dict(row):
data = {}
for column_description in row.cursor_description:
data[column_description[0]] = getattr(row, column_description[0])
return data
def __check_and_send(self, device_name, device_type, new_data):
self.statistics['MessagesReceived'] += 1
@@ -275,8 +303,8 @@ class OdbcConnector(Connector, Thread):
self.get_name(), decoding_config["metadata"])
self.__connection.setdecoding(pyodbc.SQL_WMETADATA, decoding_config["metadata"])
else:
log.warn("[%s] Unknown decoding configuration %s. Read data may be misdecoded", self.get_name(),
decoding_config)
log.warning("[%s] Unknown decoding configuration %s. Read data may be misdecoded", self.get_name(),
decoding_config)
self.__cursor = self.__connection.cursor()
log.info("[%s] Connection to database opened, attributes %s",
@@ -301,11 +329,16 @@ class OdbcConnector(Connector, Thread):
self.__iterator_file_name = sha1(file_name.encode()).hexdigest() + ".json"
log.debug("[%s] Iterator file name resolved to %s", self.get_name(), self.__iterator_file_name)
except Exception as e:
log.warn("[%s] Failed to resolve iterator file name: %s", self.get_name(), str(e))
log.warning("[%s] Failed to resolve iterator file name: %s", self.get_name(), str(e))
return bool(self.__iterator_file_name)
def __init_iterator(self):
save_iterator = self.__config["polling"]["iterator"].get("persistent", self.DEFAULT_SAVE_ITERATOR)
save_iterator = self.DEFAULT_SAVE_ITERATOR
if "persistent" not in self.__config["polling"]["iterator"]:
self.__config["polling"]["iterator"]["persistent"] = save_iterator
else:
save_iterator = self.__config["polling"]["iterator"]["persistent"]
log.info("[%s] Iterator saving %s", self.get_name(), "enabled" if save_iterator else "disabled")
if save_iterator and self.__load_iterator_config():
@@ -328,7 +361,7 @@ class OdbcConnector(Connector, Thread):
log.info("[%s] Init iterator from database: column=%s, start_value=%s",
self.get_name(), self.__iterator["name"], self.__iterator["value"])
except pyodbc.Warning as w:
log.warn("[%s] Warning on init iterator from database: %s", self.get_name(), str(w))
log.warning("[%s] Warning on init iterator from database: %s", self.get_name(), str(w))
except pyodbc.Error as e:
log.error("[%s] Failed to init iterator from database: %s", self.get_name(), str(e))
else:
@@ -378,6 +411,8 @@ class OdbcConnector(Connector, Thread):
log.info("[%s] Set pyodbc attributes: %s", self.get_name(), pyodbc_config)
def __parse_rpc_config(self):
if "serverSideRpc" not in self.__config:
self.__config["serverSideRpc"] = {}
if "enableUnknownRpc" not in self.__config["serverSideRpc"]:
self.__config["serverSideRpc"]["enableUnknownRpc"] = self.DEFAULT_ENABLE_UNKNOWN_RPC
@@ -397,10 +432,11 @@ class OdbcConnector(Connector, Thread):
reformatted_config = {}
for rpc_config in self.__config["serverSideRpc"]["methods"]:
if isinstance(rpc_config, str):
reformatted_config[rpc_config] = []
reformatted_config[rpc_config] = {}
elif isinstance(rpc_config, dict):
reformatted_config[rpc_config["name"]] = rpc_config.get("params", [])
reformatted_config[rpc_config["name"]] = rpc_config
else:
log.warn("[%s] Wrong RPC config format. Expected str or dict, get %s", self.get_name(), type(rpc_config))
log.warning("[%s] Wrong RPC config format. Expected str or dict, get %s", self.get_name(),
type(rpc_config))
self.__config["serverSideRpc"]["methods"] = reformatted_config

View File

@@ -27,10 +27,15 @@ class OdbcUplinkConverter(OdbcConverter):
if isinstance(config_item, str):
converted_data[config_item] = data[config_item]
elif isinstance(config_item, dict):
if "nameExpression" in config_item:
name = eval(config_item["nameExpression"], globals(), data)
else:
name = config_item["name"]
if "column" in config_item:
converted_data[config_item["name"]] = data[config_item["column"]]
converted_data[name] = data[config_item["column"]]
elif "value" in config_item:
converted_data[config_item["name"]] = eval(config_item["value"], globals(), data)
converted_data[name] = eval(config_item["value"], globals(), data)
else:
log.error("Failed to convert SQL data to TB format: no column/value configuration item")
else: