Avinashnalla7 commited on
Commit
fedbef2
·
verified ·
1 Parent(s): 9dccc19

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +184 -32
app.py CHANGED
@@ -6,12 +6,13 @@ import hashlib
6
  from collections import deque
7
  from typing import Any, Dict, Optional
8
 
 
9
  from fastapi import FastAPI, Request, Header, HTTPException
10
  from fastapi.responses import JSONResponse
11
  import gradio as gr
12
 
13
  # -----------------------------------------------------------------------------
14
- # Configuration (set these as HF Space "Secrets" or "Variables")
15
  # -----------------------------------------------------------------------------
16
  WEBHOOK_TOKEN = os.getenv("WEBHOOK_TOKEN", "").strip()
17
  WEBHOOK_HMAC_SECRET = os.getenv("WEBHOOK_HMAC_SECRET", "").strip()
@@ -22,6 +23,17 @@ MAX_MESSAGES = int(os.getenv("MAX_MESSAGES", "200"))
22
  PERSIST_TO_FILE = os.getenv("PERSIST_TO_FILE", "0").strip() == "1"
23
  PERSIST_PATH = os.getenv("PERSIST_PATH", "webhook_messages.jsonl").strip()
24
 
 
 
 
 
 
 
 
 
 
 
 
25
  # -----------------------------------------------------------------------------
26
  # In-memory state
27
  # -----------------------------------------------------------------------------
@@ -29,6 +41,10 @@ MESSAGES = deque(maxlen=MAX_MESSAGES)
29
  TOTAL_COUNT = 0
30
  LAST_RECEIVED_TS: Optional[float] = None
31
 
 
 
 
 
32
 
33
  def _now_ts() -> float:
34
  return time.time()
@@ -47,7 +63,7 @@ def _safe_json_dumps(obj: Any) -> str:
47
  return str(obj)
48
 
49
 
50
- def _append_message(payload: Dict[str, Any], meta: Dict[str, Any]) -> None:
51
  global TOTAL_COUNT, LAST_RECEIVED_TS
52
  TOTAL_COUNT += 1
53
  LAST_RECEIVED_TS = _now_ts()
@@ -57,6 +73,7 @@ def _append_message(payload: Dict[str, Any], meta: Dict[str, Any]) -> None:
57
  "received_at": _ts_to_str(LAST_RECEIVED_TS),
58
  "meta": meta,
59
  "payload": payload,
 
60
  }
61
 
62
  MESSAGES.appendleft(entry)
@@ -86,39 +103,97 @@ def _verify_hmac_signature(raw_body: bytes, provided_sig: Optional[str]) -> None
86
  raise HTTPException(status_code=401, detail="Invalid signature.")
87
 
88
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
89
  # -----------------------------------------------------------------------------
90
- # FastAPI app (webhook listener)
91
  # -----------------------------------------------------------------------------
92
- app = FastAPI(title="Webhook Listener", version="1.0.0")
93
 
94
 
95
  @app.get("/health")
96
  def health():
97
  return {
98
  "ok": True,
 
 
 
 
 
99
  "total_count": TOTAL_COUNT,
100
  "last_received_at": _ts_to_str(LAST_RECEIVED_TS),
101
  }
102
 
103
 
104
- # IMPORTANT: define both with and without trailing slash
105
- @app.post("/wakeup")
 
 
 
 
 
 
 
 
 
 
 
106
  @app.post("/wakeup/")
107
- @app.post("/webhook")
108
  @app.post("/webhook/")
109
  async def webhook(
110
  request: Request,
111
- x_sa_signature: Optional[str] = Header(default=None),
112
- x_sa_token: Optional[str] = Header(default=None),
113
  ):
 
 
114
  raw_body = await request.body()
115
 
116
- # Token auth (optional)
117
  if WEBHOOK_TOKEN:
118
  token_value = request.headers.get(WEBHOOK_TOKEN_HEADER) or x_sa_token
119
  _verify_token(token_value)
120
 
121
- # HMAC auth (optional)
122
  if WEBHOOK_HMAC_SECRET:
123
  sig_value = request.headers.get("X-SA-Signature") or x_sa_signature
124
  _verify_hmac_signature(raw_body, sig_value)
