Files
heartbeat/hbd/http.py
T
2026-02-08 14:32:12 -05:00

213 lines
7.5 KiB
Python

"""HTTP server implementation using aiohttp and jinja2."""
import asyncio
import json
import time
import urllib.parse
import os
import logging
from aiohttp import web
from fastapi.templating import Jinja2Templates
import jinja2
logger = logging.getLogger(__name__)
def _render_template(html_str: str, **context) -> str:
tmpl = jinja2.Template(html_str)
return tmpl.render(**context)
async def start(
host: str,
port: int,
config,
hbdclass,
msgs_getter,
log=None,
email=None,
pushmsg=None,
msg_to_websockets=None,
tcss=None,
DEBUG=0,
verbose=False,
get_now=None,
VER="",
):
"""Start an aiohttp web server and block until cancelled.
This function is intended to be awaited inside the main asyncio event loop.
"""
get_now = get_now or (lambda: time.time())
async def index(request):
res = []
res.append('<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">')
res.append("<html>")
res.append("<head>")
res.append(f"<title>Heartbeat</title>")
if tcss:
res.append(tcss)
res.append("</head>")
res.append('<body BGCOLOR = "#FFFFFF" LINK = "#008000" VLINK = "#008000">')
res.append(f"<H2>Heartbeat status {VER}</h2>")
res += hbdclass.ubHost.buildhosttable()
res += hbdclass.ubHost.buildmsgtable(msgs_getter())
res.append(
"<p> %s (%s)</p>" % (time.strftime("%H:%M:%S", time.localtime(get_now())), config.get("tz", "CET-1CDT"))
)
res.append("</body></html>")
body = "\n".join(res)
return web.Response(text=body, content_type="text/html")
async def api_hosts(request):
lst = [hbdclass.Host.hosts[h].jsons() for h in hbdclass.Host.hosts]
return web.json_response(json.loads("[" + ",".join(lst) + "]"))
async def api_messages(request):
lst = msgs_getter()[-30:]
return web.json_response(lst)
async def cmd(request):
qa = request.rel_url.query
uname = qa.get("h")
ucmd = qa.get("c")
if not ucmd or not uname:
return web.Response(status=400, text="need h= and c= arguments")
if uname not in hbdclass.Host.hosts:
return web.Response(status=400, text=f"h={uname} not found")
hbdclass.Host.hosts[uname].cmds.append(("CMD", {"cmd": urllib.parse.unquote(ucmd)}))
return web.Response(text=f"cmd {uname} queued")
async def drop(request):
qa = request.rel_url.query
uname = qa.get("h")
if not uname:
return web.Response(status=400, text="need h= argument")
if uname not in hbdclass.Host.hosts:
return web.Response(status=400, text=f"h={uname} not found")
if log:
log(uname, "dropped")
del hbdclass.Host.hosts[uname]
return web.Response(text="Done")
async def register(request):
qa = request.rel_url.query
uname = qa.get("h")
if not uname:
return web.Response(status=400, text="need h= argument")
if uname not in hbdclass.Host.hosts:
return web.Response(status=400, text=f"h={uname} not found")
ll = hbdclass.Host.hosts[uname].registerDns()
if log:
log(uname, ll)
return web.Response(text=str(ll))
async def update(request):
qa = request.rel_url.query
uname = urllib.parse.unquote(qa.get("h", ""))
ucode = qa.get("c")
if not ucode or not uname:
return web.Response(status=400, text="need h= and c= arguments")
if uname != "All" and uname not in hbdclass.Host.hosts:
return web.Response(status=400, text=f"h={uname} not found")
if uname != "All":
names = [uname]
else:
names = [n for n in hbdclass.Host.hosts if hbdclass.Host.hosts[n].cver >= 2]
out = []
for n in names:
err = None
try:
r = {"csum": None, "code": ucode}
hbdclass.Host.hosts[n].cmds.append(("UPD", r))
except Exception as e:
err = str(e)
out.append(f"update started for {n}: {err if err else 'OK'}")
return web.Response(text="\n".join(out))
async def restart(request):
# signal main application to perform restart if needed
# not implemented here - return OK
if log:
log(None, "restart request")
return web.Response(text="restart request")
async def live(request):
# render template from hbd/templates/live.html using Jinja2
# Resolve templates directory relative to the hbd package
pkg_dir = os.path.dirname(__file__)
templates_dir = config.get("templates_dir", os.path.join(pkg_dir, "templates"))
env = jinja2.Environment(loader=jinja2.FileSystemLoader(templates_dir))
host = config.get("hb_host", "localhost")
extra_scripts = config.get("http_extra_scripts", "")
host = request.host.split(":")[0]
if config.get("wss_port"):
heartbeat_ws_url = f"wss://{host}:{config['wss_port']}/hbd"
else:
heartbeat_ws_url = f"ws://{host}:{config.get('ws_port', 50005)}/hbd"
tmpl = env.get_template("live.html")
body = tmpl.render(
title="Heartbeat",
header="Heartbeat",
request=request,
heartbeat_ws_url=heartbeat_ws_url,
extra_scripts=extra_scripts,
hosts=[hbdclass.Host.hosts[h].stateinfo() for h in sorted(hbdclass.Host.hosts)],
messages=msgs_getter()[-30:],
)
return web.Response(text=body, content_type="text/html")
async def static(request):
"""Serve files from the package static directory.
URL form: /static/<path>
"""
p = request.match_info.get("path", "")
logger.debug("static file requested: %s", p)
base = os.path.abspath(os.path.join(os.path.dirname(__file__), "static"))
# normalize and prevent directory traversal
target = os.path.abspath(os.path.normpath(os.path.join(base, p)))
if not target.startswith(base + os.sep) and target != base:
return web.Response(status=403, text="Forbidden")
if not os.path.exists(target) or not os.path.isfile(target):
return web.Response(status=404, text="Not Found")
logger.info("serving static file: %s", target)
return web.FileResponse(path=target)
async def favicon(request):
"""Serve favicon.ico from the package static directory."""
base = os.path.abspath(os.path.join(os.path.dirname(__file__), "static/images"))
target = os.path.join(base, "favicon.ico")
if not os.path.exists(target) or not os.path.isfile(target):
return web.Response(status=404, text="Not Found")
return web.FileResponse(path=target)
app = web.Application()
app.add_routes(
[
web.get("/", index),
web.get("/api/0/hosts", api_hosts),
web.get("/api/0/messages", api_messages),
web.get("/c", cmd),
web.get("/d", drop),
web.get("/n", register),
web.get("/u", update),
web.get("/r", restart),
web.get("/live", live),
web.get("/static/{path:.*}", static),
web.get("/favicon.ico", favicon),
]
)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, host, port)
await site.start()
if verbose:
print(f"HTTP server started on {host}:{port}")
try:
await asyncio.Future()
finally:
await runner.cleanup()