mirror of
https://github.com/thingsboard/thingsboard-gateway
synced 2025-10-26 22:31:42 +08:00
Improvements for performance
This commit is contained in:
@@ -49,7 +49,8 @@ class TBClient(threading.Thread):
|
||||
self.client._client._on_log = self._on_log
|
||||
|
||||
def _on_log(self, *args):
|
||||
log.info(args)
|
||||
# log.debug(args)
|
||||
pass
|
||||
|
||||
def is_connected(self):
|
||||
return self.client.is_connected
|
||||
|
||||
@@ -67,6 +67,7 @@ class TBGatewayService:
|
||||
self.__load_connectors(config)
|
||||
self.__connect_with_connectors()
|
||||
self.__load_persistent_devices()
|
||||
self.__published_events = []
|
||||
self.__send_thread = Thread(target=self.__read_data_from_storage, daemon=True, name="Send data to Thingsboard Thread")
|
||||
self.__event_storage = self.__event_storage_types[config["storage"]["type"]](config["storage"])
|
||||
self.tb_client.connect()
|
||||
@@ -96,7 +97,6 @@ class TBGatewayService:
|
||||
summary_messages['SummarySent'] += telemetry[(connector+' MessagesSent').replace(' ', '')]
|
||||
self.tb_client.client.send_telemetry(summary_messages)
|
||||
gateway_statistic_send = int(time.time()*1000)
|
||||
time.sleep(.1)
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
for device in self.__connected_devices:
|
||||
@@ -166,9 +166,7 @@ class TBGatewayService:
|
||||
self.__connector_incoming_messages[connector_name] += 1
|
||||
json_data = dumps(data)
|
||||
save_result = self.__event_storage.put(json_data)
|
||||
if save_result:
|
||||
log.debug('Connector "%s" - Saved information - %s', connector_name, json_data)
|
||||
else:
|
||||
if not save_result:
|
||||
log.error('Data from device "%s" cannot be saved, connector name is %s.',
|
||||
data["deviceName"],
|
||||
connector_name)
|
||||
@@ -176,13 +174,12 @@ class TBGatewayService:
|
||||
def __read_data_from_storage(self):
|
||||
while True:
|
||||
try:
|
||||
self.__published_events = []
|
||||
events = self.__event_storage.get_event_pack()
|
||||
if events:
|
||||
for event in events:
|
||||
current_event = loads(event)
|
||||
time.sleep(.001)
|
||||
if current_event.get("telemetry"):
|
||||
# log.debug(current_event)
|
||||
telemetry = {}
|
||||
if type(current_event["telemetry"]) == list:
|
||||
for item in current_event["telemetry"]:
|
||||
@@ -194,16 +191,11 @@ class TBGatewayService:
|
||||
for telemetry_key in telemetry:
|
||||
if telemetry[telemetry_key] is not None:
|
||||
filtered_telemetry[telemetry_key] = telemetry[telemetry_key]
|
||||
# log.debug(telemetry)
|
||||
data_to_send = loads('{"ts": %f,"values": %s}' % (int(time.time()*1000), dumps(filtered_telemetry)))
|
||||
# data_to_send = loads('{"ts": %f,"values": {%s}}' % (int(time.time()*1000),
|
||||
# ','.join(dumps(param) for param in current_event["telemetry"])))
|
||||
data_to_send = {"ts": int(time.time()*1000), "values": filtered_telemetry}
|
||||
if filtered_telemetry != {}:
|
||||
self.__published_events.append(self.tb_client.client.gw_send_telemetry(current_event["deviceName"],
|
||||
data_to_send))
|
||||
time.sleep(.001)
|
||||
if current_event.get("attributes"):
|
||||
# log.debug(current_event)
|
||||
attributes = {}
|
||||
if type(current_event["attributes"]) == list:
|
||||
for item in current_event["attributes"]:
|
||||
@@ -215,24 +207,17 @@ class TBGatewayService:
|
||||
for attribute_key in attributes:
|
||||
if attributes[attribute_key] is not None:
|
||||
filtered_attributes[attribute_key] = attributes[attribute_key]
|
||||
# log.debug(attributes)
|
||||
data_to_send = loads('%s' % dumps(filtered_attributes))
|
||||
if filtered_attributes != {}:
|
||||
self.__published_events.append(self.tb_client.client.gw_send_attributes(current_event["deviceName"],
|
||||
data_to_send))
|
||||
time.sleep(.001)
|
||||
filtered_attributes))
|
||||
success = True
|
||||
for event in range(len(self.__published_events)):
|
||||
result = self.__published_events[event].get()
|
||||
success = result == self.__published_events[event].TB_ERR_SUCCESS
|
||||
if success:
|
||||
self.__event_storage.event_pack_processing_done()
|
||||
log.debug("Run event pack processing done")
|
||||
else:
|
||||
time.sleep(1)
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
time.sleep(10)
|
||||
|
||||
def __rpc_request_handler(self, _, content):
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user