505 lines
17 KiB
Python
505 lines
17 KiB
Python
"""Notification helpers: email, pushover, matrix, mattermost, signal, sms and dispatcher.
|
|
|
|
Channel types supported:
|
|
pushover - Pushover app notifications
|
|
email - SMTP email
|
|
matrix - Matrix (via matrix-nio)
|
|
mattermost - Mattermost webhook
|
|
signal - Signal via signal-cli subprocess
|
|
sms_voipms - SMS via voip.ms REST API
|
|
|
|
Each channel can specify ``min_level: WARNING|CRITICAL`` (default: WARNING).
|
|
|
|
Notifications are dispatched to the owner + managers of the host, each via
|
|
their own ``notification_channels`` list. When no users are configured the
|
|
server runs silently (no notifications sent).
|
|
"""
|
|
|
|
import asyncio
|
|
import asyncio
|
|
import logging
|
|
import smtplib
|
|
import subprocess
|
|
import time
|
|
import sys
|
|
from dataclasses import dataclass, field
|
|
from typing import Optional
|
|
|
|
from . import data
|
|
from . import ws as ws_mod
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
msg_to_websockets = ws_mod.broadcast
|
|
|
|
# Module-level state set via setup()
|
|
_config: dict = {}
|
|
_loop: Optional[asyncio.AbstractEventLoop] = None
|
|
|
|
# Tracks which channels fired a WARNING/CRITICAL per host.
|
|
# {host_name: set of channel_names} — used to route RECOVER to the same channels.
|
|
_alerted_channels: dict = {}
|
|
|
|
logf = None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Level ordering
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_LEVEL_ORDER = {"RECOVER": 0, "INFO": 0, "WARNING": 1, "CRITICAL": 2}
|
|
|
|
def _level_value(level: str) -> int:
|
|
return _LEVEL_ORDER.get(level.upper(), 0)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Notification dataclass
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@dataclass
|
|
class Notification:
|
|
"""Structured notification payload."""
|
|
title: str # e.g. "[CRITICAL] webserver01"
|
|
body: str # detail message
|
|
level: str # RECOVER | WARNING | CRITICAL | INFO
|
|
url: str = "" # link to plugin metrics page
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Module setup
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def setup(cfg: dict, loop: Optional[asyncio.AbstractEventLoop] = None):
|
|
"""Initialize notifier from configuration dict and event loop."""
|
|
global _config, _loop
|
|
_config = dict(cfg)
|
|
if loop is not None:
|
|
_loop = loop
|
|
|
|
|
|
def reload_config(cfg: dict):
|
|
"""Reload notification configuration on SIGHUP."""
|
|
global _config
|
|
_config = dict(cfg)
|
|
logger.info("Notification configuration reloaded")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Event log (websocket + file + in-memory)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def initlog(logfile):
|
|
global logf
|
|
try:
|
|
logf = open(logfile, "a+")
|
|
except Exception as e:
|
|
print("cannot open logfile %s, using STDERR: %s" % (logfile, e))
|
|
logf = sys.stderr
|
|
return logf
|
|
|
|
|
|
def closelog():
|
|
global logf
|
|
if logf and logf != sys.stderr:
|
|
try:
|
|
logf.close()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def eventlog(host, lvl, m, service=None):
|
|
ts = time.time()
|
|
s = f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(ts))} {lvl} "
|
|
if host:
|
|
s += f"{host} "
|
|
s += m
|
|
data.msgs.append(s)
|
|
logger.info(s)
|
|
if logf:
|
|
try:
|
|
logf.write(s + "\n")
|
|
logf.flush()
|
|
except Exception as e:
|
|
logger.warning("failed to write to logfile: %s", e)
|
|
msg_to_websockets("message", s)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Low-level channel drivers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _send_pushover(channel_cfg: dict, notif: Notification) -> bool:
|
|
import http.client
|
|
import urllib.parse
|
|
token = channel_cfg.get("token", "")
|
|
user = channel_cfg.get("user", "")
|
|
if not token or not user:
|
|
logger.warning("pushover: missing token or user")
|
|
return False
|
|
params: dict = {"token": token, "user": user, "title": notif.title, "message": notif.body}
|
|
if notif.url:
|
|
params["url"] = notif.url
|
|
params["url_title"] = "Plugin metrics"
|
|
conn = http.client.HTTPSConnection("api.pushover.net:443")
|
|
try:
|
|
conn.request(
|
|
"POST",
|
|
"/1/messages.json",
|
|
urllib.parse.urlencode(params),
|
|
{"Content-type": "application/x-www-form-urlencoded"},
|
|
)
|
|
r = conn.getresponse()
|
|
logger.debug("pushover response: %s %s", r.status, r.reason)
|
|
return r.status == 200
|
|
except Exception as e:
|
|
logger.error("pushover error: %s", e)
|
|
return False
|
|
|
|
|
|
def _send_email(channel_cfg: dict, notif: Notification) -> bool:
|
|
recipients = channel_cfg.get("recipients", [])
|
|
sender = channel_cfg.get("sender", "")
|
|
smtp_server = channel_cfg.get("smtp_server", "")
|
|
smtp_port = channel_cfg.get("smtp_port", 587)
|
|
smtp_user = channel_cfg.get("smtp_user")
|
|
smtp_password = channel_cfg.get("smtp_password")
|
|
|
|
if not recipients or not sender or not smtp_server:
|
|
logger.warning("email: missing recipients, sender, or smtp_server")
|
|
return False
|
|
|
|
date = time.strftime("%a, %d %b %Y %H:%M:%S %z", time.localtime())
|
|
body_text = notif.body
|
|
if notif.url:
|
|
body_text += f"\n\n{notif.url}"
|
|
raw = "To: %s\nFrom: %s\nSubject: %s\nDate: %s\n\n%s" % (
|
|
recipients[0] if isinstance(recipients, list) else recipients,
|
|
sender,
|
|
notif.title,
|
|
date,
|
|
body_text,
|
|
)
|
|
try:
|
|
server = smtplib.SMTP(smtp_server, smtp_port)
|
|
if smtp_port == 587:
|
|
server.starttls()
|
|
server.ehlo()
|
|
if smtp_user and smtp_password:
|
|
server.login(smtp_user, smtp_password)
|
|
server.sendmail(sender, recipients, raw)
|
|
server.quit()
|
|
return True
|
|
except Exception as e:
|
|
logger.warning("email send failed: %s", e)
|
|
try:
|
|
server.quit()
|
|
except Exception:
|
|
pass
|
|
return False
|
|
|
|
|
|
def _send_mattermost(channel_cfg: dict, notif: Notification) -> bool:
|
|
try:
|
|
from mattermostdriver import Driver
|
|
except ImportError:
|
|
logger.error("mattermostdriver not installed")
|
|
return False
|
|
host = channel_cfg.get("host", "")
|
|
token = channel_cfg.get("token", "")
|
|
channel = channel_cfg.get("channel", "")
|
|
if not host or not token or not channel:
|
|
logger.warning("mattermost: missing host, token, or channel")
|
|
return False
|
|
text = f"**{notif.title}**\n{notif.body}"
|
|
if notif.url:
|
|
text += f"\n[Plugin metrics]({notif.url})"
|
|
ses = {"url": host, "scheme": "http", "basepath": "/api/v4", "port": 8065}
|
|
mm = Driver(ses)
|
|
payload: dict = {"text": text, "channel": channel, "username": channel_cfg.get("username", "hbd")}
|
|
icon = channel_cfg.get("icon")
|
|
if icon:
|
|
payload["icon_url"] = icon
|
|
try:
|
|
rc = mm.webhooks.call_webhook(token, payload)
|
|
return bool(rc is None or rc == "")
|
|
except Exception as e:
|
|
logger.error("mattermost error: %s", e)
|
|
return False
|
|
|
|
|
|
def _send_signal(channel_cfg: dict, notif: Notification) -> bool:
|
|
cli = channel_cfg.get("cli_path", "/usr/local/bin/signal-cli")
|
|
user = channel_cfg.get("user", "")
|
|
recipient = channel_cfg.get("recipient", "")
|
|
if not user or not recipient:
|
|
logger.warning("signal: missing user or recipient")
|
|
return False
|
|
msg = f"{notif.title}\n{notif.body}"
|
|
if notif.url:
|
|
msg += f"\n{notif.url}"
|
|
try:
|
|
res = subprocess.run([cli, "-u", user, "send", "-m", msg, recipient], capture_output=True)
|
|
if res.returncode != 0:
|
|
logger.error("signal failed: %s", res.stderr.decode())
|
|
return False
|
|
return True
|
|
except Exception as e:
|
|
logger.exception("signal exception: %s", e)
|
|
return False
|
|
|
|
|
|
async def _send_sms_voipms_async(channel_cfg: dict, notif: Notification) -> bool:
|
|
"""Send SMS via voip.ms REST API using multipart form-data POST."""
|
|
import json
|
|
import aiohttp
|
|
|
|
api_user = channel_cfg.get("api_user", "")
|
|
api_password = channel_cfg.get("api_password", "")
|
|
did = channel_cfg.get("did", "")
|
|
dst = channel_cfg.get("dst", "")
|
|
if not api_user or not api_password or not did or not dst:
|
|
logger.warning("sms_voipms: missing api_user, api_password, did, or dst")
|
|
return False
|
|
|
|
# SMS body: title + body, truncated to 160 chars
|
|
text = f"{notif.title}: {notif.body}"
|
|
if len(text) > 160:
|
|
text = text[:157] + "..."
|
|
|
|
form_data = {
|
|
"api_username": api_user,
|
|
"api_password": api_password,
|
|
"method": "sendSMS",
|
|
"did": did,
|
|
"dst": dst,
|
|
"message": text,
|
|
}
|
|
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
with aiohttp.MultipartWriter("form-data") as mp:
|
|
for key, value in form_data.items():
|
|
part = mp.append(value)
|
|
part.set_content_disposition("form-data", name=key)
|
|
async with session.post("https://voip.ms/api/v1/rest.php", data=mp) as resp:
|
|
body = await resp.text()
|
|
if resp.status != 200:
|
|
logger.error("sms_voipms HTTP %s: %s", resp.status, body)
|
|
return False
|
|
result = json.loads(body)
|
|
if result.get("status") == "success":
|
|
return True
|
|
logger.error("sms_voipms error: %s", result.get("status"))
|
|
return False
|
|
except Exception as e:
|
|
logger.error("sms_voipms exception: %s", e)
|
|
return False
|
|
|
|
|
|
def _send_sms_voipms(channel_cfg: dict, notif: Notification) -> bool:
|
|
"""Dispatch voip.ms SMS send onto the shared event loop."""
|
|
if _loop is None:
|
|
logger.warning("sms_voipms: event loop not available")
|
|
return False
|
|
future = asyncio.run_coroutine_threadsafe(_send_sms_voipms_async(channel_cfg, notif), _loop)
|
|
try:
|
|
return future.result(timeout=15)
|
|
except Exception as e:
|
|
logger.error("sms_voipms send timed out or failed: %s", e)
|
|
return False
|
|
|
|
|
|
async def _send_matrix_async(channel_cfg: dict, notif: Notification) -> bool:
|
|
"""Send a Matrix message using matrix-nio."""
|
|
try:
|
|
from nio import AsyncClient, RoomMessageText # noqa: F401
|
|
except ImportError:
|
|
logger.error("matrix-nio not installed; pip install matrix-nio")
|
|
return False
|
|
|
|
from nio import AsyncClient
|
|
homeserver = channel_cfg.get("homeserver", "")
|
|
access_token = channel_cfg.get("access_token", "")
|
|
room_id = channel_cfg.get("room_id", "")
|
|
if not homeserver or not access_token or not room_id:
|
|
logger.warning("matrix: missing homeserver, access_token, or room_id")
|
|
return False
|
|
|
|
text = f"{notif.title}\n{notif.body}"
|
|
if notif.url:
|
|
text += f"\n{notif.url}"
|
|
html = f"<strong>{notif.title}</strong><br>{notif.body}"
|
|
if notif.url:
|
|
html += f'<br><a href="{notif.url}">Plugin metrics</a>'
|
|
|
|
client = AsyncClient(homeserver)
|
|
client.access_token = access_token
|
|
try:
|
|
from nio import RoomSendResponse
|
|
content = {
|
|
"msgtype": "m.text",
|
|
"body": text,
|
|
"format": "org.matrix.custom.html",
|
|
"formatted_body": html,
|
|
}
|
|
resp = await client.room_send(room_id, "m.room.message", content)
|
|
if hasattr(resp, "event_id"):
|
|
return True
|
|
logger.error("matrix send failed: %s", resp)
|
|
return False
|
|
except Exception as e:
|
|
logger.error("matrix exception: %s", e)
|
|
return False
|
|
finally:
|
|
await client.close()
|
|
|
|
|
|
def _send_matrix(channel_cfg: dict, notif: Notification) -> bool:
|
|
"""Dispatch matrix send onto the shared event loop."""
|
|
if _loop is None:
|
|
logger.warning("matrix: event loop not available")
|
|
return False
|
|
future = asyncio.run_coroutine_threadsafe(_send_matrix_async(channel_cfg, notif), _loop)
|
|
try:
|
|
return future.result(timeout=15)
|
|
except Exception as e:
|
|
logger.error("matrix send timed out or failed: %s", e)
|
|
return False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Channel dispatcher
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_DRIVERS = {
|
|
"pushover": _send_pushover,
|
|
"email": _send_email,
|
|
"mattermost": _send_mattermost,
|
|
"signal": _send_signal,
|
|
"sms_voipms": _send_sms_voipms,
|
|
"matrix": _send_matrix,
|
|
}
|
|
|
|
|
|
def _dispatch_to_channel(channel_name: str, channel_cfg: dict, notif: Notification) -> bool:
|
|
"""Send *notif* to a single named channel, honouring min_level."""
|
|
min_level = channel_cfg.get("min_level", "WARNING").upper()
|
|
if _level_value(notif.level) < _level_value(min_level):
|
|
logger.debug(
|
|
"channel '%s': skipping level %s (min_level=%s)", channel_name, notif.level, min_level
|
|
)
|
|
return True # not an error — filtered intentionally
|
|
|
|
ch_type = channel_cfg.get("type", "")
|
|
driver = _DRIVERS.get(ch_type)
|
|
if driver is None:
|
|
logger.warning("unknown channel type '%s' for channel '%s'", ch_type, channel_name)
|
|
return False
|
|
return driver(channel_cfg, notif)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Central dispatch function
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _build_url(host_name: str) -> str:
|
|
base_url = _config.get("base_url", "").rstrip("/")
|
|
if not base_url:
|
|
return ""
|
|
return f"{base_url}/plugins#{host_name}"
|
|
|
|
|
|
def send_notification(host_name: str, notif: Notification) -> dict:
|
|
"""Dispatch *notif* to all managers/owner of *host_name*.
|
|
|
|
Looks up the host's owner + managers, resolves each user's
|
|
notification_channels, and dispatches. Silently does nothing if
|
|
no users are configured.
|
|
|
|
Returns a dict of {channel_name: bool} results.
|
|
"""
|
|
from . import users as users_mod
|
|
from . import hbdclass
|
|
|
|
if not users_mod.users_enabled():
|
|
return {}
|
|
|
|
# Collect recipient usernames: owner + managers
|
|
host = hbdclass.Host.hosts.get(host_name)
|
|
if host is None:
|
|
logger.debug("send_notification: host '%s' not found", host_name)
|
|
return {}
|
|
|
|
recipients: set[str] = set()
|
|
owner = getattr(host, "owner", None)
|
|
if owner:
|
|
recipients.add(owner)
|
|
for m in getattr(host, "managers", []):
|
|
recipients.add(m)
|
|
|
|
if not recipients:
|
|
logger.debug("send_notification: no owner/managers for '%s'", host_name)
|
|
return {}
|
|
|
|
# Fill url if not already set
|
|
if not notif.url:
|
|
notif.url = _build_url(host_name)
|
|
|
|
global_channels: dict = _config.get("notification_channels", {})
|
|
results: dict = {}
|
|
level = notif.level.upper()
|
|
is_alert = level in ("WARNING", "CRITICAL")
|
|
is_recover = level in ("RECOVER",)
|
|
|
|
# For RECOVER: send to every channel that previously fired an alert for this host,
|
|
# regardless of that channel's min_level.
|
|
if is_recover and host_name in _alerted_channels:
|
|
for channel_name in list(_alerted_channels[host_name]):
|
|
channel_cfg = global_channels.get(channel_name)
|
|
if not channel_cfg:
|
|
continue
|
|
try:
|
|
ch_type = channel_cfg.get("type", "")
|
|
driver = _DRIVERS.get(ch_type)
|
|
if driver:
|
|
ok = driver(channel_cfg, notif)
|
|
results[channel_name] = ok
|
|
if ok:
|
|
logger.info("recover sent to channel '%s': %s", channel_name, notif.title)
|
|
except Exception as e:
|
|
logger.error("error sending recover to channel '%s': %s", channel_name, e)
|
|
# Clear the alerted set once recovery is delivered
|
|
del _alerted_channels[host_name]
|
|
return results
|
|
|
|
for username in recipients:
|
|
user = users_mod.get_user(username)
|
|
if user is None:
|
|
logger.debug("send_notification: user '%s' not found", username)
|
|
continue
|
|
for channel_name in user.notification_channels:
|
|
if channel_name in results:
|
|
continue # already dispatched to this channel this notification
|
|
channel_cfg = global_channels.get(channel_name)
|
|
if not channel_cfg:
|
|
logger.warning("channel '%s' not defined in notification_channels", channel_name)
|
|
results[channel_name] = False
|
|
continue
|
|
try:
|
|
ok = _dispatch_to_channel(channel_name, channel_cfg, notif)
|
|
results[channel_name] = ok
|
|
if ok:
|
|
logger.info("notification sent to channel '%s': %s", channel_name, notif.title)
|
|
if is_alert:
|
|
_alerted_channels.setdefault(host_name, set()).add(channel_name)
|
|
else:
|
|
logger.warning("failed to send notification to channel '%s'", channel_name)
|
|
except Exception as e:
|
|
logger.error("error sending to channel '%s': %s", channel_name, e)
|
|
results[channel_name] = False
|
|
|
|
return results
|