Inserio_relay / app.py
Avinashnalla7's picture
Update app.py
e9d6910 verified
import os
import json
import time
import hmac
import hashlib
from collections import deque
from typing import Any, Dict, Optional
import requests
from fastapi import FastAPI, Request, Header, HTTPException
from fastapi.responses import JSONResponse
import gradio as gr
# -----------------------------------------------------------------------------
# Config (HF Variables/Secrets)
# -----------------------------------------------------------------------------
WEBHOOK_TOKEN = os.getenv("WEBHOOK_TOKEN", "").strip()
WEBHOOK_HMAC_SECRET = os.getenv("WEBHOOK_HMAC_SECRET", "").strip()
WEBHOOK_TOKEN_HEADER = os.getenv("WEBHOOK_TOKEN_HEADER", "X-SA-Token").strip()
MAX_MESSAGES = int(os.getenv("MAX_MESSAGES", "200"))
PERSIST_TO_FILE = os.getenv("PERSIST_TO_FILE", "0").strip() == "1"
PERSIST_PATH = os.getenv("PERSIST_PATH", "webhook_messages.jsonl").strip()
# Downstream wakeup server
# IMPORTANT: This must be the *actual* endpoint. Example:
# http://163.245.192.149:8080/wakeup
WAKEUP_URL = os.getenv("WAKEUP_URL", "http://163.245.192.149:8080/wakeup").strip()
WAKEUP_URL = WAKEUP_URL.rstrip("/") # normalize to no trailing slash
RELAY_TIMEOUT = int(os.getenv("RELAY_TIMEOUT", "30"))
# Dedupe window
SEEN_TTL_SECONDS = int(os.getenv("SEEN_TTL_SECONDS", "86400"))
MAX_SEEN = int(os.getenv("MAX_SEEN", "5000"))
# Flood guard
MAX_INFLIGHT = int(os.getenv("MAX_INFLIGHT", "1"))
# -----------------------------------------------------------------------------
# In-memory state
# -----------------------------------------------------------------------------
MESSAGES = deque(maxlen=MAX_MESSAGES)
TOTAL_COUNT = 0
LAST_RECEIVED_TS: Optional[float] = None
SEEN: Dict[str, float] = {}
INFLIGHT = 0
def _now_ts() -> float:
return time.time()
def _ts_to_str(ts: Optional[float]) -> str:
if not ts:
return "n/a"
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(ts))
def _safe_json_dumps(obj: Any) -> str:
try:
return json.dumps(obj, indent=2, sort_keys=True, ensure_ascii=False)
except Exception:
return str(obj)
def _append_message(payload: Dict[str, Any], meta: Dict[str, Any], relay: Dict[str, Any]) -> None:
global TOTAL_COUNT, LAST_RECEIVED_TS
TOTAL_COUNT += 1
LAST_RECEIVED_TS = _now_ts()
entry = {
"received_at_ts": LAST_RECEIVED_TS,
"received_at": _ts_to_str(LAST_RECEIVED_TS),
"meta": meta,
"payload": payload,
"relay": relay,
}
MESSAGES.appendleft(entry)
if PERSIST_TO_FILE:
try:
with open(PERSIST_PATH, "a", encoding="utf-8") as f:
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
except Exception:
pass
def _verify_token(provided_token: Optional[str]) -> None:
if not WEBHOOK_TOKEN:
return
if not provided_token or provided_token.strip() != WEBHOOK_TOKEN:
raise HTTPException(status_code=401, detail="Invalid or missing webhook token.")
def _verify_hmac_signature(raw_body: bytes, provided_sig: Optional[str]) -> None:
if not WEBHOOK_HMAC_SECRET:
return
if not provided_sig:
raise HTTPException(status_code=401, detail="Missing signature header.")
mac = hmac.new(WEBHOOK_HMAC_SECRET.encode("utf-8"), raw_body, hashlib.sha256).hexdigest()
if not hmac.compare_digest(mac, provided_sig.strip()):
raise HTTPException(status_code=401, detail="Invalid signature.")
def _norm(v: Any) -> str:
if v is None:
return ""
if isinstance(v, str):
s = v.strip()
if s.lower() in ("null", "none"):
return ""
return s
return str(v)
def compute_fingerprint(payload: Dict[str, Any]) -> str:
# MVP fingerprint. Upgrade later to sha256(file_bytes) when you have file URL/bytes.
parts = {
"event_type": _norm(payload.get("event_type")),
"rid": _norm(payload.get("rid")),
"file_type": _norm(payload.get("file_type")),
"case_vendor_rid": _norm(payload.get("case_vendor_rid")),
"case_rid": _norm(payload.get("case_rid")),
"date_modified": _norm(payload.get("date_modified")),
}
raw = json.dumps(parts, sort_keys=True, separators=(",", ":")).encode("utf-8")
return hashlib.sha256(raw).hexdigest()
def _gc_seen(now: float) -> None:
cutoff = now - SEEN_TTL_SECONDS
dead = [k for k, ts in SEEN.items() if ts < cutoff]
for k in dead:
SEEN.pop(k, None)
if len(SEEN) > MAX_SEEN:
items = sorted(SEEN.items(), key=lambda kv: kv[1]) # oldest first
for k, _ in items[: len(SEEN) - MAX_SEEN]:
SEEN.pop(k, None)
# -----------------------------------------------------------------------------
# FastAPI app
# -----------------------------------------------------------------------------
app = FastAPI(title="Inserio Relay", version="1.0.0")
@app.get("/")
def root():
return {
"ok": True,
"service": "inserio-relay",
"post": ["/webhook", "/webhook/", "/wakeup", "/wakeup/"],
"forward_to": WAKEUP_URL,
"ui": "/ui",
"seen_ttl_seconds": SEEN_TTL_SECONDS,
"max_inflight": MAX_INFLIGHT,
}
@app.get("/health")
def health():
return {
"ok": True,
"service": "inserio-relay",
"forward_to": WAKEUP_URL,
"total_count": TOTAL_COUNT,
"last_received_at": _ts_to_str(LAST_RECEIVED_TS),
"seen_size": len(SEEN),
"seen_ttl_seconds": SEEN_TTL_SECONDS,
"max_inflight": MAX_INFLIGHT,
}
# Accept BOTH slash + no-slash (QuickBase can hit either)
@app.post("/webhook")
@app.post("/webhook/")
@app.post("/wakeup")
@app.post("/wakeup/")
async def webhook(
request: Request,
x_sa_signature: Optional[str] = Header(default=None, convert_underscores=False),
x_sa_token: Optional[str] = Header(default=None, convert_underscores=False),
):
global INFLIGHT
raw_body = await request.body()
# Auth (optional)
if WEBHOOK_TOKEN:
token_value = request.headers.get(WEBHOOK_TOKEN_HEADER) or x_sa_token
_verify_token(token_value)
if WEBHOOK_HMAC_SECRET:
sig_value = request.headers.get("X-SA-Signature") or x_sa_signature
_verify_hmac_signature(raw_body, sig_value)
# Parse JSON
try:
payload = await request.json()
if not isinstance(payload, dict):
payload = {"_non_object_payload": payload}
except Exception:
payload = {"_raw_body": raw_body.decode("utf-8", errors="replace")}
now = _now_ts()
_gc_seen(now)
# Default state
if not _norm(payload.get("sa_processing_state")):
payload["sa_processing_state"] = "new"
# Fingerprint
fp = _norm(payload.get("sa_fingerprint"))
if not fp:
fp = compute_fingerprint(payload)
payload["sa_fingerprint"] = fp
# Dedupe
duplicate = fp in SEEN
if not duplicate:
SEEN[fp] = now
# Flood guard
if INFLIGHT >= MAX_INFLIGHT:
relay_info = {
"ok": True,
"duplicate": duplicate,
"fingerprint": fp,
"forwarded": False,
"reason": "rate_limited_max_inflight",
}
meta = {
"client": request.client.host if request.client else None,
"path": str(request.url.path),
}
_append_message(payload=payload, meta=meta, relay=relay_info)
return JSONResponse(status_code=200, content=relay_info)
# Do NOT forward duplicates
if duplicate:
relay_info = {
"ok": True,
"duplicate": True,
"fingerprint": fp,
"forwarded": False,
"reason": "duplicate",
}
meta = {
"client": request.client.host if request.client else None,
"path": str(request.url.path),
}
_append_message(payload=payload, meta=meta, relay=relay_info)
return JSONResponse(status_code=200, content=relay_info)
# Forward non-duplicate
forward_url = WAKEUP_URL # EXACT. No extra /wakeup appended.
INFLIGHT += 1
try:
r = requests.post(forward_url, json=payload, timeout=RELAY_TIMEOUT)
forward_status = r.status_code
forward_body = r.text[:2000]
except requests.RequestException as e:
forward_status = 502
forward_body = f"relay_failed: {e}"
finally:
INFLIGHT -= 1
relay_info = {
"ok": True,
"duplicate": False,
"fingerprint": fp,
"forward_url": forward_url,
"forward_status": forward_status,
"forward_body": forward_body,
}
meta = {
"client": request.client.host if request.client else None,
"path": str(request.url.path),
}
_append_message(payload=payload, meta=meta, relay=relay_info)
return JSONResponse(status_code=200, content=relay_info)
# -----------------------------------------------------------------------------
# Gradio dashboard mounted at /ui (doesn't interfere with API routes)
# -----------------------------------------------------------------------------
def ui_status() -> str:
return _safe_json_dumps(
{
"total_count": TOTAL_COUNT,
"last_received_at": _ts_to_str(LAST_RECEIVED_TS),
"buffer_size": len(MESSAGES),
"max_buffer": MAX_MESSAGES,
"relay": {
"forward_url": WAKEUP_URL,
"timeout": RELAY_TIMEOUT,
"max_inflight": MAX_INFLIGHT,
"seen_size": len(SEEN),
"seen_ttl_seconds": SEEN_TTL_SECONDS,
},
"auth": {
"token_enabled": bool(WEBHOOK_TOKEN),
"token_header": WEBHOOK_TOKEN_HEADER,
"hmac_enabled": bool(WEBHOOK_HMAC_SECRET),
},
"endpoints": {
"health": "/health",
"post": ["/webhook", "/webhook/", "/wakeup", "/wakeup/"],
"ui": "/ui",
},
}
)
def ui_recent(limit: int = 25) -> str:
limit = max(1, min(int(limit), 200))
items = list(MESSAGES)[:limit]
return _safe_json_dumps(items)
def ui_clear() -> str:
global TOTAL_COUNT, LAST_RECEIVED_TS
MESSAGES.clear()
TOTAL_COUNT = 0
LAST_RECEIVED_TS = None
return "Cleared in-memory messages."
with gr.Blocks(title="Inserio Relay Dashboard") as demo:
gr.Markdown("# Inserio Relay Dashboard")
gr.Markdown("POST JSON to `/webhook` and it will be logged and forwarded to the wakeup server.")
with gr.Row():
status_box = gr.Code(label="Status", value=ui_status(), language="json")
clear_btn = gr.Button("Clear messages", variant="stop")
with gr.Row():
limit = gr.Slider(1, 200, value=25, step=1, label="Recent message limit")
recent_box = gr.Code(label="Recent messages (newest first)", value=ui_recent(25), language="json")
def refresh(limit_val: int):
return ui_status(), ui_recent(limit_val)
demo.load(refresh, inputs=[limit], outputs=[status_box, recent_box], every=2)
limit.change(refresh, inputs=[limit], outputs=[status_box, recent_box])
clear_btn.click(lambda: (ui_clear(), ui_status(), ui_recent(25)), outputs=[recent_box, status_box, recent_box])
app = gr.mount_gradio_app(app, demo, path="/ui")