hbc proper termination, hbd config reloadable
This commit is contained in:
+74
-22
@@ -28,12 +28,13 @@ from .plugin import PluginRegistry, PluginLoader, InfoPlugin, MonitorPlugin
|
||||
# Constants
|
||||
PORT = 50003
|
||||
INTERVAL = 10
|
||||
VER = 6
|
||||
MAXRECV = 32767
|
||||
|
||||
# Global state
|
||||
running = True
|
||||
dorestart = False
|
||||
shutdown_event: Optional[asyncio.Event] = None
|
||||
active_tasks: List[asyncio.Task] = []
|
||||
|
||||
|
||||
class AsyncConnection:
|
||||
@@ -101,7 +102,6 @@ class AsyncConnection:
|
||||
# Add standard fields
|
||||
msg["name"] = shortname(self.name)
|
||||
msg["id"] = self.conn_id
|
||||
msg["ver"] = VER
|
||||
msg["time"] = time.time()
|
||||
|
||||
# Encode message
|
||||
@@ -278,9 +278,25 @@ async def heartbeat_sender(conn: AsyncConnection, interval: int):
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error sending heartbeat: {e}", exc_info=True)
|
||||
except asyncio.CancelledError:
|
||||
logger.debug("Heartbeat sender cancelled")
|
||||
raise
|
||||
|
||||
# Wait for next interval
|
||||
await asyncio.sleep(interval)
|
||||
# Wait for next interval or shutdown event
|
||||
try:
|
||||
if shutdown_event:
|
||||
await asyncio.wait_for(
|
||||
shutdown_event.wait(),
|
||||
timeout=interval
|
||||
)
|
||||
break
|
||||
else:
|
||||
await asyncio.sleep(interval)
|
||||
except asyncio.TimeoutError:
|
||||
pass # Normal timeout, continue loop
|
||||
except asyncio.CancelledError:
|
||||
logger.debug("Heartbeat sender cancelled during sleep")
|
||||
raise
|
||||
|
||||
|
||||
async def plugin_collector(conn: AsyncConnection, registry: PluginRegistry):
|
||||
@@ -324,7 +340,14 @@ async def plugin_collector(conn: AsyncConnection, registry: PluginRegistry):
|
||||
|
||||
# Wait for all tasks
|
||||
if tasks:
|
||||
await asyncio.gather(*tasks)
|
||||
try:
|
||||
await asyncio.gather(*tasks, return_exceptions=True)
|
||||
except asyncio.CancelledError:
|
||||
logger.debug("Plugin collector cancelled, cancelling sub-tasks")
|
||||
for task in tasks:
|
||||
if not task.done():
|
||||
task.cancel()
|
||||
raise
|
||||
|
||||
|
||||
async def plugin_collector_interval(
|
||||
@@ -350,13 +373,30 @@ async def plugin_collector_interval(
|
||||
plugin_msg = {"plugin": plugin.name, **data}
|
||||
await conn.sendto(plugin_msg, "PLG")
|
||||
logger.debug(f"Sent {plugin.name} data")
|
||||
except asyncio.CancelledError:
|
||||
logger.debug("Plugin collector cancelled")
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Error collecting {plugin.name}: {e}",
|
||||
exc_info=True
|
||||
)
|
||||
|
||||
await asyncio.sleep(interval)
|
||||
# Wait for next interval or shutdown event
|
||||
try:
|
||||
if shutdown_event:
|
||||
await asyncio.wait_for(
|
||||
shutdown_event.wait(),
|
||||
timeout=interval
|
||||
)
|
||||
break
|
||||
else:
|
||||
await asyncio.sleep(interval)
|
||||
except asyncio.TimeoutError:
|
||||
pass # Normal timeout, continue loop
|
||||
except asyncio.CancelledError:
|
||||
logger.debug("Plugin collector cancelled during sleep")
|
||||
raise
|
||||
|
||||
|
||||
def shortname(name: str) -> str:
|
||||
@@ -368,6 +408,15 @@ def stop():
|
||||
"""Stop the event loop."""
|
||||
global running
|
||||
running = False
|
||||
|
||||
# Set shutdown event to wake up sleeping tasks
|
||||
if shutdown_event:
|
||||
shutdown_event.set()
|
||||
|
||||
# Cancel all active tasks
|
||||
for task in active_tasks:
|
||||
if not task.done():
|
||||
task.cancel()
|
||||
|
||||
|
||||
async def cleanup(connections: List[AsyncConnection]):
|
||||
@@ -393,7 +442,11 @@ async def cleanup(connections: List[AsyncConnection]):
|
||||
|
||||
async def async_main(args, config):
|
||||
"""Async main function."""
|
||||
global running
|
||||
global running, shutdown_event, active_tasks
|
||||
|
||||
# Create shutdown event
|
||||
shutdown_event = asyncio.Event()
|
||||
active_tasks = []
|
||||
|
||||
logger = logging.getLogger("hbc.main")
|
||||
|
||||
@@ -464,31 +517,30 @@ async def async_main(args, config):
|
||||
else:
|
||||
logger.warning(f"Plugin directory not found: {plugin_dir}")
|
||||
|
||||
# Start async tasks
|
||||
tasks = []
|
||||
|
||||
# Heartbeat senders (one per connection)
|
||||
for conn in connections:
|
||||
task = asyncio.create_task(heartbeat_sender(conn, interval))
|
||||
tasks.append(task)
|
||||
|
||||
# Plugin collector (uses all connections, but we'll use first one)
|
||||
if connections and registry.get_enabled():
|
||||
task = asyncio.create_task(plugin_collector(connections[0], registry))
|
||||
tasks.append(task)
|
||||
|
||||
# Setup signal handlers
|
||||
loop = asyncio.get_event_loop()
|
||||
for sig in (signal.SIGTERM, signal.SIGINT):
|
||||
loop.add_signal_handler(sig, stop)
|
||||
|
||||
# Start async tasks
|
||||
# Heartbeat senders (one per connection)
|
||||
for conn in connections:
|
||||
task = asyncio.create_task(heartbeat_sender(conn, interval))
|
||||
active_tasks.append(task)
|
||||
|
||||
# Plugin collector (uses all connections, but we'll use first one)
|
||||
if connections and registry.get_enabled():
|
||||
task = asyncio.create_task(plugin_collector(connections[0], registry))
|
||||
active_tasks.append(task)
|
||||
|
||||
# Wait for stop or tasks to complete
|
||||
try:
|
||||
await asyncio.gather(*tasks)
|
||||
await asyncio.gather(*active_tasks, return_exceptions=True)
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
logger.info("Tasks cancelled")
|
||||
|
||||
# Cleanup
|
||||
logger.info("Shutting down...")
|
||||
await cleanup(connections)
|
||||
await loader.unload_all()
|
||||
|
||||
|
||||
+2
-1
@@ -46,7 +46,8 @@ def main(argv=None):
|
||||
if args.debug > 0:
|
||||
config["debug"] = args.debug
|
||||
|
||||
run_server(config)
|
||||
# Pass config_path for reloading support
|
||||
run_server(config, config_path=args.configfile)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
"""Configuration loader and defaults for hbd (HeartBeat Daemon/Server)."""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
|
||||
@@ -95,6 +96,91 @@ def load_config(path=None):
|
||||
return cfg
|
||||
|
||||
|
||||
class ReloadableConfig:
|
||||
"""Thread-safe/async-safe configuration wrapper that supports runtime reloading.
|
||||
|
||||
This class wraps the configuration dictionary and provides:
|
||||
- Thread-safe config reloading via SIGHUP
|
||||
- Backward-compatible dict-like access
|
||||
- Async lock to prevent concurrent reloads
|
||||
"""
|
||||
|
||||
def __init__(self, initial_config, config_path=None):
|
||||
"""Initialize with initial configuration.
|
||||
|
||||
Args:
|
||||
initial_config: Initial configuration dictionary
|
||||
config_path: Path to config file for reloading (optional)
|
||||
"""
|
||||
self._config = initial_config
|
||||
self._config_path = config_path
|
||||
self._lock = asyncio.Lock()
|
||||
self._logger = logging.getLogger(__name__)
|
||||
|
||||
async def reload(self, config_path=None):
|
||||
"""Reload configuration from file.
|
||||
|
||||
Args:
|
||||
config_path: Path to config file (uses stored path if not provided)
|
||||
|
||||
Returns:
|
||||
New configuration dictionary
|
||||
|
||||
Raises:
|
||||
Exception if reload fails (keeps existing config)
|
||||
"""
|
||||
path = config_path or self._config_path
|
||||
if not path:
|
||||
raise ValueError("No config path specified for reload")
|
||||
|
||||
async with self._lock:
|
||||
try:
|
||||
# Load new config
|
||||
new_config = load_config(path)
|
||||
|
||||
# Store old config for rollback if needed
|
||||
old_config = self._config
|
||||
|
||||
# Update config
|
||||
self._config = new_config
|
||||
self._logger.info(f"Configuration reloaded from {path}")
|
||||
|
||||
return new_config
|
||||
except Exception as e:
|
||||
self._logger.error(f"Failed to reload config from {path}: {e}", exc_info=True)
|
||||
# Keep existing config on error
|
||||
raise
|
||||
|
||||
def get(self, key, default=None):
|
||||
"""Get a config value (dict-compatible)."""
|
||||
return self._config.get(key, default)
|
||||
|
||||
def __getitem__(self, key):
|
||||
"""Get a config value via subscript (dict-compatible)."""
|
||||
return self._config[key]
|
||||
|
||||
def __contains__(self, key):
|
||||
"""Check if key exists (dict-compatible)."""
|
||||
return key in self._config
|
||||
|
||||
def keys(self):
|
||||
"""Return config keys (dict-compatible)."""
|
||||
return self._config.keys()
|
||||
|
||||
def items(self):
|
||||
"""Return config items (dict-compatible)."""
|
||||
return self._config.items()
|
||||
|
||||
def values(self):
|
||||
"""Return config values (dict-compatible)."""
|
||||
return self._config.values()
|
||||
|
||||
@property
|
||||
def config(self):
|
||||
"""Get the underlying config dict (for components that need full dict)."""
|
||||
return self._config
|
||||
|
||||
|
||||
def get_watchhosts(config):
|
||||
"""Extract watchhosts from config, supporting both new and legacy formats.
|
||||
|
||||
|
||||
@@ -291,7 +291,6 @@ class Host:
|
||||
self.interval = 0
|
||||
self.doesack = -1
|
||||
self.cmds = []
|
||||
self.cver = 0
|
||||
self.connections = {}
|
||||
# Plugin data storage: {plugin_name: [(timestamp, data), ...]}
|
||||
self.plugin_data = {}
|
||||
@@ -307,7 +306,6 @@ class Host:
|
||||
if self.watched:
|
||||
d["name"] = "<b>%s</b>" % d["name"]
|
||||
d["dyn"] = str(self.dyn)
|
||||
d["ver"] = str(self.cver)
|
||||
d["num"] = self.num
|
||||
for c in ["IPv4", "IPv6"]:
|
||||
if c in self.connections:
|
||||
@@ -323,7 +321,6 @@ class Host:
|
||||
d = {}
|
||||
d["name"] = "Name"
|
||||
d["dyn"] = "Dyn"
|
||||
d["ver"] = "Ver"
|
||||
d["num"] = "??"
|
||||
for c in ["IPv4", "IPv6"]:
|
||||
cs = ubConnection.headerdict(c)
|
||||
@@ -371,9 +368,6 @@ class Host:
|
||||
def jsons(self):
|
||||
return json.dumps(self.stateinfo())
|
||||
|
||||
def setcver(self, cver):
|
||||
self.cver = cver
|
||||
|
||||
def isDynDns(self):
|
||||
return self.dyn
|
||||
|
||||
@@ -483,7 +477,6 @@ class Host:
|
||||
"IPv6.state",
|
||||
("IPv6.rtt", 'style="text-align: right;"'),
|
||||
("IPv6.statetime", 'style="text-align: right;"'),
|
||||
"ver",
|
||||
]
|
||||
|
||||
hostfields_short = [
|
||||
|
||||
+1
-1
@@ -115,7 +115,7 @@ async def start(
|
||||
if uname != "All":
|
||||
names = [uname]
|
||||
else:
|
||||
names = [n for n in hbdclass.Host.hosts if hbdclass.Host.hosts[n].cver >= 2]
|
||||
names = [n for n in hbdclass.Host.hosts]
|
||||
out = []
|
||||
for n in names:
|
||||
err = None
|
||||
|
||||
+114
-8
@@ -50,19 +50,78 @@ def cleanup_function(config, hbdclass):
|
||||
logger.info("Cleanup complete.")
|
||||
|
||||
|
||||
async def _run_async(config):
|
||||
async def reload_configuration(config_obj, config_path, components):
|
||||
"""Reload configuration and update all components.
|
||||
|
||||
Args:
|
||||
config_obj: ReloadableConfig instance
|
||||
config_path: Path to config file
|
||||
components: Dict with threshold_checker and other components
|
||||
|
||||
Returns:
|
||||
True if reload succeeded, False otherwise
|
||||
"""
|
||||
try:
|
||||
logger.info("=" * 60)
|
||||
logger.info("Starting configuration reload...")
|
||||
logger.info("=" * 60)
|
||||
|
||||
# Reload config file
|
||||
new_config = await config_obj.reload(config_path)
|
||||
|
||||
# Update notify module
|
||||
notify_mod.reload_config(new_config)
|
||||
|
||||
# Reload threshold checker
|
||||
if 'threshold_checker' in components:
|
||||
components['threshold_checker'].reload(new_config)
|
||||
|
||||
# Note: Changes to the following require restart:
|
||||
# - hb_port, hbd_port, ws_port (already bound)
|
||||
# - SSL certificates (already loaded)
|
||||
# - pickfile (already opened)
|
||||
# - journal settings (journal already initialized)
|
||||
|
||||
# These are reloadable and effective immediately:
|
||||
# - notification_channels
|
||||
# - threshold_configs
|
||||
# - hosts (watchhosts, dyndnshosts, notification_channels)
|
||||
# - grace period (used on next heartbeat)
|
||||
# - debug/verbose flags (used on next message)
|
||||
|
||||
logger.info("=" * 60)
|
||||
logger.info("Configuration reload completed successfully")
|
||||
logger.info("=" * 60)
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error("=" * 60)
|
||||
logger.error(f"Failed to reload configuration: {e}", exc_info=True)
|
||||
logger.error("Keeping previous configuration")
|
||||
logger.error("=" * 60)
|
||||
return False
|
||||
|
||||
|
||||
async def _run_async(config, config_path=None):
|
||||
loop = asyncio.get_running_loop()
|
||||
shutdown_event = asyncio.Event()
|
||||
reload_event = asyncio.Event()
|
||||
|
||||
# Signal handlers for graceful shutdown
|
||||
# Signal handlers for graceful shutdown and reload
|
||||
def signal_handler(signum, frame):
|
||||
sig_name = signal.Signals(signum).name if hasattr(signal, "Signals") else signum
|
||||
logger.info(f"Received {sig_name}, initiating shutdown...")
|
||||
loop.call_soon_threadsafe(shutdown_event.set)
|
||||
|
||||
def reload_handler(signum, frame):
|
||||
sig_name = signal.Signals(signum).name if hasattr(signal, "Signals") else signum
|
||||
logger.info(f"Received {sig_name}, initiating config reload...")
|
||||
loop.call_soon_threadsafe(reload_event.set)
|
||||
|
||||
# Register signal handlers
|
||||
loop.add_signal_handler(signal.SIGINT, signal_handler, signal.SIGINT, None)
|
||||
loop.add_signal_handler(signal.SIGTERM, signal_handler, signal.SIGTERM, None)
|
||||
loop.add_signal_handler(signal.SIGHUP, reload_handler, signal.SIGHUP, None)
|
||||
|
||||
from . import http as http_mod
|
||||
from . import dns as dns_mod
|
||||
@@ -83,6 +142,12 @@ async def _run_async(config):
|
||||
journal=msg_journal,
|
||||
)
|
||||
logger.info("Threshold checker initialized")
|
||||
|
||||
# Components dict for reload orchestration
|
||||
components = {
|
||||
'threshold_checker': threshold_checker,
|
||||
'msg_journal': msg_journal,
|
||||
}
|
||||
|
||||
sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
|
||||
# Disable IPV6_V6ONLY option to enable dual-stack (listen on IPv4 as well)
|
||||
@@ -128,7 +193,6 @@ async def _run_async(config):
|
||||
port=config.get("hbd_port", 50004),
|
||||
config=config,
|
||||
hbdclass=hbdclass,
|
||||
threshold_checker=threshold_checker,
|
||||
tcss=None,
|
||||
verbose=config.get("verbose", False),
|
||||
get_now=lambda: time.time(),
|
||||
@@ -193,10 +257,43 @@ async def _run_async(config):
|
||||
except Exception as e:
|
||||
logger.exception("websocket server failed to start: %s", e)
|
||||
|
||||
# Main event loop - monitor shutdown and reload events
|
||||
try:
|
||||
# run forever until shutdown event is set
|
||||
await shutdown_event.wait()
|
||||
logger.info("Shutdown signal received, stopping services...")
|
||||
while True:
|
||||
# Wait for either shutdown or reload event
|
||||
done, pending = await asyncio.wait(
|
||||
[
|
||||
asyncio.create_task(shutdown_event.wait()),
|
||||
asyncio.create_task(reload_event.wait()),
|
||||
],
|
||||
return_when=asyncio.FIRST_COMPLETED
|
||||
)
|
||||
|
||||
# Check which event was triggered
|
||||
if shutdown_event.is_set():
|
||||
logger.info("Shutdown signal received, stopping services...")
|
||||
# Cancel pending wait tasks
|
||||
for task in pending:
|
||||
task.cancel()
|
||||
break
|
||||
|
||||
if reload_event.is_set():
|
||||
# Clear the event for next reload
|
||||
reload_event.clear()
|
||||
|
||||
# Cancel pending wait tasks
|
||||
for task in pending:
|
||||
task.cancel()
|
||||
|
||||
# Perform reload if config_path is available
|
||||
if config_path:
|
||||
await reload_configuration(config, config_path, components)
|
||||
else:
|
||||
logger.warning("Cannot reload: no config path available")
|
||||
|
||||
# Continue main loop
|
||||
continue
|
||||
|
||||
except Exception as e:
|
||||
logger.exception("Error in main loop: %s", e)
|
||||
finally:
|
||||
@@ -298,10 +395,14 @@ def load_pickled_hosts(config, hbdclass):
|
||||
logger.info("no pickled data")
|
||||
|
||||
|
||||
def run(config):
|
||||
def run(config, config_path=None):
|
||||
"""Start the hbd service (blocking).
|
||||
|
||||
Manually manages the event loop to ensure clean shutdown.
|
||||
|
||||
Args:
|
||||
config: Configuration dictionary
|
||||
config_path: Path to config file (for reload support)
|
||||
"""
|
||||
import os
|
||||
|
||||
@@ -312,13 +413,18 @@ def run(config):
|
||||
|
||||
notify_mod.initlog(logfile=config.get("logfile", "messages.log"))
|
||||
eventlog(None, "INFO", f"hbd version {__version__} starting up")
|
||||
|
||||
if config_path:
|
||||
logger.info(f"Config file: {config_path} (reload with SIGHUP)")
|
||||
else:
|
||||
logger.warning("No config path provided - reload via SIGHUP disabled")
|
||||
|
||||
# Create and set the event loop manually
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
|
||||
try:
|
||||
loop.run_until_complete(_run_async(config))
|
||||
loop.run_until_complete(_run_async(config, config_path=config_path))
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Received KeyboardInterrupt, shutting down...")
|
||||
except Exception as e:
|
||||
|
||||
@@ -62,6 +62,20 @@ def setup(cfg: dict):
|
||||
_config = dict(cfg)
|
||||
|
||||
|
||||
def reload_config(cfg: dict):
|
||||
"""Reload notification configuration.
|
||||
|
||||
This function updates the module-level notification configuration
|
||||
during runtime config reloads.
|
||||
|
||||
Args:
|
||||
cfg: New configuration dictionary
|
||||
"""
|
||||
global _config
|
||||
_config = dict(cfg)
|
||||
logger.info("Notification configuration reloaded")
|
||||
|
||||
|
||||
def send_email(toaddrs, smtpserver, sender, subject, body, debug=0):
|
||||
"""Send a plain email via SMTP. Returns True on success."""
|
||||
try:
|
||||
|
||||
@@ -316,6 +316,31 @@ class ThresholdChecker:
|
||||
total_thresholds
|
||||
)
|
||||
|
||||
def reload(self, config: Dict[str, Any]):
|
||||
"""Reload threshold configuration from new config dict.
|
||||
|
||||
This clears all existing thresholds and re-parses from the new configuration.
|
||||
Alert states are preserved to maintain hysteresis across reloads.
|
||||
|
||||
Args:
|
||||
config: New configuration dictionary
|
||||
"""
|
||||
logger.info("Reloading threshold configuration...")
|
||||
|
||||
# Clear old configuration
|
||||
self.threshold_configs.clear()
|
||||
self.thresholds.clear()
|
||||
self.host_config_mapping.clear()
|
||||
|
||||
# Parse new configuration
|
||||
self._parse_config(config)
|
||||
|
||||
total_thresholds = sum(len(cfg) for cfg in self.threshold_configs.values())
|
||||
if total_thresholds == 0 and len(self.thresholds) > 0:
|
||||
total_thresholds = len(self.thresholds)
|
||||
|
||||
logger.info("Threshold configuration reloaded: %d total thresholds", total_thresholds)
|
||||
|
||||
def _parse_config(self, config: Dict[str, Any]):
|
||||
"""Parse threshold configuration from YAML structure.
|
||||
|
||||
|
||||
+2
-17
@@ -127,10 +127,7 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict):
|
||||
host.doesack = msg.get("acks", -1)
|
||||
# send ACK back
|
||||
rmsg = {"time": __import__("time").time()}
|
||||
if host.cver < 1:
|
||||
opkt = b"ACK"
|
||||
else:
|
||||
opkt = dicttos("ACK", rmsg)
|
||||
opkt = dicttos("ACK", rmsg)
|
||||
try:
|
||||
transport.sendto(opkt, addr)
|
||||
except Exception as e:
|
||||
@@ -174,7 +171,6 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict):
|
||||
})
|
||||
except Exception:
|
||||
pass
|
||||
host.setcver(msg.get("ver", 0))
|
||||
|
||||
try:
|
||||
conn, res = host.conndata(cid, ip, rtt, now)
|
||||
@@ -301,22 +297,11 @@ def handle_datagram(msg: dict, addr, transport, ctx: dict):
|
||||
del host.cmds[0]
|
||||
if log:
|
||||
log(uname, "command sent")
|
||||
if host.cver < 1:
|
||||
rmsg = rmsg["cmd"]
|
||||
elif op == "UPD":
|
||||
del host.cmds[0]
|
||||
if log:
|
||||
log(uname, "update initiated")
|
||||
if host.cver < 1:
|
||||
if log:
|
||||
log(uname, " ver 0 does not support UPD")
|
||||
continue
|
||||
if host.cver < 1:
|
||||
opkt = rmsg if isinstance(rmsg, (bytes, str)) else str(rmsg)
|
||||
if isinstance(opkt, str):
|
||||
opkt = opkt.encode()
|
||||
else:
|
||||
opkt = dicttos(op, rmsg)
|
||||
opkt = dicttos(op, rmsg)
|
||||
try:
|
||||
transport.sendto(opkt, addr)
|
||||
except Exception as e:
|
||||
|
||||
Reference in New Issue
Block a user