mirror of
https://github.com/thingsboard/thingsboard-gateway
synced 2025-10-26 22:31:42 +08:00
Added functionality for checking devices idle time (#723)
* Added functionality for checking devices idle time * Fixed creating timestamps * Fixed props naming and added lock for saving persistent devices method * Changed props naming in tb_gateway * Fixed dict keys * Changed default value in tb_gateway * Changed default value in tb_gateway
This commit is contained in:
@@ -7,6 +7,10 @@ thingsboard:
|
||||
minPackSendDelayMS: 0
|
||||
checkConnectorsConfigurationInSeconds: 60
|
||||
handleDeviceRenaming: true
|
||||
checkingDeviceActivity:
|
||||
checkDeviceInactivity: false
|
||||
inactivityTimeoutSeconds: 120
|
||||
inactivityCheckPeriodSeconds: 10
|
||||
security:
|
||||
accessToken: PUT_YOUR_GW_ACCESS_TOKEN_HERE
|
||||
qos: 1
|
||||
|
||||
@@ -167,7 +167,6 @@ class TBGatewayService:
|
||||
self.connectors_configs = {}
|
||||
self.__remote_configurator = None
|
||||
self.__request_config_after_connect = False
|
||||
self.__load_persistent_devices()
|
||||
self.__init_remote_configuration()
|
||||
self.__grpc_config = self.__config.get('grpc')
|
||||
self.__grpc_manager = None
|
||||
@@ -179,6 +178,14 @@ class TBGatewayService:
|
||||
self._load_connectors()
|
||||
self._connect_with_connectors()
|
||||
self.__load_persistent_devices()
|
||||
|
||||
self.__devices_idle_checker = self.__config['thingsboard'].get('checkingDeviceActivity', {})
|
||||
self.__check_devices_idle = self.__devices_idle_checker.get('checkDeviceInactivity', False)
|
||||
if self.__check_devices_idle:
|
||||
thread = Thread(name='Checking devices idle time', target=self.__check_devices_idle_time, daemon=True)
|
||||
thread.start()
|
||||
log.info('Start checking devices idle time')
|
||||
|
||||
self._published_events = SimpleQueue()
|
||||
self._send_thread = Thread(target=self.__read_data_from_storage, daemon=True,
|
||||
name="Send data to Thingsboard Thread")
|
||||
@@ -548,6 +555,9 @@ class TBGatewayService:
|
||||
data["deviceName"] = "currentThingsBoardGateway"
|
||||
data['deviceType'] = "gateway"
|
||||
|
||||
if self.__check_devices_idle:
|
||||
self.__connected_devices[data['deviceName']]['last_receiving_data'] = time()
|
||||
|
||||
data = self.__convert_telemetry_to_ts(data)
|
||||
|
||||
max_data_size = self.__config["thingsboard"].get("maxPayloadSizeBytes", 1024)
|
||||
@@ -712,7 +722,9 @@ class TBGatewayService:
|
||||
def __send_data(self, devices_data_in_event_pack):
|
||||
try:
|
||||
for device in devices_data_in_event_pack:
|
||||
final_device_name = device if self.__renamed_devices.get(device) is None else self.__renamed_devices[device]
|
||||
final_device_name = device if self.__renamed_devices.get(device) is None else self.__renamed_devices[
|
||||
device]
|
||||
|
||||
if devices_data_in_event_pack[device].get("attributes"):
|
||||
if device == self.name or device == "currentThingsBoardGateway":
|
||||
self._published_events.put(
|
||||
@@ -948,9 +960,9 @@ class TBGatewayService:
|
||||
return Status.FAILURE
|
||||
|
||||
def del_device(self, device_name):
|
||||
del self.__connected_devices[device_name]
|
||||
del self.__saved_devices[device_name]
|
||||
self.tb_client.client.gw_disconnect_device(device_name)
|
||||
self.__connected_devices.pop(device_name)
|
||||
self.__saved_devices.pop(device_name)
|
||||
self.__save_persistent_devices()
|
||||
|
||||
def get_devices(self):
|
||||
@@ -1023,18 +1035,49 @@ class TBGatewayService:
|
||||
self.__connected_devices = {} if self.__connected_devices is None else self.__connected_devices
|
||||
|
||||
def __save_persistent_devices(self):
|
||||
data_to_save = {}
|
||||
for device in self.__connected_devices:
|
||||
if self.__connected_devices[device]["connector"] is not None:
|
||||
data_to_save[device] = [self.__connected_devices[device]["connector"].get_name(), self.__connected_devices[device]["device_type"]]
|
||||
if device in self.__renamed_devices:
|
||||
data_to_save[device].append(self.__renamed_devices.get(device))
|
||||
with open(self._config_dir + CONNECTED_DEVICES_FILENAME, 'w') as config_file:
|
||||
try:
|
||||
config_file.write(dumps(data_to_save, indent=2, sort_keys=True))
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
log.debug("Saved connected devices.")
|
||||
with self.__lock:
|
||||
data_to_save = {}
|
||||
for device in self.__connected_devices:
|
||||
if self.__connected_devices[device]["connector"] is not None:
|
||||
data_to_save[device] = [self.__connected_devices[device]["connector"].get_name(),
|
||||
self.__connected_devices[device]["device_type"]]
|
||||
|
||||
if device in self.__renamed_devices:
|
||||
data_to_save[device].append(self.__renamed_devices.get(device))
|
||||
|
||||
with open(self._config_dir + CONNECTED_DEVICES_FILENAME, 'w') as config_file:
|
||||
try:
|
||||
config_file.write(dumps(data_to_save, indent=2, sort_keys=True))
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
|
||||
log.debug("Saved connected devices.")
|
||||
|
||||
def __check_devices_idle_time(self):
|
||||
check_devices_idle_every_sec = self.__devices_idle_checker.get('inactivityCheckPeriodSeconds', 1)
|
||||
disconnect_device_after_idle = self.__devices_idle_checker.get('inactivityTimeoutSeconds', 50)
|
||||
|
||||
while True:
|
||||
for_deleting = []
|
||||
for (device_name, device) in self.__connected_devices.items():
|
||||
ts = time()
|
||||
|
||||
if not device.get('last_receiving_data'):
|
||||
device['last_receiving_data'] = ts
|
||||
|
||||
last_receiving_data = device['last_receiving_data']
|
||||
|
||||
if ts - last_receiving_data >= disconnect_device_after_idle:
|
||||
for_deleting.append(device_name)
|
||||
|
||||
for device_name in for_deleting:
|
||||
self.del_device(device_name)
|
||||
|
||||
log.debug('Delete device %s for the reason of idle time > %s.',
|
||||
device_name,
|
||||
disconnect_device_after_idle)
|
||||
|
||||
sleep(check_devices_idle_every_sec)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
@@ -149,7 +149,8 @@ class TBGatewayMqttClient(TBDeviceMqttClient):
|
||||
def gw_disconnect_device(self, device_name):
|
||||
info = self._client.publish(topic=GATEWAY_MAIN_TOPIC + "disconnect", payload=dumps({"device": device_name}),
|
||||
qos=self.quality_of_service)
|
||||
self.__connected_devices.remove(device_name)
|
||||
if device_name in self.__connected_devices:
|
||||
self.__connected_devices.remove(device_name)
|
||||
# if self.gateway:
|
||||
# self.gateway.on_device_disconnected(self, device_name)
|
||||
log.debug("Disconnected device %s", device_name)
|
||||
|
||||
Reference in New Issue
Block a user