Avinashnalla7 commited on
Commit
0abb7e3
·
verified ·
1 Parent(s): e2591b5

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +194 -140
app.py CHANGED
@@ -1,166 +1,220 @@
1
  import os
2
  import json
3
  import time
 
4
  import hashlib
5
- import threading
6
  from typing import Any, Dict, Optional
7
 
8
- import requests
9
- from fastapi import FastAPI, Request, HTTPException
10
  from fastapi.responses import JSONResponse
 
11
 
12
- # ----------------------------
13
- # Config
14
- # ----------------------------
15
- WAKEUP_URL = os.getenv("WAKEUP_URL", "http://163.245.192.149:8080/wakeup").strip()
16
- TIMEOUT = int(os.getenv("RELAY_TIMEOUT", "30"))
17
-
18
- # Flood guard: keep this at 1 until you're 100% sure you're safe
19
- MAX_INFLIGHT = int(os.getenv("MAX_INFLIGHT", "1"))
20
-
21
- # Dedupe cache TTL (seconds). Keep reasonable so it doesn't grow forever.
22
- SEEN_TTL_SECONDS = int(os.getenv("SEEN_TTL_SECONDS", "86400")) # 24h default
23
-
24
- # How many chars of downstream response to return to caller
25
- FORWARD_BODY_LIMIT = int(os.getenv("FORWARD_BODY_LIMIT", "2000"))
26
-
27
- app = FastAPI(title="Inserio Relay", version="1.0.0")
28
-
29
- # ----------------------------
30
- # Helpers
31
- # ----------------------------
32
- def norm(v: Any) -> Any:
33
- if v is None:
34
- return ""
35
- if isinstance(v, str):
36
- s = v.strip()
37
- return "" if s.lower() in ("null", "none") else s
38
- return v
39
-
40
- def normalize_url(url: str) -> str:
41
- """
42
- Avoid trailing slash issues. Uvicorn/FastAPI often 307 redirect when trailing
43
- slash mismatches the route. We'll send to the canonical version.
44
- """
45
- return url.rstrip("/")
46
-
47
- def compute_fingerprint(payload: Dict[str, Any]) -> str:
48
- """
49
- MVP fingerprint: stable hash from fields that should represent the same upload.
50
- NOTE: This is NOT file-bytes fingerprint. That must be done downstream (163 server)
51
- after downloading the attachment bytes from QuickBase.
52
- """
53
- parts = {
54
- "rid": norm(payload.get("rid")),
55
- "file_type": norm(payload.get("file_type")),
56
- "case_vendor_rid": norm(payload.get("case_vendor_rid")),
57
- "case_rid": norm(payload.get("case_rid")),
58
- "date_modified": norm(payload.get("date_modified")),
59
  }
60
- raw = json.dumps(parts, sort_keys=True, separators=(",", ":")).encode("utf-8")
61
- return hashlib.sha256(raw).hexdigest()
62
-
63
- # ----------------------------
64
- # In-memory dedupe store with TTL
65
- # fp -> last_seen_epoch
66
- # ----------------------------
67
- SEEN: Dict[str, float] = {}
68
- SEEN_LOCK = threading.Lock()
69
-
70
- def seen_prune(now: float) -> None:
71
- cutoff = now - SEEN_TTL_SECONDS
72
- dead = [fp for fp, ts in SEEN.items() if ts < cutoff]
73
- for fp in dead:
74
- SEEN.pop(fp, None)
75
-
76
- def seen_check_and_set(fp: str) -> bool:
77
- """
78
- Returns True if already seen (duplicate), else records and returns False.
79
- """
80
- now = time.time()
81
- with SEEN_LOCK:
82
- seen_prune(now)
83
- if fp in SEEN:
84
- SEEN[fp] = now
85
- return True
86
- SEEN[fp] = now
87
- return False
88
-
89
- # ----------------------------
90
- # Flood guard semaphore
91
- # ----------------------------
92
- INFLIGHT = threading.BoundedSemaphore(value=max(1, MAX_INFLIGHT))
93
-
94
- # ----------------------------
95
- # Routes
96
- # ----------------------------
97
- @app.get("/")
98
- def root():
99
  return {
100
  "ok": True,
101
- "service": "inserio-relay",
102
- "post": ["/webhook", "/wakeup"],
103
- "forward_to": normalize_url(WAKEUP_URL),
104
- "max_inflight": MAX_INFLIGHT,
105
- "seen_ttl_seconds": SEEN_TTL_SECONDS,
106
  }
