Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import time | |
| import hmac | |
| import hashlib | |
| from collections import deque | |
| from typing import Any, Dict, Optional | |
| import requests | |
| from fastapi import FastAPI, Request, Header, HTTPException | |
| from fastapi.responses import JSONResponse | |
| import gradio as gr | |
| # ----------------------------------------------------------------------------- | |
| # Config (HF Variables/Secrets) | |
| # ----------------------------------------------------------------------------- | |
| WEBHOOK_TOKEN = os.getenv("WEBHOOK_TOKEN", "").strip() | |
| WEBHOOK_HMAC_SECRET = os.getenv("WEBHOOK_HMAC_SECRET", "").strip() | |
| WEBHOOK_TOKEN_HEADER = os.getenv("WEBHOOK_TOKEN_HEADER", "X-SA-Token").strip() | |
| MAX_MESSAGES = int(os.getenv("MAX_MESSAGES", "200")) | |
| PERSIST_TO_FILE = os.getenv("PERSIST_TO_FILE", "0").strip() == "1" | |
| PERSIST_PATH = os.getenv("PERSIST_PATH", "webhook_messages.jsonl").strip() | |
| # Downstream wakeup server | |
| # IMPORTANT: This must be the *actual* endpoint. Example: | |
| # http://163.245.192.149:8080/wakeup | |
| WAKEUP_URL = os.getenv("WAKEUP_URL", "http://163.245.192.149:8080/wakeup").strip() | |
| WAKEUP_URL = WAKEUP_URL.rstrip("/") # normalize to no trailing slash | |
| RELAY_TIMEOUT = int(os.getenv("RELAY_TIMEOUT", "30")) | |
| # Dedupe window | |
| SEEN_TTL_SECONDS = int(os.getenv("SEEN_TTL_SECONDS", "86400")) | |
| MAX_SEEN = int(os.getenv("MAX_SEEN", "5000")) | |
| # Flood guard | |
| MAX_INFLIGHT = int(os.getenv("MAX_INFLIGHT", "1")) | |
| # ----------------------------------------------------------------------------- | |
| # In-memory state | |
| # ----------------------------------------------------------------------------- | |
| MESSAGES = deque(maxlen=MAX_MESSAGES) | |
| TOTAL_COUNT = 0 | |
| LAST_RECEIVED_TS: Optional[float] = None | |
| SEEN: Dict[str, float] = {} | |
| INFLIGHT = 0 | |
| def _now_ts() -> float: | |
| return time.time() | |
| def _ts_to_str(ts: Optional[float]) -> str: | |
| if not ts: | |
| return "n/a" | |
| return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(ts)) | |
| def _safe_json_dumps(obj: Any) -> str: | |
| try: | |
| return json.dumps(obj, indent=2, sort_keys=True, ensure_ascii=False) | |
| except Exception: | |
| return str(obj) | |
| def _append_message(payload: Dict[str, Any], meta: Dict[str, Any], relay: Dict[str, Any]) -> None: | |
| global TOTAL_COUNT, LAST_RECEIVED_TS | |
| TOTAL_COUNT += 1 | |
| LAST_RECEIVED_TS = _now_ts() | |
| entry = { | |
| "received_at_ts": LAST_RECEIVED_TS, | |
| "received_at": _ts_to_str(LAST_RECEIVED_TS), | |
| "meta": meta, | |
| "payload": payload, | |
| "relay": relay, | |
| } | |
| MESSAGES.appendleft(entry) | |
| if PERSIST_TO_FILE: | |
| try: | |
| with open(PERSIST_PATH, "a", encoding="utf-8") as f: | |
| f.write(json.dumps(entry, ensure_ascii=False) + "\n") | |
| except Exception: | |
| pass | |
| def _verify_token(provided_token: Optional[str]) -> None: | |
| if not WEBHOOK_TOKEN: | |
| return | |
| if not provided_token or provided_token.strip() != WEBHOOK_TOKEN: | |
| raise HTTPException(status_code=401, detail="Invalid or missing webhook token.") | |
| def _verify_hmac_signature(raw_body: bytes, provided_sig: Optional[str]) -> None: | |
| if not WEBHOOK_HMAC_SECRET: | |
| return | |
| if not provided_sig: | |
| raise HTTPException(status_code=401, detail="Missing signature header.") | |
| mac = hmac.new(WEBHOOK_HMAC_SECRET.encode("utf-8"), raw_body, hashlib.sha256).hexdigest() | |
| if not hmac.compare_digest(mac, provided_sig.strip()): | |
| raise HTTPException(status_code=401, detail="Invalid signature.") | |
| def _norm(v: Any) -> str: | |
| if v is None: | |
| return "" | |
| if isinstance(v, str): | |
| s = v.strip() | |
| if s.lower() in ("null", "none"): | |
| return "" | |
| return s | |
| return str(v) | |
| def compute_fingerprint(payload: Dict[str, Any]) -> str: | |
| # MVP fingerprint. Upgrade later to sha256(file_bytes) when you have file URL/bytes. | |
| parts = { | |
| "event_type": _norm(payload.get("event_type")), | |
| "rid": _norm(payload.get("rid")), | |
| "file_type": _norm(payload.get("file_type")), | |
| "case_vendor_rid": _norm(payload.get("case_vendor_rid")), | |
| "case_rid": _norm(payload.get("case_rid")), | |
| "date_modified": _norm(payload.get("date_modified")), | |
| } | |
| raw = json.dumps(parts, sort_keys=True, separators=(",", ":")).encode("utf-8") | |
| return hashlib.sha256(raw).hexdigest() | |
| def _gc_seen(now: float) -> None: | |
| cutoff = now - SEEN_TTL_SECONDS | |
| dead = [k for k, ts in SEEN.items() if ts < cutoff] | |
| for k in dead: | |
| SEEN.pop(k, None) | |
| if len(SEEN) > MAX_SEEN: | |
| items = sorted(SEEN.items(), key=lambda kv: kv[1]) # oldest first | |
| for k, _ in items[: len(SEEN) - MAX_SEEN]: | |
| SEEN.pop(k, None) | |
| # ----------------------------------------------------------------------------- | |
| # FastAPI app | |
| # ----------------------------------------------------------------------------- | |
| app = FastAPI(title="Inserio Relay", version="1.0.0") | |
| def root(): | |
| return { | |
| "ok": True, | |
| "service": "inserio-relay", | |
| "post": ["/webhook", "/webhook/", "/wakeup", "/wakeup/"], | |
| "forward_to": WAKEUP_URL, | |
| "ui": "/ui", | |
| "seen_ttl_seconds": SEEN_TTL_SECONDS, | |
| "max_inflight": MAX_INFLIGHT, | |
| } | |
| def health(): | |
| return { | |
| "ok": True, | |
| "service": "inserio-relay", | |
| "forward_to": WAKEUP_URL, | |
| "total_count": TOTAL_COUNT, | |
| "last_received_at": _ts_to_str(LAST_RECEIVED_TS), | |
| "seen_size": len(SEEN), | |
| "seen_ttl_seconds": SEEN_TTL_SECONDS, | |
| "max_inflight": MAX_INFLIGHT, | |
| } | |
| # Accept BOTH slash + no-slash (QuickBase can hit either) | |
| async def webhook( | |
| request: Request, | |
| x_sa_signature: Optional[str] = Header(default=None, convert_underscores=False), | |
| x_sa_token: Optional[str] = Header(default=None, convert_underscores=False), | |
| ): | |
| global INFLIGHT | |
| raw_body = await request.body() | |
| # Auth (optional) | |
| if WEBHOOK_TOKEN: | |
| token_value = request.headers.get(WEBHOOK_TOKEN_HEADER) or x_sa_token | |
| _verify_token(token_value) | |
| if WEBHOOK_HMAC_SECRET: | |
| sig_value = request.headers.get("X-SA-Signature") or x_sa_signature | |
| _verify_hmac_signature(raw_body, sig_value) | |
| # Parse JSON | |
| try: | |
| payload = await request.json() | |
| if not isinstance(payload, dict): | |
| payload = {"_non_object_payload": payload} | |
| except Exception: | |
| payload = {"_raw_body": raw_body.decode("utf-8", errors="replace")} | |
| now = _now_ts() | |
| _gc_seen(now) | |
| # Default state | |
| if not _norm(payload.get("sa_processing_state")): | |
| payload["sa_processing_state"] = "new" | |
| # Fingerprint | |
| fp = _norm(payload.get("sa_fingerprint")) | |
| if not fp: | |
| fp = compute_fingerprint(payload) | |
| payload["sa_fingerprint"] = fp | |
| # Dedupe | |
| duplicate = fp in SEEN | |
| if not duplicate: | |
| SEEN[fp] = now | |
| # Flood guard | |
| if INFLIGHT >= MAX_INFLIGHT: | |
| relay_info = { | |
| "ok": True, | |
| "duplicate": duplicate, | |
| "fingerprint": fp, | |
| "forwarded": False, | |
| "reason": "rate_limited_max_inflight", | |
| } | |
| meta = { | |
| "client": request.client.host if request.client else None, | |
| "path": str(request.url.path), | |
| } | |
| _append_message(payload=payload, meta=meta, relay=relay_info) | |
| return JSONResponse(status_code=200, content=relay_info) | |
| # Do NOT forward duplicates | |
| if duplicate: | |
| relay_info = { | |
| "ok": True, | |
| "duplicate": True, | |
| "fingerprint": fp, | |
| "forwarded": False, | |
| "reason": "duplicate", | |
| } | |
| meta = { | |
| "client": request.client.host if request.client else None, | |
| "path": str(request.url.path), | |
| } | |
| _append_message(payload=payload, meta=meta, relay=relay_info) | |
| return JSONResponse(status_code=200, content=relay_info) | |
| # Forward non-duplicate | |
| forward_url = WAKEUP_URL # EXACT. No extra /wakeup appended. | |
| INFLIGHT += 1 | |
| try: | |
| r = requests.post(forward_url, json=payload, timeout=RELAY_TIMEOUT) | |
| forward_status = r.status_code | |
| forward_body = r.text[:2000] | |
| except requests.RequestException as e: | |
| forward_status = 502 | |
| forward_body = f"relay_failed: {e}" | |
| finally: | |
| INFLIGHT -= 1 | |
| relay_info = { | |
| "ok": True, | |
| "duplicate": False, | |
| "fingerprint": fp, | |
| "forward_url": forward_url, | |
| "forward_status": forward_status, | |
| "forward_body": forward_body, | |
| } | |
| meta = { | |
| "client": request.client.host if request.client else None, | |
| "path": str(request.url.path), | |
| } | |
| _append_message(payload=payload, meta=meta, relay=relay_info) | |
| return JSONResponse(status_code=200, content=relay_info) | |
| # ----------------------------------------------------------------------------- | |
| # Gradio dashboard mounted at /ui (doesn't interfere with API routes) | |
| # ----------------------------------------------------------------------------- | |
| def ui_status() -> str: | |
| return _safe_json_dumps( | |
| { | |
| "total_count": TOTAL_COUNT, | |
| "last_received_at": _ts_to_str(LAST_RECEIVED_TS), | |
| "buffer_size": len(MESSAGES), | |
| "max_buffer": MAX_MESSAGES, | |
| "relay": { | |
| "forward_url": WAKEUP_URL, | |
| "timeout": RELAY_TIMEOUT, | |
| "max_inflight": MAX_INFLIGHT, | |
| "seen_size": len(SEEN), | |
| "seen_ttl_seconds": SEEN_TTL_SECONDS, | |
| }, | |
| "auth": { | |
| "token_enabled": bool(WEBHOOK_TOKEN), | |
| "token_header": WEBHOOK_TOKEN_HEADER, | |
| "hmac_enabled": bool(WEBHOOK_HMAC_SECRET), | |
| }, | |
| "endpoints": { | |
| "health": "/health", | |
| "post": ["/webhook", "/webhook/", "/wakeup", "/wakeup/"], | |
| "ui": "/ui", | |
| }, | |
| } | |
| ) | |
| def ui_recent(limit: int = 25) -> str: | |
| limit = max(1, min(int(limit), 200)) | |
| items = list(MESSAGES)[:limit] | |
| return _safe_json_dumps(items) | |
| def ui_clear() -> str: | |
| global TOTAL_COUNT, LAST_RECEIVED_TS | |
| MESSAGES.clear() | |
| TOTAL_COUNT = 0 | |
| LAST_RECEIVED_TS = None | |
| return "Cleared in-memory messages." | |
| with gr.Blocks(title="Inserio Relay Dashboard") as demo: | |
| gr.Markdown("# Inserio Relay Dashboard") | |
| gr.Markdown("POST JSON to `/webhook` and it will be logged and forwarded to the wakeup server.") | |
| with gr.Row(): | |
| status_box = gr.Code(label="Status", value=ui_status(), language="json") | |
| clear_btn = gr.Button("Clear messages", variant="stop") | |
| with gr.Row(): | |
| limit = gr.Slider(1, 200, value=25, step=1, label="Recent message limit") | |
| recent_box = gr.Code(label="Recent messages (newest first)", value=ui_recent(25), language="json") | |
| def refresh(limit_val: int): | |
| return ui_status(), ui_recent(limit_val) | |
| demo.load(refresh, inputs=[limit], outputs=[status_box, recent_box], every=2) | |
| limit.change(refresh, inputs=[limit], outputs=[status_box, recent_box]) | |
| clear_btn.click(lambda: (ui_clear(), ui_status(), ui_recent(25)), outputs=[recent_box, status_box, recent_box]) | |
| app = gr.mount_gradio_app(app, demo, path="/ui") |