mirror of
https://github.com/thingsboard/thingsboard-gateway
synced 2025-10-26 22:31:42 +08:00
Improvements for statistic collect and remote configuration.
This commit is contained in:
@@ -33,7 +33,7 @@ class ModbusConnector(Connector, threading.Thread):
|
||||
'MessagesSent': 0}
|
||||
super().__init__()
|
||||
self.__gateway = gateway
|
||||
self.__connector_type = connector_type
|
||||
self._connector_type = connector_type
|
||||
self.__master = None
|
||||
self.__config = config.get("server")
|
||||
self.__configure_master()
|
||||
@@ -70,11 +70,11 @@ class ModbusConnector(Connector, threading.Thread):
|
||||
try:
|
||||
for device in self.__config["devices"]:
|
||||
if self.__config.get("converter") is not None:
|
||||
converter = TBUtility.check_and_import(self.__connector_type, self.__config["converter"])(device)
|
||||
converter = TBUtility.check_and_import(self._connector_type, self.__config["converter"])(device)
|
||||
else:
|
||||
converter = BytesModbusUplinkConverter(device)
|
||||
if self.__config.get("downlink_converter") is not None:
|
||||
downlink_converter = TBUtility.check_and_import(self.__connector_type, self.__config["downlink_converter"])(device)
|
||||
downlink_converter = TBUtility.check_and_import(self._connector_type, self.__config["downlink_converter"])(device)
|
||||
else:
|
||||
downlink_converter = BytesModbusDownlinkConverter(device)
|
||||
if device.get('deviceName') not in self.__gateway.get_devices():
|
||||
|
||||
@@ -30,7 +30,7 @@ class MqttConnector(Connector, Thread):
|
||||
super().__init__()
|
||||
self.__log = log
|
||||
self.config = config
|
||||
self.__connector_type = connector_type
|
||||
self._connector_type = connector_type
|
||||
self.statistics = {'MessagesReceived': 0,
|
||||
'MessagesSent': 0}
|
||||
self.__gateway = gateway
|
||||
@@ -147,7 +147,7 @@ class MqttConnector(Connector, Thread):
|
||||
converter = None
|
||||
if mapping["converter"]["type"] == "custom":
|
||||
try:
|
||||
module = TBUtility.check_and_import(self.__connector_type, mapping["converter"]["extension"])
|
||||
module = TBUtility.check_and_import(self._connector_type, mapping["converter"]["extension"])
|
||||
if module is not None:
|
||||
self.__log.debug('Custom converter for topic %s - found!', mapping["topicFilter"])
|
||||
converter = module(mapping)
|
||||
@@ -246,7 +246,7 @@ class MqttConnector(Connector, Thread):
|
||||
if message.topic in request.get("topicFilter") or\
|
||||
(request.get("deviceNameTopicExpression") is not None and search(request.get("deviceNameTopicExpression"), message.topic)):
|
||||
founded_device_name = None
|
||||
founded_device_type
|
||||
founded_device_type = 'default'
|
||||
if request.get("deviceNameJsonExpression"):
|
||||
founded_device_name = TBUtility.get_value(request["deviceNameJsonExpression"], content)
|
||||
if request.get("deviceNameTopicExpression"):
|
||||
|
||||
@@ -22,6 +22,7 @@ from logging.handlers import MemoryHandler
|
||||
from os import remove
|
||||
from thingsboard_gateway.gateway.tb_client import TBClient
|
||||
from thingsboard_gateway.gateway.tb_logger import TBLoggerHandler
|
||||
from thingsboard_gateway.tb_utility.tb_utility import TBUtility
|
||||
|
||||
log = getLogger("service")
|
||||
|
||||
@@ -129,6 +130,8 @@ class RemoteConfigurator:
|
||||
self.__gateway.connectors_configs[connector['type']] = []
|
||||
self.__gateway.connectors_configs[connector['type']].append(
|
||||
{"name": connector["name"], "config": {connector['configuration']: input_connector["config"]}})
|
||||
connector_class = TBUtility.check_and_import(connector["type"], self.__gateway._default_connectors.get(connector["type"], connector.get("class")))
|
||||
self.__gateway._implemented_connectors[connector["type"]] = connector_class
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
|
||||
@@ -148,6 +151,7 @@ class RemoteConfigurator:
|
||||
self.__gateway.connectors_configs = self.__old_connectors_configs
|
||||
for connector_name in self.__gateway.available_connectors:
|
||||
self.__gateway.available_connectors[connector_name].close()
|
||||
self.__gateway._load_connectors(self.__old_general_configuration_file)
|
||||
self.__gateway._connect_with_connectors()
|
||||
log.exception(e)
|
||||
return False
|
||||
|
||||
@@ -20,7 +20,7 @@ from yaml import safe_dump, safe_load
|
||||
from simplejson import load, loads, dumps
|
||||
from os import listdir, path
|
||||
from sys import getsizeof
|
||||
from threading import Thread
|
||||
from threading import Thread, RLock
|
||||
from random import choice
|
||||
from string import ascii_lowercase
|
||||
from queue import Queue
|
||||
@@ -37,6 +37,7 @@ main_handler = logging.handlers.MemoryHandler(-1)
|
||||
|
||||
class TBGatewayService:
|
||||
def __init__(self, config_file=None):
|
||||
self.__lock = RLock()
|
||||
if config_file is None:
|
||||
config_file = path.dirname(path.dirname(path.abspath(__file__))) + '/config/tb_gateway.yaml'.replace('/', path.sep)
|
||||
with open(config_file) as general_config:
|
||||
@@ -87,9 +88,9 @@ class TBGatewayService:
|
||||
self.__remote_configurator.send_current_configuration()
|
||||
self.__load_persistent_devices()
|
||||
self.__published_events = Queue(0)
|
||||
self.__send_thread = Thread(target=self.__read_data_from_storage, daemon=True,
|
||||
name="Send data to Thingsboard Thread")
|
||||
self.__send_thread.start()
|
||||
self._send_thread = Thread(target=self.__read_data_from_storage, daemon=True,
|
||||
name="Send data to Thingsboard Thread")
|
||||
self._send_thread.start()
|
||||
|
||||
try:
|
||||
gateway_statistic_send = 0
|
||||
@@ -118,7 +119,8 @@ class TBGatewayService:
|
||||
|
||||
if cur_time - gateway_statistic_send > 5000.0 and self.tb_client.is_connected():
|
||||
summary_messages = self.__form_statistics()
|
||||
self.tb_client.client.send_telemetry(summary_messages)
|
||||
with self.__lock:
|
||||
self.tb_client.client.send_telemetry(summary_messages)
|
||||
gateway_statistic_send = time.time()*1000
|
||||
# self.__check_shared_attributes()
|
||||
except KeyboardInterrupt:
|
||||
@@ -154,10 +156,10 @@ class TBGatewayService:
|
||||
if new_configuration is not None and self.__remote_configurator is not None:
|
||||
try:
|
||||
confirmed = self.__remote_configurator.process_configuration(new_configuration)
|
||||
if confirmed:
|
||||
self.__send_thread = Thread(target=self.__read_data_from_storage, daemon=True,
|
||||
name="Send data to Thingsboard Thread")
|
||||
self.__send_thread.start()
|
||||
# if confirmed:
|
||||
# self._send_thread = Thread(target=self.__read_data_from_storage, daemon=True,
|
||||
# name="Send data to Thingsboard Thread")
|
||||
# self._send_thread.start()
|
||||
self.__remote_configurator.send_current_configuration()
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
@@ -267,7 +269,8 @@ class TBGatewayService:
|
||||
try:
|
||||
if self.tb_client.is_connected():
|
||||
size = getsizeof(devices_data_in_event_pack)
|
||||
events = self._event_storage.get_event_pack()
|
||||
with self.__lock:
|
||||
events = self._event_storage.get_event_pack()
|
||||
if events:
|
||||
for event in events:
|
||||
try:
|
||||
@@ -330,24 +333,25 @@ class TBGatewayService:
|
||||
|
||||
def __send_data(self, devices_data_in_event_pack):
|
||||
try:
|
||||
for device in devices_data_in_event_pack:
|
||||
if devices_data_in_event_pack[device].get("attributes"):
|
||||
if device == self.name:
|
||||
self.__published_events.put(self.tb_client.client.send_attributes(devices_data_in_event_pack[device]["attributes"]))
|
||||
else:
|
||||
self.__published_events.put(self.tb_client.client.gw_send_attributes(device,
|
||||
devices_data_in_event_pack[
|
||||
device][
|
||||
"attributes"]))
|
||||
if devices_data_in_event_pack[device].get("telemetry"):
|
||||
if device == self.name:
|
||||
self.__published_events.put(self.tb_client.client.send_telemetry(devices_data_in_event_pack[device]["telemetry"]))
|
||||
else:
|
||||
self.__published_events.put(self.tb_client.client.gw_send_telemetry(device,
|
||||
devices_data_in_event_pack[
|
||||
device][
|
||||
"telemetry"]))
|
||||
devices_data_in_event_pack[device] = {"telemetry": [], "attributes": {}}
|
||||
with self.__lock:
|
||||
for device in devices_data_in_event_pack:
|
||||
if devices_data_in_event_pack[device].get("attributes"):
|
||||
if device == self.name:
|
||||
self.__published_events.put(self.tb_client.client.send_attributes(devices_data_in_event_pack[device]["attributes"]))
|
||||
else:
|
||||
self.__published_events.put(self.tb_client.client.gw_send_attributes(device,
|
||||
devices_data_in_event_pack[
|
||||
device][
|
||||
"attributes"]))
|
||||
if devices_data_in_event_pack[device].get("telemetry"):
|
||||
if device == self.name:
|
||||
self.__published_events.put(self.tb_client.client.send_telemetry(devices_data_in_event_pack[device]["telemetry"]))
|
||||
else:
|
||||
self.__published_events.put(self.tb_client.client.gw_send_telemetry(device,
|
||||
devices_data_in_event_pack[
|
||||
device][
|
||||
"telemetry"]))
|
||||
devices_data_in_event_pack[device] = {"telemetry": [], "attributes": {}}
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
|
||||
@@ -373,8 +377,9 @@ class TBGatewayService:
|
||||
if self.connectors_configs.get(module):
|
||||
log.debug("Connector \"%s\" for RPC request \"%s\" found", module, content["method"])
|
||||
for connector_name in self.available_connectors:
|
||||
log.debug("Sending command RPC %s to connector %s", content["method"], connector_name)
|
||||
result = self.available_connectors[connector_name].server_side_rpc_handler(content)
|
||||
if self.available_connectors[connector_name]._connector_type == module:
|
||||
log.debug("Sending command RPC %s to connector %s", content["method"], connector_name)
|
||||
result = self.available_connectors[connector_name].server_side_rpc_handler(content)
|
||||
else:
|
||||
log.error("Connector \"%s\" not found", module)
|
||||
result = {"error": "%s - connector not found in available connectors." % module, "code": 404}
|
||||
@@ -402,11 +407,11 @@ class TBGatewayService:
|
||||
if success_sent:
|
||||
rpc_response["success"] = True
|
||||
if device is not None and success_sent is not None:
|
||||
self.tb_client.client.gw_send_rpc_reply(device, req_id, rpc_response)
|
||||
self.tb_client.client.gw_send_rpc_reply(device, req_id, dumps(rpc_response))
|
||||
elif device is not None and req_id is not None and content is not None:
|
||||
self.tb_client.client.gw_send_rpc_reply(device, req_id, content)
|
||||
elif device is None and success_sent is not None:
|
||||
self.tb_client.client.send_rpc_reply(req_id, rpc_response, quality_of_service=1)
|
||||
self.tb_client.client.send_rpc_reply(req_id, dumps(rpc_response), quality_of_service=1)
|
||||
elif device is None and content is not None:
|
||||
self.tb_client.client.send_rpc_reply(req_id, content, quality_of_service=1)
|
||||
except Exception as e:
|
||||
@@ -441,7 +446,6 @@ class TBGatewayService:
|
||||
self.available_connectors[connector].statistics['MessagesReceived']
|
||||
telemetry[(connector_camel_case + ' EventsSent').replace(' ', '')] = \
|
||||
self.available_connectors[connector].statistics['MessagesSent']
|
||||
self.tb_client.client.send_telemetry(telemetry)
|
||||
summary_messages['eventsProduced'] += telemetry[
|
||||
str(connector_camel_case + ' EventsProduced').replace(' ', '')]
|
||||
summary_messages['eventsSent'] += telemetry[
|
||||
|
||||
@@ -24,6 +24,7 @@ GATEWAY_ATTRIBUTES_REQUEST_TOPIC = "v1/gateway/attributes/request"
|
||||
GATEWAY_ATTRIBUTES_RESPONSE_TOPIC = "v1/gateway/attributes/response"
|
||||
GATEWAY_MAIN_TOPIC = "v1/gateway/"
|
||||
GATEWAY_RPC_TOPIC = "v1/gateway/rpc"
|
||||
GATEWAY_RPC_RESPONSE_TOPIC = "v1/gateway/rpc/response"
|
||||
|
||||
log = logging.getLogger("tb_connection")
|
||||
log.setLevel(logging.DEBUG)
|
||||
@@ -43,6 +44,7 @@ class TBGatewayMqttClient(TBDeviceMqttClient):
|
||||
self._client.on_connect = self._on_connect
|
||||
self._client.on_message = self._on_message
|
||||
self._client.on_subscribe = self._on_subscribe
|
||||
self._client._on_unsubscribe = self._on_unsubscribe
|
||||
self._gw_subscriptions = {}
|
||||
self.gateway = gateway
|
||||
|
||||
@@ -52,6 +54,7 @@ class TBGatewayMqttClient(TBDeviceMqttClient):
|
||||
self._gw_subscriptions[int(self._client.subscribe(GATEWAY_ATTRIBUTES_TOPIC, qos=1)[1])] = GATEWAY_ATTRIBUTES_TOPIC
|
||||
self._gw_subscriptions[int(self._client.subscribe(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC + "/+")[1])] = GATEWAY_ATTRIBUTES_RESPONSE_TOPIC
|
||||
self._gw_subscriptions[int(self._client.subscribe(GATEWAY_RPC_TOPIC + "/+")[1])] = GATEWAY_RPC_TOPIC
|
||||
self._gw_subscriptions[int(self._client.subscribe(GATEWAY_RPC_RESPONSE_TOPIC + "/+")[1])] = GATEWAY_RPC_RESPONSE_TOPIC
|
||||
|
||||
def _on_subscribe(self, client, userdata, mid, granted_qos):
|
||||
subscription = self._gw_subscriptions.get(mid)
|
||||
@@ -63,6 +66,9 @@ class TBGatewayMqttClient(TBDeviceMqttClient):
|
||||
log.debug("Service subscription to topic %s - successfully completed.", subscription)
|
||||
del(self._gw_subscriptions[mid])
|
||||
|
||||
def _on_unsubscribe(self, *args):
|
||||
log.debug(args)
|
||||
|
||||
def get_subscriptions_in_progress(self):
|
||||
return True if self._gw_subscriptions else False
|
||||
|
||||
|
||||
Reference in New Issue
Block a user