107
 
108
- @app.get("/health")
109
- def health():
110
- return {"ok": True}
111
 
 
 
 
112
  @app.post("/webhook")
113
- async def webhook(req: Request):
114
- # Validate JSON
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
115
  try:
116
- payload = await req.json()
 
 
117
  except Exception:
118
- raise HTTPException(status_code=400, detail="invalid_json")
 
 
 
 
 
 
 
 
 
 
 
 
 
119
 
120
- if not isinstance(payload, dict):
121
- raise HTTPException(status_code=400, detail="payload_must_be_object")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
122
 
123
- # 1) processing state default
124
- state = norm(payload.get("sa_processing_state"))
125
- if not state:
126
- payload["sa_processing_state"] = "new"
127
 
128
- # 2) fingerprint + dedupe
129
- fp = norm(payload.get("sa_fingerprint"))
130
- if not fp:
131
- fp = compute_fingerprint(payload)
132
- payload["sa_fingerprint"] = fp
133
 
134
- if seen_check_and_set(fp):
135
- # Do not forward duplicates (safe default)
136
- payload["sa_processing_state"] = "duplicate"
137
- return {"ok": True, "duplicate": True, "fingerprint": fp}
138
 
139
- # 3) forward/relay with flood guard
140
- if not INFLIGHT.acquire(blocking=False):
141
- # You are getting flooded; do NOT forward.
142
- # This is the safety brake Mark warned about.
143
- raise HTTPException(status_code=429, detail="too_many_inflight_requests")
 
144
 
145
- try:
146
- forward_url = normalize_url(WAKEUP_URL)
147
 
148
- r = requests.post(forward_url, json=payload, timeout=TIMEOUT)
 
 
 
 
 
149
 
150
- return {
151
- "ok": True,
152
- "duplicate": False,
153
- "fingerprint": fp,
154
- "forward_url": forward_url,
155
- "forward_status": r.status_code,
156
- "forward_body": (r.text or "")[:FORWARD_BODY_LIMIT],
157
- }
158
- except requests.RequestException as e:
159
- raise HTTPException(status_code=502, detail=f"relay_failed: {e}")
160
- finally:
161
- INFLIGHT.release()
162
 
163
- # Alias route because people keep trying /wakeup on HF
164
- @app.post("/wakeup")
165
- async def wakeup_alias(req: Request):
166
- return await webhook(req)
 
 
 
 
 
 
 
 
 
 
1
  import os
2
  import json
3
  import time
4
+ import hmac
5
  import hashlib
6
+ from collections import deque
7
  from typing import Any, Dict, Optional
8
 
9
+ from fastapi import FastAPI, Request, Header, HTTPException
 
10
  from fastapi.responses import JSONResponse
11
+ import gradio as gr
12
 
