D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
imunify360
/
venv
/
lib
/
python3.11
/
site-packages
/
defence360agent
/
plugins
/
Filename :
backup_info_sender.py
back
Copy
import asyncio import time from contextlib import suppress from datetime import timedelta from logging import getLogger from typing import Union from defence360agent.contracts.messages import MessageType from defence360agent.contracts.plugins import MessageSource from defence360agent.subsys.backup_systems import ( get_current_backend, get_last_backup_timestamp, ) from defence360agent.subsys.persistent_state import load_state, save_state from defence360agent.utils import Scope, recurring_check logger = getLogger(__name__) SEND_INTERVAL = int(timedelta(hours=24).total_seconds()) RECURRING_CHECK_INTERVAL = 5 class BackupInfoSender(MessageSource): """Send user backup statistics to CH periodically""" SCOPE = Scope.IM360 async def create_source(self, loop, sink): self._loop = loop self._sink = sink self._send_event = asyncio.Event() self._last_send_timestamp = self.load_last_send_timestamp() self._check_task = self._loop.create_task( self._recurring_check_data_to_send() ) self._send_stat_task = self._loop.create_task( self._recurring_send_stat() ) async def shutdown(self): for task in [self._check_task, self._send_stat_task]: task.cancel() with suppress(asyncio.CancelledError): await task self.save_last_send_timestamp() @staticmethod def is_valid_timestamp(timestamp: Union[int, float]) -> bool: return isinstance(timestamp, (int, float)) and timestamp > 0 def save_last_send_timestamp(self, ts: Union[int, float] = None): timestamp = self._last_send_timestamp if ts is None else ts if not self.is_valid_timestamp(timestamp): logger.warning("Invalid timestamp: %s", timestamp) return save_state("BackupInfoSender", {"last_send_timestamp": timestamp}) def load_last_send_timestamp(self): timestamp = load_state("BackupInfoSender").get("last_send_timestamp") if not self.is_valid_timestamp(timestamp): logger.warning("Invalid timestamp loaded, resetting to 0") timestamp = 0 return timestamp @recurring_check(RECURRING_CHECK_INTERVAL) async def _recurring_check_data_to_send(self): if time.time() - self._last_send_timestamp >= SEND_INTERVAL: self._send_event.set() @recurring_check(0) async def _recurring_send_stat(self): await self._send_event.wait() try: await self._send_server_config() except Exception as e: logger.exception("Failed to collect backup info: %s", e) finally: # Ensure backup info is not sent too frequently, even after an error self._last_send_timestamp = time.time() self._send_event.clear() async def _send_server_config(self): confg_msg = MessageType.BackupInfo( backup_provider_type=get_current_backend(), last_backup_timestamp=await get_last_backup_timestamp(), ) await self._sink.process_message(confg_msg)