@@ -131,6 +206,89 @@ async def webhook(
131
  except Exception:
132
  payload = {"_raw_body": raw_body.decode("utf-8", errors="replace")}
133
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
134
  meta = {
135
  "client": request.client.host if request.client else None,
136
  "path": str(request.url.path),
@@ -142,22 +300,13 @@ async def webhook(
142
  },
143
  }
144
 
145
- _append_message(payload=payload, meta=meta)
146
 
147
- return JSONResponse(
148
- status_code=200,
149
- content={
150
- "ok": True,
151
- "message": "Webhook received.",
152
- "total_count": TOTAL_COUNT,
153
- "last_received_at": _ts_to_str(LAST_RECEIVED_TS),
154
- "echo": payload,
155
- },
156
- )
157
 
158
 
159
  # -----------------------------------------------------------------------------
160
- # Gradio UI (simple dashboard)
161
  # -----------------------------------------------------------------------------
162
  def ui_status() -> str:
163
  return _safe_json_dumps(
@@ -166,6 +315,13 @@ def ui_status() -> str:
166
  "last_received_at": _ts_to_str(LAST_RECEIVED_TS),
167
  "buffer_size": len(MESSAGES),
168
  "max_buffer": MAX_MESSAGES,
 
 
 
 
 
 
 
169
  "auth": {
170
  "token_enabled": bool(WEBHOOK_TOKEN),
171
  "token_header": WEBHOOK_TOKEN_HEADER,
@@ -173,8 +329,7 @@ def ui_status() -> str:
173
  },
174
  "endpoints": {
175
  "health": "/health",
176
- "webhook_post": "/wakeup",
177
- "webhook_post_alias": "/webhook",
178
  },
179
  }
180
  )
@@ -194,12 +349,9 @@ def ui_clear() -> str:
194
  return "Cleared in-memory messages."
195
 
196
 
197
- with gr.Blocks(title="Webhook Listener Dashboard") as demo:
198
- gr.Markdown("# Webhook Listener Dashboard")
199
- gr.Markdown(
200
- "Send a POST request with JSON to `/wakeup` or `/webhook` and it will appear below.\n\n"
201
- "This is for payload debugging / confirming delivery."
202
- )
203
 
204
  with gr.Row():
205
  status_box = gr.Code(label="Status", value=ui_status(), language="json")
@@ -208,7 +360,7 @@ with gr.Blocks(title="Webhook Listener Dashboard") as demo:
208
  with gr.Row():
209
  limit = gr.Slider(1, 200, value=25, step=1, label="Recent message limit")
210
 
211
- recent_box = gr.Code(label="Recent payloads (newest first)", value=ui_recent(25), language="json")
212
 
213
  def refresh(limit_val: int):
214
  return ui_status(), ui_recent(limit_val)
 
6
  from collections import deque
7
  from typing import Any, Dict, Optional
8
 
9
+ import requests
10
  from fastapi import FastAPI, Request, Header, HTTPException
11
  from fastapi.responses import JSONResponse
12
  import gradio as gr
13
 
14
  # -----------------------------------------------------------------------------
15
+ # Configuration (HF Space Variables/Secrets)
16
  # -----------------------------------------------------------------------------
17
  WEBHOOK_TOKEN = os.getenv("WEBHOOK_TOKEN", "").strip()
18
  WEBHOOK_HMAC_SECRET = os.getenv("WEBHOOK_HMAC_SECRET", "").strip()
 
23
  PERSIST_TO_FILE = os.getenv("PERSIST_TO_FILE", "0").strip() == "1"
24
  PERSIST_PATH = os.getenv("PERSIST_PATH", "webhook_messages.jsonl").strip()
25
 
26
+ # Relay forwarding
27
+ WAKEUP_URL = os.getenv("WAKEUP_URL", "http://163.245.192.149:8080/wakeup").strip().rstrip("/")
28
+ RELAY_TIMEOUT = int(os.getenv("RELAY_TIMEOUT", "30"))
29
+
30
+ # Dedupe window (seconds). Default 24h.
31
+ SEEN_TTL_SECONDS = int(os.getenv("SEEN_TTL_SECONDS", "86400"))
32
+ MAX_SEEN = int(os.getenv("MAX_SEEN", "5000"))
33
+
34
+ # Limit concurrent forwards (prevents flood / money burn)
35
+ MAX_INFLIGHT = int(os.getenv("MAX_INFLIGHT", "1"))
36
+
37
  # -----------------------------------------------------------------------------
38
  # In-memory state
39
  # -----------------------------------------------------------------------------
 
41
  TOTAL_COUNT = 0
42
  LAST_RECEIVED_TS: Optional[float] = None
43
 
44
+ # seen fingerprints -> last_seen_ts
45
+ SEEN: Dict[str, float] = {}
46
+ INFLIGHT = 0
47
+
48
 
49
  def _now_ts() -> float:
50
  return time.time()
 
63
  return str(obj)
64
 
65
 
66
+ def _append_message(payload: Dict[str, Any], meta: Dict[str, Any], relay: Dict[str, Any]) -> None:
67
  global TOTAL_COUNT, LAST_RECEIVED_TS
68
  TOTAL_COUNT += 1
69
  LAST_RECEIVED_TS = _now_ts()
 
73
  "received_at": _ts_to_str(LAST_RECEIVED_TS),
74
  "meta": meta,
75
  "payload": payload,
