import os import json import time import hmac import hashlib from collections import deque from typing import Any, Dict, Optional import requests from fastapi import FastAPI, Request, Header, HTTPException from fastapi.responses import JSONResponse import gradio as gr # ----------------------------------------------------------------------------- # Config (HF Variables/Secrets) # ----------------------------------------------------------------------------- WEBHOOK_TOKEN = os.getenv("WEBHOOK_TOKEN", "").strip() WEBHOOK_HMAC_SECRET = os.getenv("WEBHOOK_HMAC_SECRET", "").strip() WEBHOOK_TOKEN_HEADER = os.getenv("WEBHOOK_TOKEN_HEADER", "X-SA-Token").strip() MAX_MESSAGES = int(os.getenv("MAX_MESSAGES", "200")) PERSIST_TO_FILE = os.getenv("PERSIST_TO_FILE", "0").strip() == "1" PERSIST_PATH = os.getenv("PERSIST_PATH", "webhook_messages.jsonl").strip() # Downstream wakeup server # IMPORTANT: This must be the *actual* endpoint. Example: # http://163.245.192.149:8080/wakeup WAKEUP_URL = os.getenv("WAKEUP_URL", "http://163.245.192.149:8080/wakeup").strip() WAKEUP_URL = WAKEUP_URL.rstrip("/") # normalize to no trailing slash RELAY_TIMEOUT = int(os.getenv("RELAY_TIMEOUT", "30")) # Dedupe window SEEN_TTL_SECONDS = int(os.getenv("SEEN_TTL_SECONDS", "86400")) MAX_SEEN = int(os.getenv("MAX_SEEN", "5000")) # Flood guard MAX_INFLIGHT = int(os.getenv("MAX_INFLIGHT", "1")) # ----------------------------------------------------------------------------- # In-memory state # ----------------------------------------------------------------------------- MESSAGES = deque(maxlen=MAX_MESSAGES) TOTAL_COUNT = 0 LAST_RECEIVED_TS: Optional[float] = None SEEN: Dict[str, float] = {} INFLIGHT = 0 def _now_ts() -> float: return time.time() def _ts_to_str(ts: Optional[float]) -> str: if not ts: return "n/a" return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(ts)) def _safe_json_dumps(obj: Any) -> str: try: return json.dumps(obj, indent=2, sort_keys=True, ensure_ascii=False) except Exception: return str(obj) def _append_message(payload: Dict[str, Any], meta: Dict[str, Any], relay: Dict[str, Any]) -> None: global TOTAL_COUNT, LAST_RECEIVED_TS TOTAL_COUNT += 1 LAST_RECEIVED_TS = _now_ts() entry = { "received_at_ts": LAST_RECEIVED_TS, "received_at": _ts_to_str(LAST_RECEIVED_TS), "meta": meta, "payload": payload, "relay": relay, } MESSAGES.appendleft(entry) if PERSIST_TO_FILE: try: with open(PERSIST_PATH, "a", encoding="utf-8") as f: f.write(json.dumps(entry, ensure_ascii=False) + "\n") except Exception: pass def _verify_token(provided_token: Optional[str]) -> None: if not WEBHOOK_TOKEN: return if not provided_token or provided_token.strip() != WEBHOOK_TOKEN: raise HTTPException(status_code=401, detail="Invalid or missing webhook token.") def _verify_hmac_signature(raw_body: bytes, provided_sig: Optional[str]) -> None: if not WEBHOOK_HMAC_SECRET: return if not provided_sig: raise HTTPException(status_code=401, detail="Missing signature header.") mac = hmac.new(WEBHOOK_HMAC_SECRET.encode("utf-8"), raw_body, hashlib.sha256).hexdigest() if not hmac.compare_digest(mac, provided_sig.strip()): raise HTTPException(status_code=401, detail="Invalid signature.") def _norm(v: Any) -> str: if v is None: return "" if isinstance(v, str): s = v.strip() if s.lower() in ("null", "none"): return "" return s return str(v) def compute_fingerprint(payload: Dict[str, Any]) -> str: # MVP fingerprint. Upgrade later to sha256(file_bytes) when you have file URL/bytes. parts = { "event_type": _norm(payload.get("event_type")), "rid": _norm(payload.get("rid")), "file_type": _norm(payload.get("file_type")), "case_vendor_rid": _norm(payload.get("case_vendor_rid")), "case_rid": _norm(payload.get("case_rid")), "date_modified": _norm(payload.get("date_modified")), } raw = json.dumps(parts, sort_keys=True, separators=(",", ":")).encode("utf-8") return hashlib.sha256(raw).hexdigest() def _gc_seen(now: float) -> None: cutoff = now - SEEN_TTL_SECONDS dead = [k for k, ts in SEEN.items() if ts < cutoff] for k in dead: SEEN.pop(k, None) if len(SEEN) > MAX_SEEN: items = sorted(SEEN.items(), key=lambda kv: kv[1]) # oldest first for k, _ in items[: len(SEEN) - MAX_SEEN]: SEEN.pop(k, None) # ----------------------------------------------------------------------------- # FastAPI app # ----------------------------------------------------------------------------- app = FastAPI(title="Inserio Relay", version="1.0.0") @app.get("/") def root(): return { "ok": True, "service": "inserio-relay", "post": ["/webhook", "/webhook/", "/wakeup", "/wakeup/"], "forward_to": WAKEUP_URL, "ui": "/ui", "seen_ttl_seconds": SEEN_TTL_SECONDS, "max_inflight": MAX_INFLIGHT, } @app.get("/health") def health(): return { "ok": True, "service": "inserio-relay", "forward_to": WAKEUP_URL, "total_count": TOTAL_COUNT, "last_received_at": _ts_to_str(LAST_RECEIVED_TS), "seen_size": len(SEEN), "seen_ttl_seconds": SEEN_TTL_SECONDS, "max_inflight": MAX_INFLIGHT, } # Accept BOTH slash + no-slash (QuickBase can hit either) @app.post("/webhook") @app.post("/webhook/") @app.post("/wakeup") @app.post("/wakeup/") async def webhook( request: Request, x_sa_signature: Optional[str] = Header(default=None, convert_underscores=False), x_sa_token: Optional[str] = Header(default=None, convert_underscores=False), ): global INFLIGHT raw_body = await request.body() # Auth (optional) if WEBHOOK_TOKEN: token_value = request.headers.get(WEBHOOK_TOKEN_HEADER) or x_sa_token _verify_token(token_value) if WEBHOOK_HMAC_SECRET: sig_value = request.headers.get("X-SA-Signature") or x_sa_signature _verify_hmac_signature(raw_body, sig_value) # Parse JSON try: payload = await request.json() if not isinstance(payload, dict): payload = {"_non_object_payload": payload} except Exception: payload = {"_raw_body": raw_body.decode("utf-8", errors="replace")} now = _now_ts() _gc_seen(now) # Default state if not _norm(payload.get("sa_processing_state")): payload["sa_processing_state"] = "new" # Fingerprint fp = _norm(payload.get("sa_fingerprint")) if not fp: fp = compute_fingerprint(payload) payload["sa_fingerprint"] = fp # Dedupe duplicate = fp in SEEN if not duplicate: SEEN[fp] = now # Flood guard if INFLIGHT >= MAX_INFLIGHT: relay_info = { "ok": True, "duplicate": duplicate, "fingerprint": fp, "forwarded": False, "reason": "rate_limited_max_inflight", } meta = { "client": request.client.host if request.client else None, "path": str(request.url.path), } _append_message(payload=payload, meta=meta, relay=relay_info) return JSONResponse(status_code=200, content=relay_info) # Do NOT forward duplicates if duplicate: relay_info = { "ok": True, "duplicate": True, "fingerprint": fp, "forwarded": False, "reason": "duplicate", } meta = { "client": request.client.host if request.client else None, "path": str(request.url.path), } _append_message(payload=payload, meta=meta, relay=relay_info) return JSONResponse(status_code=200, content=relay_info) # Forward non-duplicate forward_url = WAKEUP_URL # EXACT. No extra /wakeup appended. INFLIGHT += 1 try: r = requests.post(forward_url, json=payload, timeout=RELAY_TIMEOUT) forward_status = r.status_code forward_body = r.text[:2000] except requests.RequestException as e: forward_status = 502 forward_body = f"relay_failed: {e}" finally: INFLIGHT -= 1 relay_info = { "ok": True, "duplicate": False, "fingerprint": fp, "forward_url": forward_url, "forward_status": forward_status, "forward_body": forward_body, } meta = { "client": request.client.host if request.client else None, "path": str(request.url.path), } _append_message(payload=payload, meta=meta, relay=relay_info) return JSONResponse(status_code=200, content=relay_info) # ----------------------------------------------------------------------------- # Gradio dashboard mounted at /ui (doesn't interfere with API routes) # ----------------------------------------------------------------------------- def ui_status() -> str: return _safe_json_dumps( { "total_count": TOTAL_COUNT, "last_received_at": _ts_to_str(LAST_RECEIVED_TS), "buffer_size": len(MESSAGES), "max_buffer": MAX_MESSAGES, "relay": { "forward_url": WAKEUP_URL, "timeout": RELAY_TIMEOUT, "max_inflight": MAX_INFLIGHT, "seen_size": len(SEEN), "seen_ttl_seconds": SEEN_TTL_SECONDS, }, "auth": { "token_enabled": bool(WEBHOOK_TOKEN), "token_header": WEBHOOK_TOKEN_HEADER, "hmac_enabled": bool(WEBHOOK_HMAC_SECRET), }, "endpoints": { "health": "/health", "post": ["/webhook", "/webhook/", "/wakeup", "/wakeup/"], "ui": "/ui", }, } ) def ui_recent(limit: int = 25) -> str: limit = max(1, min(int(limit), 200)) items = list(MESSAGES)[:limit] return _safe_json_dumps(items) def ui_clear() -> str: global TOTAL_COUNT, LAST_RECEIVED_TS MESSAGES.clear() TOTAL_COUNT = 0 LAST_RECEIVED_TS = None return "Cleared in-memory messages." with gr.Blocks(title="Inserio Relay Dashboard") as demo: gr.Markdown("# Inserio Relay Dashboard") gr.Markdown("POST JSON to `/webhook` and it will be logged and forwarded to the wakeup server.") with gr.Row(): status_box = gr.Code(label="Status", value=ui_status(), language="json") clear_btn = gr.Button("Clear messages", variant="stop") with gr.Row(): limit = gr.Slider(1, 200, value=25, step=1, label="Recent message limit") recent_box = gr.Code(label="Recent messages (newest first)", value=ui_recent(25), language="json") def refresh(limit_val: int): return ui_status(), ui_recent(limit_val) demo.load(refresh, inputs=[limit], outputs=[status_box, recent_box], every=2) limit.change(refresh, inputs=[limit], outputs=[status_box, recent_box]) clear_btn.click(lambda: (ui_clear(), ui_status(), ui_recent(25)), outputs=[recent_box, status_box, recent_box]) app = gr.mount_gradio_app(app, demo, path="/ui")