Avinashnalla7 commited on
Commit
e2591b5
·
verified ·
1 Parent(s): 6491b95

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +118 -18
app.py CHANGED
@@ -1,13 +1,35 @@
1
- import os, json, time, hashlib
2
- from fastapi import FastAPI, Request, HTTPException
 
 
 
 
 
3
  import requests
 
 
4
 
5
- WAKEUP_URL = os.getenv("WAKEUP_URL", "http://163.245.192.149:8080/wakeup/")
 
 
 
6
  TIMEOUT = int(os.getenv("RELAY_TIMEOUT", "30"))
7
 
8
- app = FastAPI()
 
 
 
 
 
 
 
 
 
9
 
10
- def norm(v):
 
 
 
11
  if v is None:
12
  return ""
13
  if isinstance(v, str):
@@ -15,31 +37,93 @@ def norm(v):
15
  return "" if s.lower() in ("null", "none") else s
16
  return v
17
 
18
- def compute_fingerprint(payload: dict) -> str:
 
 
 
 
 
 
 
19
  """
20
  MVP fingerprint: stable hash from fields that should represent the same upload.
21
- Upgrade later to sha256(file-bytes) when you have a download URL.
 
22
  """
23
  parts = {
24
  "rid": norm(payload.get("rid")),
25
  "file_type": norm(payload.get("file_type")),
26
  "case_vendor_rid": norm(payload.get("case_vendor_rid")),
 
27
  "date_modified": norm(payload.get("date_modified")),
28
  }
29
  raw = json.dumps(parts, sort_keys=True, separators=(",", ":")).encode("utf-8")
30
  return hashlib.sha256(raw).hexdigest()
31
 
32
- # ultra-light “db” for HF: in-memory set (fine for relay testing)
33
- SEEN = set()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
34
 
35
  @app.post("/webhook")
36
  async def webhook(req: Request):
37
- payload = await req.json()
 
 
 
 
 
 
 
38
 
39
  # 1) processing state default
40
  state = norm(payload.get("sa_processing_state"))
41
  if not state:
42
- payload["sa_processing_state"] = "new" # null -> new [oai_citation:3‡API Integration Setup Guide_otter.ai.txt](sediment://file_0000000079e8722fb44223541f5b71cb)
43
 
44
  # 2) fingerprint + dedupe
45
  fp = norm(payload.get("sa_fingerprint"))
@@ -47,20 +131,36 @@ async def webhook(req: Request):
47
  fp = compute_fingerprint(payload)
48
  payload["sa_fingerprint"] = fp
49
 
50
- if fp in SEEN:
 
51
  payload["sa_processing_state"] = "duplicate"
52
  return {"ok": True, "duplicate": True, "fingerprint": fp}
53
 
54
- SEEN.add(fp)
 
 
 
 
55
 
56
- # 3) forward/relay
57
  try:
58
- r = requests.post(WAKEUP_URL, json=payload, timeout=TIMEOUT)
 
 
 
59
  return {
60
  "ok": True,
61
- "forward_status": r.status_code,
62
- "forward_body": r.text[:2000],
63
  "fingerprint": fp,
 
 
 
64
  }
65
  except requests.RequestException as e:
66
- raise HTTPException(status_code=502, detail=f"relay_failed: {e}")
 
 
 
 
 
 
 
 
1
+ import os
2
+ import json
3
+ import time
4
+ import hashlib
5
+ import threading
6
+ from typing import Any, Dict, Optional
7
+
8
  import requests
9
+ from fastapi import FastAPI, Request, HTTPException
10
+ from fastapi.responses import JSONResponse
11
 
12
+ # ----------------------------
13
+ # Config
14
+ # ----------------------------
15
+ WAKEUP_URL = os.getenv("WAKEUP_URL", "http://163.245.192.149:8080/wakeup").strip()
16
  TIMEOUT = int(os.getenv("RELAY_TIMEOUT", "30"))
17
 
18
+ # Flood guard: keep this at 1 until you're 100% sure you're safe
19
+ MAX_INFLIGHT = int(os.getenv("MAX_INFLIGHT", "1"))
20
+
21
+ # Dedupe cache TTL (seconds). Keep reasonable so it doesn't grow forever.
22
+ SEEN_TTL_SECONDS = int(os.getenv("SEEN_TTL_SECONDS", "86400")) # 24h default
23
+
24
+ # How many chars of downstream response to return to caller
25
+ FORWARD_BODY_LIMIT = int(os.getenv("FORWARD_BODY_LIMIT", "2000"))
26
+
27
+ app = FastAPI(title="Inserio Relay", version="1.0.0")
28
 
29
+ # ----------------------------
30
+ # Helpers
31
+ # ----------------------------
32
+ def norm(v: Any) -> Any:
33
  if v is None:
34
  return ""
35
  if isinstance(v, str):
 
37
  return "" if s.lower() in ("null", "none") else s
38
  return v
39
 