76
+ "relay": relay,
77
  }
78
 
79
  MESSAGES.appendleft(entry)
 
103
  raise HTTPException(status_code=401, detail="Invalid signature.")
104
 
105
 
106
+ def _norm(v: Any) -> str:
107
+ if v is None:
108
+ return ""
109
+ if isinstance(v, str):
110
+ s = v.strip()
111
+ if s.lower() in ("null", "none"):
112
+ return ""
113
+ return s
114
+ return str(v)
115
+
116
+
117
+ def compute_fingerprint(payload: Dict[str, Any]) -> str:
118
+ """
119
+ Payload-based fingerprint (MVP).
120
+ When you have a file download URL, upgrade to sha256(file_bytes).
121
+ """
122
+ parts = {
123
+ "event_type": _norm(payload.get("event_type")),
124
+ "rid": _norm(payload.get("rid")),
125
+ "file_type": _norm(payload.get("file_type")),
126
+ "case_vendor_rid": _norm(payload.get("case_vendor_rid")),
127
+ "case_rid": _norm(payload.get("case_rid")),
128
+ "date_modified": _norm(payload.get("date_modified")),
129
+ }
130
+ raw = json.dumps(parts, sort_keys=True, separators=(",", ":")).encode("utf-8")
131
+ return hashlib.sha256(raw).hexdigest()
132
+
133
+
134
+ def _gc_seen(now: float) -> None:
135
+ # TTL cleanup
136
+ cutoff = now - SEEN_TTL_SECONDS
137
+ dead = [k for k, ts in SEEN.items() if ts < cutoff]
138
+ for k in dead:
139
+ SEEN.pop(k, None)
140
+
141
+ # size cap cleanup (drop oldest)
142
+ if len(SEEN) > MAX_SEEN:
143
+ items = sorted(SEEN.items(), key=lambda kv: kv[1]) # oldest first
144
+ for k, _ in items[: len(SEEN) - MAX_SEEN]:
145
+ SEEN.pop(k, None)
146
+
147
+
148
  # -----------------------------------------------------------------------------
149
+ # FastAPI app
150
  # -----------------------------------------------------------------------------
151
+ app = FastAPI(title="Inserio Relay", version="1.0.0")
152
 
153
 
154
  @app.get("/health")
155
  def health():
156
  return {
157
  "ok": True,
158
+ "service": "inserio-relay",
159
+ "post": ["/webhook/", "/wakeup/"],
160
+ "forward_to": f"{WAKEUP_URL}/wakeup",
161
+ "max_inflight": MAX_INFLIGHT,
162
+ "seen_ttl_seconds": SEEN_TTL_SECONDS,
163
  "total_count": TOTAL_COUNT,
164
  "last_received_at": _ts_to_str(LAST_RECEIVED_TS),
165
  }
166
 
167
 
168
+ @app.get("/")
169
+ def root():
170
+ # A simple JSON so HF "App" tab doesn't show Not Found
171
+ return {
172
+ "ok": True,
173
+ "service": "inserio-relay",
174
+ "post": ["/webhook/", "/wakeup/"],
175
+ "forward_to": f"{WAKEUP_URL}/wakeup",
176
+ "max_inflight": MAX_INFLIGHT,
177
+ "seen_ttl_seconds": SEEN_TTL_SECONDS,
178
+ }
179
+
180
+
181
  @app.post("/wakeup/")
 
182
  @app.post("/webhook/")
183
  async def webhook(
184
  request: Request,
185
+ x_sa_signature: Optional[str] = Header(default=None, convert_underscores=False),
186
+ x_sa_token: Optional[str] = Header(default=None, convert_underscores=False),
187
  ):
188
+ global INFLIGHT
189
+
190
  raw_body = await request.body()
191
 
192
+ # Auth (optional)
193
  if WEBHOOK_TOKEN:
194
  token_value = request.headers.get(WEBHOOK_TOKEN_HEADER) or x_sa_token
195
  _verify_token(token_value)
196
 
 
197
  if WEBHOOK_HMAC_SECRET:
198
  sig_value = request.headers.get("X-SA-Signature") or x_sa_signature
199
  _verify_hmac_signature(raw_body, sig_value)
 
206
  except Exception:
207
  payload = {"_raw_body": raw_body.decode("utf-8", errors="replace")}
208
 