13
+ # -----------------------------------------------------------------------------
14
+ # Configuration (set these as HF Space "Secrets" or "Variables")
15
+ # -----------------------------------------------------------------------------
16
+ WEBHOOK_TOKEN = os.getenv("WEBHOOK_TOKEN", "").strip()
17
+ WEBHOOK_HMAC_SECRET = os.getenv("WEBHOOK_HMAC_SECRET", "").strip()
18
+ WEBHOOK_TOKEN_HEADER = os.getenv("WEBHOOK_TOKEN_HEADER", "X-SA-Token").strip()
19
+
20
+ MAX_MESSAGES = int(os.getenv("MAX_MESSAGES", "200"))
21
+
22
+ PERSIST_TO_FILE = os.getenv("PERSIST_TO_FILE", "0").strip() == "1"
23
+ PERSIST_PATH = os.getenv("PERSIST_PATH", "webhook_messages.jsonl").strip()
24
+
25
+ # -----------------------------------------------------------------------------
26
+ # In-memory state
27
+ # -----------------------------------------------------------------------------
28
+ MESSAGES = deque(maxlen=MAX_MESSAGES)
29
+ TOTAL_COUNT = 0
30
+ LAST_RECEIVED_TS: Optional[float] = None
31
+
32
+
33
+ def _now_ts() -> float:
34
+ return time.time()
35
+
36
+
37
+ def _ts_to_str(ts: Optional[float]) -> str:
38
+ if not ts:
39
+ return "n/a"
40
+ return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(ts))
41
+
42
+
43
+ def _safe_json_dumps(obj: Any) -> str:
44
+ try:
45
+ return json.dumps(obj, indent=2, sort_keys=True, ensure_ascii=False)
46
+ except Exception:
47
+ return str(obj)
48
+
49
+
50
+ def _append_message(payload: Dict[str, Any], meta: Dict[str, Any]) -> None:
51
+ global TOTAL_COUNT, LAST_RECEIVED_TS
52
+ TOTAL_COUNT += 1
53
+ LAST_RECEIVED_TS = _now_ts()
54
+
55
+ entry = {
56
+ "received_at_ts": LAST_RECEIVED_TS,
57
+ "received_at": _ts_to_str(LAST_RECEIVED_TS),
58
+ "meta": meta,
59
+ "payload": payload,
60
  }
61
+
62
+ MESSAGES.appendleft(entry)
63
+
64
+ if PERSIST_TO_FILE:
65
+ try:
66
+ with open(PERSIST_PATH, "a", encoding="utf-8") as f:
67
+ f.write(json.dumps(entry, ensure_ascii=False) + "\n")
68
+ except Exception:
69
+ pass
70
+
71
+
72
+ def _verify_token(provided_token: Optional[str]) -> None:
73
+ if not WEBHOOK_TOKEN:
74
+ return
75
+ if not provided_token or provided_token.strip() != WEBHOOK_TOKEN:
76
+ raise HTTPException(status_code=401, detail="Invalid or missing webhook token.")
77
+
78
+
79
+ def _verify_hmac_signature(raw_body: bytes, provided_sig: Optional[str]) -> None:
80
+ if not WEBHOOK_HMAC_SECRET:
81
+ return
82
+ if not provided_sig:
83
+ raise HTTPException(status_code=401, detail="Missing signature header.")
84
+ mac = hmac.new(WEBHOOK_HMAC_SECRET.encode("utf-8"), raw_body, hashlib.sha256).hexdigest()
85
+ if not hmac.compare_digest(mac, provided_sig.strip()):
86
+ raise HTTPException(status_code=401, detail="Invalid signature.")
87
+
88
+
89
+ # -----------------------------------------------------------------------------
90
+ # FastAPI app (webhook listener)
91
+ # -----------------------------------------------------------------------------
92
+ app = FastAPI(title="Webhook Listener", version="1.0.0")
93
+
94
+
95
+ @app.get("/health")
96
+ def health():
 
 
 
97
  return {
98
  "ok": True,
99
+ "total_count": TOTAL_COUNT,
100
+ "last_received_at": _ts_to_str(LAST_RECEIVED_TS),
 
 
 
101
  }
102
 
 
 
 
103
 
104
+ # IMPORTANT: define both with and without trailing slash
105
+ @app.post("/wakeup")
106
+ @app.post("/wakeup/")
107
  @app.post("/webhook")
108
+ @app.post("/webhook/")
109
+ async def webhook(
110
+ request: Request,
111
+ x_sa_signature: Optional[str] = Header(default=None),
112
+ x_sa_token: Optional[str] = Header(default=None),
113
+ ):
114
+ raw_body = await request.body()
115
+
116
+ # Token auth (optional)
117
+ if WEBHOOK_TOKEN:
118
+ token_value = request.headers.get(WEBHOOK_TOKEN_HEADER) or x_sa_token
119
+ _verify_token(token_value)
120
+
121
+ # HMAC auth (optional)
122
+ if WEBHOOK_HMAC_SECRET:
123
+ sig_value = request.headers.get("X-SA-Signature") or x_sa_signature
124
+ _verify_hmac_signature(raw_body, sig_value)
125
+
126
+ # Parse JSON
127
  try:
