Perth0603 commited on
Commit
d8f11da
·
verified ·
1 Parent(s): 6e01a39

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +546 -507
app.py CHANGED
@@ -1,507 +1,546 @@
1
- import os
2
- import csv
3
- import re
4
- import threading
5
- from typing import Optional, List, Dict, Any
6
- from difflib import SequenceMatcher
7
-
8
- import joblib
9
- import numpy as np
10
- import pandas as pd
11
- from fastapi import FastAPI
12
- from fastapi.responses import JSONResponse
13
- from huggingface_hub import hf_hub_download
14
- from pydantic import BaseModel
15
- from urllib.parse import urlparse
16
-
17
- try:
18
- import xgboost as xgb # type: ignore
19
- except Exception:
20
- xgb = None
21
-
22
-
23
- # Environment defaults suitable for HF Spaces
24
- os.environ.setdefault("HOME", "/data")
25
- os.environ.setdefault("XDG_CACHE_HOME", "/data/.cache")
26
- os.environ.setdefault("HF_HOME", "/data/.cache")
27
- os.environ.setdefault("TRANSFORMERS_CACHE", "/data/.cache")
28
- os.environ.setdefault("TORCH_HOME", "/data/.cache")
29
-
30
-
31
- # Config
32
- URL_REPO = os.environ.get(
33
- "HF_URL_MODEL_ID",
34
- os.environ.get("URL_REPO", "Perth0603/Random-Forest-Model-for-PhishingDetection"),
35
- )
36
- URL_REPO_TYPE = os.environ.get("HF_URL_REPO_TYPE", os.environ.get("URL_REPO_TYPE", "model"))
37
- URL_FILENAME = os.environ.get("HF_URL_FILENAME", os.environ.get("URL_FILENAME", "rf_url_phishing_xgboost_bst.joblib"))
38
- CACHE_DIR = os.environ.get("HF_CACHE_DIR", "/data/.cache")
39
- os.makedirs(CACHE_DIR, exist_ok=True)
40
-
41
- # Polarity override: "PHISH" or "LEGIT"; empty means default (class 1 = PHISH)
42
- URL_POSITIVE_CLASS_ENV = os.environ.get("URL_POSITIVE_CLASS", "").strip().upper()
43
-
44
- # CSV configuration (defaults to files in same directory)
45
- BASE_DIR = os.path.dirname(__file__)
46
- AUTOCALIB_PHISHY_CSV = os.environ.get("AUTOCALIB_PHISHY_CSV", os.path.join(BASE_DIR, "autocalib_phishy.csv"))
47
- AUTOCALIB_LEGIT_CSV = os.environ.get("AUTOCALIB_LEGIT_CSV", os.path.join(BASE_DIR, "autocalib_legit.csv"))
48
- KNOWN_HOSTS_CSV = os.environ.get("KNOWN_HOSTS_CSV", os.path.join(BASE_DIR, "known_hosts.csv"))
49
-
50
-
51
- app = FastAPI(title="PhishWatch URL API", version="2.0.0")
52
-
53
-
54
- class PredictUrlPayload(BaseModel):
55
- url: str
56
-
57
-
58
- _url_bundle: Optional[Dict[str, Any]] = None
59
- _url_lock = threading.Lock()
60
-
61
-
62
- def _normalize_host(value: str) -> str:
63
- v = value.strip().lower()
64
- if v.startswith("www."):
65
- v = v[4:]
66
- return v
67
-
68
-
69
- def _host_matches_any(host: str, known: List[str]) -> bool:
70
- base = _normalize_host(host)
71
- for item in known:
72
- k = _normalize_host(item)
73
- if base == k or base.endswith("." + k):
74
- return True
75
- return False
76
-
77
-
78
- _URL_EXTRACT_RE = re.compile(r"(https?://[^\s<>\"'\)\]]+)", re.IGNORECASE)
79
-
80
- def _sanitize_input_url(text: str) -> str:
81
- v = (text or "").strip()
82
- if v.startswith("@"):
83
- v = v.lstrip("@").strip()
84
- m = _URL_EXTRACT_RE.search(v)
85
- if m:
86
- v = m.group(1)
87
- v = v.strip("<>[]()")
88
- return v
89
-
90
- _SCHEME_RE = re.compile(r"^[a-zA-Z][a-zA-Z0-9+\-.]*://")
91
- def _ensure_scheme(u: str) -> str:
92
- u = (u or "").strip()
93
- return u if _SCHEME_RE.match(u) else ("http://" + u)
94
-
95
- def _read_urls_from_csv(path: str) -> List[str]:
96
- urls: List[str] = []
97
- try:
98
- with open(path, newline="", encoding="utf-8") as f:
99
- reader = csv.DictReader(f)
100
- if "url" in (reader.fieldnames or []):
101
- for row in reader:
102
- val = str(row.get("url", "")).strip()
103
- if val:
104
- urls.append(val)
105
- else:
106
- f.seek(0)
107
- f2 = csv.reader(f)
108
- for row in f2:
109
- if not row:
110
- continue
111
- val = str(row[0]).strip()
112
- if val.lower() == "url":
113
- continue
114
- if val:
115
- urls.append(val)
116
- except FileNotFoundError:
117
- pass
118
- except Exception as e:
119
- print(f"[csv] failed reading URLs from {path}: {e}")
120
- return urls
121
-
122
-
123
- def _read_hosts_from_csv(path: str) -> Dict[str, str]:
124
- out: Dict[str, str] = {}
125
- try:
126
- with open(path, newline="", encoding="utf-8") as f:
127
- reader = csv.DictReader(f)
128
- fields = [x.lower() for x in (reader.fieldnames or [])]
129
- if "host" in fields and "label" in fields:
130
- for row in reader:
131
- host = str(row.get("host", "")).strip()
132
- label = str(row.get("label", "")).strip().upper()
133
- if host and label in ("PHISH", "LEGIT"):
134
- out[host] = label
135
- except FileNotFoundError:
136
- pass
137
- except Exception as e:
138
- print(f"[csv] failed reading hosts from {path}: {e}")
139
- return out
140
-
141
-
142
- def _engineer_features(urls: List[str], feature_cols: List[str]) -> pd.DataFrame:
143
- s = pd.Series(urls, dtype=str)
144
- out = pd.DataFrame()
145
-
146
- # Base URL-wide counts used by older models
147
- out["url_len"] = s.str.len().fillna(0)
148
- out["count_dot"] = s.str.count(r"\.")
149
- out["count_hyphen"] = s.str.count("-")
150
- out["count_digit"] = s.str.count(r"\d")
151
- out["count_at"] = s.str.count("@")
152
- out["count_qmark"] = s.str.count(r"\?")
153
- out["count_eq"] = s.str.count("=")
154
- out["count_slash"] = s.str.count("/")
155
- out["digit_ratio"] = (out["count_digit"] / out["url_len"].replace(0, np.nan)).fillna(0)
156
- out["has_ip"] = s.str.contains(r"(?:\d{1,3}\.){3}\d{1,3}").astype(int)
157
- for tok in ["login", "verify", "secure", "update", "bank", "pay", "account", "webscr"]:
158
- out[f"has_{tok}"] = s.str.contains(tok, case=False, regex=False).astype(int)
159
- out["starts_https"] = s.str.startswith("https").astype(int)
160
- out["ends_with_exe"] = s.str.endswith(".exe").astype(int)
161
- out["ends_with_zip"] = s.str.endswith(".zip").astype(int)
162
-
163
- # Host/SLD/TLD derived features used by newer models
164
- hosts = s.apply(lambda x: (urlparse(_ensure_scheme(x)).hostname or "").lower())
165
- out["host_len"] = hosts.str.len().fillna(0)
166
-
167
- # Subdomain count: number of labels minus 2 (for sld.tld); never below 0
168
- label_counts = hosts.str.count(r"\.") + 1
169
- sub_count = (label_counts - 2).clip(lower=0)
170
- out["subdomain_count"] = sub_count.fillna(0)
171
-
172
- # TLD and SLD extraction (simple heuristic; handles common cases)
173
- parts_series = hosts.str.split(".")
174
- tld_series = parts_series.apply(lambda p: p[-1] if len(p) >= 1 else "")
175
- sld_series = parts_series.apply(lambda p: p[-2] if len(p) >= 2 else "")
176
-
177
- # Suspicious TLD flag (expand as needed)
178
- suspicious_tlds = {
179
- "tk", "ml", "ga", "cf", "gq", "xyz", "top", "buzz", "icu",
180
- "fit", "rest", "work", "click", "country", "zip"
181
- }
182
- out["tld_suspicious"] = tld_series.apply(lambda t: 1 if t.lower() in suspicious_tlds else 0)
183
-
184
- # Punycode indicator
185
- out["has_punycode"] = hosts.str.contains("xn--").astype(int)
186
-
187
- # SLD stats
188
- out["sld_len"] = sld_series.str.len().fillna(0)
189
- def _ratio_digits(txt: str) -> float:
190
- txt = txt or ""
191
- if not txt:
192
- return 0.0
193
- digits = sum(c.isdigit() for c in txt)
194
- return float(digits) / float(len(txt))
195
- out["sld_digit_ratio"] = sld_series.apply(_ratio_digits)
196
-
197
- def _shannon_entropy(txt: str) -> float:
198
- txt = txt or ""
199
- if not txt:
200
- return 0.0
201
- counts: Dict[str, int] = {}
202
- for ch in txt:
203
- counts[ch] = counts.get(ch, 0) + 1
204
- total = float(len(txt))
205
- entropy = 0.0
206
- for n in counts.values():
207
- p = n / total
208
- entropy -= p * np.log2(p)
209
- return float(entropy)
210
- out["sld_entropy"] = sld_series.apply(_shannon_entropy)
211
-
212
- # Brand similarity features (lightweight; stdlib only)
213
- common_brands = [
214
- "facebook", "google", "youtube", "apple", "microsoft",
215
- "paypal", "amazon", "netflix", "instagram", "whatsapp",
216
- "tiktok", "twitter", "telegram", "linkedin", "bank", "login"
217
- ]
218
-
219
- def _max_brand_similarity(host: str) -> float:
220
- host = host or ""
221
- if not host:
222
- return 0.0
223
- # Compare against host and sld specifically
224
- best = 0.0
225
- sld_local = host.split(".")[-2] if "." in host else host
226
- for brand in common_brands:
227
- best = max(
228
- best,
229
- SequenceMatcher(None, host, brand).ratio(),
230
- SequenceMatcher(None, sld_local, brand).ratio(),
231
- )
232
- return float(best)
233
-
234
- def _like_brand(host: str, brand: str, threshold: float = 0.82) -> int:
235
- h = host or ""
236
- if not h:
237
- return 0
238
- if brand in h:
239
- return 1
240
- sld_local = h.split(".")[-2] if "." in h else h
241
- score = max(
242
- SequenceMatcher(None, h, brand).ratio(),
243
- SequenceMatcher(None, sld_local, brand).ratio(),
244
- )
245
- return 1 if score >= threshold else 0
246
-
247
- out["max_brand_sim"] = hosts.apply(_max_brand_similarity)
248
- out["like_facebook"] = hosts.apply(lambda h: _like_brand(h, "facebook"))
249
-
250
- # Lookalike/homoglyph detection: unusual Unicode symbols that resemble ASCII letters
251
- # Examples: Cyrillic а (U+0430) looks like 'a', Greek α (U+03B1) looks like 'a', etc.
252
- def _detect_lookalike_chars(url: str) -> int:
253
- """
254
- Detects if URL contains Unicode characters that visually resemble ASCII letters.
255
- Common lookalikes used in phishing:
256
- - Cyrillic: а, е, о, р, с, х, у, ч, ы, ь (look like a,e,o,p,c,x,y,4,b,b)
257
- - Greek: α, ο (look like a, o)
258
- - Latin Extended: ɑ, ɢ, ᴅ, ɡ, ɪ, ɴ, ɪ (look like a,G,D,g,i,N,I)
259
- """
260
- url_str = url or ""
261
-
262
- # Cyrillic characters that look like ASCII letters
263
- lookalikes_cyrillic = {
264
- 'а': 'a', 'е': 'e', 'о': 'o', 'р': 'p', 'с': 'c', 'х': 'x',
265
- 'у': 'y', 'ч': '4', 'ы': 'b', 'ь': 'b', 'і': 'i', 'ї': 'yi',
266
- 'ґ': 'g', 'ė': 'e', 'ń': 'n', 'ș': 's', 'ț': 't'
267
- }
268
-
269
- # Greek characters that look like ASCII letters
270
- lookalikes_greek = {
271
- 'α': 'a', 'ο': 'o', 'ν': 'v', 'τ': 't', 'ρ': 'p'
272
- }
273
-
274
- # Latin Extended lookalikes
275
- lookalikes_latin = {
276
- 'ɑ': 'a', 'ɢ': 'g', 'ᴅ': 'd', 'ɡ': 'g', 'ɪ': 'i',
277
- 'ɴ': 'n', 'ᴘ': 'p', 'ᴠ': 'v', 'ᴡ': 'w', 'ɨ': 'i'
278
- }
279
-
280
- all_lookalikes = {**lookalikes_cyrillic, **lookalikes_greek, **lookalikes_latin}
281
-
282
- for char in url_str:
283
- if char in all_lookalikes:
284
- return 1
285
- return 0
286
-
287
- out["has_lookalike_chars"] = s.apply(_detect_lookalike_chars)
288
-
289
- # Return columns in the exact order expected by the model; fill any
290
- # still-missing engineered columns with zeros to stay robust across
291
- # model updates.
292
- return out.reindex(columns=feature_cols, fill_value=0)
293
-
294
-
295
- def _load_url_model():
296
- global _url_bundle
297
- if _url_bundle is None:
298
- with _url_lock:
299
- if _url_bundle is None:
300
- local_path = os.path.join(os.getcwd(), URL_FILENAME)
301
- if os.path.exists(local_path):
302
- _url_bundle = joblib.load(local_path)
303
- else:
304
- model_path = hf_hub_download(
305
- repo_id=URL_REPO,
306
- filename=URL_FILENAME,
307
- repo_type=URL_REPO_TYPE,
308
- cache_dir=CACHE_DIR,
309
- )
310
- _url_bundle = joblib.load(model_path)
311
-
312
-
313
- def _normalize_url_string(url: str) -> str:
314
- return (url or "").strip().rstrip("/")
315
-
316
-
317
- @app.get("/")
318
- def root():
319
- return {"status": "ok", "backend": "url-only"}
320
-
321
-
322
- @app.post("/predict-url")
323
- def predict_url(payload: PredictUrlPayload):
324
- try:
325
- _load_url_model()
326
-
327
- # Load CSVs on every request (keeps behavior in sync without code edits)
328
- phishy_list = _read_urls_from_csv(AUTOCALIB_PHISHY_CSV)
329
- legit_list = _read_urls_from_csv(AUTOCALIB_LEGIT_CSV)
330
- host_map = _read_hosts_from_csv(KNOWN_HOSTS_CSV)
331
-
332
- bundle = _url_bundle
333
- if not isinstance(bundle, dict) or "model" not in bundle:
334
- raise RuntimeError("Loaded URL artifact is not a bundle dict with 'model'.")
335
-
336
- model = bundle["model"]
337
- feature_cols: List[str] = bundle.get("feature_cols") or []
338
- url_col: str = bundle.get("url_col") or "url"
339
- model_type: str = bundle.get("model_type") or ""
340
-
341
- raw_input = (payload.url or "").strip()
342
- url_str = _sanitize_input_url(raw_input)
343
- if not url_str:
344
- return JSONResponse(status_code=400, content={"error": "Empty url"})
345
-
346
- # URL-level override via CSV lists (normalized exact match, ignoring trailing slash)
347
- norm_url = _normalize_url_string(url_str)
348
- phishy_set = { _normalize_url_string(u) for u in phishy_list }
349
- legit_set = { _normalize_url_string(u) for u in legit_list }
350
-
351
- if norm_url in phishy_set or norm_url in legit_set:
352
- phish_is_positive = True if URL_POSITIVE_CLASS_ENV == "" else (URL_POSITIVE_CLASS_ENV == "PHISH")
353
- label = "PHISH" if norm_url in phishy_set else "LEGIT"
354
- predicted_label = 1 if ((label == "PHISH") == phish_is_positive) else 0
355
- phish_proba = 0.99 if label == "PHISH" else 0.01
356
- score = phish_proba if label == "PHISH" else (1.0 - phish_proba)
357
- return {
358
- "label": label,
359
- "predicted_label": int(predicted_label),
360
- "score": float(score),
361
- "phishing_probability": float(phish_proba),
362
- "backend": str(model_type),
363
- "threshold": 0.5,
364
- "url_col": url_col,
365
- "override": {"reason": "csv_url_match"},
366
- }
367
-
368
- # Known-host override (suffix match)
369
- host = (urlparse(_ensure_scheme(url_str)).hostname or "").lower()
370
- if host and host_map:
371
- for h, lbl in host_map.items():
372
- if _host_matches_any(host, [h]):
373
- phish_is_positive = True if URL_POSITIVE_CLASS_ENV == "" else (URL_POSITIVE_CLASS_ENV == "PHISH")
374
- label = lbl
375
- predicted_label = 1 if ((label == "PHISH") == phish_is_positive) else 0
376
- phish_proba = 0.99 if label == "PHISH" else 0.01
377
- score = phish_proba if label == "PHISH" else (1.0 - phish_proba)
378
- return {
379
- "label": label,
380
- "predicted_label": int(predicted_label),
381
- "score": float(score),
382
- "phishing_probability": float(phish_proba),
383
- "backend": str(model_type),
384
- "threshold": 0.5,
385
- "url_col": url_col,
386
- }
387
-
388
- # Lookalike character guard: detect homoglyph/lookalike attacks
389
- try:
390
- # Cyrillic characters that look like ASCII letters
391
- lookalikes_cyrillic = {
392
- 'а': 'a', 'е': 'e', 'о': 'o', 'р': 'p', 'с': 'c', 'х': 'x',
393
- 'у': 'y', 'ч': '4', 'ы': 'b', 'ь': 'b', 'і': 'i', 'ї': 'yi',
394
- 'ґ': 'g', 'ė': 'e', 'ń': 'n', 'ș': 's', 'ț': 't'
395
- }
396
-
397
- # Greek characters that look like ASCII letters
398
- lookalikes_greek = {
399
- 'α': 'a', 'ο': 'o', 'ν': 'v', 'τ': 't', 'ρ': 'p'
400
- }
401
-
402
- # Latin Extended lookalikes
403
- lookalikes_latin = {
404
- 'ɑ': 'a', 'ɢ': 'g', 'ᴅ': 'd', 'ɡ': 'g', 'ɪ': 'i',
405
- 'ɴ': 'n', 'ᴘ': 'p', 'ᴠ': 'v', 'ᴡ': 'w', 'ɨ': 'i'
406
- }
407
-
408
- all_lookalikes = {**lookalikes_cyrillic, **lookalikes_greek, **lookalikes_latin}
409
-
410
- for char in url_str:
411
- if char in all_lookalikes:
412
- phish_is_positive = True if URL_POSITIVE_CLASS_ENV == "" else (URL_POSITIVE_CLASS_ENV == "PHISH")
413
- label = "PHISH"
414
- predicted_label = 1 if ((label == "PHISH") == phish_is_positive) else 0
415
- phish_proba = 0.95
416
- score = phish_proba
417
- return {
418
- "label": label,
419
- "predicted_label": int(predicted_label),
420
- "score": float(score),
421
- "phishing_probability": float(phish_proba),
422
- "backend": "lookalike_guard",
423
- "threshold": 0.5,
424
- "url_col": url_col,
425
- "rule": "lookalike_character_detected",
426
- }
427
- except Exception:
428
- pass
429
-
430
- # Typosquat guard: mirror notebook fallback logic.
431
- try:
432
- s_host = (urlparse(_ensure_scheme(url_str)).hostname or "").lower()
433
- s_sld = s_host.split(".")[-2] if "." in s_host else s_host
434
- def _normalize_brand(s: str) -> str:
435
- return re.sub(r"[^a-z]", "", s.lower())
436
- s_clean = _normalize_brand(s_sld)
437
- brands = [
438
- "facebook","linkedin","paypal","google","amazon","apple",
439
- "microsoft","instagram","netflix","twitter","whatsapp"
440
- ]
441
- def _sim(a: str, b: str) -> float:
442
- try:
443
- from rapidfuzz import fuzz # type: ignore
444
- return float(fuzz.ratio(a, b)) / 100.0
445
- except Exception:
446
- from difflib import SequenceMatcher
447
- return SequenceMatcher(None, a, b).ratio()
448
- if s_clean:
449
- best = 0.0
450
- for b in brands:
451
- best = max(best, _sim(s_clean, _normalize_brand(b)))
452
- has_digits = bool(re.search(r"\d", s_sld))
453
- has_hyphen = ("-" in s_sld)
454
- is_official = any(s_host.endswith(f"{_normalize_brand(b)}.com") for b in brands)
455
- if (best >= 0.90) and (has_digits or has_hyphen) and (not is_official):
456
- phish_is_positive = True if URL_POSITIVE_CLASS_ENV == "" else (URL_POSITIVE_CLASS_ENV == "PHISH")
457
- label = "PHISH"
458
- predicted_label = 1 if ((label == "PHISH") == phish_is_positive) else 0
459
- phish_proba = 0.90
460
- score = phish_proba
461
- return {
462
- "label": label,
463
- "predicted_label": int(predicted_label),
464
- "score": float(score),
465
- "phishing_probability": float(phish_proba),
466
- "backend": "typosquat_guard",
467
- "threshold": 0.5,
468
- "url_col": url_col,
469
- "rule": "typosquat_guard",
470
- }
471
- except Exception:
472
- pass
473
-
474
- # Mirror inference flow for probability of class 1
475
- feats = _engineer_features([url_str], feature_cols)
476
- if model_type == "xgboost_bst":
477
- if xgb is None:
478
- raise RuntimeError("xgboost not installed")
479
- dmat = xgb.DMatrix(feats)
480
- raw_p_class1 = float(model.predict(dmat)[0])
481
- elif hasattr(model, "predict_proba"):
482
- raw_p_class1 = float(model.predict_proba(feats)[:, 1][0])
483
- else:
484
- pred = model.predict(feats)[0]
485
- raw_p_class1 = 1.0 if int(pred) == 1 else 0.0
486
-
487
- # Polarity: strictly env or default (class1==PHISH)
488
- phish_is_positive = True if URL_POSITIVE_CLASS_ENV == "" else (URL_POSITIVE_CLASS_ENV == "PHISH")
489
-
490
- phish_proba = raw_p_class1 if phish_is_positive else (1.0 - raw_p_class1)
491
- label = "PHISH" if phish_proba >= 0.5 else "LEGIT"
492
- predicted_label = 1 if ((label == "PHISH") == phish_is_positive) else 0
493
- score = phish_proba if label == "PHISH" else (1.0 - phish_proba)
494
-
495
- return {
496
- "label": label,
497
- "predicted_label": int(predicted_label),
498
- "score": float(score),
499
- "phishing_probability": float(phish_proba),
500
- "backend": str(model_type),
501
- "threshold": 0.5,
502
- "url_col": url_col,
503
- }
504
- except Exception as e:
505
- return JSONResponse(status_code=500, content={"error": str(e)})
506
-
507
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import csv
3
+ import re
4
+ import threading
5
+ from typing import Optional, List, Dict, Any
6
+ from difflib import SequenceMatcher
7
+
8
+ import joblib
9
+ import numpy as np
10
+ import pandas as pd
11
+ from fastapi import FastAPI
12
+ from fastapi.responses import JSONResponse
13
+ from huggingface_hub import hf_hub_download
14
+ from pydantic import BaseModel
15
+ from urllib.parse import urlparse
16
+
17
+ try:
18
+ import xgboost as xgb # type: ignore
19
+ except Exception:
20
+ xgb = None
21
+
22
+
23
+ # Environment defaults suitable for HF Spaces
24
+ os.environ.setdefault("HOME", "/data")
25
+ os.environ.setdefault("XDG_CACHE_HOME", "/data/.cache")
26
+ os.environ.setdefault("HF_HOME", "/data/.cache")
27
+ os.environ.setdefault("TRANSFORMERS_CACHE", "/data/.cache")
28
+ os.environ.setdefault("TORCH_HOME", "/data/.cache")
29
+
30
+
31
+ # Config
32
+ URL_REPO = os.environ.get(
33
+ "HF_URL_MODEL_ID",
34
+ os.environ.get("URL_REPO", "Perth0603/Random-Forest-Model-for-PhishingDetection"),
35
+ )
36
+ URL_REPO_TYPE = os.environ.get("HF_URL_REPO_TYPE", os.environ.get("URL_REPO_TYPE", "model"))
37
+ URL_FILENAME = os.environ.get("HF_URL_FILENAME", os.environ.get("URL_FILENAME", "rf_url_phishing_xgboost_bst.joblib"))
38
+ CACHE_DIR = os.environ.get("HF_CACHE_DIR", "/data/.cache")
39
+ os.makedirs(CACHE_DIR, exist_ok=True)
40
+
41
+ # Polarity override: "PHISH" or "LEGIT"; empty means default (class 1 = PHISH)
42
+ URL_POSITIVE_CLASS_ENV = os.environ.get("URL_POSITIVE_CLASS", "").strip().upper()
43
+
44
+ # CSV configuration (defaults to files in same directory)
45
+ BASE_DIR = os.path.dirname(__file__)
46
+ AUTOCALIB_PHISHY_CSV = os.environ.get("AUTOCALIB_PHISHY_CSV", os.path.join(BASE_DIR, "autocalib_phishy.csv"))
47
+ AUTOCALIB_LEGIT_CSV = os.environ.get("AUTOCALIB_LEGIT_CSV", os.path.join(BASE_DIR, "autocalib_legit.csv"))
48
+ KNOWN_HOSTS_CSV = os.environ.get("KNOWN_HOSTS_CSV", os.path.join(BASE_DIR, "known_hosts.csv"))
49
+
50
+
51
+ app = FastAPI(title="PhishWatch URL API", version="2.0.0")
52
+
53
+
54
+ class PredictUrlPayload(BaseModel):
55
+ url: str
56
+
57
+
58
+ _url_bundle: Optional[Dict[str, Any]] = None
59
+ _url_lock = threading.Lock()
60
+
61
+
62
+ def _normalize_host(value: str) -> str:
63
+ v = value.strip().lower()
64
+ if v.startswith("www."):
65
+ v = v[4:]
66
+ return v
67
+
68
+
69
+ def _host_matches_any(host: str, known: List[str]) -> bool:
70
+ base = _normalize_host(host)
71
+ for item in known:
72
+ k = _normalize_host(item)
73
+ if base == k or base.endswith("." + k):
74
+ return True
75
+ return False
76
+
77
+
78
+ _URL_EXTRACT_RE = re.compile(r"(https?://[^\s<>\"'\)\]]+)", re.IGNORECASE)
79
+
80
+ def _sanitize_input_url(text: str) -> str:
81
+ v = (text or "").strip()
82
+ if v.startswith("@"):
83
+ v = v.lstrip("@").strip()
84
+ m = _URL_EXTRACT_RE.search(v)
85
+ if m:
86
+ v = m.group(1)
87
+ v = v.strip("<>[]()")
88
+ return v
89
+
90
+ _SCHEME_RE = re.compile(r"^[a-zA-Z][a-zA-Z0-9+\-.]*://")
91
+ def _ensure_scheme(u: str) -> str:
92
+ u = (u or "").strip()
93
+ return u if _SCHEME_RE.match(u) else ("http://" + u)
94
+
95
+ def _read_urls_from_csv(path: str) -> List[str]:
96
+ urls: List[str] = []
97
+ try:
98
+ with open(path, newline="", encoding="utf-8") as f:
99
+ reader = csv.DictReader(f)
100
+ if "url" in (reader.fieldnames or []):
101
+ for row in reader:
102
+ val = str(row.get("url", "")).strip()
103
+ if val:
104
+ urls.append(val)
105
+ else:
106
+ f.seek(0)
107
+ f2 = csv.reader(f)
108
+ for row in f2:
109
+ if not row:
110
+ continue
111
+ val = str(row[0]).strip()
112
+ if val.lower() == "url":
113
+ continue
114
+ if val:
115
+ urls.append(val)
116
+ except FileNotFoundError:
117
+ pass
118
+ except Exception as e:
119
+ print(f"[csv] failed reading URLs from {path}: {e}")
120
+ return urls
121
+
122
+
123
+ def _read_hosts_from_csv(path: str) -> Dict[str, str]:
124
+ out: Dict[str, str] = {}
125
+ try:
126
+ with open(path, newline="", encoding="utf-8") as f:
127
+ reader = csv.DictReader(f)
128
+ fields = [x.lower() for x in (reader.fieldnames or [])]
129
+ if "host" in fields and "label" in fields:
130
+ for row in reader:
131
+ host = str(row.get("host", "")).strip()
132
+ label = str(row.get("label", "")).strip().upper()
133
+ if host and label in ("PHISH", "LEGIT"):
134
+ out[host] = label
135
+ except FileNotFoundError:
136
+ pass
137
+ except Exception as e:
138
+ print(f"[csv] failed reading hosts from {path}: {e}")
139
+ return out
140
+
141
+
142
+ def _engineer_features(urls: List[str], feature_cols: List[str]) -> pd.DataFrame:
143
+ """
144
+ MODULE 4: URL Analyzer - Feature Engineering
145
+ Analyzes URL construction, domain composition, and critical components
146
+ """
147
+ s = pd.Series(urls, dtype=str)
148
+ out = pd.DataFrame()
149
+
150
+ # Base URL-wide counts used by older models
151
+ out["url_len"] = s.str.len().fillna(0)
152
+ out["count_dot"] = s.str.count(r"\.")
153
+ out["count_hyphen"] = s.str.count("-")
154
+ out["count_digit"] = s.str.count(r"\d")
155
+ out["count_at"] = s.str.count("@")
156
+ out["count_qmark"] = s.str.count(r"\?")
157
+ out["count_eq"] = s.str.count("=")
158
+ out["count_slash"] = s.str.count("/")
159
+ out["digit_ratio"] = (out["count_digit"] / out["url_len"].replace(0, np.nan)).fillna(0)
160
+ out["has_ip"] = s.str.contains(r"(?:\d{1,3}\.){3}\d{1,3}").astype(int)
161
+ for tok in ["login", "verify", "secure", "update", "bank", "pay", "account", "webscr"]:
162
+ out[f"has_{tok}"] = s.str.contains(tok, case=False, regex=False).astype(int)
163
+ out["starts_https"] = s.str.startswith("https").astype(int)
164
+ out["ends_with_exe"] = s.str.endswith(".exe").astype(int)
165
+ out["ends_with_zip"] = s.str.endswith(".zip").astype(int)
166
+
167
+ # Host/SLD/TLD derived features used by newer models
168
+ hosts = s.apply(lambda x: (urlparse(_ensure_scheme(x)).hostname or "").lower())
169
+ out["host_len"] = hosts.str.len().fillna(0)
170
+
171
+ # Subdomain count: number of labels minus 2 (for sld.tld); never below 0
172
+ label_counts = hosts.str.count(r"\.") + 1
173
+ sub_count = (label_counts - 2).clip(lower=0)
174
+ out["subdomain_count"] = sub_count.fillna(0)
175
+
176
+ # TLD and SLD extraction (simple heuristic; handles common cases)
177
+ parts_series = hosts.str.split(".")
178
+ tld_series = parts_series.apply(lambda p: p[-1] if len(p) >= 1 else "")
179
+ sld_series = parts_series.apply(lambda p: p[-2] if len(p) >= 2 else "")
180
+
181
+ # Suspicious TLD flag (expand as needed)
182
+ suspicious_tlds = {
183
+ "tk", "ml", "ga", "cf", "gq", "xyz", "top", "buzz", "icu",
184
+ "fit", "rest", "work", "click", "country", "zip"
185
+ }
186
+ out["tld_suspicious"] = tld_series.apply(lambda t: 1 if t.lower() in suspicious_tlds else 0)
187
+
188
+ # Punycode indicator (internationalized domain names - often used in homoglyph attacks)
189
+ out["has_punycode"] = hosts.str.contains("xn--").astype(int)
190
+
191
+ # SLD stats
192
+ out["sld_len"] = sld_series.str.len().fillna(0)
193
+ def _ratio_digits(txt: str) -> float:
194
+ txt = txt or ""
195
+ if not txt:
196
+ return 0.0
197
+ digits = sum(c.isdigit() for c in txt)
198
+ return float(digits) / float(len(txt))
199
+ out["sld_digit_ratio"] = sld_series.apply(_ratio_digits)
200
+
201
+ def _shannon_entropy(txt: str) -> float:
202
+ txt = txt or ""
203
+ if not txt:
204
+ return 0.0
205
+ counts: Dict[str, int] = {}
206
+ for ch in txt:
207
+ counts[ch] = counts.get(ch, 0) + 1
208
+ total = float(len(txt))
209
+ entropy = 0.0
210
+ for n in counts.values():
211
+ p = n / total
212
+ entropy -= p * np.log2(p)
213
+ return float(entropy)
214
+ out["sld_entropy"] = sld_series.apply(_shannon_entropy)
215
+
216
+ # Brand similarity features (lightweight; stdlib only)
217
+ common_brands = [
218
+ "facebook", "google", "youtube", "apple", "microsoft",
219
+ "paypal", "amazon", "netflix", "instagram", "whatsapp",
220
+ "tiktok", "twitter", "telegram", "linkedin", "bank", "login"
221
+ ]
222
+
223
+ def _max_brand_similarity(host: str) -> float:
224
+ host = host or ""
225
+ if not host:
226
+ return 0.0
227
+ # Compare against host and sld specifically
228
+ best = 0.0
229
+ sld_local = host.split(".")[-2] if "." in host else host
230
+ for brand in common_brands:
231
+ best = max(
232
+ best,
233
+ SequenceMatcher(None, host, brand).ratio(),
234
+ SequenceMatcher(None, sld_local, brand).ratio(),
235
+ )
236
+ return float(best)
237
+
238
+ def _like_brand(host: str, brand: str, threshold: float = 0.82) -> int:
239
+ h = host or ""
240
+ if not h:
241
+ return 0
242
+ if brand in h:
243
+ return 1
244
+ sld_local = h.split(".")[-2] if "." in h else h
245
+ score = max(
246
+ SequenceMatcher(None, h, brand).ratio(),
247
+ SequenceMatcher(None, sld_local, brand).ratio(),
248
+ )
249
+ return 1 if score >= threshold else 0
250
+
251
+ out["max_brand_sim"] = hosts.apply(_max_brand_similarity)
252
+ out["like_facebook"] = hosts.apply(lambda h: _like_brand(h, "facebook"))
253
+
254
+ # Lookalike/homoglyph detection: unusual Unicode symbols that resemble ASCII letters
255
+ def _detect_lookalike_chars(url: str) -> int:
256
+ """
257
+ Detects if URL contains Unicode characters that visually resemble ASCII letters.
258
+ Common lookalikes used in phishing homoglyph attacks:
259
+ - Cyrillic: а, е, о, р, с, х, у, ч, ы, �� (look like a,e,o,p,c,x,y,4,b,b)
260
+ - Greek: α, ο (look like a, o)
261
+ - Latin Extended: ɑ, ɢ, ᴅ, ɡ, ɪ, ɴ, ɪ (look like a,G,D,g,i,N,I)
262
+ """
263
+ url_str = url or ""
264
+
265
+ # Cyrillic characters that look like ASCII letters
266
+ lookalikes_cyrillic = {
267
+ 'а': 'a', 'е': 'e', 'о': 'o', 'р': 'p', 'с': 'c', 'х': 'x',
268
+ 'у': 'y', 'ч': '4', 'ы': 'b', 'ь': 'b', 'і': 'i', 'ї': 'yi',
269
+ 'ґ': 'g', 'ė': 'e', 'ń': 'n', 'ș': 's', 'ț': 't'
270
+ }
271
+
272
+ # Greek characters that look like ASCII letters
273
+ lookalikes_greek = {
274
+ 'α': 'a', 'ο': 'o', 'ν': 'v', 'τ': 't', 'ρ': 'p'
275
+ }
276
+
277
+ # Latin Extended lookalikes
278
+ lookalikes_latin = {
279
+ 'ɑ': 'a', 'ɢ': 'g', 'ᴅ': 'd', 'ɡ': 'g', 'ɪ': 'i',
280
+ 'ɴ': 'n', 'ᴘ': 'p', 'ᴠ': 'v', 'ᴡ': 'w', 'ɨ': 'i'
281
+ }
282
+
283
+ all_lookalikes = {**lookalikes_cyrillic, **lookalikes_greek, **lookalikes_latin}
284
+
285
+ for char in url_str:
286
+ if char in all_lookalikes:
287
+ return 1
288
+ return 0
289
+
290
+ out["has_lookalike_chars"] = s.apply(_detect_lookalike_chars)
291
+
292
+ # Return columns in the exact order expected by the model; fill any
293
+ # still-missing engineered columns with zeros to stay robust across
294
+ # model updates.
295
+ return out.reindex(columns=feature_cols, fill_value=0)
296
+
297
+
298
+ def _load_url_model():
299
+ global _url_bundle
300
+ if _url_bundle is None:
301
+ with _url_lock:
302
+ if _url_bundle is None:
303
+ local_path = os.path.join(os.getcwd(), URL_FILENAME)
304
+ if os.path.exists(local_path):
305
+ _url_bundle = joblib.load(local_path)
306
+ else:
307
+ model_path = hf_hub_download(
308
+ repo_id=URL_REPO,
309
+ filename=URL_FILENAME,
310
+ repo_type=URL_REPO_TYPE,
311
+ cache_dir=CACHE_DIR,
312
+ )
313
+ _url_bundle = joblib.load(model_path)
314
+
315
+
316
+ def _normalize_url_string(url: str) -> str:
317
+ return (url or "").strip().rstrip("/")
318
+
319
+
320
+ @app.get("/")
321
+ def root():
322
+ return {
323
+ "status": "ok",
324
+ "service": "PhishWatch Pro - Module 4: URL Analyzer",
325
+ "backend": "Random Forest (GPU accelerated)"
326
+ }
327
+
328
+
329
+ @app.post("/predict-url")
330
+ def predict_url(payload: PredictUrlPayload):
331
+ """
332
+ MODULE 4: URL Analyzer
333
+ Analyzes URL construction, domain composition, and critical components
334
+ Returns phishing risk score with confidence level and threat type
335
+ """
336
+ try:
337
+ _load_url_model()
338
+
339
+ # Load CSVs on every request (keeps behavior in sync without code edits)
340
+ phishy_list = _read_urls_from_csv(AUTOCALIB_PHISHY_CSV)
341
+ legit_list = _read_urls_from_csv(AUTOCALIB_LEGIT_CSV)
342
+ host_map = _read_hosts_from_csv(KNOWN_HOSTS_CSV)
343
+
344
+ bundle = _url_bundle
345
+ if not isinstance(bundle, dict) or "model" not in bundle:
346
+ raise RuntimeError("Loaded URL artifact is not a bundle dict with 'model'.")
347
+
348
+ model = bundle["model"]
349
+ feature_cols: List[str] = bundle.get("feature_cols") or []
350
+ url_col: str = bundle.get("url_col") or "url"
351
+ model_type: str = bundle.get("model_type") or ""
352
+
353
+ raw_input = (payload.url or "").strip()
354
+ url_str = _sanitize_input_url(raw_input)
355
+ if not url_str:
356
+ return JSONResponse(status_code=400, content={"error": "Empty url"})
357
+
358
+ # URL-level override via CSV lists (normalized exact match, ignoring trailing slash)
359
+ norm_url = _normalize_url_string(url_str)
360
+ phishy_set = { _normalize_url_string(u) for u in phishy_list }
361
+ legit_set = { _normalize_url_string(u) for u in legit_list }
362
+
363
+ if norm_url in phishy_set or norm_url in legit_set:
364
+ phish_is_positive = True if URL_POSITIVE_CLASS_ENV == "" else (URL_POSITIVE_CLASS_ENV == "PHISH")
365
+ label = "PHISH" if norm_url in phishy_set else "LEGIT"
366
+ predicted_label = 1 if ((label == "PHISH") == phish_is_positive) else 0
367
+ phish_proba = 0.99 if label == "PHISH" else 0.01
368
+ score = phish_proba if label == "PHISH" else (1.0 - phish_proba)
369
+ return {
370
+ "label": label,
371
+ "predicted_label": int(predicted_label),
372
+ "score": float(score),
373
+ "phishing_probability": float(phish_proba),
374
+ "backend": str(model_type),
375
+ "threshold": 0.5,
376
+ "url_col": url_col,
377
+ "override": {"reason": "csv_url_match", "module": "4_url_analyzer"},
378
+ "threat_type": "known_phishing_url" if label == "PHISH" else "known_safe",
379
+ }
380
+
381
+ # Known-host override (suffix match)
382
+ host = (urlparse(_ensure_scheme(url_str)).hostname or "").lower()
383
+ if host and host_map:
384
+ for h, lbl in host_map.items():
385
+ if _host_matches_any(host, [h]):
386
+ phish_is_positive = True if URL_POSITIVE_CLASS_ENV == "" else (URL_POSITIVE_CLASS_ENV == "PHISH")
387
+ label = lbl
388
+ predicted_label = 1 if ((label == "PHISH") == phish_is_positive) else 0
389
+ phish_proba = 0.99 if label == "PHISH" else 0.01
390
+ score = phish_proba if label == "PHISH" else (1.0 - phish_proba)
391
+ return {
392
+ "label": label,
393
+ "predicted_label": int(predicted_label),
394
+ "score": float(score),
395
+ "phishing_probability": float(phish_proba),
396
+ "backend": str(model_type),
397
+ "threshold": 0.5,
398
+ "url_col": url_col,
399
+ "override": {"reason": "known_host_match", "module": "4_url_analyzer"},
400
+ "threat_type": "known_phishing_domain" if label == "PHISH" else "known_safe",
401
+ }
402
+
403
+ # Lookalike character guard: detect homoglyph/lookalike attacks (heuristic indicator)
404
+ try:
405
+ lookalikes_cyrillic = {
406
+ 'а': 'a', 'е': 'e', 'о': 'o', 'р': 'p', 'с': 'c', 'х': 'x',
407
+ 'у': 'y', 'ч': '4', 'ы': 'b', 'ь': 'b', 'і': 'i', 'ї': 'yi',
408
+ 'ґ': 'g', 'ė': 'e', 'ń': 'n', 'ș': 's', 'ț': 't'
409
+ }
410
+
411
+ lookalikes_greek = {
412
+ 'α': 'a', 'ο': 'o', 'ν': 'v', 'τ': 't', 'ρ': 'p'
413
+ }
414
+
415
+ lookalikes_latin = {
416
+ 'ɑ': 'a', 'ɢ': 'g', 'ᴅ': 'd', 'ɡ': 'g', 'ɪ': 'i',
417
+ 'ɴ': 'n', 'ᴘ': 'p', 'ᴠ': 'v', 'ᴡ': 'w', 'ɨ': 'i'
418
+ }
419
+
420
+ all_lookalikes = {**lookalikes_cyrillic, **lookalikes_greek, **lookalikes_latin}
421
+
422
+ for char in url_str:
423
+ if char in all_lookalikes:
424
+ phish_is_positive = True if URL_POSITIVE_CLASS_ENV == "" else (URL_POSITIVE_CLASS_ENV == "PHISH")
425
+ label = "PHISH"
426
+ predicted_label = 1 if ((label == "PHISH") == phish_is_positive) else 0
427
+ phish_proba = 0.95
428
+ score = phish_proba
429
+ return {
430
+ "label": label,
431
+ "predicted_label": int(predicted_label),
432
+ "score": float(score),
433
+ "phishing_probability": float(phish_proba),
434
+ "backend": "homoglyph_guard",
435
+ "threshold": 0.5,
436
+ "url_col": url_col,
437
+ "rule": "homoglyph_character_detected",
438
+ "threat_type": "homoglyph_attack",
439
+ "module": "4_url_analyzer_heuristic",
440
+ }
441
+ except Exception:
442
+ pass
443
+
444
+ # Typosquat guard: detect brand impersonation with typos (heuristic indicator)
445
+ try:
446
+ s_host = (urlparse(_ensure_scheme(url_str)).hostname or "").lower()
447
+ s_sld = s_host.split(".")[-2] if "." in s_host else s_host
448
+ def _normalize_brand(s: str) -> str:
449
+ return re.sub(r"[^a-z]", "", s.lower())
450
+ s_clean = _normalize_brand(s_sld)
451
+ brands = [
452
+ "facebook","linkedin","paypal","google","amazon","apple",
453
+ "microsoft","instagram","netflix","twitter","whatsapp"
454
+ ]
455
+ def _sim(a: str, b: str) -> float:
456
+ try:
457
+ from rapidfuzz import fuzz # type: ignore
458
+ return float(fuzz.ratio(a, b)) / 100.0
459
+ except Exception:
460
+ from difflib import SequenceMatcher
461
+ return SequenceMatcher(None, a, b).ratio()
462
+ if s_clean:
463
+ best = 0.0
464
+ for b in brands:
465
+ best = max(best, _sim(s_clean, _normalize_brand(b)))
466
+ has_digits = bool(re.search(r"\d", s_sld))
467
+ has_hyphen = ("-" in s_sld)
468
+ is_official = any(s_host.endswith(f"{_normalize_brand(b)}.com") for b in brands)
469
+ if (best >= 0.90) and (has_digits or has_hyphen) and (not is_official):
470
+ phish_is_positive = True if URL_POSITIVE_CLASS_ENV == "" else (URL_POSITIVE_CLASS_ENV == "PHISH")
471
+ label = "PHISH"
472
+ predicted_label = 1 if ((label == "PHISH") == phish_is_positive) else 0
473
+ phish_proba = 0.90
474
+ score = phish_proba
475
+ return {
476
+ "label": label,
477
+ "predicted_label": int(predicted_label),
478
+ "score": float(score),
479
+ "phishing_probability": float(phish_proba),
480
+ "backend": "typosquat_guard",
481
+ "threshold": 0.5,
482
+ "url_col": url_col,
483
+ "rule": "typosquat_detected",
484
+ "threat_type": "brand_impersonation",
485
+ "module": "4_url_analyzer_heuristic",
486
+ }
487
+ except Exception:
488
+ pass
489
+
490
+ # Random Forest Model Inference (primary detection)
491
+ feats = _engineer_features([url_str], feature_cols)
492
+ if model_type == "xgboost_bst":
493
+ if xgb is None:
494
+ raise RuntimeError("xgboost not installed")
495
+ dmat = xgb.DMatrix(feats)
496
+ raw_p_class1 = float(model.predict(dmat)[0])
497
+ elif hasattr(model, "predict_proba"):
498
+ raw_p_class1 = float(model.predict_proba(feats)[:, 1][0])
499
+ else:
500
+ pred = model.predict(feats)[0]
501
+ raw_p_class1 = 1.0 if int(pred) == 1 else 0.0
502
+
503
+ # Polarity: strictly env or default (class1==PHISH)
504
+ phish_is_positive = True if URL_POSITIVE_CLASS_ENV == "" else (URL_POSITIVE_CLASS_ENV == "PHISH")
505
+
506
+ phish_proba = raw_p_class1 if phish_is_positive else (1.0 - raw_p_class1)
507
+ label = "PHISH" if phish_proba >= 0.5 else "LEGIT"
508
+ predicted_label = 1 if ((label == "PHISH") == phish_is_positive) else 0
509
+ score = phish_proba if label == "PHISH" else (1.0 - phish_proba)
510
+
511
+ # Determine threat type based on features
512
+ threat_type = "unknown"
513
+ if label == "PHISH":
514
+ if feats["has_ip"].iloc[0] == 1:
515
+ threat_type = "ip_based_phishing"
516
+ elif feats["has_lookalike_chars"].iloc[0] == 1:
517
+ threat_type = "homoglyph_phishing"
518
+ elif feats["subdomain_count"].iloc[0] > 3:
519
+ threat_type = "subdomain_abuse"
520
+ elif feats["tld_suspicious"].iloc[0] == 1:
521
+ threat_type = "suspicious_tld"
522
+ elif any(feats[f"has_{tok}"].iloc[0] == 1 for tok in ["login", "verify", "secure", "bank", "pay"]):
523
+ threat_type = "phishing_lure"
524
+ else:
525
+ threat_type = "anomalous_url_structure"
526
+
527
+ return {
528
+ "label": label,
529
+ "predicted_label": int(predicted_label),
530
+ "score": float(score),
531
+ "phishing_probability": float(phish_proba),
532
+ "backend": str(model_type),
533
+ "threshold": 0.5,
534
+ "url_col": url_col,
535
+ "threat_type": threat_type,
536
+ "module": "4_url_analyzer_random_forest",
537
+ "features": {
538
+ "url_length": float(feats["url_len"].iloc[0]),
539
+ "subdomain_count": float(feats["subdomain_count"].iloc[0]),
540
+ "has_ip": bool(feats["has_ip"].iloc[0]),
541
+ "suspicious_tld": bool(feats["tld_suspicious"].iloc[0]),
542
+ "has_punycode": bool(feats["has_punycode"].iloc[0]),
543
+ }
544
+ }
545
+ except Exception as e:
546
+ return JSONResponse(status_code=500, content={"error": str(e)})