209
+ now = _now_ts()
210
+ _gc_seen(now)
211
+
212
+ # Enforce default state
213
+ if not _norm(payload.get("sa_processing_state")):
214
+ payload["sa_processing_state"] = "new"
215
+
216
+ # Fingerprint
217
+ fp = _norm(payload.get("sa_fingerprint"))
218
+ if not fp:
219
+ fp = compute_fingerprint(payload)
220
+ payload["sa_fingerprint"] = fp
221
+
222
+ # Dedupe check
223
+ duplicate = fp in SEEN
224
+ if not duplicate:
225
+ SEEN[fp] = now
226
+
227
+ # Flood guard
228
+ if INFLIGHT >= MAX_INFLIGHT:
229
+ relay_info = {
230
+ "ok": True,
231
+ "duplicate": duplicate,
232
+ "fingerprint": fp,
233
+ "forwarded": False,
234
+ "reason": "rate_limited_max_inflight",
235
+ }
236
+ meta = {
237
+ "client": request.client.host if request.client else None,
238
+ "path": str(request.url.path),
239
+ "headers": {
240
+ "user-agent": request.headers.get("user-agent"),
241
+ "content-type": request.headers.get("content-type"),
242
+ },
243
+ }
244
+ _append_message(payload=payload, meta=meta, relay=relay_info)
245
+ return JSONResponse(status_code=200, content=relay_info)
246
+
247
+ # Forward (even if duplicate? NO. If duplicate, don’t forward.)
248
+ forward_status = None
249
+ forward_body = None
250
+ forward_url = f"{WAKEUP_URL}/wakeup"
251
+
252
+ if duplicate:
253
+ relay_info = {
254
+ "ok": True,
255
+ "duplicate": True,
256
+ "fingerprint": fp,
257
+ "forwarded": False,
258
+ "reason": "duplicate",
259
+ }
260
+ meta = {
261
+ "client": request.client.host if request.client else None,
262
+ "path": str(request.url.path),
263
+ "headers": {
264
+ "user-agent": request.headers.get("user-agent"),
265
+ "content-type": request.headers.get("content-type"),
266
+ },
267
+ }
268
+ _append_message(payload=payload, meta=meta, relay=relay_info)
269
+ return JSONResponse(status_code=200, content=relay_info)
270
+
271
+ # Forward non-duplicate
272
+ INFLIGHT += 1
273
+ try:
274
+ r = requests.post(forward_url, json=payload, timeout=RELAY_TIMEOUT)
275
+ forward_status = r.status_code
276
+ forward_body = r.text[:2000]
277
+ except requests.RequestException as e:
278
+ forward_status = 502
279
+ forward_body = f"relay_failed: {e}"
280
+ finally:
281
+ INFLIGHT -= 1
282
+
283
+ relay_info = {
284
+ "ok": True,
285
+ "duplicate": False,
286
+ "fingerprint": fp,
287
+ "forward_url": forward_url,
288
+ "forward_status": forward_status,
289
+ "forward_body": forward_body,
290
+ }
291
+
292
  meta = {
293
  "client": request.client.host if request.client else None,
294
  "path": str(request.url.path),
 
300
  },
301
  }
302
 
303
+ _append_message(payload=payload, meta=meta, relay=relay_info)
304
 
305
+ return JSONResponse(status_code=200, content=relay_info)
 
 
 
 
 
 
 
 
 
306
 
307
 
308
  # -----------------------------------------------------------------------------
309
+ # Gradio dashboard
310
  # -----------------------------------------------------------------------------
311
  def ui_status() -> str:
312
  return _safe_json_dumps(
 
315
  "last_received_at": _ts_to_str(LAST_RECEIVED_TS),
316
  "buffer_size": len(MESSAGES),
317
  "max_buffer": MAX_MESSAGES,
318
+ "relay": {
319
+ "forward_url": f"{WAKEUP_URL}/wakeup",
320
+ "timeout": RELAY_TIMEOUT,
321
+ "max_inflight": MAX_INFLIGHT,
322
+ "seen_size": len(SEEN),
323
+ "seen_ttl_seconds": SEEN_TTL_SECONDS,
324
+ },
325
  "auth": {
326
  "token_enabled": bool(WEBHOOK_TOKEN),
327
  "token_header": WEBHOOK_TOKEN_HEADER,
 
329
  },
330
  "endpoints": {
331
  "health": "/health",
332
+ "post": ["/wakeup/", "/webhook/"],
 
333
  },
334
  }
335
  )
 
349
  return "Cleared in-memory messages."
350
 
351
 
352
+ with gr.Blocks(title="Inserio Relay Dashboard") as demo:
353
+ gr.Markdown("# Inserio Relay Dashboard")
354
+ gr.Markdown("POST JSON to `/webhook/` (or `/wakeup/`). It will be logged and forwarded to the wakeup server.")
 
 
 
355
 
356
  with gr.Row():
357
  status_box = gr.Code(label="Status", value=ui_status(), language="json")
 
360
  with gr.Row():
361
  limit = gr.Slider(1, 200, value=25, step=1, label="Recent message limit")
362
 
363
+ recent_box = gr.Code(label="Recent messages (newest first)", value=ui_recent(25), language="json")
364
 
365
  def refresh(limit_val: int):
366
  return ui_status(), ui_recent(limit_val)