225 lines
6.1 KiB
Python
225 lines
6.1 KiB
Python
"""DNS update helper and pure asyncio worker for heartbeat daemon."""
|
|
|
|
from __future__ import annotations
|
|
from subprocess import Popen, PIPE, STDOUT
|
|
from typing import Optional
|
|
import asyncio
|
|
|
|
|
|
def create_nsupdate_payload(
|
|
hostname: str, newip: str, dyndomain: str, dnsttl: str = "5"
|
|
) -> str:
|
|
D = {
|
|
"domain": dyndomain,
|
|
"fqdn": f"{hostname}.dy.{dyndomain}",
|
|
"dnsttl": dnsttl,
|
|
"newip": newip,
|
|
"ts": __import__("time").strftime(
|
|
"%Y-%m-%d.%H:%M:%S", __import__("time").gmtime()
|
|
),
|
|
}
|
|
if ":" in newip:
|
|
nsup = (
|
|
"""update delete %(fqdn)s AAAA
|
|
update add %(fqdn)s %(dnsttl)s AAAA %(newip)s
|
|
update delete %(fqdn)s TXT
|
|
update add %(fqdn)s %(dnsttl)s TXT "Created: %(ts)s"
|
|
send
|
|
answer
|
|
|
|
"""
|
|
% D
|
|
)
|
|
else:
|
|
nsup = (
|
|
"""update delete %(fqdn)s A
|
|
update add %(fqdn)s %(dnsttl)s A %(newip)s
|
|
update delete %(fqdn)s TXT
|
|
update add %(fqdn)s %(dnsttl)s TXT "Created: %(ts)s"
|
|
send
|
|
answer
|
|
|
|
"""
|
|
% D
|
|
)
|
|
return nsup
|
|
|
|
|
|
def nsupdate(
|
|
hostname: str,
|
|
newip: str,
|
|
dyndomain: str,
|
|
nsupdate_bin: str = "/usr/local/bin/nsupdate",
|
|
rndc_key: str = "/etc/dhcpc/rndc-key",
|
|
) -> Optional[str]:
|
|
"""Perform DNS update via nsupdate command.
|
|
|
|
Returns None on success, else returns combined stdout/stderr as a string.
|
|
"""
|
|
nsup = create_nsupdate_payload(hostname, newip, dyndomain)
|
|
cmd = [nsupdate_bin, "-k", rndc_key, "-v"]
|
|
try:
|
|
p = Popen(cmd, shell=False, bufsize=0, stdin=PIPE, stdout=PIPE, stderr=STDOUT)
|
|
except OSError as e:
|
|
return f"nsupdate: execution failed: {e}"
|
|
except Exception as e:
|
|
return f"nsupdate: some error occured: {e}"
|
|
|
|
(output, err) = p.communicate(nsup.encode())
|
|
out = output.decode() if output else ""
|
|
if out.find("status: NOERROR") >= 0:
|
|
return None
|
|
return out
|
|
|
|
|
|
async def dns_update_worker(
|
|
hbdclass,
|
|
cfg: dict,
|
|
async_queue=None,
|
|
log: Optional[callable] = None,
|
|
pushmsg: Optional[callable] = None,
|
|
loop: Optional[asyncio.AbstractEventLoop] = None,
|
|
):
|
|
"""Pure async DNS worker that processes updates from asyncio.Queue.
|
|
|
|
Exits when it receives a None sentinel.
|
|
"""
|
|
if loop is None:
|
|
loop = asyncio.get_running_loop()
|
|
|
|
dnsq = async_queue
|
|
if not dnsq:
|
|
if log:
|
|
try:
|
|
await loop.run_in_executor(
|
|
None, log, None, "dns_update_worker: no queue available"
|
|
)
|
|
except Exception:
|
|
pass
|
|
return
|
|
|
|
while True:
|
|
try:
|
|
item = await dnsq.get()
|
|
except Exception as e:
|
|
if log:
|
|
try:
|
|
await loop.run_in_executor(
|
|
None, log, None, f"dns_update_worker: error getting item: {e}"
|
|
)
|
|
except Exception:
|
|
pass
|
|
break
|
|
|
|
if item is None:
|
|
break
|
|
|
|
try:
|
|
name, addr = item
|
|
except Exception:
|
|
try:
|
|
dnsq.task_done()
|
|
except Exception:
|
|
pass
|
|
continue
|
|
|
|
m = f"changed address to {addr}"
|
|
for dyndomain in cfg.get("dyndomains", []):
|
|
err = await loop.run_in_executor(
|
|
None,
|
|
nsupdate,
|
|
name,
|
|
addr,
|
|
dyndomain,
|
|
cfg.get("nsupdate_bin", "/usr/local/bin/nsupdate"),
|
|
cfg.get("rndc_key", "/etc/dhcpc/rndc-key"),
|
|
)
|
|
if err:
|
|
m += f", DNS update failed: {err}"
|
|
if pushmsg:
|
|
try:
|
|
await loop.run_in_executor(
|
|
None,
|
|
pushmsg,
|
|
"error: nsupdate failed",
|
|
f"{name}.dy.{dyndomain}: {m}",
|
|
)
|
|
except Exception:
|
|
pass
|
|
else:
|
|
m += ", DNS updated."
|
|
|
|
try:
|
|
dnsq.task_done()
|
|
except Exception:
|
|
pass
|
|
|
|
if log:
|
|
try:
|
|
await loop.run_in_executor(None, log, name, m)
|
|
except Exception:
|
|
pass
|
|
|
|
if log:
|
|
try:
|
|
await loop.run_in_executor(None, log, None, "dns_update_worker exiting")
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def start_dns_worker(
|
|
hbdclass,
|
|
cfg: dict,
|
|
log: Optional[callable] = None,
|
|
pushmsg: Optional[callable] = None,
|
|
loop: Optional[asyncio.AbstractEventLoop] = None,
|
|
):
|
|
"""Start the async DNS worker and return the Task.
|
|
|
|
Replaces Host.dnsQ with an asyncio.Queue wrapped in a thread-safe bridge
|
|
so legacy synchronous put() calls from UDP handlers still work.
|
|
"""
|
|
if loop is None:
|
|
loop = asyncio.get_event_loop()
|
|
|
|
# Create asyncio.Queue and wrap in a bridge for thread-safe puts
|
|
async_q = asyncio.Queue()
|
|
|
|
class _QueueBridge:
|
|
"""Thread-safe wrapper around asyncio.Queue for synchronous callers."""
|
|
|
|
def __init__(self, loop, aq):
|
|
self._loop = loop
|
|
self._aq = aq
|
|
|
|
def put(self, item):
|
|
"""Thread-safe put that schedules onto event loop."""
|
|
try:
|
|
# Try to detect if we're in the event loop thread
|
|
asyncio.get_running_loop()
|
|
# We're in event loop context, use put_nowait directly
|
|
self._aq.put_nowait(item)
|
|
except RuntimeError:
|
|
# We're in a different thread, schedule safely
|
|
try:
|
|
self._loop.call_soon_threadsafe(self._aq.put_nowait, item)
|
|
except Exception:
|
|
pass
|
|
|
|
def task_done(self):
|
|
"""Delegate task_done to asyncio.Queue."""
|
|
try:
|
|
self._aq.task_done()
|
|
except Exception:
|
|
pass
|
|
|
|
bridge = _QueueBridge(loop, async_q)
|
|
hbdclass.Host.dnsQ = bridge
|
|
|
|
task = loop.create_task(
|
|
dns_update_worker(
|
|
hbdclass, cfg, async_queue=async_q, log=log, pushmsg=pushmsg, loop=loop
|
|
)
|
|
)
|
|
return task
|