Perth0603 commited on
Commit
016c0e8
·
verified ·
1 Parent(s): e2e3793

Upload 4 files

Browse files
Files changed (1) hide show
  1. app.py +102 -0
app.py CHANGED
@@ -6,6 +6,7 @@ os.environ.setdefault("TRANSFORMERS_CACHE", "/data/.cache")
6
  os.environ.setdefault("TORCH_HOME", "/data/.cache")
7
 
8
  from typing import Optional, List, Dict, Any
 
9
  from urllib.parse import urlparse
10
  import threading
11
  import re
@@ -131,6 +132,103 @@ _KNOWN_LEGIT_HOSTS: List[str] = [
131
  _KNOWN_PHISH_HOSTS: List[str] = [
132
  ]
133
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
134
  # -------------------------
135
  # URL features (must match training)
136
  # -------------------------
@@ -294,6 +392,8 @@ def _startup():
294
  print(f"[startup] text model load failed: {e}")
295
  try:
296
  _load_url_model()
 
 
297
  global _url_phish_is_positive
298
  b = _url_bundle
299
  if isinstance(b, dict) and _url_phish_is_positive is None:
@@ -343,6 +443,8 @@ def predict(payload: PredictPayload):
343
  def predict_url(payload: PredictUrlPayload):
344
  try:
345
  _load_url_model()
 
 
346
  bundle = _url_bundle
347
  if not isinstance(bundle, dict) or "model" not in bundle:
348
  raise RuntimeError("Loaded URL artifact is not a bundle dict with 'model'.")
 
6
  os.environ.setdefault("TORCH_HOME", "/data/.cache")
7
 
8
  from typing import Optional, List, Dict, Any
9
+ import csv
10
  from urllib.parse import urlparse
11
  import threading
12
  import re
 
132
  _KNOWN_PHISH_HOSTS: List[str] = [
133
  ]
134
 
135
+ # Helpers to normalize and match hosts by suffix (handles subdomains)
136
+ def _normalize_host(value: str) -> str:
137
+ v = value.strip().lower()
138
+ if v.startswith("www."):
139
+ v = v[4:]
140
+ return v
141
+
142
+ def _host_matches_any(host: str, known: List[str]) -> bool:
143
+ base = _normalize_host(host)
144
+ for item in known:
145
+ k = _normalize_host(item)
146
+ if base == k or base.endswith("." + k):
147
+ return True
148
+ return False
149
+
150
+ # Optional CSV configuration
151
+ def _read_urls_from_csv(path: str) -> List[str]:
152
+ urls: List[str] = []
153
+ try:
154
+ with open(path, newline="", encoding="utf-8") as f:
155
+ reader = csv.DictReader(f)
156
+ if "url" in (reader.fieldnames or []):
157
+ for row in reader:
158
+ val = str(row.get("url", "")).strip()
159
+ if val:
160
+ urls.append(val)
161
+ else:
162
+ f.seek(0)
163
+ f2 = csv.reader(f)
164
+ for row in f2:
165
+ if not row:
166
+ continue
167
+ val = str(row[0]).strip()
168
+ if val.lower() == "url":
169
+ continue
170
+ if val:
171
+ urls.append(val)
172
+ except Exception as e:
173
+ print(f"[csv] failed reading URLs from {path}: {e}")
174
+ return urls
175
+
176
+ def _read_hosts_from_csv(path: str) -> Dict[str, str]:
177
+ host_to_label: Dict[str, str] = {}
178
+ try:
179
+ with open(path, newline="", encoding="utf-8") as f:
180
+ reader = csv.DictReader(f)
181
+ fields = [x.lower() for x in (reader.fieldnames or [])]
182
+ if "host" in fields and "label" in fields:
183
+ for row in reader:
184
+ host = str(row.get("host", "")).strip().lower()
185
+ label = str(row.get("label", "")).strip().upper()
186
+ if host and label in ("PHISH", "LEGIT"):
187
+ host_to_label[host] = label
188
+ else:
189
+ f.seek(0)
190
+ f2 = csv.reader(f)
191
+ for row in f2:
192
+ if len(row) < 2:
193
+ continue
194
+ host = str(row[0]).strip().lower()
195
+ label = str(row[1]).strip().upper()
196
+ if host.lower() == "host" and label == "LABEL":
197
+ continue
198
+ if host and label in ("PHISH", "LEGIT"):
199
+ host_to_label[host] = label
200
+ except Exception as e:
201
+ print(f"[csv] failed reading hosts from {path}: {e}")
202
+ return host_to_label
203
+
204
+ def _load_csv_configs_if_any():
205
+ base_dir = os.path.dirname(__file__)
206
+ phishy_csv = os.environ.get("AUTOCALIB_PHISHY_CSV", os.path.join(base_dir, "autocalib_phishy.csv"))
207
+ legit_csv = os.environ.get("AUTOCALIB_LEGIT_CSV", os.path.join(base_dir, "autocalib_legit.csv"))
208
+ hosts_csv = os.environ.get("KNOWN_HOSTS_CSV", os.path.join(base_dir, "known_hosts.csv"))
209
+
210
+ if os.path.exists(phishy_csv):
211
+ urls = _read_urls_from_csv(phishy_csv)
212
+ if urls:
213
+ print(f"[csv] loaded phishy URLs: {len(urls)} from {phishy_csv}")
214
+ _AUTOCALIB_PHISHY_URLS[:] = urls
215
+ if os.path.exists(legit_csv):
216
+ urls = _read_urls_from_csv(legit_csv)
217
+ if urls:
218
+ print(f"[csv] loaded legit URLs: {len(urls)} from {legit_csv}")
219
+ _AUTOCALIB_LEGIT_URLS[:] = urls
220
+ if os.path.exists(hosts_csv):
221
+ mapping = _read_hosts_from_csv(hosts_csv)
222
+ if mapping:
223
+ print(f"[csv] loaded known hosts: {len(mapping)} from {hosts_csv}")
224
+ _KNOWN_LEGIT_HOSTS.clear()
225
+ _KNOWN_PHISH_HOSTS.clear()
226
+ for host, label in mapping.items():
227
+ if label == "LEGIT":
228
+ _KNOWN_LEGIT_HOSTS.append(host)
229
+ elif label == "PHISH":
230
+ _KNOWN_PHISH_HOSTS.append(host)
231
+
232
  # -------------------------
233
  # URL features (must match training)
234
  # -------------------------
 
392
  print(f"[startup] text model load failed: {e}")
393
  try:
394
  _load_url_model()
395
+ # Load CSV-driven config if present
396
+ _load_csv_configs_if_any()
397
  global _url_phish_is_positive
398
  b = _url_bundle
399
  if isinstance(b, dict) and _url_phish_is_positive is None:
 
443
  def predict_url(payload: PredictUrlPayload):
444
  try:
445
  _load_url_model()
446
+ # Load CSV-based config if present (hot-reload safe)
447
+ _load_csv_configs_if_any()
448
  bundle = _url_bundle
449
  if not isinstance(bundle, dict) or "model" not in bundle:
450
  raise RuntimeError("Loaded URL artifact is not a bundle dict with 'model'.")