D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
proc
/
self
/
root
/
opt
/
cloudlinux
/
venv
/
lib
/
python3.11
/
site-packages
/
clselect
/
Filename :
cluseroptselect.py
back
Copy
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT from __future__ import absolute_import from __future__ import print_function from __future__ import division import os import base64 import re import configparser from builtins import map from future.utils import iteritems from .cluserextselect import ClUserExtSelect from .clselectexcept import ClSelectExcept from clcommon import clcaptain from . import utils from xml.sax.saxutils import unescape from clcommon.utils import ExternalProgramFailed from clcommon.php_conf_reader import PhpConfReader, PhpConfBaseException,\ PhpConfReadError, PhpConfLoadException, PhpConfNoSuchAlternativeException class ClUserOptSelect(ClUserExtSelect): """ Class for processing user options """ OPTIONS_PATH = '/etc/cl.selector.conf.d/php.conf' if utils.in_cagefs() else '/etc/cl.selector/php.conf' def __init__(self, item='php', exclude_pid_list=None): ClUserExtSelect.__init__(self, item, exclude_pid_list) self._whitelist = {} self._user_excludes = set() self._html_escape_table = {" ": " ", '"': """, "'": "'", ">": ">", "<": "<", "&": "&"} self._html_unescape_table = {v: k for k, v in iteritems(self._html_escape_table)} def insert_options(self, user, version, optset, decoder, append=False, quiet=True, create=True): """ Inserts supplied options into current ones @param optset: string @param decoder: string @param """ options = {} if optset != '': options = self._process_option_string( optset=optset, decoder=decoder, expect_separator=True) options = self._remove_forbidden_options(options, version, quiet) return utils.apply_for_at_least_one_user( self.insert_json_options, self._clpwd.get_names(self._clpwd.get_uid(user)), ClSelectExcept.UnableToSaveData, version, options, append, create ) def insert_json_options(self, user, version, options, append=False, create=True): """ Inserts supplied options into current ones @param user: string @param version: string @param options: object """ self._check_user_in_cagefs(user) user_ini_path = self._compose_user_ini_path(user, version) (contents, extensions, extensions_data) = self._load_ini_contents(user_ini_path) contents = self._prepare_options_data(contents) if append: contents.update(options) else: contents = options options_set = self._compose_options_set(contents) if options_set: options_set = self._wrap_options(options_set) data = self._compose_output_data( options_set, extensions, extensions_data) # Convert 'no value' values of directives for idx in range(0, len(data)): line = data[idx] line_parts = line.split('=') if len(line_parts) != 2: continue if line_parts[1] == 'no value': # put empty string instead 'no value' to directive value data[idx] = line_parts[0] + '=' self._write_to_file( user, '\n'.join(data).rstrip()+'\n', user_ini_path, create) self._reload_processes(user) self._backup_settings(user, version, options_set, create) def bulk_insert_options(self, user, version, options, append=False, create=True): """ Handles multiple users with same uids """ return utils.apply_for_at_least_one_user( self.insert_json_options, self._clpwd.get_names(self._clpwd.get_uid(user)), ClSelectExcept.UnableToSaveData, version, options, append, create ) def delete_options(self, user, version, optset, decoder, quiet=True): """ Deletes supplied options from current ones """ return utils.apply_for_at_least_one_user( self._delete_user, self._clpwd.get_names(self._clpwd.get_uid(user)), ClSelectExcept.UnableToSaveData, optset, decoder, version ) def _delete_user(self, user, optset, decoder, version): options = self._process_option_string( optset=optset, decoder=decoder, expect_separator=False) self._check_user_in_cagefs(user) user_ini_path = self._compose_user_ini_path(user, version) (contents, extensions, extensions_data) = self._load_ini_contents(user_ini_path) contents = self._prepare_options_data(contents) for opt in options.keys(): contents.pop(opt, None) options_set = self._compose_options_set(contents) options_set = self._wrap_options(options_set) data = self._compose_output_data( options_set, extensions, extensions_data) self._write_to_file( user, '\n'.join(data).rstrip()+'\n', user_ini_path) self._reload_processes(user) self._backup_settings(user, version, options_set) def get_options(self, user, version=None): """ Returns options summary for a user @param user: string @param version: string return: dict """ if not version: version = self.get_version(user)[0] if version == 'native': raise ClSelectExcept.UnableToGetExtensions(version) self._get_ini_defaults(version) self._get_user_ini(user, version) return self._get_whitelist(version) def reset_options(self, users=None, versions=None): """ Deletes all custom options settings @param users: list @param versions: list """ all_users = self.list_all_users() alternatives = self.get_all_alternatives_data() for version in alternatives.keys(): if versions and version not in versions: continue for user in all_users: if users and user not in users: continue try: self.insert_options(user=user, version=version, optset='', decoder='plain', append=False, quiet=True, create=False) except ClSelectExcept.NotCageFSUser: continue def _prepare_options_data(self, contents): options = {} for item in contents: if item.strip() == "": continue if item.startswith(';>===') or item.startswith(';<==='): continue key, value = list(map((lambda x:x.strip()), item.split('=', 1))) if value == '': value = 'no value' options.update({key: value}) return options def _get_whitelist(self, version): """ Returns whitelist data """ if not self._whitelist: self._load_whitelist(version) return self._whitelist def _load_whitelist(self, version): """ Parses php config file (not php.ini!) and updates structure """ # Get short_php_version_to_full map alternatives = self.get_all_alternatives_data() self._check_alternative(version, alternatives) if '.' not in version: raise ClSelectExcept.UnableToGetExtensions(version) # Short to full PHP version map. Example: {'4.4', '4.4.9'} php_versions = dict() for short_ver, ver_data in iteritems(alternatives): php_versions[short_ver] = ver_data['version'] try: # Read config conf_reader = PhpConfReader(self.OPTIONS_PATH) php_conf_dict = conf_reader.get_config_for_selectorctl(version, php_versions) self._whitelist.update(php_conf_dict) except PhpConfNoSuchAlternativeException as e: raise ClSelectExcept.UnableToGetExtensions(e.php_version) except (PhpConfReadError, PhpConfLoadException, PhpConfBaseException) as e: raise ClSelectExcept.UnableToLoadData(self.OPTIONS_PATH, str(e)) def _handle_option_item(option_item, expect_separator=True): """ Splits options data into key-value pair and returns it @param option_item: string @param expect_separator: bool @return: dict """ if ':' in option_item: option_name, option_value = option_item.split(':', 1) else: if not expect_separator: option_name, option_value = option_item, '' else: raise ClSelectExcept.WrongData( "Colon as a separator expected (%s)!" % (option_item,)) return {option_name: option_value} _handle_option_item = staticmethod(_handle_option_item) def _decoder(data, decoder='plain'): """ Decodes option item @param data: string @param decoder: string @return: string """ dispatcher = { 'plain': (lambda x: x), 'base64': (lambda x: base64.b64decode(x).decode())} try: return dispatcher[decoder](data) except KeyError: return dispatcher['plain'](data) _decoder = staticmethod(_decoder) def _process_option_string(cls, optset, decoder='plain', expect_separator=True): """ Wrapper around options parsing routines @param optset: string @param decoder: callback name @expect_separator: bool @return: dict """ options = {} if optset: for option_item in optset.split(','): option_item = cls._decoder(option_item, decoder) options.update( cls._handle_option_item( option_item, expect_separator)) return options _process_option_string = classmethod(_process_option_string) def _remove_forbidden_options(self, options, version, quiet=True): """ Check if all options to process are present in white list and removes forbidden ones or raise an exception @param options: dict @param quiet: bool @return: dict """ whitelist = self._get_whitelist(version) if not set(options.keys()).issubset(set(whitelist.keys())): white_list_options = {} for opt_name, opt_value in iteritems(options): if opt_name not in whitelist: if quiet: continue else: raise ClSelectExcept.UnableToProcessOption(opt_name) white_list_options[opt_name] = opt_value options = white_list_options return options def _compose_options_set(options): """ Construct option item from key and value pair @param options: dict return: list """ options_set = [] for opt_name, opt_value in iteritems(options): options_set.append("%s=%s" % (opt_name, opt_value)) return options_set _compose_options_set = staticmethod(_compose_options_set) def _wrap_options(self, contents): """ Adds identifying string before and after dataset @param contents: list """ data = [';>=== Start of PHP Selector Custom Options ==='] data.extend(contents) data.append(';<=== End of PHP Selector Custom Options =====') return data def _compose_output_data(contents, extensions, extensions_data): """ Construct output @param contents: list @param extensions: list @param extensions_data: dict return: list """ data = [] for item in extensions: data.extend(extensions_data[item]) # Add two spacelines between each extension data.extend(["", ""]) data.extend(contents) return data _compose_output_data = staticmethod(_compose_output_data) def _check_version(self, test, version): """ Compares version in use and version required by PHP feature and return true if PHP feature satisfies """ alternatives = self.get_all_alternatives_data() self._check_alternative(version, alternatives) if '.' not in version: raise ClSelectExcept.UnableToGetExtensions(version) v_array = list(map((lambda x: int(x)), alternatives[version]['version'].split('.'))) # if test has 2 section, add third if len(test.split('.')) == 2: test += '.0' patt = re.compile(r'([<>=]{1,2})?(\d+\.\d+\.\d+)\.?') m = patt.match(test) if not m: raise ClSelectExcept.NoSuchAlternativeVersion(test) action = m.group(1) test = list(map((lambda x: int(x)), m.group(2).split('.'))) version_int = v_array[0] << 11 | v_array[1] << 7 | v_array[2] test_int = test[0] << 11 | test[1] << 7 | test[2] if action == r'<' and version_int < test_int: return True if action == r'<=' and version_int <= test_int: return True if action == r'>' and version_int > test_int: return True if action == r'>=' and version_int >= test_int: return True if not action or action == r'=': version_int = v_array[0] << 11 | v_array[1] << 7 test_int = test[0] << 11 | test[1] << 7 if version_int == test_int: return True return False def _get_php_error_tbl(self, php_ver): # http://php.net/manual/en/errorfunc.constants.php php_error_table = { 1: 'E_ERROR', 2: 'E_WARNING', 4: 'E_PARSE', 8: 'E_NOTICE', 16: 'E_CORE_ERROR', 32: 'E_CORE_WARNING', 64: 'E_COMPILE_ERROR', 128: 'E_COMPILE_WARNING', 256: 'E_USER_ERROR', 512: 'E_USER_WARNING', 1024: 'E_USER_NOTICE', 2048: 'E_STRICT' # E_STRICT since PHP 5 but not included in E_ALL until PHP 5.4.0 } if self._check_version('<5.2.0', php_ver): php_error_table[2047] = 'E_ALL' if self._check_version('>=5.2.0', php_ver): php_error_table[4096] = 'E_RECOVERABLE_ERROR' # E_RECOVERABLE_ERROR since PHP 5.2.0 if self._check_version('<5.3.0', php_ver): php_error_table[6143] = 'E_ALL' # E_ALL 6143 in PHP 5.2.x if self._check_version('>=5.3.0', php_ver): php_error_table[8192] = 'E_DEPRECATED' # E_DEPRECATED since PHP 5.3.0 php_error_table[16384] = 'E_USER_DEPRECATED' # E_USER_DEPRECATED since PHP 5.3.0 if self._check_version('<5.4.0', php_ver): php_error_table[30719] = 'E_ALL' # E_ALL 30719 in PHP 5.3.x if self._check_version('>=5.4.0', php_ver): php_error_table[32767] = 'E_ALL' # E_ALL 32767 in PHP >= 5.4.x return php_error_table def _php_string2error(self, str_, php_ver): """ Convert php error level 'error-reporting' from string to code http://php.net/manual/ru/function.error-reporting.php #>>> ClUserOptSelect(item='php')._php_string2error('E_ALL & ~E_NOTICE', '5.4') 32759 #>>> ClUserOptSelect(item='php')._php_string2error('E_USER_ERROR | E_NOTICE', '5.4') 264 #>>> ClUserOptSelect(item='php')._php_string2error('E_ERROR | E_WARNING | E_PARSE | E_COMPILE_ERROR', '5.4') 71 #>>> ClUserOptSelect(item='php')._php_string2error('E_ERROR | INCORRECT', '5.4') # incorrect variable 'INCORRECT' None #>>> ClUserOptSelect(item='php')._php_string2error('E_ERROR + E_WARNING', '5.4') # incorrect operator '+' None :param str: error_reporting variable :return None|int: error_reporting error code; return None if can't convert """ VALID_SYMBOLS = '0123456789|&~!^ ' # http://php.net/manual/en/errorfunc.constants.php php_error_table = self._get_php_error_tbl(php_ver) # replacing all constants to the numbers for code, name in iteritems(php_error_table): str_ = str_.replace(name, str(code)) # check if str_ has only valid symbols if set(str_).difference(set(VALID_SYMBOLS)): return None try: error_code = int(eval(str_)) except (SyntaxError, ValueError, TypeError): return None return error_code def _get_error_desc(self, value, version, range_): if not re.match(r'^-?\d{1,5}$', value): # error-reporting code must be from 32767 to -32767 return '' desc = [] value = int(value) for error_string in range_: if self._php_string2error(error_string, php_ver=version) == value: return error_string php_error_table = self._get_php_error_tbl(php_ver=version) for error in php_error_table: if (error & value) == error: desc.append(php_error_table[error]) return r' | '.join(desc) def _get_ini_defaults(self, version): """ Gets PHP defaults (calls php -i) @param version: string """ alternatives = self.get_all_alternatives_data() self._check_alternative(version, alternatives) whitelist = self._get_whitelist(version) if not os.path.isfile(alternatives[version]['data'][self._item]): raise ClSelectExcept.NoSuchAlternativeVersion(version) env_data = os.environ if ('SCRIPT_FILENAME' in env_data): script_path = '/usr/share/l.v.e-manager/utils/clinfo.php' if os.path.exists(script_path): env_data['SCRIPT_FILENAME'] = script_path cmd = [alternatives[version]['data'][self._item]] else: cmd = [alternatives[version]['data'][self._item], '-qi'] env_data.pop('SERVER_SOFTWARE', None) env_data['PHP_FCGI_MAX_REQUESTS'] = '1' env_data['PHP_FCGI_CHILDREN'] = '0' env_data['ACCEPT_ENCODING'] = '' env_data['HTTP_ACCEPT_ENCODING'] = '' tag_pattern = re.compile( r'<tr[^>]*?><td[^>]*>(.*?)</td><td[^>]*>(.*?)</td>(?:<td[^>]*>(.*?)</td>)?</tr>') strip_pattern = re.compile(r'<[^>]*?>') cmd[1:1] = ['-d', 'opcache.enable_cli=0', '-d', 'zlib.output_compression=Off', '-d', 'auto_append_file=none', '-d', 'extension=mbstring.so', '-d', 'auto_prepend_file=none', '-d', 'disable_functions=none'] output = utils.run_command(cmd, env_data) lines = tag_pattern.findall(output) # Directives which values are rewritten while execute CMD rewritten_directives = ['opcache.enable_cli', 'zlib.output_compression', 'auto_append_file', 'extension', 'auto_prepend_file', 'disable_functions'] configuration_file = None for l in lines: directive = re.sub(strip_pattern, '', l[0]) if 'Loaded Configuration File' in directive: s = re.sub(strip_pattern, '', (l[2] or l[1])) configuration_file = unescape(s, self._html_unescape_table).strip() if directive in whitelist: # convert html entries to string s = re.sub(strip_pattern, '', (l[2] or l[1])) value = unescape(s, self._html_unescape_table) if value == 'no value': if ('default' in whitelist[directive] and whitelist[directive]['default'] != ""): continue else: whitelist[directive]['default'] = "" else: if directive == 'error_reporting': error_range = whitelist[directive]['range'].split(',') value = self._get_error_desc(value, version, error_range) whitelist[directive]['default'] = value # Because we rewrite directives from list above when execute cmd # we need to use default value from php.ini if directive in rewritten_directives and configuration_file: whitelist[directive]['default'] = self._get_value_from_ini_file(configuration_file, directive) self._whitelist.update(whitelist) def _get_user_ini(self, user, version): """ Parses user ini file and updates values of existing data @param user: string """ self._get_whitelist(version) user_ini_path = self._compose_user_ini_path(user, version) (contents, extensions, extensions_data) = self._load_ini_contents(user_ini_path) contents = self._prepare_options_data(contents) for key in contents: try: self._whitelist[key]['value'] = contents[key] except KeyError: continue def _backup_settings(self, user, version, data, create=True): """ On saving user settings keep backup on user homedir @param user: string @param version: string @param data: list """ user_backup_path = os.path.join( self._clpwd.get_homedir(user), '.cl.selector') if not os.path.isdir(user_backup_path): try: clcaptain.mkdir(user_backup_path) except (OSError, ExternalProgramFailed) as e: raise ClSelectExcept.UnableToSaveData(user_backup_path, e) user_backup_file = os.path.join( user_backup_path, "alt_php%s.cfg" % version.replace('.', '')) # replace 'no value' in directive value to empty for idx in range(0, len(data)): line = data[idx] line_parts = line.split('=') if len(line_parts) == 2 and line_parts[1] == 'no value': data[idx] = line_parts[0] + '=' self._write_to_file( user, '\n'.join(data), user_backup_file, create) def backup_php_options(self, user): """ rewrite php backup file with php options @param user: string """ self._check_user_in_cagefs(user) alternatives = self.get_all_alternatives_data() for version in alternatives.keys(): user_ini_path = self._compose_user_ini_path(user, version) (contents, extensions, extensions_data) = self._load_ini_contents(user_ini_path) contents = self._prepare_options_data(contents) options_set = self._compose_options_set(contents) if options_set: options_set = self._wrap_options(options_set) self._backup_settings(user, version, options_set) def _get_value_from_ini_file(self, configuration_file, directive): """ get value from ini file Now used for getting default value for some php options, which we cannot get garanted :param configuration_file: ini file for reading :param directive: key name :return: value of key or '' """ config = configparser.ConfigParser(interpolation=None, strict=False) try: config.read(configuration_file) return config['PHP'].get(directive) except (KeyError, PermissionError): raise ClSelectExcept.FileProcessError(configuration_file)