40
+ def normalize_url(url: str) -> str:
41
+ """
42
+ Avoid trailing slash issues. Uvicorn/FastAPI often 307 redirect when trailing
43
+ slash mismatches the route. We'll send to the canonical version.
44
+ """
45
+ return url.rstrip("/")
46
+
47
+ def compute_fingerprint(payload: Dict[str, Any]) -> str:
48
  """
49
  MVP fingerprint: stable hash from fields that should represent the same upload.
50
+ NOTE: This is NOT file-bytes fingerprint. That must be done downstream (163 server)
51
+ after downloading the attachment bytes from QuickBase.
52
  """
53
  parts = {
54
  "rid": norm(payload.get("rid")),
55
  "file_type": norm(payload.get("file_type")),
56
  "case_vendor_rid": norm(payload.get("case_vendor_rid")),
57
+ "case_rid": norm(payload.get("case_rid")),
58
  "date_modified": norm(payload.get("date_modified")),
59
  }
60
  raw = json.dumps(parts, sort_keys=True, separators=(",", ":")).encode("utf-8")
61
  return hashlib.sha256(raw).hexdigest()
62
 
63
+ # ----------------------------
64
+ # In-memory dedupe store with TTL
65
+ # fp -> last_seen_epoch
66
+ # ----------------------------
67
+ SEEN: Dict[str, float] = {}
68
+ SEEN_LOCK = threading.Lock()
69
+
70
+ def seen_prune(now: float) -> None:
71
+ cutoff = now - SEEN_TTL_SECONDS
72
+ dead = [fp for fp, ts in SEEN.items() if ts < cutoff]
73
+ for fp in dead:
74
+ SEEN.pop(fp, None)
75
+
76
+ def seen_check_and_set(fp: str) -> bool:
77
+ """
78
+ Returns True if already seen (duplicate), else records and returns False.
79
+ """
80
+ now = time.time()
81
+ with SEEN_LOCK:
82
+ seen_prune(now)
83
+ if fp in SEEN:
84
+ SEEN[fp] = now
85
+ return True
86
+ SEEN[fp] = now
87
+ return False
88
+
89
+ # ----------------------------
90
+ # Flood guard semaphore
91
+ # ----------------------------
92
+ INFLIGHT = threading.BoundedSemaphore(value=max(1, MAX_INFLIGHT))
93
+
94
+ # ----------------------------
95
+ # Routes
96
+ # ----------------------------
97
+ @app.get("/")
98
+ def root():
99
+ return {
100
+ "ok": True,
101
+ "service": "inserio-relay",
102
+ "post": ["/webhook", "/wakeup"],
103
+ "forward_to": normalize_url(WAKEUP_URL),
104
+ "max_inflight": MAX_INFLIGHT,
105
+ "seen_ttl_seconds": SEEN_TTL_SECONDS,
106
+ }
107
+
108
+ @app.get("/health")
109
+ def health():
110
+ return {"ok": True}
111
 
112
  @app.post("/webhook")
113
  async def webhook(req: Request):
114
+ # Validate JSON
115
+ try:
116
+ payload = await req.json()
117
+ except Exception:
118
+ raise HTTPException(status_code=400, detail="invalid_json")
119
+
120
+ if not isinstance(payload, dict):
121
+ raise HTTPException(status_code=400, detail="payload_must_be_object")
122
 
123
  # 1) processing state default
124
  state = norm(payload.get("sa_processing_state"))
125
  if not state:
126
+ payload["sa_processing_state"] = "new"
127
 
128
  # 2) fingerprint + dedupe
129
  fp = norm(payload.get("sa_fingerprint"))
 
131
  fp = compute_fingerprint(payload)
132
  payload["sa_fingerprint"] = fp
133
 
134
+ if seen_check_and_set(fp):
135
+ # Do not forward duplicates (safe default)
136
  payload["sa_processing_state"] = "duplicate"
137
  return {"ok": True, "duplicate": True, "fingerprint": fp}
138
 
139
+ # 3) forward/relay with flood guard
140
+ if not INFLIGHT.acquire(blocking=False):
141
+ # You are getting flooded; do NOT forward.
142
+ # This is the safety brake Mark warned about.
143
+ raise HTTPException(status_code=429, detail="too_many_inflight_requests")
144
 
 
145
  try:
146
+ forward_url = normalize_url(WAKEUP_URL)
147
+
148
+ r = requests.post(forward_url, json=payload, timeout=TIMEOUT)
149
+
150
  return {
151
  "ok": True,
152
+ "duplicate": False,
 
153
  "fingerprint": fp,
154
+ "forward_url": forward_url,
155
+ "forward_status": r.status_code,
156
+ "forward_body": (r.text or "")[:FORWARD_BODY_LIMIT],
157
  }
158
  except requests.RequestException as e:
159
+ raise HTTPException(status_code=502, detail=f"relay_failed: {e}")
160
+ finally:
161
+ INFLIGHT.release()
162
+
163
+ # Alias route because people keep trying /wakeup on HF
164
+ @app.post("/wakeup")
165
+ async def wakeup_alias(req: Request):
166
+ return await webhook(req)