D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
imunify360
/
venv
/
lib64
/
python3.11
/
site-packages
/
defence360agent
/
subsys
/
features
/
Filename :
kernel_care.py
back
Copy
import logging import os import shutil import json from defence360agent.contracts.config import Core from defence360agent.contracts.license import LicenseCLN from defence360agent.utils import run_cmd_and_log, OsReleaseInfo, os_version from defence360agent.subsys.features.abstract_feature import ( AbstractFeature, FeatureError, FeatureStatus, ) from defence360agent import utils from defence360agent.rpc_tools import exceptions logger = logging.getLogger(__name__) class KernelCare(AbstractFeature): KC_PROPERTIES = "/var/imunify360/plesk-previous-kernelcare-stats.json" KC_SCRIPT_URL = ( "https://repo.cloudlinux.com/kernelcare/kernelcare_install.sh" ) LOG_DIR = "/var/log/%s" % Core.PRODUCT NAME = "KernelCare" BIN_PATH = "/usr/bin/kcarectl" INSTALL_LOG_FILE_MASK = "%s/install-kernelcare.log.*" % LOG_DIR REMOVE_LOG_FILE_MASK = "%s/remove-kernelcare.log.*" % LOG_DIR INSTALL_CMD = "curl -s %s | bash" % KC_SCRIPT_URL REMOVE_CMD_REDHAT = "yum remove -y kernelcare" REMOVE_CMD_DEBIAN = "apt-get -y remove kernelcare" _CMD_LIST = [INSTALL_CMD, REMOVE_CMD_REDHAT, REMOVE_CMD_DEBIAN] STATUS_MESSAGE = { 0: "Host is updated to the latest patch level", 1: "There are no applied patches", 2: "There are new not applied patches", 3: "Kernel is unsupported", } async def _check_installed_impl(self) -> bool: return os.path.exists(self.BIN_PATH) async def status(self): """ :raises FeatureError: if kernelcare returns unexpected error :return: str: feature's current status """ status = await super().status() is_feature_installed = ( status["items"]["status"] == FeatureStatus.INSTALLED ) if not is_feature_installed: return status ret, out, err = await self.get_output_kcarectl("--status") try: # EDF is obsolete since 6.1 status["items"]["edf_supported"] = False status["items"]["message"] = self.STATUS_MESSAGE[ret] except KeyError: raise FeatureError( "Unknown error occured while getting status from kcarectl. " f"stdout: [{out}], stderr: [{err}], return code: [{ret}]" ) return status @AbstractFeature.raise_if_shouldnt_install_now async def install(self): return await run_cmd_and_log( self.INSTALL_CMD, self.INSTALL_LOG_FILE_MASK, env=dict(os.environ, DEBIAN_FRONTEND="noninteractive"), ) @AbstractFeature.raise_if_shouldnt_remove_now async def remove(self): if OsReleaseInfo.id_like() & OsReleaseInfo.DEBIAN: command = self.REMOVE_CMD_DEBIAN else: command = self.REMOVE_CMD_REDHAT return await run_cmd_and_log(command, self.REMOVE_LOG_FILE_MASK) async def get_plugin_info(self): try: output = await self.run_kcarectl("--plugin-info", "--json") except FileNotFoundError: raise exceptions.RpcError("kcarectl not found") except utils.CheckRunError as e: if not (e.returncode == 2 and b"--json" in e.stderr): raise # reraise as is else: # unrecognized arguments: --json # use RpcError, to get an error raise exceptions.RpcError( "Your kcarectl version doesn't support --json option." " Please, update to kernelcare-2.15-2 or newer." ) try: results = json.loads(output.decode().partition("--START--")[-1]) except ValueError: raise exceptions.RpcError( "Can't decode kcarectl output as json." " Try updating to the latest kernelcare version." ) return results async def run_kcarectl(self, *options): return await utils.check_run((self.BIN_PATH,) + options) async def get_output_kcarectl(self, *options): ret, out, err = await utils.run([self.BIN_PATH, *options]) return ret, out.decode(), err.decode()