1
0
mirror of https://github.com/thingsboard/thingsboard-gateway synced 2025-10-26 22:31:42 +08:00

Improvements for performance.

This commit is contained in:
zbeacon
2019-12-12 12:45:12 +02:00
parent dada6430bb
commit d60b0329a8
6 changed files with 63 additions and 46 deletions

View File

@@ -99,5 +99,5 @@ class TBClient(threading.Thread):
time.sleep(self.__min_reconnect_delay)
time.sleep(.1)
else:
time.sleep(.1)
time.sleep(1)

View File

@@ -19,6 +19,7 @@ import yaml
from simplejson import load, loads, dumps
from os import listdir, path
from threading import Thread
from queue import Queue
from thingsboard_gateway.gateway.tb_client import TBClient
from thingsboard_gateway.gateway.tb_logger import TBLoggerHandler
from thingsboard_gateway.tb_utility.tb_utility import TBUtility
@@ -67,7 +68,7 @@ class TBGatewayService:
self.__load_connectors(config)
self.__connect_with_connectors()
self.__load_persistent_devices()
self.__published_events = []
self.__published_events = Queue(0)
self.__send_thread = Thread(target=self.__read_data_from_storage, daemon=True, name="Send data to Thingsboard Thread")
self.__event_storage = self.__event_storage_types[config["storage"]["type"]](config["storage"])
self.tb_client.connect()
@@ -80,12 +81,17 @@ class TBGatewayService:
try:
gateway_statistic_send = 0
while True:
for rpc_in_progress in self.__rpc_requests_in_progress:
if time.time() >= self.__rpc_requests_in_progress[rpc_in_progress][1]:
self.__rpc_requests_in_progress[rpc_in_progress][2](rpc_in_progress)
self.cancel_rpc_request(rpc_in_progress)
cur_time = time.time()
if self.__rpc_requests_in_progress:
for rpc_in_progress in self.__rpc_requests_in_progress:
if cur_time >= self.__rpc_requests_in_progress[rpc_in_progress][1]:
self.__rpc_requests_in_progress[rpc_in_progress][2](rpc_in_progress)
self.cancel_rpc_request(rpc_in_progress)
time.sleep(0.1)
else:
time.sleep(1)
if time.time()*1000 - gateway_statistic_send > 60000.0:
if cur_time*1000 - gateway_statistic_send > 60000.0:
summary_messages = {"SummaryReceived": 0, "SummarySent": 0}
telemetry = {}
for connector in self.available_connectors:
@@ -164,6 +170,12 @@ class TBGatewayService:
self.__connector_incoming_messages[connector_name] = 0
else:
self.__connector_incoming_messages[connector_name] += 1
telemetry = {}
for item in data["telemetry"]:
telemetry = {**telemetry, **item}
data["telemetry"] = {"ts": int(time.time()*1000), "values": telemetry}
json_data = dumps(data)
save_result = self.__event_storage.put(json_data)
if not save_result:
@@ -176,44 +188,46 @@ class TBGatewayService:
try:
events = self.__event_storage.get_event_pack()
if events:
devices_data_in_event_pack = {}
for event in events:
current_event = loads(event)
try:
current_event = loads(event)
except Exception as e:
log.exception(e)
continue
if not devices_data_in_event_pack.get(current_event["deviceName"]):
devices_data_in_event_pack[current_event["deviceName"]] = {"telemetry": [],
"attributes": {}}
if current_event.get("telemetry"):
telemetry = {}
if type(current_event["telemetry"]) == list:
for item in current_event["telemetry"]:
telemetry.update(item.items())
devices_data_in_event_pack[current_event["deviceName"]]["telemetry"].append(item)
else:
telemetry = current_event["telemetry"]
filtered_telemetry = {}
for telemetry_key in telemetry:
if telemetry[telemetry_key] is not None:
filtered_telemetry[telemetry_key] = telemetry[telemetry_key]
data_to_send = {"ts": int(time.time()*1000), "values": filtered_telemetry}
if filtered_telemetry != {}:
self.__published_events.append(self.tb_client.client.gw_send_telemetry(current_event["deviceName"],
data_to_send))
if current_event.get("attributes"):
attributes = {}
if type(current_event["attributes"]) == list:
for item in current_event["attributes"]:
attributes.update(item.items())
else:
attributes = current_event["attributes"]
filtered_attributes = {}
for attribute_key in attributes:
if attributes[attribute_key] is not None:
filtered_attributes[attribute_key] = attributes[attribute_key]
if filtered_attributes:
self.__published_events.append(self.tb_client.client.gw_send_attributes(current_event["deviceName"],
filtered_attributes))
devices_data_in_event_pack[current_event["deviceName"]]["telemetry"].append(current_event["telemetry"])
if current_event.get("attributes"):
if type(current_event["attributes"]) == list:
for item in current_event["attributes"]:
devices_data_in_event_pack[current_event["deviceName"]]["attributes"].update(
item.items())
else:
devices_data_in_event_pack[current_event["deviceName"]]["attributes"].update(
current_event["attributes"].items())
for device in devices_data_in_event_pack:
self.__published_events.put(self.tb_client.client.gw_send_attributes(device,
devices_data_in_event_pack[device]["attributes"]))
self.__published_events.put(self.tb_client.client.gw_send_telemetry(device,
devices_data_in_event_pack[device]["telemetry"]))
success = True
for event in self.__published_events:
while not self.__published_events.empty():
event = self.__published_events.get()
success = event.get() == event.TB_ERR_SUCCESS
if success:
self.__event_storage.event_pack_processing_done()
except Exception as e:
log.exception(e)
time.sleep(1)
def __rpc_request_handler(self, _, content):
try: