Compare commits

..

8 Commits

Author SHA1 Message Date
andreas 4349ae217a version 5.2.4
Release / release (push) Successful in 5s
2026-05-08 08:50:06 -04:00
andreas b3aa7b585f udp/config: fall back to default_owner when os_info has no owner; log debug
- When os_info arrives with no owner field, apply default_owner from server config
- Stop applying default_owner unconditionally in get_host_access (now deferred to os_info handling)
- os_info plugin logs debug message when injecting owner from client config

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-08 08:49:42 -04:00
andreas 88a3c09b51 hbc/server: request InfoPlugin refresh when host has no plugin data; update docs
- Server sets request_update=1 in ACK when host.plugin_data is empty
- hbc: AsyncConnection.request_info_event; handle_ack sets it on request_update
- hbc: _info_plugin_refresh_loop clears InfoPlugin caches and resends on demand
- hbc_mini: same via _request_info event and _info_refresh_loop
- docs/USERS.md: document client-declared owner config key
- docs/PLUGIN_DEVELOPMENT.md: document server-initiated InfoPlugin refresh

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-08 07:37:41 -04:00
andreas 0504402a8a hbc/hbc_mini: add owner config; include in os_info; server applies to host
- owner: optional top-level config key in ~/.hbc.yaml / ~/.hbc.json
- Propagated into plugin configs at load time so os_info can include it
- os_info PLG data carries owner field when set
- udp: sets host.owner from os_info if not already configured server-side
- live.html: format event log timestamps as YYYY-MM-DD HH:MM:SS (24-hour)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-08 07:25:47 -04:00
andreas ca58c18802 eventlog: store structured dicts; filter by user; clock: fix minute hand step
- eventlog() now stores {ts, host, level, service, message} dicts instead of strings
- WebSocket sends/broadcasts filter event log messages by the user's managed hosts
- live.html renders structured log entries with level-coloured spans
- Swiss railway clock minute hand now holds until second hand reaches 12, then steps

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-08 07:00:17 -04:00
andreas 1ddc4b8132 threshold/alerts: strip _status_code suffix from displayed metric names
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-08 06:19:16 -04:00
andreas 5e1720ed32 notify: use plain URL in Mattermost plugin metrics link
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-07 10:43:18 -04:00
andreas 77f127fe60 hbc/hbc_mini: consolidate startup log into single line
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-07 10:33:31 -04:00
17 changed files with 187 additions and 55 deletions
+23
View File
@@ -8,6 +8,7 @@ This guide explains how to create custom plugins for the Heartbeat monitoring sy
- [Plugin Types](#plugin-types) - [Plugin Types](#plugin-types)
- [Creating a Plugin](#creating-a-plugin) - [Creating a Plugin](#creating-a-plugin)
- [Plugin Lifecycle](#plugin-lifecycle) - [Plugin Lifecycle](#plugin-lifecycle)
- [Server-initiated InfoPlugin refresh](#server-initiated-infoplugin-refresh)
- [Configuration](#configuration) - [Configuration](#configuration)
- [Best Practices](#best-practices) - [Best Practices](#best-practices)
- [Examples](#examples) - [Examples](#examples)
@@ -250,6 +251,28 @@ Understanding the plugin lifecycle helps you implement plugins correctly:
└─> Plugin releases resources, closes connections └─> Plugin releases resources, closes connections
``` ```
## Server-initiated InfoPlugin refresh
When a heartbeat packet arrives from a host the server has no plugin data for (e.g. after a server restart), the server sets `request_update = 1` in the ACK reply. The client detects this flag and immediately re-runs all InfoPlugins — clearing their cached results first — then resends the data as PLG messages.
This means InfoPlugin data will always reach the server as soon as possible without requiring a client restart. No action is needed from plugin authors: the framework handles cache invalidation and re-collection automatically.
The lifecycle for this case looks like:
```
Server restarts, host reconnects
└─> hbd receives HTB with no existing plugin_data for host
└─> hbd sets request_update=1 in ACK
Client receives ACK
└─> Detects request_update flag
└─> Clears _cache on every registered InfoPlugin
└─> Calls collect() on each InfoPlugin
└─> Sends fresh PLG messages to server
```
If you write an `InfoPlugin` with side effects in `_collect_info()` (opening connections, writing files, etc.), be aware it may be called more than once per client session when this mechanism triggers.
## Configuration ## Configuration
### Plugin-Specific Configuration ### Plugin-Specific Configuration
+18
View File
@@ -46,6 +46,24 @@ default_owner: andreas # owns hosts with no explicit owner
# falls back to the first admin user if omitted # falls back to the first admin user if omitted
``` ```
### Client-declared host ownership
A host can declare its own owner directly in the hbc or hbc_mini client configuration. This is useful for hosts that are not listed in the server config, or during initial setup before a server-side config entry has been created.
**`~/.hbc.yaml`** (hbc):
```yaml
owner: andreas
```
**`~/.hbc.json`** (hbc_mini):
```json
{ "owner": "andreas" }
```
When set, the value is included in the `os_info` plugin data sent to the server. The server applies it as `host.owner` the first time `os_info` arrives, provided no owner has been configured server-side for that host. Server-configured ownership always takes precedence.
---
### Assigning roles to hosts ### Assigning roles to hosts
```yaml ```yaml
+1 -1
View File
@@ -14,4 +14,4 @@ Install options:
""" """
__all__ = ["__version__"] __all__ = ["__version__"]
__version__ = "5.2.3" __version__ = "5.2.4"
+3
View File
@@ -16,6 +16,9 @@ CLIENT_DEFAULTS = {
"hb_port": 50003, # Port where hbd servers listen "hb_port": 50003, # Port where hbd servers listen
"interval": 10, # Heartbeat interval in seconds "interval": 10, # Heartbeat interval in seconds
# Host identity
"owner": None, # Optional username to set as this host's owner on the server
# Runtime flags # Runtime flags
"foreground": False, "foreground": False,
"verbose": False, "verbose": False,
+37 -18
View File
@@ -59,6 +59,7 @@ class AsyncConnection:
self._dead = False self._dead = False
self._ever_opened = False self._ever_opened = False
self._open_fail_count = 0 # consecutive failures before first success self._open_fail_count = 0 # consecutive failures before first success
self.request_info_event: asyncio.Event = asyncio.Event()
self.logger = logging.getLogger(f"hbc.conn.{addr}") self.logger = logging.getLogger(f"hbc.conn.{addr}")
@@ -138,6 +139,9 @@ class AsyncConnection:
self.ackcount += 1 self.ackcount += 1
self.logger.debug(f"ACK received, RTT: {rtt:.1f}ms") self.logger.debug(f"ACK received, RTT: {rtt:.1f}ms")
if msg.get("request_update"):
self.logger.info("server requested plugin info refresh")
self.request_info_event.set()
class HeartbeatProtocol(asyncio.DatagramProtocol): class HeartbeatProtocol(asyncio.DatagramProtocol):
@@ -338,6 +342,26 @@ async def heartbeat_sender(conn: AsyncConnection, interval: int):
raise raise
async def _info_plugin_refresh_loop(conn: AsyncConnection, info_plugins: List):
"""Wait for server requests to re-send InfoPlugin data."""
logger = logging.getLogger("hbc.plugins")
while running:
await conn.request_info_event.wait()
if not running:
break
conn.request_info_event.clear()
logger.info("refreshing InfoPlugins on server request")
for plugin in info_plugins:
plugin._cache = None
try:
data = await plugin.collect()
if data:
await conn.sendto({"plugin": plugin.name, **data}, "PLG")
logger.info(f"Resent {plugin.name} data")
except Exception as e:
logger.error(f"Error re-collecting {plugin.name}: {e}", exc_info=True)
async def plugin_collector(conn: AsyncConnection, registry: PluginRegistry): async def plugin_collector(conn: AsyncConnection, registry: PluginRegistry):
"""Collect and send plugin data. """Collect and send plugin data.
@@ -369,24 +393,21 @@ async def plugin_collector(conn: AsyncConnection, registry: PluginRegistry):
for plugin in monitor_plugins: for plugin in monitor_plugins:
by_interval[plugin.interval].append(plugin) by_interval[plugin.interval].append(plugin)
# Create tasks for each interval # Create tasks for each interval; always include the info-refresh watcher
tasks = [] tasks = [asyncio.create_task(_info_plugin_refresh_loop(conn, info_plugins))]
for interval, plugins in by_interval.items(): for interval, plugins in by_interval.items():
task = asyncio.create_task( tasks.append(asyncio.create_task(
plugin_collector_interval(conn, plugins, interval) plugin_collector_interval(conn, plugins, interval)
) ))
tasks.append(task)
# Wait for all tasks try:
if tasks: await asyncio.gather(*tasks, return_exceptions=True)
try: except asyncio.CancelledError:
await asyncio.gather(*tasks, return_exceptions=True) logger.debug("Plugin collector cancelled, cancelling sub-tasks")
except asyncio.CancelledError: for task in tasks:
logger.debug("Plugin collector cancelled, cancelling sub-tasks") if not task.done():
for task in tasks: task.cancel()
if not task.done(): raise
task.cancel()
raise
async def plugin_collector_interval( async def plugin_collector_interval(
@@ -495,9 +516,7 @@ async def async_main(args, config):
hb_port = config.get("hb_port", PORT) hb_port = config.get("hb_port", PORT)
interval = config.get("interval", INTERVAL) interval = config.get("interval", INTERVAL)
logger.info(f"hbc {__version__} starting on {iam}") logger.info(f"hbc {__version__} on {iam} -> {hb_hosts} port={hb_port}, interval={interval}s")
logger.info(f"Starting hbc for {iam} -> {hb_hosts}")
logger.info(f"Port: {hb_port}, Interval: {interval}s")
# Create connections # Create connections
connections = [] connections = []
+4 -1
View File
@@ -364,7 +364,10 @@ class PluginLoader:
# Instantiate plugin with config — check plugins subdict first, # Instantiate plugin with config — check plugins subdict first,
# then top-level keys (e.g. nagios_runner: ... at root of config). # then top-level keys (e.g. nagios_runner: ... at root of config).
plugin_instance_config = plugins_subconfig.get(obj.name) or raw_config.get(obj.name, {}) plugin_instance_config = dict(plugins_subconfig.get(obj.name) or raw_config.get(obj.name) or {})
# Propagate top-level owner so os_info (and any future plugin) can report it.
if "owner" in raw_config and "owner" not in plugin_instance_config:
plugin_instance_config["owner"] = raw_config["owner"]
plugin = obj(config=plugin_instance_config) plugin = obj(config=plugin_instance_config)
# Initialize plugin # Initialize plugin
+3
View File
@@ -62,6 +62,9 @@ class OSInfoPlugin(InfoPlugin):
"hbc_version": hbc_version, "hbc_version": hbc_version,
"hbc_type": "full", "hbc_type": "full",
} }
if self.config.get("owner"):
self.logger.debug(f"Adding owner from config: {self.config['owner']}")
data["owner"] = self.config["owner"]
# Add Linux-specific distribution info # Add Linux-specific distribution info
if platform.system() == "Linux": if platform.system() == "Linux":
+1 -1
View File
@@ -309,7 +309,7 @@ def get_host_access(config, hostname) -> dict:
""" """
host_cfg = get_host_config(config, hostname) host_cfg = get_host_config(config, hostname)
owner = host_cfg.get("owner") or get_default_owner(config) owner = host_cfg.get("owner") # or get_default_owner(config)
managers = host_cfg.get("managers", []) managers = host_cfg.get("managers", [])
if isinstance(managers, str): if isinstance(managers, str):
+10 -3
View File
@@ -106,11 +106,18 @@ def closelog():
def eventlog(host, lvl, m, service=None): def eventlog(host, lvl, m, service=None):
ts = time.time() ts = time.time()
msg = {
"ts": ts,
"host": host or None,
"level": lvl,
"service": service,
"message": m,
}
data.msgs.append(msg)
s = f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(ts))} {lvl} " s = f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(ts))} {lvl} "
if host: if host:
s += f"{host} " s += f"{host} "
s += m s += m
data.msgs.append(s)
logger.info(s) logger.info(s)
if logf: if logf:
try: try:
@@ -118,7 +125,7 @@ def eventlog(host, lvl, m, service=None):
logf.flush() logf.flush()
except Exception as e: except Exception as e:
logger.warning("failed to write to logfile: %s", e) logger.warning("failed to write to logfile: %s", e)
msg_to_websockets("message", s) msg_to_websockets("message", msg)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -209,7 +216,7 @@ def _send_mattermost(channel_cfg: dict, notif: Notification) -> bool:
return False return False
text = f"**{notif.title}**\n{notif.body}" text = f"**{notif.title}**\n{notif.body}"
if notif.url: if notif.url:
text += f"\n[Plugin metrics]({notif.url})" text += f"\n[Plugin metrics] {notif.url}"
ses = {"url": host, "scheme": "http", "basepath": "/api/v4", "port": 8065} ses = {"url": host, "scheme": "http", "basepath": "/api/v4", "port": 8065}
mm = Driver(ses) mm = Driver(ses)
payload: dict = {"text": text, "channel": channel, "username": channel_cfg.get("username", "hbd")} payload: dict = {"text": text, "channel": channel, "username": channel_cfg.get("username", "hbd")}
+1 -1
View File
@@ -438,7 +438,7 @@
<div class="alert-header"> <div class="alert-header">
<span class="alert-level ${level}">${alert.level}</span> <span class="alert-level ${level}">${alert.level}</span>
<a class="alert-hostname" href="/plugins#${alert.hostname}">${alert.hostname}</a> <a class="alert-hostname" href="/plugins#${alert.hostname}">${alert.hostname}</a>
<span class="alert-metric">${alert.metric_path.includes('.') ? alert.metric_path.slice(alert.metric_path.indexOf('.') + 1) : alert.metric_path}</span> <span class="alert-metric">${(alert.metric_path.includes('.') ? alert.metric_path.slice(alert.metric_path.indexOf('.') + 1) : alert.metric_path).replace(/_status_code$/, '')}</span>
</div> </div>
<div class="alert-details"> <div class="alert-details">
<span>${valueText}</span> <span>${valueText}</span>
+1 -1
View File
@@ -214,7 +214,7 @@
ctx.restore(); ctx.restore();
} }
hand((m + s / 60) / 60 * Math.PI * 2 - Math.PI / 2, hand((sFrac >= 58.5 ? m + 1 : m) / 60 * Math.PI * 2 - Math.PI / 2,
R * 0.88, -R * 0.12, SIZE * 0.027, '#222'); /* minute */ R * 0.88, -R * 0.12, SIZE * 0.027, '#222'); /* minute */
hand((h + m / 60) / 12 * Math.PI * 2 - Math.PI / 2, hand((h + m / 60) / 12 * Math.PI * 2 - Math.PI / 2,
R * 0.58, -R * 0.12, SIZE * 0.039, '#222'); /* hour */ R * 0.58, -R * 0.12, SIZE * 0.039, '#222'); /* hour */
+28 -2
View File
@@ -183,11 +183,24 @@
line-height: 1.0; line-height: 1.0;
} }
#messages div { #messages .log-entry {
padding: 5px 0; padding: 5px 0;
border-bottom: 1px solid #f0f0f0; border-bottom: 1px solid #f0f0f0;
display: flex;
gap: 0.5em;
align-items: baseline;
} }
.log-ts { color: #888; white-space: nowrap; }
.log-level { font-weight: bold; min-width: 6em; }
.log-host { font-weight: 600; }
.log-service { color: #888; }
.log-warning .log-level { color: #b8860b; }
.log-critical .log-level { color: #c00; }
.log-recover .log-level { color: #2a7a2a; }
.log-info .log-level { color: #555; }
/* Modal for connection status messages */ /* Modal for connection status messages */
.connection-modal { .connection-modal {
display: none; display: none;
@@ -460,7 +473,20 @@
update_table(state.data); update_table(state.data);
} else if (state.type == "message") { } else if (state.type == "message") {
var msgs = document.getElementById("messages"); var msgs = document.getElementById("messages");
msgs.insertAdjacentHTML("afterbegin", "<div>" + state.data + "</div>"); var msg = state.data;
var _d = new Date(msg.ts * 1000);
function _p(n) { return n < 10 ? '0' + n : '' + n; }
var ts_str = _d.getFullYear() + '-' + _p(_d.getMonth()+1) + '-' + _p(_d.getDate())
+ ' ' + _p(_d.getHours()) + ':' + _p(_d.getMinutes()) + ':' + _p(_d.getSeconds());
var lvl = (msg.level || "INFO").toLowerCase();
var html = '<div class="log-entry log-' + lvl + '">';
html += '<span class="log-ts">' + ts_str + '</span>';
html += '<span class="log-level">' + (msg.level || "") + '</span>';
if (msg.host) html += '<span class="log-host">' + msg.host + '</span>';
if (msg.service) html += '<span class="log-service">' + msg.service + '</span>';
html += '<span class="log-msg">' + msg.message + '</span>';
html += '</div>';
msgs.insertAdjacentHTML("afterbegin", html);
} }
cnt++; cnt++;
}; };
+4 -4
View File
@@ -1044,8 +1044,8 @@ class ThresholdChecker:
# Format operator symbol # Format operator symbol
op_symbol = threshold.operator.value op_symbol = threshold.operator.value
# Short metric label: strip the plugin-name prefix for readability # Short metric label: strip the plugin-name prefix and _status_code suffix
short_path = metric_path.partition(".")[2] or metric_path short_path = (metric_path.partition(".")[2] or metric_path).removesuffix("_status_code")
# Use a display-friendly value (inf is the sentinel for "overdue") # Use a display-friendly value (inf is the sentinel for "overdue")
import math import math
@@ -1109,7 +1109,7 @@ class ThresholdChecker:
if host is not None and not host.watched: if host is not None and not host.watched:
eventlog(host_name, lvl, message, service="threshold") eventlog(host_name, lvl, message, service="threshold")
return return
short_path = metric_path.partition(".")[2] or metric_path short_path = (metric_path.partition(".")[2] or metric_path).removesuffix("_status_code")
title = f"[{lvl}] {host_name} {short_path}" title = f"[{lvl}] {host_name} {short_path}"
# Strip the "metric = " prefix from message so body is just the value/detail # Strip the "metric = " prefix from message so body is just the value/detail
prefix = short_path + " = " prefix = short_path + " = "
@@ -1349,7 +1349,7 @@ class ThresholdChecker:
# Format operator symbol # Format operator symbol
op_symbol = threshold.operator.value op_symbol = threshold.operator.value
short_path = metric_path.partition(".")[2] or metric_path short_path = (metric_path.partition(".")[2] or metric_path).removesuffix("_status_code")
# Time to re-notify # Time to re-notify
if threshold_value is not None: if threshold_value is not None:
+8 -1
View File
@@ -350,8 +350,10 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict):
if msg.get("ID") == "HTB": if msg.get("ID") == "HTB":
host.doesack = msg.get("acks", -1) host.doesack = msg.get("acks", -1)
# send ACK back # send ACK back; ask client to resend plugin info when we have none yet
rmsg = {"time": time.time()} rmsg = {"time": time.time()}
if not host.plugin_data:
rmsg["request_update"] = 1
opkt = dicttos("ACK", rmsg) opkt = dicttos("ACK", rmsg)
try: try:
transport.sendto(opkt, addr) transport.sendto(opkt, addr)
@@ -368,6 +370,11 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict):
if k not in ("ID", "plugin", "id", "name")} if k not in ("ID", "plugin", "id", "name")}
# Store plugin data with timestamp # Store plugin data with timestamp
host.add_plugin_data(plugin_name, plugin_data, timestamp=now) host.add_plugin_data(plugin_name, plugin_data, timestamp=now)
# If os_info reports an owner and none is configured server-side, apply it
if plugin_name == "os_info":
if not host.owner:
host.owner = plugin_data.get("owner", config_mod.get_default_owner(cfg))
if DEBUG > 1: if DEBUG > 1:
print(f"Stored plugin data for {uname}: {plugin_name}") print(f"Stored plugin data for {uname}: {plugin_name}")
+6 -2
View File
@@ -85,11 +85,13 @@ async def handler(request):
except Exception as e: except Exception as e:
logger.error("Error sending initial hosts: %s", e) logger.error("Error sending initial hosts: %s", e)
# Send recent messages # Send recent messages, filtered to hosts this user may see
if data.msgs: if data.msgs:
try: try:
for m in data.msgs: for m in data.msgs:
await ws.send_str(json.dumps({"type": "message", "data": m})) host_name = m.get("host") if isinstance(m, dict) else None
if not host_name or _user_can_see_host(user, host_name):
await ws.send_str(json.dumps({"type": "message", "data": m}))
except Exception as e: except Exception as e:
logger.error("Error sending initial messages: %s", e) logger.error("Error sending initial messages: %s", e)
@@ -128,6 +130,8 @@ def broadcast(typ: str, payload) -> bool:
host_name: Optional[str] = None host_name: Optional[str] = None
if typ in ("host", "plugin"): if typ in ("host", "plugin"):
host_name = payload.get("raw_name") or payload.get("host") or payload.get("name") host_name = payload.get("raw_name") or payload.get("host") or payload.get("name")
elif typ == "message" and isinstance(payload, dict):
host_name = payload.get("host")
jmsg = json.dumps({"type": typ, "data": payload}) jmsg = json.dumps({"type": typ, "data": payload})
+1 -1
View File
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "hbd" name = "hbd"
version = "5.2.3" version = "5.2.4"
description = "Heartbeat monitoring system — client (hbc) and server (hbd)" description = "Heartbeat monitoring system — client (hbc) and server (hbd)"
readme = "README.md" readme = "README.md"
requires-python = ">=3.11" requires-python = ">=3.11"
+30 -11
View File
@@ -41,7 +41,7 @@ from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple from typing import Any, Dict, List, Optional, Tuple
# updated by scripts/bumpminor.sh # updated by scripts/bumpminor.sh
__version__ = "5.2.3" __version__ = "5.2.4"
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Protocol (mirrors hbd/common/proto.py) # Protocol (mirrors hbd/common/proto.py)
@@ -114,6 +114,7 @@ def _stodict(data: bytes) -> Dict[str, Any]:
_DEFAULTS: Dict[str, Any] = { _DEFAULTS: Dict[str, Any] = {
"hb_port": 50003, "hb_port": 50003,
"interval": 10, "interval": 10,
"owner": None,
"plugins": {}, "plugins": {},
} }
@@ -239,6 +240,8 @@ class OSInfoPlugin(InfoPlugin):
"hbc_version": __version__, "hbc_version": __version__,
"hbc_type": "mini", "hbc_type": "mini",
} }
if self.config.get("owner"):
data["owner"] = self.config["owner"]
if platform.system() == "Linux": if platform.system() == "Linux":
data.update(_linux_distro()) data.update(_linux_distro())
elif platform.system() == "Darwin": elif platform.system() == "Darwin":
@@ -716,7 +719,9 @@ async def _load_plugins(cfg: Dict[str, Any]) -> List[Plugin]:
plugins_cfg: Dict[str, Any] = cfg.get("plugins", {}) plugins_cfg: Dict[str, Any] = cfg.get("plugins", {})
loaded: List[Plugin] = [] loaded: List[Plugin] = []
for cls in _ALL_PLUGIN_CLASSES: for cls in _ALL_PLUGIN_CLASSES:
plugin_cfg = plugins_cfg.get(cls.name) or cfg.get(cls.name, {}) plugin_cfg = dict(plugins_cfg.get(cls.name) or cfg.get(cls.name) or {})
if "owner" in cfg and "owner" not in plugin_cfg:
plugin_cfg["owner"] = cfg["owner"]
plugin: Plugin = cls(config=plugin_cfg) plugin: Plugin = cls(config=plugin_cfg)
try: try:
ok = await plugin.initialize() ok = await plugin.initialize()
@@ -786,7 +791,7 @@ class _HeartbeatProtocol(asyncio.DatagramProtocol):
msg_id = msg.get("ID") msg_id = msg.get("ID")
now = time.time() now = time.time()
if msg_id == "ACK": if msg_id == "ACK":
self._conn._handle_ack(now) self._conn._handle_ack(msg, now)
elif msg_id == "CMD": elif msg_id == "CMD":
asyncio.create_task(_handle_command(self._conn, msg)) asyncio.create_task(_handle_command(self._conn, msg))
elif msg_id == "UPD": elif msg_id == "UPD":
@@ -813,6 +818,7 @@ class AsyncConnection:
self.rtts: List[float] = [0.0] self.rtts: List[float] = [0.0]
self._transport: Optional[asyncio.DatagramTransport] = None self._transport: Optional[asyncio.DatagramTransport] = None
self._dead = False self._dead = False
self._request_info: asyncio.Event = asyncio.Event()
self._log = logging.getLogger(f"hbc.conn.{addr}") self._log = logging.getLogger(f"hbc.conn.{addr}")
async def open(self) -> bool: async def open(self) -> bool:
@@ -831,12 +837,14 @@ class AsyncConnection:
self._transport.close() self._transport.close()
self._transport = None self._transport = None
def _handle_ack(self, now: float): def _handle_ack(self, msg: Dict[str, Any], now: float):
rtt = (now - self.lastsend) * 1000.0 rtt = (now - self.lastsend) * 1000.0
self.rtts.append(rtt) self.rtts.append(rtt)
if len(self.rtts) > 10: if len(self.rtts) > 10:
self.rtts.pop(0) self.rtts.pop(0)
self.ackcount += 1 self.ackcount += 1
if msg.get("request_update"):
self._request_info.set()
async def sendto(self, msg: Dict[str, Any], msg_id: str = "HTB"): async def sendto(self, msg: Dict[str, Any], msg_id: str = "HTB"):
if self._dead: if self._dead:
@@ -969,6 +977,19 @@ async def _run_monitor_group(conn: AsyncConnection, plugins: List[Plugin], inter
await _sleep(interval) await _sleep(interval)
async def _info_refresh_loop(conn: AsyncConnection, info: List[Plugin]):
log = logging.getLogger("hbc.plugins")
while _running:
await conn._request_info.wait()
if not _running:
break
conn._request_info.clear()
log.info("refreshing InfoPlugins on server request")
for plugin in info:
plugin._cache = None
await _run_info_plugins(conn, info)
async def _plugin_collector(conn: AsyncConnection, plugins: List[Plugin]): async def _plugin_collector(conn: AsyncConnection, plugins: List[Plugin]):
info = [p for p in plugins if isinstance(p, InfoPlugin)] info = [p for p in plugins if isinstance(p, InfoPlugin)]
monitor = [p for p in plugins if isinstance(p, MonitorPlugin)] monitor = [p for p in plugins if isinstance(p, MonitorPlugin)]
@@ -979,12 +1000,10 @@ async def _plugin_collector(conn: AsyncConnection, plugins: List[Plugin]):
for p in monitor: for p in monitor:
by_interval[p.interval].append(p) by_interval[p.interval].append(p)
if by_interval: tasks = [asyncio.create_task(_info_refresh_loop(conn, info))]
await asyncio.gather( tasks += [asyncio.create_task(_run_monitor_group(conn, grp, iv))
*[asyncio.create_task(_run_monitor_group(conn, grp, iv)) for iv, grp in by_interval.items()]
for iv, grp in by_interval.items()], await asyncio.gather(*tasks, return_exceptions=True)
return_exceptions=True,
)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -1038,7 +1057,7 @@ async def _async_main(args, cfg: Dict[str, Any]) -> int:
port = cfg.get("hb_port", PORT) port = cfg.get("hb_port", PORT)
interval = cfg.get("interval", INTERVAL) interval = cfg.get("interval", INTERVAL)
log.info("starting hbc_mini %s on %s -> %s port=%d interval=%ds",__version__, iam, args.hosts, port, interval) log.info("hbc_mini %s on %s -> %s port=%d interval=%ds",__version__, iam, args.hosts, port, interval)
connections: List[AsyncConnection] = [] connections: List[AsyncConnection] = []
conn_id = 1 conn_id = 1