mirror of
https://github.com/thingsboard/thingsboard-gateway
synced 2025-10-26 22:31:42 +08:00
Fixed and improved default JSON MQTT uplink converter.
This commit is contained in:
@@ -78,6 +78,39 @@ class JsonMqttUplinkConverterTests(unittest.TestCase):
|
||||
converted_array_data = converter.convert(topic, data)
|
||||
self.assertTrue(converted_array_data.get(SEND_ON_CHANGE_PARAMETER))
|
||||
|
||||
def test_parse_device_name_from_spaced_key_name(self):
|
||||
device_key_name = "device name"
|
||||
|
||||
topic, config, data = self._get_device_test_data_with_spaced_key_and_different_out_type(device_key_name)
|
||||
converter = JsonMqttUplinkConverter(config, logger=logging.getLogger('converter'))
|
||||
converted_data = converter.convert(topic, data)
|
||||
|
||||
self.assertEqual(data[device_key_name], converted_data["deviceName"])
|
||||
|
||||
def test_convert_data_from_string_to_int_without_eval(self):
|
||||
use_eval = False
|
||||
device_key_name = "device name"
|
||||
attr_key_name = "test_key"
|
||||
|
||||
topic, config, data = self._get_device_test_data_with_spaced_key_and_different_out_type(
|
||||
device_key_name, attr_key_name, use_eval)
|
||||
converter = JsonMqttUplinkConverter(config, logger=logging.getLogger('converter'))
|
||||
converted_data = converter.convert(topic, data)
|
||||
|
||||
self.assertEqual(converted_data[ATTRIBUTES_PARAMETER][0][attr_key_name], int(float(data[attr_key_name])))
|
||||
|
||||
def test_convert_data_from_string_to_int_with_eval(self):
|
||||
use_eval = True
|
||||
device_key_name = "device name"
|
||||
attr_key_name = "test_key"
|
||||
|
||||
topic, config, data = self._get_device_test_data_with_spaced_key_and_different_out_type(
|
||||
device_key_name, attr_key_name, use_eval)
|
||||
converter = JsonMqttUplinkConverter(config, logger=logging.getLogger('converter'))
|
||||
converted_data = converter.convert(topic, data)
|
||||
|
||||
self.assertEqual(converted_data[ATTRIBUTES_PARAMETER][0][attr_key_name], 2 * int(float(data[attr_key_name])))
|
||||
|
||||
@staticmethod
|
||||
def _convert_to_dict(data_array):
|
||||
data_dict = {}
|
||||
@@ -128,6 +161,32 @@ class JsonMqttUplinkConverterTests(unittest.TestCase):
|
||||
}
|
||||
return topic, config, data
|
||||
|
||||
def _get_device_test_data_with_spaced_key_and_different_out_type(self, device_name_key, attr_key_name="test_key", use_eval=False):
|
||||
topic = "topic"
|
||||
value_expression = "${" + attr_key_name + "}"
|
||||
config = {
|
||||
"topicFilter": topic,
|
||||
"converter": {
|
||||
"type": "json",
|
||||
"useEval": use_eval,
|
||||
"deviceNameJsonExpression": "${" + device_name_key + "}",
|
||||
"deviceTypeJsonExpression": self.DEVICE_TYPE,
|
||||
"attributes": [
|
||||
{
|
||||
"type": "int",
|
||||
"key": attr_key_name,
|
||||
"value": f"{value_expression} + {value_expression}" if use_eval else value_expression
|
||||
}
|
||||
],
|
||||
"timeseries": []
|
||||
}
|
||||
}
|
||||
data = {
|
||||
device_name_key: self.DEVICE_NAME,
|
||||
attr_key_name: "21.420000"
|
||||
}
|
||||
return topic, config, data
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
@@ -23,10 +23,13 @@ from thingsboard_gateway.gateway.statistics_service import StatisticsService
|
||||
|
||||
|
||||
class JsonMqttUplinkConverter(MqttUplinkConverter):
|
||||
CONFIGURATION_OPTION_USE_EVAL = "useEval"
|
||||
|
||||
def __init__(self, config, logger):
|
||||
self._log = logger
|
||||
self.__config = config.get('converter')
|
||||
self.__send_data_on_change = self.__config.get(SEND_ON_CHANGE_PARAMETER)
|
||||
self.__use_eval = self.__config.get(self.CONFIGURATION_OPTION_USE_EVAL, False)
|
||||
|
||||
@property
|
||||
def config(self):
|
||||
@@ -82,22 +85,18 @@ class JsonMqttUplinkConverter(MqttUplinkConverter):
|
||||
|
||||
full_key = datatype_config["key"]
|
||||
for (key, key_tag) in zip(keys, keys_tags):
|
||||
is_valid_key = "${" in datatype_config["key"] and "}" in \
|
||||
datatype_config["key"]
|
||||
full_key = full_key.replace('${' + str(key_tag) + '}',
|
||||
str(key)) if is_valid_key else key_tag
|
||||
is_valid_key = "${" in datatype_config["key"] and "}" in datatype_config["key"]
|
||||
full_key = full_key.replace('${' + str(key_tag) + '}', str(key)) if is_valid_key else key_tag
|
||||
|
||||
full_value = datatype_config["value"]
|
||||
for (value, value_tag) in zip(values, values_tags):
|
||||
is_valid_value = "${" in datatype_config["value"] and "}" in \
|
||||
datatype_config["value"]
|
||||
|
||||
full_value = full_value.replace('${' + str(value_tag) + '}',
|
||||
str(value)) if is_valid_value else value
|
||||
is_valid_value = "${" in datatype_config["value"] and "}" in datatype_config["value"]
|
||||
full_value = full_value.replace('${' + str(value_tag) + '}', str(value)) if is_valid_value else value
|
||||
|
||||
if full_key != 'None' and full_value != 'None':
|
||||
dict_result[datatypes[datatype]].append(
|
||||
self.create_timeseries_record(full_key, full_value, timestamp))
|
||||
self.create_timeseries_record(full_key, TBUtility.convert_data_type(
|
||||
full_value, datatype_config["type"], self.__use_eval), timestamp))
|
||||
except Exception as e:
|
||||
self._log.error('Error in converter, for config: \n%s\n and message: \n%s\n %s', dumps(self.__config),
|
||||
str(data), e)
|
||||
|
||||
@@ -108,7 +108,8 @@ class TBUtility:
|
||||
full_value = body.get(target_str.split()[0])
|
||||
elif isinstance(body, (dict, list)):
|
||||
try:
|
||||
jsonpath_expression = parse(target_str)
|
||||
# Wrap in quotes to support key name with spaces
|
||||
jsonpath_expression = parse('"' + target_str + '"')
|
||||
jsonpath_match = jsonpath_expression.find(body)
|
||||
if jsonpath_match:
|
||||
full_value = jsonpath_match[0].value
|
||||
@@ -126,7 +127,7 @@ class TBUtility:
|
||||
|
||||
@staticmethod
|
||||
def get_values(expression, body=None, value_type="string", get_tag=False, expression_instead_none=False):
|
||||
expression_arr = findall(r'\$\{[${A-Za-z0-9.^\]\[*_:]*\}', expression)
|
||||
expression_arr = findall(r'\$\{[${A-Za-z0-9. ^\]\[*_:]*\}', expression)
|
||||
|
||||
values = [TBUtility.get_value(exp, body, value_type=value_type, get_tag=get_tag,
|
||||
expression_instead_none=expression_instead_none) for exp in expression_arr]
|
||||
@@ -219,3 +220,20 @@ class TBUtility:
|
||||
return TBUtility.generate_certificate(certificate, key, cert_detail)
|
||||
else:
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def convert_data_type(data, new_type, use_eval=False):
|
||||
current_type = type(data)
|
||||
# use 'in' check instead of equality for such case like 'str' and 'string'
|
||||
if current_type.__name__ in new_type:
|
||||
return data
|
||||
|
||||
evaluated_data = eval(data, globals(), {}) if use_eval else data
|
||||
if 'int' in new_type or 'long' in new_type:
|
||||
return int(float(evaluated_data))
|
||||
elif 'float' in new_type or 'double' in new_type:
|
||||
return float(evaluated_data)
|
||||
elif 'bool' in new_type:
|
||||
return bool(evaluated_data)
|
||||
else:
|
||||
return str(evaluated_data)
|
||||
Reference in New Issue
Block a user