from fastapi import FastAPI, Request from fastapi.responses import HTMLResponse, RedirectResponse import json from datetime import datetime app = FastAPI() # In-memory storage webhook_logs = [] @app.post("/webhook") async def webhook(request: Request): body = await request.body() try: body_json = json.loads(body) body_pretty = json.dumps(body_json, indent=2) except Exception: body_pretty = body.decode("utf-8", errors="ignore") webhook_logs.insert(0, { "time": datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"), "headers": dict(request.headers), "body": body_pretty }) del webhook_logs[50:] return {"status": "ok"} @app.post("/clear") async def clear_logs(): webhook_logs.clear() return RedirectResponse("/", status_code=303) @app.get("/", response_class=HTMLResponse) async def dashboard(): items = "" for i, log in enumerate(webhook_logs): items += f"""
#{i + 1} {log["time"]}
Headers
{json.dumps(log["headers"], indent=2)}
Body
{log["body"]}
""" html = f""" Webhook Viewer

📡 Webhook Viewer

{len(webhook_logs)} requests
{items if items else "

No webhooks received yet

"}
""" return html