128
+ payload = await request.json()
129
+ if not isinstance(payload, dict):
130
+ payload = {"_non_object_payload": payload}
131
  except Exception:
132
+ payload = {"_raw_body": raw_body.decode("utf-8", errors="replace")}
133
+
134
+ meta = {
135
+ "client": request.client.host if request.client else None,
136
+ "path": str(request.url.path),
137
+ "headers": {
138
+ "user-agent": request.headers.get("user-agent"),
139
+ "content-type": request.headers.get("content-type"),
140
+ WEBHOOK_TOKEN_HEADER: "***" if request.headers.get(WEBHOOK_TOKEN_HEADER) else None,
141
+ "x-sa-signature": "***" if request.headers.get("x-sa-signature") else None,
142
+ },
143
+ }
144
+
145
+ _append_message(payload=payload, meta=meta)
146
 
147
+ return JSONResponse(
148
+ status_code=200,
149
+ content={
150
+ "ok": True,
151
+ "message": "Webhook received.",
152
+ "total_count": TOTAL_COUNT,
153
+ "last_received_at": _ts_to_str(LAST_RECEIVED_TS),
154
+ "echo": payload,
155
+ },
156
+ )
157
+
158
+
159
+ # -----------------------------------------------------------------------------
160
+ # Gradio UI (simple dashboard)
161
+ # -----------------------------------------------------------------------------
162
+ def ui_status() -> str:
163
+ return _safe_json_dumps(
164
+ {
165
+ "total_count": TOTAL_COUNT,
166
+ "last_received_at": _ts_to_str(LAST_RECEIVED_TS),
167
+ "buffer_size": len(MESSAGES),
168
+ "max_buffer": MAX_MESSAGES,
169
+ "auth": {
170
+ "token_enabled": bool(WEBHOOK_TOKEN),
171
+ "token_header": WEBHOOK_TOKEN_HEADER,
172
+ "hmac_enabled": bool(WEBHOOK_HMAC_SECRET),
173
+ },
174
+ "endpoints": {
175
+ "health": "/health",
176
+ "webhook_post": "/wakeup",
177
+ "webhook_post_alias": "/webhook",
178
+ },
179
+ }
180
+ )
181
 
 
 
 
 
182
 
183
+ def ui_recent(limit: int = 25) -> str:
184
+ limit = max(1, min(int(limit), 200))
185
+ items = list(MESSAGES)[:limit]
186
+ return _safe_json_dumps(items)
 
187
 
 
 
 
 
188
 
189
+ def ui_clear() -> str:
190
+ global TOTAL_COUNT, LAST_RECEIVED_TS
191
+ MESSAGES.clear()
192
+ TOTAL_COUNT = 0
193
+ LAST_RECEIVED_TS = None
194
+ return "Cleared in-memory messages."
195
 
 
 
196
 
197
+ with gr.Blocks(title="Webhook Listener Dashboard") as demo:
198
+ gr.Markdown("# Webhook Listener Dashboard")
199
+ gr.Markdown(
200
+ "Send a POST request with JSON to `/wakeup` or `/webhook` and it will appear below.\n\n"
201
+ "This is for payload debugging / confirming delivery."
202
+ )
203
 
204
+ with gr.Row():
205
+ status_box = gr.Code(label="Status", value=ui_status(), language="json")
206
+ clear_btn = gr.Button("Clear messages", variant="stop")
 
 
 
 
 
 
 
 
 
207
 
208
+ with gr.Row():
209
+ limit = gr.Slider(1, 200, value=25, step=1, label="Recent message limit")
210
+
211
+ recent_box = gr.Code(label="Recent payloads (newest first)", value=ui_recent(25), language="json")
212
+
213
+ def refresh(limit_val: int):
214
+ return ui_status(), ui_recent(limit_val)
215
+
216
+ demo.load(refresh, inputs=[limit], outputs=[status_box, recent_box], every=2)
217
+ limit.change(refresh, inputs=[limit], outputs=[status_box, recent_box])
218
+ clear_btn.click(lambda: (ui_clear(), ui_status(), ui_recent(25)), outputs=[recent_box, status_box, recent_box])
219
+
220
+ app = gr.mount_gradio_app(app, demo, path="/")