"""Notification helpers: email, pushover, matrix, mattermost, signal, sms and dispatcher. Channel types supported: pushover - Pushover app notifications email - SMTP email matrix - Matrix (via matrix-nio) mattermost - Mattermost webhook signal - Signal via signal-cli subprocess sms_voipms - SMS via voip.ms REST API Each channel can specify ``min_level: WARNING|CRITICAL`` (default: WARNING). Notifications are dispatched to the owner + managers of the host, each via their own ``notification_channels`` list. When no users are configured the server runs silently (no notifications sent). """ import asyncio import logging import smtplib import subprocess import time import sys from dataclasses import dataclass, field from typing import Optional from . import data from . import ws as ws_mod logger = logging.getLogger(__name__) msg_to_websockets = ws_mod.broadcast # Module-level state set via setup() _config: dict = {} # Tracks which channels fired a WARNING/CRITICAL per host. # {host_name: set of channel_names} — used to route RECOVER to the same channels. _alerted_channels: dict = {} logf = None # --------------------------------------------------------------------------- # Level ordering # --------------------------------------------------------------------------- _LEVEL_ORDER = {"RECOVER": 0, "INFO": 0, "WARNING": 1, "CRITICAL": 2} def _level_value(level: str) -> int: return _LEVEL_ORDER.get(level.upper(), 0) # --------------------------------------------------------------------------- # Notification dataclass # --------------------------------------------------------------------------- @dataclass class Notification: """Structured notification payload.""" title: str # e.g. "[CRITICAL] webserver01" body: str # detail message level: str # RECOVER | WARNING | CRITICAL | INFO url: str = "" # link to plugin metrics page # --------------------------------------------------------------------------- # Module setup # --------------------------------------------------------------------------- def setup(cfg: dict, loop: Optional[asyncio.AbstractEventLoop] = None): """Initialize notifier from configuration dict.""" global _config _config = dict(cfg) def reload_config(cfg: dict): """Reload notification configuration on SIGHUP.""" global _config _config = dict(cfg) logger.info("Notification configuration reloaded") # --------------------------------------------------------------------------- # Event log (websocket + file + in-memory) # --------------------------------------------------------------------------- def initlog(logfile): global logf try: logf = open(logfile, "a+") except Exception as e: print("cannot open logfile %s, using STDERR: %s" % (logfile, e)) logf = sys.stderr return logf def closelog(): global logf if logf and logf != sys.stderr: try: logf.close() except Exception: pass def eventlog(host, lvl, m, service=None): ts = time.time() msg = { "ts": ts, "host": host or None, "level": lvl, "service": service, "message": m, } data.msgs.append(msg) s = f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(ts))} {lvl} " if host: s += f"{host} " s += m logger.info(s) if logf: try: logf.write(s + "\n") logf.flush() except Exception as e: logger.warning("failed to write to logfile: %s", e) msg_to_websockets("message", msg) # --------------------------------------------------------------------------- # Low-level channel drivers # --------------------------------------------------------------------------- def _send_pushover(channel_cfg: dict, notif: Notification) -> bool: import http.client import urllib.parse token = channel_cfg.get("token", "") user = channel_cfg.get("user", "") if not token or not user: logger.warning("pushover: missing token or user") return False params: dict = {"token": token, "user": user, "title": notif.title, "message": notif.body} if notif.url: params["url"] = notif.url params["url_title"] = "Plugin metrics" conn = http.client.HTTPSConnection("api.pushover.net:443") try: conn.request( "POST", "/1/messages.json", urllib.parse.urlencode(params), {"Content-type": "application/x-www-form-urlencoded"}, ) r = conn.getresponse() logger.debug("pushover response: %s %s", r.status, r.reason) return r.status == 200 except Exception as e: logger.error("pushover error: %s", e) return False def _send_email(channel_cfg: dict, notif: Notification) -> bool: recipients = channel_cfg.get("recipients", []) sender = channel_cfg.get("sender", "") smtp_server = channel_cfg.get("smtp_server", "") smtp_port = channel_cfg.get("smtp_port", 587) smtp_user = channel_cfg.get("smtp_user") smtp_password = channel_cfg.get("smtp_password") if not recipients or not sender or not smtp_server: logger.warning("email: missing recipients, sender, or smtp_server") return False date = time.strftime("%a, %d %b %Y %H:%M:%S %z", time.localtime()) body_text = notif.body if notif.url: body_text += f"\n\n{notif.url}" raw = "To: %s\nFrom: %s\nSubject: %s\nDate: %s\n\n%s" % ( recipients[0] if isinstance(recipients, list) else recipients, sender, notif.title, date, body_text, ) try: server = smtplib.SMTP(smtp_server, smtp_port) if smtp_port == 587: server.starttls() server.ehlo() if smtp_user and smtp_password: server.login(smtp_user, smtp_password) server.sendmail(sender, recipients, raw) server.quit() return True except Exception as e: logger.warning("email send failed: %s", e) try: server.quit() except Exception: pass return False def _send_mattermost(channel_cfg: dict, notif: Notification) -> bool: try: from mattermostdriver import Driver except ImportError: logger.error("mattermostdriver not installed") return False host = channel_cfg.get("host", "") token = channel_cfg.get("token", "") channel = channel_cfg.get("channel", "") if not host or not token or not channel: logger.warning("mattermost: missing host, token, or channel") return False text = f"**{notif.title}**\n{notif.body}" if notif.url: text += f"\n[Plugin metrics] {notif.url}" ses = {"url": host, "scheme": "http", "basepath": "/api/v4", "port": 8065} mm = Driver(ses) payload: dict = {"text": text, "channel": channel, "username": channel_cfg.get("username", "hbd")} icon = channel_cfg.get("icon") if icon: payload["icon_url"] = icon try: rc = mm.webhooks.call_webhook(token, payload) return bool(rc is None or rc == "") except Exception as e: logger.error("mattermost error: %s", e) return False def _send_signal(channel_cfg: dict, notif: Notification) -> bool: cli = channel_cfg.get("cli_path", "/usr/local/bin/signal-cli") user = channel_cfg.get("user", "") recipient = channel_cfg.get("recipient", "") if not user or not recipient: logger.warning("signal: missing user or recipient") return False msg = f"{notif.title}\n{notif.body}" if notif.url: msg += f"\n{notif.url}" try: res = subprocess.run([cli, "-u", user, "send", "-m", msg, recipient], capture_output=True) if res.returncode != 0: logger.error("signal failed: %s", res.stderr.decode()) return False return True except Exception as e: logger.exception("signal exception: %s", e) return False async def _send_sms_voipms_async(channel_cfg: dict, notif: Notification) -> bool: """Send SMS via voip.ms REST API using multipart form-data POST.""" import json import aiohttp api_user = channel_cfg.get("api_user", "") api_password = channel_cfg.get("api_password", "") did = channel_cfg.get("did", "") dst = channel_cfg.get("dst", "") if not api_user or not api_password or not did or not dst: logger.warning("sms_voipms: missing api_user, api_password, did, or dst") return False # SMS body: title + body, truncated to 160 chars text = f"{notif.title}: {notif.body}" if len(text) > 160: text = text[:157] + "..." form_data = { "api_username": api_user, "api_password": api_password, "method": "sendSMS", "did": did, "dst": dst, "message": text, } try: async with aiohttp.ClientSession() as session: with aiohttp.MultipartWriter("form-data") as mp: for key, value in form_data.items(): part = mp.append(value) part.set_content_disposition("form-data", name=key) async with session.post("https://voip.ms/api/v1/rest.php", data=mp) as resp: body = await resp.text() if resp.status != 200: logger.error("sms_voipms HTTP %s: %s", resp.status, body) return False result = json.loads(body) if result.get("status") == "success": return True logger.error("sms_voipms error: %s", result.get("status")) return False except Exception as e: logger.error("sms_voipms exception: %s", e) return False async def _send_matrix_async(channel_cfg: dict, notif: Notification) -> bool: """Send a Matrix message using matrix-nio.""" try: from nio import AsyncClient, RoomMessageText # noqa: F401 except ImportError: logger.error("matrix-nio not installed; pip install matrix-nio") return False from nio import AsyncClient homeserver = channel_cfg.get("homeserver", "") access_token = channel_cfg.get("access_token", "") room_id = channel_cfg.get("room_id", "") if not homeserver or not access_token or not room_id: logger.warning("matrix: missing homeserver, access_token, or room_id") return False text = f"{notif.title}\n{notif.body}" if notif.url: text += f"\n{notif.url}" html = f"{notif.title}
{notif.body}" if notif.url: html += f'
Plugin metrics' client = AsyncClient(homeserver) client.access_token = access_token try: from nio import RoomSendResponse content = { "msgtype": "m.text", "body": text, "format": "org.matrix.custom.html", "formatted_body": html, } resp = await client.room_send(room_id, "m.room.message", content) if hasattr(resp, "event_id"): return True logger.error("matrix send failed: %s", resp) return False except Exception as e: logger.error("matrix exception: %s", e) return False finally: await client.close() # --------------------------------------------------------------------------- # Channel dispatcher (all async — sync drivers run in a thread executor) # --------------------------------------------------------------------------- # Sync drivers kept for `hbd notify` CLI usage (asyncio.run wraps them there). _DRIVERS = { "pushover": _send_pushover, "email": _send_email, "mattermost": _send_mattermost, "signal": _send_signal, } _TIMEOUT = 15 # seconds per channel send async def _dispatch_to_channel(channel_name: str, channel_cfg: dict, notif: Notification) -> bool: """Send *notif* to a single named channel, honouring min_level.""" level = notif.level.upper() if level != "RECOVER": min_level = channel_cfg.get("min_level", "WARNING").upper() if _level_value(level) < _level_value(min_level): logger.debug( "channel '%s': skipping level %s (min_level=%s)", channel_name, level, min_level ) return True # filtered intentionally ch_type = channel_cfg.get("type", "") try: if ch_type == "matrix": return await asyncio.wait_for(_send_matrix_async(channel_cfg, notif), timeout=_TIMEOUT) if ch_type == "sms_voipms": return await asyncio.wait_for(_send_sms_voipms_async(channel_cfg, notif), timeout=_TIMEOUT) sync_driver = _DRIVERS.get(ch_type) if sync_driver is None: logger.warning("unknown channel type '%s' for channel '%s'", ch_type, channel_name) return False return await asyncio.wait_for( asyncio.to_thread(sync_driver, channel_cfg, notif), timeout=_TIMEOUT ) except asyncio.TimeoutError: logger.error("channel '%s' timed out after %ds", channel_name, _TIMEOUT) return False # --------------------------------------------------------------------------- # Central dispatch function # --------------------------------------------------------------------------- def _build_url(host_name: str) -> str: base_url = _config.get("base_url", "").rstrip("/") if not base_url: return "" return f"{base_url}/plugins#{host_name}" async def send_notification(host_name: str, notif: Notification) -> dict: """Dispatch *notif* to all managers/owner of *host_name*. Looks up the host's owner + managers, resolves each user's notification_channels, and dispatches. Silently does nothing if no users are configured. Returns a dict of {channel_name: bool} results. """ from . import users as users_mod from . import hbdclass if not users_mod.users_enabled(): return {} # Collect recipient usernames: owner + managers host = hbdclass.Host.hosts.get(host_name) if host is None: logger.debug("send_notification: host '%s' not found", host_name) return {} recipients: set[str] = set() owner = getattr(host, "owner", None) if owner: recipients.add(owner) for m in getattr(host, "managers", []): recipients.add(m) if not recipients: logger.debug("send_notification: no owner/managers for '%s'", host_name) return {} # Fill url if not already set if not notif.url: notif.url = _build_url(host_name) global_channels: dict = _config.get("notification_channels", {}) results: dict = {} level = notif.level.upper() is_alert = level in ("WARNING", "CRITICAL") is_recover = level in ("RECOVER",) # For RECOVER: send to every channel that previously fired an alert for this host, # regardless of that channel's min_level. if is_recover and host_name in _alerted_channels: for channel_name in list(_alerted_channels[host_name]): channel_cfg = global_channels.get(channel_name) if not channel_cfg: continue try: ok = await _dispatch_to_channel(channel_name, channel_cfg, notif) results[channel_name] = ok if ok: logger.info("recover sent to channel '%s': %s", channel_name, notif.title) except Exception as e: logger.error("error sending recover to channel '%s': %s", channel_name, e) del _alerted_channels[host_name] return results for username in recipients: user = users_mod.get_user(username) if user is None: logger.debug("send_notification: user '%s' not found", username) continue for channel_name in user.notification_channels: if channel_name in results: continue channel_cfg = global_channels.get(channel_name) if not channel_cfg: logger.warning("channel '%s' not defined in notification_channels", channel_name) results[channel_name] = False continue try: ok = await _dispatch_to_channel(channel_name, channel_cfg, notif) results[channel_name] = ok if ok: logger.info("notification sent to channel '%s': %s", channel_name, notif.title) if is_alert: _alerted_channels.setdefault(host_name, set()).add(channel_name) else: logger.warning("failed to send notification to channel '%s'", channel_name) except Exception as e: logger.error("error sending to channel '%s': %s", channel_name, e) results[channel_name] = False return results