re-factor notifications, add sms and matrix as channels

This commit is contained in:
Andreas Wrede
2026-04-12 11:04:00 -04:00
parent 7f049a4e26
commit 0199ca4693
11 changed files with 887 additions and 864 deletions
+83
View File
@@ -47,6 +47,34 @@ def build_parser():
help="Username (informational only, for display)",
)
# --- notify ---
notify_p = subparsers.add_parser(
"notify",
help="Send a test message via a configured notification channel",
)
notify_p.add_argument("-c", "--config", dest="configfile", help="Config file path (YAML)")
notify_p.add_argument(
"channel",
help="Channel name as defined in notification_channels",
)
notify_p.add_argument(
"message",
nargs="?",
default="Test notification from hbd",
help="Message body (default: 'Test notification from hbd')",
)
notify_p.add_argument(
"--level",
default="WARNING",
choices=["INFO", "WARNING", "CRITICAL", "RECOVER"],
help="Notification level (default: WARNING)",
)
notify_p.add_argument(
"--title",
default=None,
help="Notification title (default: '[LEVEL] test')",
)
return parser
@@ -75,6 +103,57 @@ def cmd_passwd(args):
print(f" password: {hashed}")
def cmd_notify(args):
"""Send a test message via a single notification channel."""
from .config import load_config
from .notify import Notification, _dispatch_to_channel, setup
config = load_config(args.configfile)
setup(config)
channels = config.get("notification_channels", {})
if args.channel not in channels:
available = ", ".join(channels.keys()) if channels else "(none)"
print(f"Error: channel '{args.channel}' not found in notification_channels.", file=sys.stderr)
print(f"Available channels: {available}", file=sys.stderr)
sys.exit(1)
channel_cfg = channels[args.channel]
level = args.level.upper()
title = args.title or f"[{level}] test"
base_url = config.get("base_url", "").rstrip("/")
notif = Notification(
title=title,
body=args.message,
level=level,
url=f"{base_url}/plugins" if base_url else "",
)
# Bypass min_level for explicit test sends; run async channels directly
import asyncio
ch_type = channel_cfg.get("type", "")
print(f"Sending via {args.channel} ({ch_type}): {title}{args.message}")
if ch_type in ("matrix", "sms_voipms"):
from .notify import _send_matrix_async, _send_sms_voipms_async
driver_async = _send_matrix_async if ch_type == "matrix" else _send_sms_voipms_async
ok = asyncio.run(driver_async(channel_cfg, notif))
else:
from .notify import _DRIVERS
driver = _DRIVERS.get(ch_type)
if driver is None:
print(f"Error: unknown channel type '{ch_type}'", file=sys.stderr)
sys.exit(1)
ok = driver(channel_cfg, notif)
if ok:
print("OK")
else:
print("FAILED — check logs for details", file=sys.stderr)
sys.exit(1)
def main(argv=None):
parser = build_parser()
args = parser.parse_args(argv)
@@ -83,6 +162,10 @@ def main(argv=None):
cmd_passwd(args)
return
if args.command == "notify":
cmd_notify(args)
return
# Default: run the server (supports both `hbd serve ...` and `hbd ...`)
config = load_config(args.configfile)
+16 -120
View File
@@ -22,7 +22,7 @@ SERVER_DEFAULTS = {
"logfile": os.path.join(os.path.expanduser("~"), ".hb.log"),
# Notification channels
"notification_channels": {}, # Named channels with type and credentials
"default_notification_channels": [], # Default channels if host doesn't specify
"base_url": "", # Base URL for notification links (e.g. https://hbd.example.com)
# Monitoring settings
"interval": 20, # Expected heartbeat interval (for server checks)
@@ -34,8 +34,7 @@ SERVER_DEFAULTS = {
"default_owner": None, # Username that owns hosts with no explicit owner
# Host management
"hosts": {}, # New unified host definitions (optional)
"watchhosts": [], # Hosts to monitor and notify about (legacy)
"hosts": {}, # Unified host definitions
"dyndnshosts": [], # Hosts with dynamic DNS (legacy)
"drophosts": [], # Hosts to ignore
"dyndomains": ["wrede.org"],
@@ -216,34 +215,18 @@ class ReloadableConfig:
def get_watchhosts(config):
"""Extract watchhosts from config, supporting both new and legacy formats.
Args:
config: Configuration dictionary
"""Extract watched hostnames from config (hosts with watch: true).
Returns:
List of hostnames to watch
"""
watchhosts = []
# New format: hosts section with watch attribute
if "hosts" in config:
hosts_config = config["hosts"]
if isinstance(hosts_config, dict):
for host_name, host_attrs in hosts_config.items():
if isinstance(host_attrs, dict) and host_attrs.get("watch", False):
watchhosts.append(host_name)
# Legacy format: watchhosts list
if "watchhosts" in config:
legacy_watchhosts = config.get("watchhosts", [])
if isinstance(legacy_watchhosts, (list, set)):
watchhosts.extend(legacy_watchhosts)
elif isinstance(legacy_watchhosts, dict):
# Old dict format: {"host1": {attrs}, "host2": {attrs}}
watchhosts.extend(legacy_watchhosts.keys())
return list(set(watchhosts)) # Remove duplicates
hosts_config = config.get("hosts", {})
if isinstance(hosts_config, dict):
for host_name, host_attrs in hosts_config.items():
if isinstance(host_attrs, dict) and host_attrs.get("watch", False):
watchhosts.append(host_name)
return watchhosts
def get_dyndnshosts(config):
@@ -275,105 +258,18 @@ def get_dyndnshosts(config):
def get_host_config(config, hostname):
"""Get configuration for a specific host.
Args:
config: Configuration dictionary
hostname: Host name
"""Get configuration for a specific host from the hosts section.
Returns:
Dictionary with host attributes or empty dict
"""
if "hosts" in config:
hosts_config = config.get("hosts", {})
if isinstance(hosts_config, dict) and hostname in hosts_config:
return hosts_config[hostname] if isinstance(hosts_config[hostname], dict) else {}
# Check legacy watchhosts for notification settings
if "watchhosts" in config:
watchhosts = config.get("watchhosts", {})
if isinstance(watchhosts, dict) and hostname in watchhosts:
legacy_attrs = watchhosts[hostname]
if isinstance(legacy_attrs, dict):
# Convert legacy format to new format
return {
"watch": True,
"notify": legacy_attrs.get("notify"),
"notify_src": legacy_attrs.get("src"),
}
hosts_config = config.get("hosts", {})
if isinstance(hosts_config, dict) and hostname in hosts_config:
val = hosts_config[hostname]
return val if isinstance(val, dict) else {}
return {}
def get_notification_channels_for_host(config, hostname):
"""Get notification channels configured for a specific host.
Args:
config: Configuration dictionary
hostname: Host name
Returns:
List of channel names to use for this host
"""
host_config = get_host_config(config, hostname)
# Check if host specifies notification channels
channels = host_config.get("notification_channels", [])
if channels:
if isinstance(channels, str):
return [channels]
elif isinstance(channels, list):
return channels
# Fall back to default channels
default_channels = config.get("default_notification_channels", [])
if default_channels:
if isinstance(default_channels, str):
return [default_channels]
elif isinstance(default_channels, list):
return default_channels
# No channels configured, return empty list (will use legacy global config)
return []
def get_channel_config(config, channel_name):
"""Get configuration for a specific notification channel.
Args:
config: Configuration dictionary
channel_name: Name of the notification channel
Returns:
Dictionary with channel configuration or None if not found
"""
channels = config.get("notification_channels", {})
if isinstance(channels, dict) and channel_name in channels:
return channels[channel_name]
return None
def get_notification_channels_config(config, hostname):
"""Get list of notification channel configurations for a host.
Args:
config: Configuration dictionary
hostname: Host name
Returns:
List of (channel_name, channel_config) tuples
"""
channel_names = get_notification_channels_for_host(config, hostname)
channels = []
for channel_name in channel_names:
channel_config = get_channel_config(config, channel_name)
if channel_config and channel_config.get("type"):
channels.append((channel_name, channel_config))
return channels
# ---------------------------------------------------------------------------
# User / host-access helpers
# ---------------------------------------------------------------------------
+3 -1
View File
@@ -244,7 +244,9 @@ async def start(
host = config.get("hb_host", "localhost")
extra_scripts = config.get("http_extra_scripts", "")
host = request.host # includes port if non-standard
scheme = "wss" if request.secure else "ws"
forwarded_proto = request.headers.get("X-Forwarded-Proto", "")
is_secure = request.secure or forwarded_proto.lower() == "https"
scheme = "wss" if is_secure else "ws"
heartbeat_ws_url = f"{scheme}://{host}/ws"
tmpl = env.get_template("live.html")
body = tmpl.render(
+1 -1
View File
@@ -162,7 +162,7 @@ async def _run_async(config, config_path=None):
from . import journal as journal_mod
from . import threshold as threshold_mod
notify_mod.setup(config)
notify_mod.setup(config, loop=loop)
# Initialize message journal
msg_journal = journal_mod.get_journal(config)
+407 -232
View File
@@ -1,37 +1,103 @@
"""Notification helpers: email, pushover, mattermost, signal and dispatcher."""
"""Notification helpers: email, pushover, matrix, mattermost, signal, sms and dispatcher.
Channel types supported:
pushover - Pushover app notifications
email - SMTP email
matrix - Matrix (via matrix-nio)
mattermost - Mattermost webhook
signal - Signal via signal-cli subprocess
sms_voipms - SMS via voip.ms REST API
Each channel can specify ``min_level: WARNING|CRITICAL`` (default: WARNING).
Notifications are dispatched to the owner + managers of the host, each via
their own ``notification_channels`` list. When no users are configured the
server runs silently (no notifications sent).
"""
import asyncio
import logging
from typing import Optional
import http.client
import urllib.parse
import subprocess
import smtplib
import subprocess
import time
import sys
from dataclasses import dataclass, field
from typing import Optional
from . import data
from . import ws as ws_mod
from . import main as main_mod
DEFAULT_PUSHPROVIDERS = ["all", "pushover", "mattermost", "signal"]
msg_to_websockets = ws_mod.broadcast
# module-level configuration set via setup()
_config = {}
logger = logging.getLogger(__name__)
msg_to_websockets = ws_mod.broadcast
# Module-level state set via setup()
_config: dict = {}
_loop: Optional[asyncio.AbstractEventLoop] = None
# Tracks which channels fired a WARNING/CRITICAL per host.
# {host_name: set of channel_names} — used to route RECOVER to the same channels.
_alerted_channels: dict = {}
logf = None
# ---------------------------------------------------------------------------
# Level ordering
# ---------------------------------------------------------------------------
_LEVEL_ORDER = {"RECOVER": 0, "INFO": 0, "WARNING": 1, "CRITICAL": 2}
def _level_value(level: str) -> int:
return _LEVEL_ORDER.get(level.upper(), 0)
# ---------------------------------------------------------------------------
# Notification dataclass
# ---------------------------------------------------------------------------
@dataclass
class Notification:
"""Structured notification payload."""
title: str # e.g. "[CRITICAL] webserver01"
body: str # detail message
level: str # RECOVER | WARNING | CRITICAL | INFO
url: str = "" # link to plugin metrics page
# ---------------------------------------------------------------------------
# Module setup
# ---------------------------------------------------------------------------
def setup(cfg: dict, loop: Optional[asyncio.AbstractEventLoop] = None):
"""Initialize notifier from configuration dict and event loop."""
global _config, _loop
_config = dict(cfg)
if loop is not None:
_loop = loop
def reload_config(cfg: dict):
"""Reload notification configuration on SIGHUP."""
global _config
_config = dict(cfg)
logger.info("Notification configuration reloaded")
# ---------------------------------------------------------------------------
# Event log (websocket + file + in-memory)
# ---------------------------------------------------------------------------
def initlog(logfile):
global logf
try:
logf = open(logfile, "a+")
except Exception as e:
import sys
print("cannot open logfile %s, using STDERR: %s" % (logfile, e))
logf = sys.stderr
return logf
def closelog():
global logf
if logf and logf != sys.stderr:
@@ -40,6 +106,7 @@ def closelog():
except Exception:
pass
def eventlog(host, lvl, m, service=None):
ts = time.time()
s = f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(ts))} {lvl} "
@@ -56,91 +123,29 @@ def eventlog(host, lvl, m, service=None):
logger.warning("failed to write to logfile: %s", e)
msg_to_websockets("message", s)
def setup(cfg: dict):
"""Initialize notifier defaults from a configuration dict."""
global _config
_config = dict(cfg)
# ---------------------------------------------------------------------------
# Low-level channel drivers
# ---------------------------------------------------------------------------
def reload_config(cfg: dict):
"""Reload notification configuration.
This function updates the module-level notification configuration
during runtime config reloads.
Args:
cfg: New configuration dictionary
"""
global _config
_config = dict(cfg)
logger.info("Notification configuration reloaded")
def send_email(toaddrs, smtpserver, sender, subject, body, debug=0):
"""Send a plain email via SMTP. Returns True on success."""
try:
smtpport = _config.get("smtpport", 587)
server = smtplib.SMTP(smtpserver, smtpport)
if debug > 0:
server.set_debuglevel(1)
if smtpport == 587:
server.starttls()
server.ehlo()
smtpuser = _config.get("smtpuser", None)
smtppassword = _config.get("smtppassword", None)
if smtpuser and smtppassword:
server.login(smtpuser, smtppassword)
server.sendmail(sender, toaddrs, body)
except Exception as e:
logger.warning("email send failed: %s", e)
try:
server.quit()
except Exception:
pass
def _send_pushover(channel_cfg: dict, notif: Notification) -> bool:
import http.client
import urllib.parse
token = channel_cfg.get("token", "")
user = channel_cfg.get("user", "")
if not token or not user:
logger.warning("pushover: missing token or user")
return False
try:
server.quit()
except Exception:
pass
return True
def email(subject: str, msg: str, debug: int = 0) -> bool:
"""Convenience wrapper exposed to the rest of the application.
Uses module-level configuration to supply recipient list, smtp server
and sender address.
"""
toaddrs = _config.get("toemail")
fromemail = _config.get("fromemail")
smtpserver = _config.get("smtpserver")
if not toaddrs or not fromemail or not smtpserver:
logger.warning(
"email config incomplete: toemail=%s, fromemail=%s, smtpserver=%s",
toaddrs,
fromemail,
smtpserver,
)
return False
date = time.strftime("%a, %d %b %Y %H:%M:%S %z", time.localtime())
body = "To: %s\nFrom: %s\nSubject: %s\nDate: %s\n\n%s" % (
toaddrs[0] if toaddrs else "",
fromemail,
subject,
date,
msg,
)
return send_email(toaddrs, smtpserver, fromemail, subject, body, debug=debug)
def pushover(token: str, user: str, msg: str, debug: int = 0) -> bool:
"""Send message via Pushover API."""
params: dict = {"token": token, "user": user, "title": notif.title, "message": notif.body}
if notif.url:
params["url"] = notif.url
params["url_title"] = "Plugin metrics"
conn = http.client.HTTPSConnection("api.pushover.net:443")
try:
conn.request(
"POST",
"/1/messages.json",
urllib.parse.urlencode({"token": token, "user": user, "message": msg}),
urllib.parse.urlencode(params),
{"Content-type": "application/x-www-form-urlencoded"},
)
r = conn.getresponse()
@@ -151,176 +156,346 @@ def pushover(token: str, user: str, msg: str, debug: int = 0) -> bool:
return False
def pushmattermost(
host: str,
token: str,
channel: str,
msg: str,
username: str = "hbd",
icon: Optional[str] = None,
debug: int = 0,
) -> bool:
"""Send a message to Mattermost via simple webhook driver if available.
def _send_email(channel_cfg: dict, notif: Notification) -> bool:
recipients = channel_cfg.get("recipients", [])
sender = channel_cfg.get("sender", "")
smtp_server = channel_cfg.get("smtp_server", "")
smtp_port = channel_cfg.get("smtp_port", 587)
smtp_user = channel_cfg.get("smtp_user")
smtp_password = channel_cfg.get("smtp_password")
This helper tries to import mattermostdriver.Driver and uses webhooks if present.
If the import fails it returns False.
"""
if not recipients or not sender or not smtp_server:
logger.warning("email: missing recipients, sender, or smtp_server")
return False
date = time.strftime("%a, %d %b %Y %H:%M:%S %z", time.localtime())
body_text = notif.body
if notif.url:
body_text += f"\n\n{notif.url}"
raw = "To: %s\nFrom: %s\nSubject: %s\nDate: %s\n\n%s" % (
recipients[0] if isinstance(recipients, list) else recipients,
sender,
notif.title,
date,
body_text,
)
try:
server = smtplib.SMTP(smtp_server, smtp_port)
if smtp_port == 587:
server.starttls()
server.ehlo()
if smtp_user and smtp_password:
server.login(smtp_user, smtp_password)
server.sendmail(sender, recipients, raw)
server.quit()
return True
except Exception as e:
logger.warning("email send failed: %s", e)
try:
server.quit()
except Exception:
pass
return False
def _send_mattermost(channel_cfg: dict, notif: Notification) -> bool:
try:
from mattermostdriver import Driver
except Exception:
except ImportError:
logger.error("mattermostdriver not installed")
return False
host = channel_cfg.get("host", "")
token = channel_cfg.get("token", "")
channel = channel_cfg.get("channel", "")
if not host or not token or not channel:
logger.warning("mattermost: missing host, token, or channel")
return False
text = f"**{notif.title}**\n{notif.body}"
if notif.url:
text += f"\n[Plugin metrics]({notif.url})"
ses = {"url": host, "scheme": "http", "basepath": "/api/v4", "port": 8065}
mm = Driver(ses)
payload = {"text": msg, "channel": channel, "username": username}
payload: dict = {"text": text, "channel": channel, "username": channel_cfg.get("username", "hbd")}
icon = channel_cfg.get("icon")
if icon:
payload["icon_url"] = icon
try:
rc = mm.webhooks.call_webhook(token, payload)
logger.debug("mattermost rc: %s", rc)
return bool(rc is None or rc == "")
except Exception as e:
logger.error("mattermost error: %s", e)
return False
def pushsignal(
signal_cli_bin: str, user: str, recipient: str, msg: str, debug: int = 0
) -> bool:
"""Send a message via signal-cli (requires local installation).
Uses subprocess to call signal-cli. Returns True if the command succeeded.
"""
CLI = [signal_cli_bin, "-u", user, "send", "-m", msg, recipient]
logger.debug("signal cli: %s", CLI)
def _send_signal(channel_cfg: dict, notif: Notification) -> bool:
cli = channel_cfg.get("cli_path", "/usr/local/bin/signal-cli")
user = channel_cfg.get("user", "")
recipient = channel_cfg.get("recipient", "")
if not user or not recipient:
logger.warning("signal: missing user or recipient")
return False
msg = f"{notif.title}\n{notif.body}"
if notif.url:
msg += f"\n{notif.url}"
try:
res = subprocess.run(CLI, capture_output=True)
res = subprocess.run([cli, "-u", user, "send", "-m", msg, recipient], capture_output=True)
if res.returncode != 0:
logger.error("signal failed: %s".res.stderr.decode())
logger.error("signal failed: %s", res.stderr.decode())
return False
logger.debug("signal sent: %s", res.stdout.decode())
return True
except Exception as e:
logger.exception("signal exception: %s", e)
return False
def _dispatch_to_channel(channel_name: str, channel_config: dict, msg: str, debug: int = 0) -> bool:
"""Dispatch a message to a specific notification channel.
Args:
channel_name: Name of the channel (for logging)
channel_config: Channel configuration dictionary with 'type' and type-specific fields
msg: Message to send
debug: Debug level
Returns:
True if notification sent successfully, False otherwise
"""
channel_type = channel_config.get("type")
if channel_type == "pushover":
return pushover(
channel_config.get("token", ""),
channel_config.get("user", ""),
msg,
debug=debug
)
elif channel_type == "email":
# Build email from channel config
recipients = channel_config.get("recipients", [])
sender = channel_config.get("sender", "")
smtp_server = channel_config.get("smtp_server", "")
smtp_port = channel_config.get("smtp_port", 587)
smtp_user = channel_config.get("smtp_user")
smtp_password = channel_config.get("smtp_password")
if not recipients or not sender or not smtp_server:
logger.warning(
"Email channel '%s' missing required fields: recipients=%s, sender=%s, smtp_server=%s",
channel_name, recipients, sender, smtp_server
)
return False
# Temporarily update _config for email() function
old_config = dict(_config)
_config["toemail"] = recipients
_config["fromemail"] = sender
_config["smtpserver"] = smtp_server
_config["smtpport"] = smtp_port
if smtp_user:
_config["smtpuser"] = smtp_user
if smtp_password:
_config["smtppassword"] = smtp_password
result = email("Heartbeat notification", msg, debug=debug)
# Restore config
_config.clear()
_config.update(old_config)
return result
elif channel_type == "signal":
return pushsignal(
channel_config.get("cli_path", "/usr/local/bin/signal-cli"),
channel_config.get("user", ""),
channel_config.get("recipient", ""),
msg,
debug=debug
)
elif channel_type == "mattermost":
return pushmattermost(
channel_config.get("host", ""),
channel_config.get("token", ""),
channel_config.get("channel", ""),
msg,
username=channel_config.get("username", "hbd"),
icon=channel_config.get("icon"),
debug=debug
)
else:
logger.warning("Unknown channel type '%s' for channel '%s'", channel_type, channel_name)
async def _send_sms_voipms_async(channel_cfg: dict, notif: Notification) -> bool:
"""Send SMS via voip.ms REST API using multipart form-data POST."""
import json
import aiohttp
api_user = channel_cfg.get("api_user", "")
api_password = channel_cfg.get("api_password", "")
did = channel_cfg.get("did", "")
dst = channel_cfg.get("dst", "")
if not api_user or not api_password or not did or not dst:
logger.warning("sms_voipms: missing api_user, api_password, did, or dst")
return False
# SMS body: title + body, truncated to 160 chars
text = f"{notif.title}: {notif.body}"
if len(text) > 160:
text = text[:157] + "..."
form_data = {
"api_username": api_user,
"api_password": api_password,
"method": "sendSMS",
"did": did,
"dst": dst,
"message": text,
}
try:
async with aiohttp.ClientSession() as session:
with aiohttp.MultipartWriter("form-data") as mp:
for key, value in form_data.items():
part = mp.append(value)
part.set_content_disposition("form-data", name=key)
async with session.post("https://voip.ms/api/v1/rest.php", data=mp) as resp:
body = await resp.text()
if resp.status != 200:
logger.error("sms_voipms HTTP %s: %s", resp.status, body)
return False
result = json.loads(body)
if result.get("status") == "success":
return True
logger.error("sms_voipms error: %s", result.get("status"))
return False
except Exception as e:
logger.error("sms_voipms exception: %s", e)
return False
def pushmsg_for_host(hostname: str, msg: str, debug: int = 0) -> dict:
"""Send notification for a specific host using its configured channels.
This function looks up the host's notification channels from the config
and sends the message to those channels.
Args:
hostname: Name of the host to send notification for
msg: Message to send
debug: Debug level
Returns:
Dictionary of results per channel: {"channel_name": True/False}
def _send_sms_voipms(channel_cfg: dict, notif: Notification) -> bool:
"""Dispatch voip.ms SMS send onto the shared event loop."""
if _loop is None:
logger.warning("sms_voipms: event loop not available")
return False
future = asyncio.run_coroutine_threadsafe(_send_sms_voipms_async(channel_cfg, notif), _loop)
try:
return future.result(timeout=15)
except Exception as e:
logger.error("sms_voipms send timed out or failed: %s", e)
return False
async def _send_matrix_async(channel_cfg: dict, notif: Notification) -> bool:
"""Send a Matrix message using matrix-nio."""
try:
from nio import AsyncClient, RoomMessageText # noqa: F401
except ImportError:
logger.error("matrix-nio not installed; pip install matrix-nio")
return False
from nio import AsyncClient
homeserver = channel_cfg.get("homeserver", "")
access_token = channel_cfg.get("access_token", "")
room_id = channel_cfg.get("room_id", "")
if not homeserver or not access_token or not room_id:
logger.warning("matrix: missing homeserver, access_token, or room_id")
return False
text = f"{notif.title}\n{notif.body}"
if notif.url:
text += f"\n{notif.url}"
html = f"<strong>{notif.title}</strong><br>{notif.body}"
if notif.url:
html += f'<br><a href="{notif.url}">Plugin metrics</a>'
client = AsyncClient(homeserver)
client.access_token = access_token
try:
from nio import RoomSendResponse
content = {
"msgtype": "m.text",
"body": text,
"format": "org.matrix.custom.html",
"formatted_body": html,
}
resp = await client.room_send(room_id, "m.room.message", content)
if hasattr(resp, "event_id"):
return True
logger.error("matrix send failed: %s", resp)
return False
except Exception as e:
logger.error("matrix exception: %s", e)
return False
finally:
await client.close()
def _send_matrix(channel_cfg: dict, notif: Notification) -> bool:
"""Dispatch matrix send onto the shared event loop."""
if _loop is None:
logger.warning("matrix: event loop not available")
return False
future = asyncio.run_coroutine_threadsafe(_send_matrix_async(channel_cfg, notif), _loop)
try:
return future.result(timeout=15)
except Exception as e:
logger.error("matrix send timed out or failed: %s", e)
return False
# ---------------------------------------------------------------------------
# Channel dispatcher
# ---------------------------------------------------------------------------
_DRIVERS = {
"pushover": _send_pushover,
"email": _send_email,
"mattermost": _send_mattermost,
"signal": _send_signal,
"sms_voipms": _send_sms_voipms,
"matrix": _send_matrix,
}
def _dispatch_to_channel(channel_name: str, channel_cfg: dict, notif: Notification) -> bool:
"""Send *notif* to a single named channel, honouring min_level."""
min_level = channel_cfg.get("min_level", "WARNING").upper()
if _level_value(notif.level) < _level_value(min_level):
logger.debug(
"channel '%s': skipping level %s (min_level=%s)", channel_name, notif.level, min_level
)
return True # not an error — filtered intentionally
ch_type = channel_cfg.get("type", "")
driver = _DRIVERS.get(ch_type)
if driver is None:
logger.warning("unknown channel type '%s' for channel '%s'", ch_type, channel_name)
return False
return driver(channel_cfg, notif)
# ---------------------------------------------------------------------------
# Central dispatch function
# ---------------------------------------------------------------------------
def _build_url(host_name: str) -> str:
base_url = _config.get("base_url", "").rstrip("/")
if not base_url:
return ""
return f"{base_url}/plugins#{host_name}"
def send_notification(host_name: str, notif: Notification) -> dict:
"""Dispatch *notif* to all managers/owner of *host_name*.
Looks up the host's owner + managers, resolves each user's
notification_channels, and dispatches. Silently does nothing if
no users are configured.
Returns a dict of {channel_name: bool} results.
"""
from . import config as config_mod
# Get notification channels for this host
channels = config_mod.get_notification_channels_config(_config, hostname)
if not channels:
logger.warning("No notification channels configured for host '%s'", hostname)
from . import users as users_mod
from . import hbdclass
if not users_mod.users_enabled():
return {}
# Dispatch to each channel
results = {}
for channel_name, channel_config in channels:
try:
success = _dispatch_to_channel(channel_name, channel_config, msg, debug=debug)
results[channel_name] = success
if success:
logger.info("Notification sent to channel '%s': %s", channel_name, msg)
else:
logger.warning("Failed to send notification to channel '%s'", channel_name)
except Exception as e:
logger.error("Error sending to channel '%s': %s", channel_name, e)
results[channel_name] = False
# Collect recipient usernames: owner + managers
host = hbdclass.Host.hosts.get(host_name)
if host is None:
logger.debug("send_notification: host '%s' not found", host_name)
return {}
recipients: set[str] = set()
owner = getattr(host, "owner", None)
if owner:
recipients.add(owner)
for m in getattr(host, "managers", []):
recipients.add(m)
if not recipients:
logger.debug("send_notification: no owner/managers for '%s'", host_name)
return {}
# Fill url if not already set
if not notif.url:
notif.url = _build_url(host_name)
global_channels: dict = _config.get("notification_channels", {})
results: dict = {}
level = notif.level.upper()
is_alert = level in ("WARNING", "CRITICAL")
is_recover = level in ("RECOVER",)
# For RECOVER: send to every channel that previously fired an alert for this host,
# regardless of that channel's min_level.
if is_recover and host_name in _alerted_channels:
for channel_name in list(_alerted_channels[host_name]):
channel_cfg = global_channels.get(channel_name)
if not channel_cfg:
continue
try:
ch_type = channel_cfg.get("type", "")
driver = _DRIVERS.get(ch_type)
if driver:
ok = driver(channel_cfg, notif)
results[channel_name] = ok
if ok:
logger.info("recover sent to channel '%s': %s", channel_name, notif.title)
except Exception as e:
logger.error("error sending recover to channel '%s': %s", channel_name, e)
# Clear the alerted set once recovery is delivered
del _alerted_channels[host_name]
return results
for username in recipients:
user = users_mod.get_user(username)
if user is None:
logger.debug("send_notification: user '%s' not found", username)
continue
for channel_name in user.notification_channels:
if channel_name in results:
continue # already dispatched to this channel this notification
channel_cfg = global_channels.get(channel_name)
if not channel_cfg:
logger.warning("channel '%s' not defined in notification_channels", channel_name)
results[channel_name] = False
continue
try:
ok = _dispatch_to_channel(channel_name, channel_cfg, notif)
results[channel_name] = ok
if ok:
logger.info("notification sent to channel '%s': %s", channel_name, notif.title)
if is_alert:
_alerted_channels.setdefault(host_name, set()).add(channel_name)
else:
logger.warning("failed to send notification to channel '%s'", channel_name)
except Exception as e:
logger.error("error sending to channel '%s': %s", channel_name, e)
results[channel_name] = False
return results
+56 -7
View File
@@ -81,7 +81,7 @@
#ntable th {
border: 1px solid #e0e0e0;
text-align: left;
padding: 8px 10px;
padding: 2px 4px;
}
#ntable tr:nth-child(even) {
@@ -92,8 +92,24 @@
background-color: #e3f2fd;
}
#ntable tbody tr.row-warning {
background-color: #fff8c5;
}
#ntable tbody tr.row-critical {
background-color: #fde8e8;
}
#ntable tbody tr.row-warning:hover {
background-color: #fff0a0;
}
#ntable tbody tr.row-critical:hover {
background-color: #f9c8c8;
}
#ntable th {
padding: 12px 10px;
padding: 6px 8px;
background-color: #2196f3;
color: white;
font-weight: 600;
@@ -143,7 +159,7 @@
/* Message styling */
#messages {
font-size: 0.85em;
line-height: 1.6;
line-height: 1.0;
}
#messages div {
@@ -217,6 +233,19 @@
}
}
function updateRowAlert(row, data) {
var criticalUnacked = data.alert_critical_unacked || 0;
var criticalAcked = data.alert_critical_acked || 0;
var warningUnacked = data.alert_warning_unacked || 0;
var warningAcked = data.alert_warning_acked || 0;
row.classList.remove('row-warning', 'row-critical');
if (criticalUnacked > 0 || criticalAcked > 0) {
row.classList.add('row-critical');
} else if (warningUnacked > 0 || warningAcked > 0) {
row.classList.add('row-warning');
}
}
function createRow(data) {
var row = document.createElement("tr");
var c_name = document.createElement("td");
@@ -284,12 +313,31 @@
var table = document.getElementById("ntablebody"); // find table to append to
table.appendChild(row); // append row to table
name_idx[c_name] = row;
updateRowAlert(row, data);
}
function formatTS(ts) {
const milliseconds = ts * 1000;
const dateObject = new Date(milliseconds);
return dateObject.toLocaleString("de-DE");
const now = new Date();
const d = new Date(ts * 1000);
const pad = n => String(n).padStart(2, '0');
const timeStr = `${pad(d.getHours())}:${pad(d.getMinutes())}:${pad(d.getSeconds())}`;
// Same calendar day → show time only
if (d.toDateString() === now.toDateString()) {
return timeStr;
}
// Within 8 days → show "-X d hh:mm:ss"
const todayStart = new Date(now.getFullYear(), now.getMonth(), now.getDate());
const dStart = new Date(d.getFullYear(), d.getMonth(), d.getDate());
const diffDays = Math.round((todayStart - dStart) / 86400000);
if (diffDays < 8) {
return `-${diffDays}d ${timeStr}`;
}
// Older → date only
return `${d.getFullYear()}-${pad(d.getMonth() + 1)}-${pad(d.getDate())}`;
}
function update_table(data) {
@@ -345,6 +393,7 @@
name_idx[data.name].cells[4 + i * 4].innerHTML = state;
name_idx[data.name].cells[5 + i * 4].innerHTML = latency;
}
updateRowAlert(name_idx[data.name], data);
}
function WS_Connect() {
@@ -427,7 +476,7 @@
</thead>
<tbody id="ntablebody">
{% for host in hosts %}
<tr>
<tr class="{% if host.alert_critical_unacked > 0 or host.alert_critical_acked > 0 %}row-critical{% elif host.alert_warning_unacked > 0 or host.alert_warning_acked > 0 %}row-warning{% endif %}">
<td>{{ host.name }}</td>
<td style="text-align: center; color: #ff9800; font-weight: bold;">
{%- set warning_unacked = host.alert_warning_unacked -%}
+16 -4
View File
@@ -1003,9 +1003,15 @@ class ThresholdChecker:
value: Any,
):
"""Send notification and log to journal/eventlog."""
# Send notification using host-specific channels
try:
notify_mod.pushmsg_for_host(host_name, f"{lvl}: {host_name} - {message}")
notify_mod.send_notification(
host_name,
notify_mod.Notification(
title=f"[{lvl}] {host_name}",
body=message,
level=lvl,
),
)
logger.info("Notification sent: %s", message)
except Exception as e:
logger.error("Failed to send notification: %s", e)
@@ -1137,9 +1143,15 @@ class ThresholdChecker:
else:
message = f"REMINDER ({alert_state.level.name}): {host_name} - {metric_path} = {value} (ongoing for {int(now - alert_state.since)}s)"
# Send re-notification using host-specific channels
try:
notify_mod.pushmsg_for_host(host_name, message)
notify_mod.send_notification(
host_name,
notify_mod.Notification(
title=f"[REMINDER/{alert_state.level.name}] {host_name}",
body=message,
level=alert_state.level.name,
),
)
alert_state.last_notification = now
alert_state.notification_count += 1
logger.info("Re-notification sent: %s", message)
+27 -24
View File
@@ -171,7 +171,7 @@ def dicttos(ID, d):
DROPOVERDUE = 7 * 24 * 3600 # seconds before an overdue host becomes UNKNOWN
def _make_timer_callbacks(uname, host, watchhosts, ctx):
def _make_timer_callbacks(uname, host, ctx):
"""Return (on_overdue, on_unknown) async callbacks for connection timer logic.
Captured values are bound at call time so callbacks are safe to use in loops.
@@ -191,9 +191,11 @@ def _make_timer_callbacks(uname, host, watchhosts, ctx):
now = time.time()
connection.newstate(connection.__class__.OVERDUE, now, cfg.get("grace", 2))
msg = f"{connection.afam} overdue"
eventlog(uname, "CRITICAL" if uname in watchhosts else "WARNING", msg)
if uname in watchhosts:
notify_mod.pushmsg_for_host(uname, f"{uname} {msg}")
eventlog(uname, "CRITICAL", msg)
notify_mod.send_notification(
uname,
notify_mod.Notification(title=f"[CRITICAL] {uname}", body=msg, level="CRITICAL"),
)
if threshold_checker:
threshold_checker.check_value(
host_name=uname,
@@ -218,8 +220,6 @@ def restore_connection_timers(hbdclass, ctx):
now = time.time()
cfg = ctx.get("config", {})
grace = cfg.get("grace", 2)
from . import config as config_mod
watchhosts = config_mod.get_watchhosts(cfg)
restored = 0
for uname, host in list(hbdclass.Host.hosts.items()):
@@ -229,7 +229,7 @@ def restore_connection_timers(hbdclass, ctx):
if state == hbdclass.Connection.DOWN:
continue
on_overdue, on_unknown = _make_timer_callbacks(uname, host, watchhosts, ctx)
on_overdue, on_unknown = _make_timer_callbacks(uname, host, ctx)
if state == hbdclass.Connection.UP and interval > 0:
elapsed = now - conn.lastbeat
@@ -322,9 +322,6 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict):
host = hbdcls.Host.hosts[uname]
newh = False
# Get watchhosts once for use throughout message handling
watchhosts = config_mod.get_watchhosts(cfg)
cid = msg.get("id", 0)
try:
rtt = float(msg.get("rtt"))
@@ -390,8 +387,10 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict):
if res:
eventlog(uname, "WARNING", res)
if uname in watchhosts:
notify_mod.pushmsg_for_host(uname, "%s %s" % (host.name, res))
notify_mod.send_notification(
uname,
notify_mod.Notification(title=f"[WARNING] {uname}", body=res, level="WARNING"),
)
interval = int(msg.get("interval", 0) or 0)
shutdown = msg.get("shutdown", 0)
@@ -401,13 +400,12 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict):
if boot:
eventlog(uname, "INFO", "booted")
if uname in watchhosts:
m = "%s booted" % (host.name)
notify_mod.pushmsg_for_host(uname, m)
notify_mod.send_notification(
uname,
notify_mod.Notification(title=f"[INFO] {uname}", body=f"{host.name} booted", level="INFO"),
)
if message:
eventlog(uname, "INFO", "msg: %s" % message, service=service)
if uname in watchhosts:
notify_mod.pushmsg_for_host(uname, message)
if conn.getstate() != hbdcls.Connection.UP:
lasts = conn.state
@@ -420,8 +418,10 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict):
else:
m = "%s back after being %s for %s" % (conn.afam, lasts, dur(d))
eventlog(uname, "RECOVER", m)
if uname in watchhosts:
notify_mod.pushmsg_for_host(uname, "%s %s is back" % (uname, conn.afam))
notify_mod.send_notification(
uname,
notify_mod.Notification(title=f"[RECOVER] {uname}", body=m, level="RECOVER"),
)
if boot or newh:
host.upcount = host.doesack
@@ -429,20 +429,23 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict):
host.upcount += 1
if shutdown:
eventlog(uname, "INFO", "%s shutdown" % conn.afam)
if uname in watchhosts:
notify_mod.pushmsg_for_host(uname, "%s %s shutdown" % (uname, conn.afam))
m = "%s shutdown" % conn.afam
eventlog(uname, "INFO", m)
notify_mod.send_notification(
uname,
notify_mod.Notification(title=f"[INFO] {uname}", body=m, level="INFO"),
)
conn.newstate(hbdcls.Connection.DOWN, now)
if interval > 0:
host.interval = interval
# Timer-based reachability monitoring
# Reset overdue timer on every heartbeat
if interval > 0 and conn.getstate() != hbdcls.Connection.DOWN:
grace = cfg.get("grace", 2)
timeout_seconds = interval + grace
on_overdue, _ = _make_timer_callbacks(uname, host, watchhosts, ctx)
on_overdue, _ = _make_timer_callbacks(uname, host, ctx)
conn.reset_overdue_timer(timeout_seconds, on_overdue)
# Check RTT thresholds using the threshold checker