diff --git a/docs/PLUGIN_DEVELOPMENT.md b/docs/PLUGIN_DEVELOPMENT.md index 0297b23..ed6caf0 100644 --- a/docs/PLUGIN_DEVELOPMENT.md +++ b/docs/PLUGIN_DEVELOPMENT.md @@ -8,6 +8,7 @@ This guide explains how to create custom plugins for the Heartbeat monitoring sy - [Plugin Types](#plugin-types) - [Creating a Plugin](#creating-a-plugin) - [Plugin Lifecycle](#plugin-lifecycle) +- [Server-initiated InfoPlugin refresh](#server-initiated-infoplugin-refresh) - [Configuration](#configuration) - [Best Practices](#best-practices) - [Examples](#examples) @@ -250,6 +251,28 @@ Understanding the plugin lifecycle helps you implement plugins correctly: └─> Plugin releases resources, closes connections ``` +## Server-initiated InfoPlugin refresh + +When a heartbeat packet arrives from a host the server has no plugin data for (e.g. after a server restart), the server sets `request_update = 1` in the ACK reply. The client detects this flag and immediately re-runs all InfoPlugins — clearing their cached results first — then resends the data as PLG messages. + +This means InfoPlugin data will always reach the server as soon as possible without requiring a client restart. No action is needed from plugin authors: the framework handles cache invalidation and re-collection automatically. + +The lifecycle for this case looks like: + +``` +Server restarts, host reconnects + └─> hbd receives HTB with no existing plugin_data for host + └─> hbd sets request_update=1 in ACK + +Client receives ACK + └─> Detects request_update flag + └─> Clears _cache on every registered InfoPlugin + └─> Calls collect() on each InfoPlugin + └─> Sends fresh PLG messages to server +``` + +If you write an `InfoPlugin` with side effects in `_collect_info()` (opening connections, writing files, etc.), be aware it may be called more than once per client session when this mechanism triggers. + ## Configuration ### Plugin-Specific Configuration diff --git a/docs/USERS.md b/docs/USERS.md index af34350..e5f114c 100644 --- a/docs/USERS.md +++ b/docs/USERS.md @@ -46,6 +46,24 @@ default_owner: andreas # owns hosts with no explicit owner # falls back to the first admin user if omitted ``` +### Client-declared host ownership + +A host can declare its own owner directly in the hbc or hbc_mini client configuration. This is useful for hosts that are not listed in the server config, or during initial setup before a server-side config entry has been created. + +**`~/.hbc.yaml`** (hbc): +```yaml +owner: andreas +``` + +**`~/.hbc.json`** (hbc_mini): +```json +{ "owner": "andreas" } +``` + +When set, the value is included in the `os_info` plugin data sent to the server. The server applies it as `host.owner` the first time `os_info` arrives, provided no owner has been configured server-side for that host. Server-configured ownership always takes precedence. + +--- + ### Assigning roles to hosts ```yaml diff --git a/hbd/client/main.py b/hbd/client/main.py index 3b246ed..2b5ebbe 100644 --- a/hbd/client/main.py +++ b/hbd/client/main.py @@ -59,6 +59,7 @@ class AsyncConnection: self._dead = False self._ever_opened = False self._open_fail_count = 0 # consecutive failures before first success + self.request_info_event: asyncio.Event = asyncio.Event() self.logger = logging.getLogger(f"hbc.conn.{addr}") @@ -138,6 +139,9 @@ class AsyncConnection: self.ackcount += 1 self.logger.debug(f"ACK received, RTT: {rtt:.1f}ms") + if msg.get("request_update"): + self.logger.info("server requested plugin info refresh") + self.request_info_event.set() class HeartbeatProtocol(asyncio.DatagramProtocol): @@ -338,15 +342,35 @@ async def heartbeat_sender(conn: AsyncConnection, interval: int): raise +async def _info_plugin_refresh_loop(conn: AsyncConnection, info_plugins: List): + """Wait for server requests to re-send InfoPlugin data.""" + logger = logging.getLogger("hbc.plugins") + while running: + await conn.request_info_event.wait() + if not running: + break + conn.request_info_event.clear() + logger.info("refreshing InfoPlugins on server request") + for plugin in info_plugins: + plugin._cache = None + try: + data = await plugin.collect() + if data: + await conn.sendto({"plugin": plugin.name, **data}, "PLG") + logger.info(f"Resent {plugin.name} data") + except Exception as e: + logger.error(f"Error re-collecting {plugin.name}: {e}", exc_info=True) + + async def plugin_collector(conn: AsyncConnection, registry: PluginRegistry): """Collect and send plugin data. - + Args: conn: Connection to send on registry: Plugin registry """ logger = logging.getLogger("hbc.plugins") - + # Collect InfoPlugins once at startup info_plugins = registry.get_by_type(InfoPlugin) for plugin in info_plugins: @@ -359,34 +383,31 @@ async def plugin_collector(conn: AsyncConnection, registry: PluginRegistry): logger.info(f"Sent {plugin.name} data") except Exception as e: logger.error(f"Error collecting {plugin.name}: {e}", exc_info=True) - + # Schedule MonitorPlugins # Group plugins by interval from collections import defaultdict by_interval = defaultdict(list) - + monitor_plugins = registry.get_by_type(MonitorPlugin) for plugin in monitor_plugins: by_interval[plugin.interval].append(plugin) - - # Create tasks for each interval - tasks = [] + + # Create tasks for each interval; always include the info-refresh watcher + tasks = [asyncio.create_task(_info_plugin_refresh_loop(conn, info_plugins))] for interval, plugins in by_interval.items(): - task = asyncio.create_task( + tasks.append(asyncio.create_task( plugin_collector_interval(conn, plugins, interval) - ) - tasks.append(task) - - # Wait for all tasks - if tasks: - try: - await asyncio.gather(*tasks, return_exceptions=True) - except asyncio.CancelledError: - logger.debug("Plugin collector cancelled, cancelling sub-tasks") - for task in tasks: - if not task.done(): - task.cancel() - raise + )) + + try: + await asyncio.gather(*tasks, return_exceptions=True) + except asyncio.CancelledError: + logger.debug("Plugin collector cancelled, cancelling sub-tasks") + for task in tasks: + if not task.done(): + task.cancel() + raise async def plugin_collector_interval( diff --git a/hbd/server/udp.py b/hbd/server/udp.py index 1de0b21..def7be9 100644 --- a/hbd/server/udp.py +++ b/hbd/server/udp.py @@ -350,8 +350,10 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict): if msg.get("ID") == "HTB": host.doesack = msg.get("acks", -1) - # send ACK back + # send ACK back; ask client to resend plugin info when we have none yet rmsg = {"time": time.time()} + if not host.plugin_data: + rmsg["request_update"] = 1 opkt = dicttos("ACK", rmsg) try: transport.sendto(opkt, addr) diff --git a/scripts/hbc_mini.py b/scripts/hbc_mini.py index 494d453..9e8748a 100755 --- a/scripts/hbc_mini.py +++ b/scripts/hbc_mini.py @@ -791,7 +791,7 @@ class _HeartbeatProtocol(asyncio.DatagramProtocol): msg_id = msg.get("ID") now = time.time() if msg_id == "ACK": - self._conn._handle_ack(now) + self._conn._handle_ack(msg, now) elif msg_id == "CMD": asyncio.create_task(_handle_command(self._conn, msg)) elif msg_id == "UPD": @@ -818,6 +818,7 @@ class AsyncConnection: self.rtts: List[float] = [0.0] self._transport: Optional[asyncio.DatagramTransport] = None self._dead = False + self._request_info: asyncio.Event = asyncio.Event() self._log = logging.getLogger(f"hbc.conn.{addr}") async def open(self) -> bool: @@ -836,12 +837,14 @@ class AsyncConnection: self._transport.close() self._transport = None - def _handle_ack(self, now: float): + def _handle_ack(self, msg: Dict[str, Any], now: float): rtt = (now - self.lastsend) * 1000.0 self.rtts.append(rtt) if len(self.rtts) > 10: self.rtts.pop(0) self.ackcount += 1 + if msg.get("request_update"): + self._request_info.set() async def sendto(self, msg: Dict[str, Any], msg_id: str = "HTB"): if self._dead: @@ -974,6 +977,19 @@ async def _run_monitor_group(conn: AsyncConnection, plugins: List[Plugin], inter await _sleep(interval) +async def _info_refresh_loop(conn: AsyncConnection, info: List[Plugin]): + log = logging.getLogger("hbc.plugins") + while _running: + await conn._request_info.wait() + if not _running: + break + conn._request_info.clear() + log.info("refreshing InfoPlugins on server request") + for plugin in info: + plugin._cache = None + await _run_info_plugins(conn, info) + + async def _plugin_collector(conn: AsyncConnection, plugins: List[Plugin]): info = [p for p in plugins if isinstance(p, InfoPlugin)] monitor = [p for p in plugins if isinstance(p, MonitorPlugin)] @@ -984,12 +1000,10 @@ async def _plugin_collector(conn: AsyncConnection, plugins: List[Plugin]): for p in monitor: by_interval[p.interval].append(p) - if by_interval: - await asyncio.gather( - *[asyncio.create_task(_run_monitor_group(conn, grp, iv)) - for iv, grp in by_interval.items()], - return_exceptions=True, - ) + tasks = [asyncio.create_task(_info_refresh_loop(conn, info))] + tasks += [asyncio.create_task(_run_monitor_group(conn, grp, iv)) + for iv, grp in by_interval.items()] + await asyncio.gather(*tasks, return_exceptions=True) # ---------------------------------------------------------------------------