D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
proc
/
thread-self
/
root
/
opt
/
cloudlinux
/
venv
/
lib
/
python3.11
/
site-packages
/
clselect
/
Filename :
clselectctl.py
back
Copy
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT from __future__ import absolute_import from __future__ import print_function from __future__ import division import os import pwd import re from collections import namedtuple import logging import traceback import cldetectlib as detect from typing import List # NOQA from future.utils import iteritems from . import clpassenger from clcommon.cpapi import cpusers, getCPName from clcommon.clpwd import drop_privileges, ClPwd from clcommon.cpapi.plugins.cpanel import is_no_php_binaries_on_cpanel from .clselectexcept import ClSelectExcept from .clselectnodejs.node_manager import NodeManager from .clselectpython.python_manager import PythonManager from .utils import get_abs_rel from typing import Optional, List # NOQA DISABLED_DIRECTORY_NAMES = ['public_html', 'rubyvenv', 'virtualenv', '.cl.selector', '.cagefs', 'www', 'nodevenv'] ApplicationSummary = namedtuple('ApplicationSummary', ('user', 'version', 'doc_root', 'interpreter', 'app_status')) InterpreterSummary = namedtuple('InterpreterSummary', ('version', 'version_full', 'enabled', 'installed')) def check_directory(directory): if ' ' in directory: raise ValueError('Directory should not contain spaces') if os.path.isabs(directory): raise ValueError('Directory should be relative to user\'s home') if directory[-1] == '/': raise ValueError('Directory should not contain a trailing slash') if directory in DISABLED_DIRECTORY_NAMES: raise ValueError('Directory "%s" not allowed' % directory) all_valid = re.match(r'[-\.\w\/]+$', directory) if all_valid is None: raise ValueError('Directory name contains invalid characters') def get_alias(alias): for c in ('#', '?', './'): if c in alias: raise ValueError('Alias is not valid') # root aliases if alias in ('.', ''): alias = '/' return alias.strip('/') def get_directory(prefix): return prefix.replace('_', '/').replace('//', '_') def get_prefix(directory): return directory.replace('_', '__').replace('/', '_') def get_user(user): if not user: current_euid = os.geteuid() user = pwd.getpwuid(current_euid).pw_name if user == 'root': raise ValueError('User parameter must be specified if current user is root') return user def _verify_application(interpreter, app_root, binary_path=None): """ Application is valid only if binary and app_root exists :param binary_path: path to binary in virtual environment :param app_root: path to root directory of application :return: result of checking :rtype: bool """ if interpreter in ('nodejs', 'python'): return os.path.isdir(app_root) return os.path.isfile(binary_path) and os.path.isdir(app_root) def server_applications_summary(interpreter): # type: (str) -> List[ApplicationSummary] """Find and return all apps for given interpreter on server""" domains_data = None applications = [] # LVEMAN-1408 euid = os.geteuid() for user in cpusers(): try: if detect.is_da(): # NB: python will import this only 1 time from clcommon.cpapi.plugins.directadmin import userdomains domains_list = userdomains(user, as_root=True), # called as root to save time on right's escalated call domains_data = list( filter( None, map( lambda x: x[0] if x else None, # take only domain from (domain, docroot) tuples domains_list, ) ) ) # drop permissions in order not to with drop_privileges(user): # LVEMAN-1408: Force check user existence after drop user rights pwd.getpwnam(user) applications.extend( _user_applications_short_summary(user, interpreter, domains_data)) except ClPwd.NoSuchUserException: continue except KeyError: # LVEMAN-1408: this may be thrown by pwd.getpwnam in clpassenger.py module. # It uses python pwd library instead ClPwd # Send debug message to sentry logger = logging.getLogger(__name__) logger.error("DEBUG: user %s present in panel but doesn't exist in system. Process euid=%d. Trace: %s at %s" % (user, euid, traceback.format_exc(), str(traceback.extract_stack()))) return applications def _user_applications_short_summary(user, interpreter, domains_docroots_data=None): """ Return generator with all applications for given user and interpreter. To increase performance, only a small part of the information about the applications is given. :param user: name of panel user :param interpreter: name of interpreter (python, ruby, etc) :param domains_docroots_data: total data about users domains :return: Generator[ApplicationSummary] """ userdomains_data = domains_docroots_data if interpreter in ('nodejs', 'python'): try: if interpreter == 'nodejs': from .clselectnodejs.apps_manager import ApplicationsManager else: from .clselectpython.apps_manager import ApplicationsManager config_data = ApplicationsManager().get_user_config_data(user) except ClSelectExcept.WrongData: return for app, data in iteritems(config_data): try: app_root, _ = get_abs_rel(user, app) except ClSelectExcept.WrongData: continue if _verify_application(interpreter, app_root): yield ApplicationSummary( user=user, version=data['%s_version' % interpreter], doc_root=data['domain'], interpreter=interpreter, app_status=data['app_status'] ) else: for dummy, data in iteritems(clpassenger.summary(user, userdomains_data=userdomains_data)): if data['interpreter'] != interpreter: continue # return only really existing applications binary, app_root = data['binary'], data['directory'] if _verify_application(interpreter, app_root, binary): it_version = os.path.basename(os.path.dirname(os.path.dirname(binary))) yield ApplicationSummary(user=user, version=it_version, doc_root=data['docroot'], interpreter=data['interpreter'], app_status='started') def get_default_version(interpreter): """Return default version for given interpreter """ if interpreter == 'nodejs': return NodeManager().default_version if interpreter == 'python': return PythonManager().default_version return None def _iter_interpreters(interpreter): """Return generator for interpreters all versions""" if interpreter == 'nodejs': interpreters = NodeManager().get_summary(installed_interpreters_only=True) elif interpreter == 'python': interpreters = PythonManager().get_summary(installed_interpreters_only=True) else: raise NotImplementedError() for version, version_info in iteritems(interpreters['available_versions']): enabled = version_info['status'] == 'enabled' if interpreter == 'nodejs': # for nodejs 6.16.0 -> 6 major_version = str(int(version.split('.')[0])) else: # for python 2.7.16 -> 2.7 major_version = '.'.join(version.split('.')[:2]) yield InterpreterSummary(version=major_version, version_full=version, enabled=enabled, installed=True) def _iter_php_interpreters(): """ Return generator with all PHP versions on server. :return: Generator[InterpreterSummary] """ from .clselect import ClSelect php = ClSelect() versions = php.list_alternatives() for version, full_version, _ in versions: yield InterpreterSummary( version, full_version, installed=True, enabled=php.is_version_enabled(version)) def interpreter_versions_short_summary(interpreter): # type: (str) -> List[InterpreterSummary] """Find and return all versions for given interpreter on server""" if interpreter == 'php': # if there is no installed php - no sense to collect statistics if getCPName() == 'cPanel' and is_no_php_binaries_on_cpanel(): return [] return list(_iter_php_interpreters()) elif interpreter == 'ruby': from .clselectruby.interpreters import interpreters elif interpreter in ('python', 'nodejs'): return list(_iter_interpreters(interpreter)) else: raise NotImplementedError it_list = [] for it in interpreters(): # for now, both python and ruby interpreters cannot be disabled it_list.append(InterpreterSummary(it.version, it.version_full, installed=True, enabled=True)) return it_list