D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
proc
/
self
/
root
/
opt
/
cloudlinux
/
venv
/
lib
/
python3.11
/
site-packages
/
ssa
/
modules
/
Filename :
stat_sender.py
back
Copy
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT """ This module contains SSA classes for sending e-mails with report and sending reports to ClickHouse """ __package__ = 'ssa.modules' import json import logging import subprocess from email.message import EmailMessage from functools import partial from typing import Optional, Any from socket import gethostname from clcommon.cpapi import get_admin_email from requests import Session, Response from requests.adapters import HTTPAdapter from requests.exceptions import RequestException from requests.packages.urllib3.util.retry import Retry from .common import Common from .decision_maker import DecisionMaker from ..internal.constants import stat_server from ..internal.exceptions import SSAError from ..internal.mailer import Mailer, render_report_table from ..internal.utils import read_sys_id, sentry_init, duration_cast, \ format_date class StatisticsSender(Common): """ Send report to ClickHouse and over e-mails """ def __init__(self, ch_endpoint: str = f'https://{stat_server}/api/clos-ssa'): super().__init__() self.logger = logging.getLogger('stat_sender') self.logger.info('StatisticsSender enabled: %s', __package__) self.sys_id = read_sys_id() self.ch_endpoint = ch_endpoint self.mail_sender = Mailer() retry_conf = Retry(total=3, allowed_methods=frozenset(['POST']), status_forcelist=frozenset([502, 503, 504]), backoff_factor=3) # sleeps 0s, 6s, 18s adapter = HTTPAdapter(max_retries=retry_conf) self.session = Session() self.session.mount('https://', adapter) self.session.request = partial(self.session.request, timeout=10) def send(self, report: dict = None) -> None: """ Send given report over e-mail If no report given, get it from DecisionMaker API """ if report is None: report = DecisionMaker().get_json_report() if self.summary_notification_enabled: self.email_report(report) self.clickhouse_report(report) def email_report(self, report_view: dict) -> Optional[EmailMessage]: """ Create and send e-mail with report """ mail_to = self.get_mail_recipient() if mail_to and report_view.get('domains'): report_table, mail = render_report_table(report_view) msg = self.mail_sender._message( recipient=mail_to, template='ssa_report', date=format_date(report_view['date']), hostname=gethostname(), html=mail ) msg.add_attachment(json.dumps(report_view), subtype='json', filename=f"report_{report_view['date']}.json") msg.add_attachment(report_table, subtype='html', filename=f"report_{report_view['date']}.html") self.mail_sender._send(msg) return msg def get_mail_recipient(self) -> Optional[str]: """ Retrieve a recipient's e-mail: 1. get address from a wmt-api utility 2. if command failed or address is empty, get address of server admin """ try: # get_admin_email could return '', None, or throw unexpected errors return self.wmt_api_report_email() or get_admin_email() except Exception as e: self.logger.error('get_admin_email failed with: %s', str(e)) def wmt_api_report_email(self) -> Optional[str]: """ Retrieve a recipient's e-mail address from WMT API """ _util = 'wmt-api' try: api_response = subprocess.run( [f'/usr/share/web-monitoring-tool/wmtbin/{_util}', '--config-get'], check=True, text=True, capture_output=True).stdout.strip() except (subprocess.CalledProcessError, AttributeError, OSError, ValueError) as e: self.logger.error('wmt-api utility failed: %s', str(e)) return try: return json.loads(api_response).get('config').get('report_email') except json.JSONDecodeError as e: self.logger.error('wmt-api returned invalid json: %s', str(e)) except AttributeError: self.logger.error('wmt-api returned unexpected response: %s', api_response) def clickhouse_report(self, report_view: dict) -> bool: """ Send report to ClickHouse """ if report_view.get('domains'): self.logger.info('Sending POST request to %s', self.ch_endpoint) try: resp = self.session.post(self.ch_endpoint, json=self._ch_pack( self.clickhouse_format(report_view))) except RequestException as e: self.logger.error('POST failed with %s', e, extra={'endpoint': self.ch_endpoint}) raise SSAError( f'Failed to POST data to SSA API server: {str(e)}') from e return self._process_response(resp) else: self.logger.info('Report is empty, not sending to ClickHouse') return False @staticmethod def _ch_pack(value: Any) -> dict: """ Pack given value into data field of a dict """ return dict(data=value) def clickhouse_format(self, original_report: dict) -> list: """ Format local report for sending to ClickHouse (required structures differ) """ ch_report = list() for domain in original_report.get('domains'): ch_report.append({ 'system_id': self.sys_id, 'domain': domain.get('name'), 'count_slow_urls': domain.get('slow_urls'), 'count_slow_requests': domain.get('slow_reqs'), 'total_requests': domain.get('total_reqs'), 'details': [ { "url": u.get('name'), "count_requests": u.get('reqs_num'), "avg_duration": duration_cast( u.get('average_duration')), "correlation": float(u.get('correlation', 0)) } for u in domain.get('urls') ] }) return ch_report def _process_response(self, response: Response) -> bool: """ Check received response :param response: a requests.Response object :return: True in case of success, False otherwise """ if not response.ok: self.logger.error('Unable to connect to server with %s:%s', response.status_code, response.reason, extra={'resp_text': response.text}) return False else: self.logger.info('[%s:%s] Response received %s', response.status_code, response.reason, response.url) result = response.json() if result['status'] != 'ok': self.logger.error('Received response with status %s', result['status'], extra={'response': result}) return False self.logger.info('Sent to ClickHouse successfully') return True if __name__ == "__main__": sentry_init() logging.basicConfig(filename='stat_sender_standalone.log', level=logging.INFO) try: StatisticsSender().send() except SSAError as exc: print(exc) raise SystemExit(1)