D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
proc
/
thread-self
/
root
/
opt
/
cloudlinux
/
venv
/
lib
/
python3.11
/
site-packages
/
clwizard
/
modules
/
Filename :
cagefs.py
back
Copy
# coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENCE.TXT # import os from clcommon.const import Feature from clcommon.utils import ExternalProgramFailed from clconfig.cagefs_statistics_config import check_cagefs_initialized from cllimits.cagefs_lib import CageFs, CageFsException from clwizard.constants import MODULES_LOGS_DIR from clwizard.exceptions import InstallationFailedException from .base import WizardInstaller class CagefsInstaller(WizardInstaller): LOG_FILE = os.path.join(MODULES_LOGS_DIR, 'cagefs.log') _REQUIRED_CL_COMPONENT_SUPPORT = Feature.CAGEFS def __init__(self): super().__init__() self.cgfs = CageFs(logger=self.app_logger) def _enable_existing_users(self): """ Enable all users statuses and after it toggle to initial mode :return: """ initial_mode = self.cgfs.get_user_mode() out = self.cgfs.set_enabled_mode() self.app_logger.info("Cagefs users status was updated to enabled: %s", out) if initial_mode == 'Disable All': self.cgfs.toggle_user_mode() def _set_enabled_mode(self): mode = self.cgfs.get_user_mode() self.app_logger.info("Current mode is: %s", mode) if mode == 'Disable All': self.app_logger.info("Try to set mode to Enable all") self.cgfs.toggle_user_mode() self.app_logger.info("Mode was toggled to Enable all") def _install_cagefs_package(self): if not self._is_package_installed('cagefs'): try: self._install_package('cagefs') except ExternalProgramFailed as err: raise InstallationFailedException() from err else: self.app_logger.info("Skip package installation, it is already installed") def _initialize_cagefs(self): if not check_cagefs_initialized(): try: out = self.cgfs.initialize_cagefs() self.app_logger.info("Cagefs was initialized: %s", out) except CageFsException as err: self.app_logger.error("Cagefs initialization failed with error: %s", str(err)) raise InstallationFailedException() from err else: self.app_logger.info("Initializing was skipped, cagefs was already initialized") def _enable_cagefs(self): try: out = self.cgfs.enable_cagefs() self.app_logger.info("Cagefs was enabled, ready for configuration\n %s", out) except CageFsException as err: raise InstallationFailedException() from err def run_installation(self, options): self._install_cagefs_package() self._initialize_cagefs() self._enable_cagefs() try: if options['enable_for_existing_users']: self._enable_existing_users() if options['enable_for_new_users']: self._set_enabled_mode() except CageFsException as err: self.app_logger.error("Can`t configure options: %s", str(err)) raise InstallationFailedException() from err @classmethod def supported_options(cls): return {'enable_for_existing_users', 'enable_for_new_users'} def initial_status(self): try: enabled_for_new = self.cgfs.get_user_mode() == 'Enable All' except CageFsException: self.app_logger.warning( 'Unable to detect cagefs mode, assuming that it is "Disable All"' ) enabled_for_new = False return { # we always allow user to initialize cagefs # bool needed to replace None with False # None is given when cagefs is not installed 'already_configured': bool(check_cagefs_initialized()), 'options': {'enable_for_new_users': enabled_for_new}, }