Update app.py
Browse files
app.py
CHANGED
|
@@ -19,6 +19,26 @@ try:
|
|
| 19 |
except Exception:
|
| 20 |
xgb = None
|
| 21 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 22 |
|
| 23 |
# Environment defaults suitable for HF Spaces
|
| 24 |
os.environ.setdefault("HOME", "/data")
|
|
@@ -47,14 +67,42 @@ AUTOCALIB_PHISHY_CSV = os.environ.get("AUTOCALIB_PHISHY_CSV", os.path.join(BASE_
|
|
| 47 |
AUTOCALIB_LEGIT_CSV = os.environ.get("AUTOCALIB_LEGIT_CSV", os.path.join(BASE_DIR, "autocalib_legit.csv"))
|
| 48 |
KNOWN_HOSTS_CSV = os.environ.get("KNOWN_HOSTS_CSV", os.path.join(BASE_DIR, "known_hosts.csv"))
|
| 49 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 50 |
|
| 51 |
-
app = FastAPI(
|
|
|
|
|
|
|
|
|
|
|
|
|
| 52 |
|
| 53 |
|
|
|
|
| 54 |
class PredictUrlPayload(BaseModel):
|
| 55 |
url: str
|
| 56 |
|
| 57 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 58 |
_url_bundle: Optional[Dict[str, Any]] = None
|
| 59 |
_url_lock = threading.Lock()
|
| 60 |
|
|
@@ -247,38 +295,22 @@ def _engineer_features(urls: List[str], feature_cols: List[str]) -> pd.DataFrame
|
|
| 247 |
out["max_brand_sim"] = hosts.apply(_max_brand_similarity)
|
| 248 |
out["like_facebook"] = hosts.apply(lambda h: _like_brand(h, "facebook"))
|
| 249 |
|
| 250 |
-
# Lookalike/homoglyph detection
|
| 251 |
-
# Examples: Cyrillic а (U+0430) looks like 'a', Greek α (U+03B1) looks like 'a', etc.
|
| 252 |
def _detect_lookalike_chars(url: str) -> int:
|
| 253 |
-
"""
|
| 254 |
-
Detects if URL contains Unicode characters that visually resemble ASCII letters.
|
| 255 |
-
Common lookalikes used in phishing:
|
| 256 |
-
- Cyrillic: а, е, о, р, с, х, у, ч, ы, ь (look like a,e,o,p,c,x,y,4,b,b)
|
| 257 |
-
- Greek: α, ο (look like a, o)
|
| 258 |
-
- Latin Extended: ɑ, ɢ, ᴅ, ɡ, ɪ, ɴ, ɪ (look like a,G,D,g,i,N,I)
|
| 259 |
-
"""
|
| 260 |
url_str = url or ""
|
| 261 |
-
|
| 262 |
-
# Cyrillic characters that look like ASCII letters
|
| 263 |
lookalikes_cyrillic = {
|
| 264 |
'а': 'a', 'е': 'e', 'о': 'o', 'р': 'p', 'с': 'c', 'х': 'x',
|
| 265 |
'у': 'y', 'ч': '4', 'ы': 'b', 'ь': 'b', 'і': 'i', 'ї': 'yi',
|
| 266 |
'ґ': 'g', 'ė': 'e', 'ń': 'n', 'ș': 's', 'ț': 't'
|
| 267 |
}
|
| 268 |
-
|
| 269 |
-
# Greek characters that look like ASCII letters
|
| 270 |
lookalikes_greek = {
|
| 271 |
'α': 'a', 'ο': 'o', 'ν': 'v', 'τ': 't', 'ρ': 'p'
|
| 272 |
}
|
| 273 |
-
|
| 274 |
-
# Latin Extended lookalikes
|
| 275 |
lookalikes_latin = {
|
| 276 |
'ɑ': 'a', 'ɢ': 'g', 'ᴅ': 'd', 'ɡ': 'g', 'ɪ': 'i',
|
| 277 |
'ɴ': 'n', 'ᴘ': 'p', 'ᴠ': 'v', 'ᴡ': 'w', 'ɨ': 'i'
|
| 278 |
}
|
| 279 |
-
|
| 280 |
all_lookalikes = {**lookalikes_cyrillic, **lookalikes_greek, **lookalikes_latin}
|
| 281 |
-
|
| 282 |
for char in url_str:
|
| 283 |
if char in all_lookalikes:
|
| 284 |
return 1
|
|
@@ -286,9 +318,6 @@ def _engineer_features(urls: List[str], feature_cols: List[str]) -> pd.DataFrame
|
|
| 286 |
|
| 287 |
out["has_lookalike_chars"] = s.apply(_detect_lookalike_chars)
|
| 288 |
|
| 289 |
-
# Return columns in the exact order expected by the model; fill any
|
| 290 |
-
# still-missing engineered columns with zeros to stay robust across
|
| 291 |
-
# model updates.
|
| 292 |
return out.reindex(columns=feature_cols, fill_value=0)
|
| 293 |
|
| 294 |
|
|
@@ -314,17 +343,162 @@ def _normalize_url_string(url: str) -> str:
|
|
| 314 |
return (url or "").strip().rstrip("/")
|
| 315 |
|
| 316 |
|
|
|
|
|
|
|
|
|
|
|
|
|
| 317 |
@app.get("/")
|
| 318 |
def root():
|
| 319 |
-
return {
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 320 |
|
| 321 |
|
| 322 |
@app.post("/predict-url")
|
| 323 |
def predict_url(payload: PredictUrlPayload):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 324 |
try:
|
| 325 |
_load_url_model()
|
| 326 |
|
| 327 |
-
# Load CSVs on every request
|
| 328 |
phishy_list = _read_urls_from_csv(AUTOCALIB_PHISHY_CSV)
|
| 329 |
legit_list = _read_urls_from_csv(AUTOCALIB_LEGIT_CSV)
|
| 330 |
host_map = _read_hosts_from_csv(KNOWN_HOSTS_CSV)
|
|
@@ -343,7 +517,7 @@ def predict_url(payload: PredictUrlPayload):
|
|
| 343 |
if not url_str:
|
| 344 |
return JSONResponse(status_code=400, content={"error": "Empty url"})
|
| 345 |
|
| 346 |
-
# URL-level override via CSV lists
|
| 347 |
norm_url = _normalize_url_string(url_str)
|
| 348 |
phishy_set = { _normalize_url_string(u) for u in phishy_list }
|
| 349 |
legit_set = { _normalize_url_string(u) for u in legit_list }
|
|
@@ -355,6 +529,7 @@ def predict_url(payload: PredictUrlPayload):
|
|
| 355 |
phish_proba = 0.99 if label == "PHISH" else 0.01
|
| 356 |
score = phish_proba if label == "PHISH" else (1.0 - phish_proba)
|
| 357 |
return {
|
|
|
|
| 358 |
"label": label,
|
| 359 |
"predicted_label": int(predicted_label),
|
| 360 |
"score": float(score),
|
|
@@ -365,7 +540,7 @@ def predict_url(payload: PredictUrlPayload):
|
|
| 365 |
"override": {"reason": "csv_url_match"},
|
| 366 |
}
|
| 367 |
|
| 368 |
-
# Known-host override
|
| 369 |
host = (urlparse(_ensure_scheme(url_str)).hostname or "").lower()
|
| 370 |
if host and host_map:
|
| 371 |
for h, lbl in host_map.items():
|
|
@@ -376,6 +551,7 @@ def predict_url(payload: PredictUrlPayload):
|
|
| 376 |
phish_proba = 0.99 if label == "PHISH" else 0.01
|
| 377 |
score = phish_proba if label == "PHISH" else (1.0 - phish_proba)
|
| 378 |
return {
|
|
|
|
| 379 |
"label": label,
|
| 380 |
"predicted_label": int(predicted_label),
|
| 381 |
"score": float(score),
|
|
@@ -383,95 +559,80 @@ def predict_url(payload: PredictUrlPayload):
|
|
| 383 |
"backend": str(model_type),
|
| 384 |
"threshold": 0.5,
|
| 385 |
"url_col": url_col,
|
|
|
|
| 386 |
}
|
| 387 |
|
| 388 |
-
# Lookalike character guard
|
| 389 |
-
|
| 390 |
-
|
| 391 |
-
|
| 392 |
-
|
| 393 |
-
|
| 394 |
-
|
| 395 |
-
|
| 396 |
-
|
| 397 |
-
|
| 398 |
-
|
| 399 |
-
|
| 400 |
-
|
| 401 |
-
|
| 402 |
-
|
| 403 |
-
|
| 404 |
-
|
| 405 |
-
|
| 406 |
-
|
| 407 |
-
|
| 408 |
-
|
| 409 |
-
|
| 410 |
-
|
| 411 |
-
|
| 412 |
-
|
| 413 |
-
|
| 414 |
-
|
| 415 |
-
phish_proba
|
| 416 |
-
|
| 417 |
-
|
| 418 |
-
|
| 419 |
-
|
| 420 |
-
|
| 421 |
-
|
| 422 |
-
|
| 423 |
-
|
| 424 |
-
|
| 425 |
-
|
| 426 |
-
|
| 427 |
-
|
| 428 |
-
|
| 429 |
-
|
| 430 |
-
|
| 431 |
-
|
| 432 |
-
|
| 433 |
-
|
| 434 |
-
|
| 435 |
-
|
| 436 |
-
|
| 437 |
-
|
| 438 |
-
|
| 439 |
-
|
| 440 |
-
|
| 441 |
-
|
| 442 |
-
|
| 443 |
-
|
| 444 |
-
|
| 445 |
-
|
| 446 |
-
|
| 447 |
-
|
| 448 |
-
|
| 449 |
-
|
| 450 |
-
|
| 451 |
-
|
| 452 |
-
|
| 453 |
-
|
| 454 |
-
|
| 455 |
-
|
| 456 |
-
|
| 457 |
-
|
| 458 |
-
|
| 459 |
-
phish_proba = 0.90
|
| 460 |
-
score = phish_proba
|
| 461 |
-
return {
|
| 462 |
-
"label": label,
|
| 463 |
-
"predicted_label": int(predicted_label),
|
| 464 |
-
"score": float(score),
|
| 465 |
-
"phishing_probability": float(phish_proba),
|
| 466 |
-
"backend": "typosquat_guard",
|
| 467 |
-
"threshold": 0.5,
|
| 468 |
-
"url_col": url_col,
|
| 469 |
-
"rule": "typosquat_guard",
|
| 470 |
-
}
|
| 471 |
-
except Exception:
|
| 472 |
-
pass
|
| 473 |
-
|
| 474 |
-
# Mirror inference flow for probability of class 1
|
| 475 |
feats = _engineer_features([url_str], feature_cols)
|
| 476 |
if model_type == "xgboost_bst":
|
| 477 |
if xgb is None:
|
|
@@ -484,15 +645,14 @@ def predict_url(payload: PredictUrlPayload):
|
|
| 484 |
pred = model.predict(feats)[0]
|
| 485 |
raw_p_class1 = 1.0 if int(pred) == 1 else 0.0
|
| 486 |
|
| 487 |
-
# Polarity: strictly env or default (class1==PHISH)
|
| 488 |
phish_is_positive = True if URL_POSITIVE_CLASS_ENV == "" else (URL_POSITIVE_CLASS_ENV == "PHISH")
|
| 489 |
-
|
| 490 |
phish_proba = raw_p_class1 if phish_is_positive else (1.0 - raw_p_class1)
|
| 491 |
label = "PHISH" if phish_proba >= 0.5 else "LEGIT"
|
| 492 |
predicted_label = 1 if ((label == "PHISH") == phish_is_positive) else 0
|
| 493 |
score = phish_proba if label == "PHISH" else (1.0 - phish_proba)
|
| 494 |
|
| 495 |
return {
|
|
|
|
| 496 |
"label": label,
|
| 497 |
"predicted_label": int(predicted_label),
|
| 498 |
"score": float(score),
|
|
@@ -502,6 +662,4 @@ def predict_url(payload: PredictUrlPayload):
|
|
| 502 |
"url_col": url_col,
|
| 503 |
}
|
| 504 |
except Exception as e:
|
| 505 |
-
return JSONResponse(status_code=500, content={"error": str(e)})
|
| 506 |
-
|
| 507 |
-
|
|
|
|
| 19 |
except Exception:
|
| 20 |
xgb = None
|
| 21 |
|
| 22 |
+
# NLP libraries for Text Preprocessing (Module 2)
|
| 23 |
+
try:
|
| 24 |
+
import nltk
|
| 25 |
+
from nltk.tokenize import word_tokenize
|
| 26 |
+
from nltk.corpus import stopwords
|
| 27 |
+
from nltk.stem import PorterStemmer, WordNetLemmatizer
|
| 28 |
+
from textblob import TextBlob
|
| 29 |
+
|
| 30 |
+
# Download required NLTK data on startup
|
| 31 |
+
for resource in ['punkt', 'stopwords', 'wordnet', 'omw-1.4']:
|
| 32 |
+
try:
|
| 33 |
+
nltk.data.find(f'tokenizers/{resource}' if resource == 'punkt' else f'corpora/{resource}')
|
| 34 |
+
except LookupError:
|
| 35 |
+
nltk.download(resource, quiet=True)
|
| 36 |
+
|
| 37 |
+
NLTK_AVAILABLE = True
|
| 38 |
+
except Exception as e:
|
| 39 |
+
print(f"[WARNING] NLP libraries not available: {e}")
|
| 40 |
+
NLTK_AVAILABLE = False
|
| 41 |
+
|
| 42 |
|
| 43 |
# Environment defaults suitable for HF Spaces
|
| 44 |
os.environ.setdefault("HOME", "/data")
|
|
|
|
| 67 |
AUTOCALIB_LEGIT_CSV = os.environ.get("AUTOCALIB_LEGIT_CSV", os.path.join(BASE_DIR, "autocalib_legit.csv"))
|
| 68 |
KNOWN_HOSTS_CSV = os.environ.get("KNOWN_HOSTS_CSV", os.path.join(BASE_DIR, "known_hosts.csv"))
|
| 69 |
|
| 70 |
+
# Initialize NLP components for Module 2
|
| 71 |
+
if NLTK_AVAILABLE:
|
| 72 |
+
stemmer = PorterStemmer()
|
| 73 |
+
lemmatizer = WordNetLemmatizer()
|
| 74 |
+
stop_words = set(stopwords.words('english'))
|
| 75 |
+
|
| 76 |
+
# Phishing-specific suspicious keywords (as per methodology Section 3.7.2)
|
| 77 |
+
PHISHING_KEYWORDS = {
|
| 78 |
+
'urgent', 'verify', 'suspended', 'locked', 'confirm', 'update',
|
| 79 |
+
'click', 'prize', 'winner', 'congratulations', 'expire', 'act now',
|
| 80 |
+
'account', 'security', 'password', 'credit card', 'bank', 'payment',
|
| 81 |
+
'refund', 'tax', 'irs', 'social security', 'ssn', 'login', 'signin',
|
| 82 |
+
'alert', 'warning', 'action required', 'unusual activity', 'compromised'
|
| 83 |
+
}
|
| 84 |
+
|
| 85 |
|
| 86 |
+
app = FastAPI(
|
| 87 |
+
title="PhishWatch Pro API",
|
| 88 |
+
version="3.0.0",
|
| 89 |
+
description="Complete phishing detection system with URL analysis (Module 4) and Text Preprocessing (Module 2)"
|
| 90 |
+
)
|
| 91 |
|
| 92 |
|
| 93 |
+
# Pydantic Models
|
| 94 |
class PredictUrlPayload(BaseModel):
|
| 95 |
url: str
|
| 96 |
|
| 97 |
|
| 98 |
+
class PreprocessTextPayload(BaseModel):
|
| 99 |
+
text: str
|
| 100 |
+
include_sentiment: bool = True
|
| 101 |
+
include_stemming: bool = True
|
| 102 |
+
include_lemmatization: bool = True
|
| 103 |
+
remove_stopwords: bool = True
|
| 104 |
+
|
| 105 |
+
|
| 106 |
_url_bundle: Optional[Dict[str, Any]] = None
|
| 107 |
_url_lock = threading.Lock()
|
| 108 |
|
|
|
|
| 295 |
out["max_brand_sim"] = hosts.apply(_max_brand_similarity)
|
| 296 |
out["like_facebook"] = hosts.apply(lambda h: _like_brand(h, "facebook"))
|
| 297 |
|
| 298 |
+
# Lookalike/homoglyph detection
|
|
|
|
| 299 |
def _detect_lookalike_chars(url: str) -> int:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 300 |
url_str = url or ""
|
|
|
|
|
|
|
| 301 |
lookalikes_cyrillic = {
|
| 302 |
'а': 'a', 'е': 'e', 'о': 'o', 'р': 'p', 'с': 'c', 'х': 'x',
|
| 303 |
'у': 'y', 'ч': '4', 'ы': 'b', 'ь': 'b', 'і': 'i', 'ї': 'yi',
|
| 304 |
'ґ': 'g', 'ė': 'e', 'ń': 'n', 'ș': 's', 'ț': 't'
|
| 305 |
}
|
|
|
|
|
|
|
| 306 |
lookalikes_greek = {
|
| 307 |
'α': 'a', 'ο': 'o', 'ν': 'v', 'τ': 't', 'ρ': 'p'
|
| 308 |
}
|
|
|
|
|
|
|
| 309 |
lookalikes_latin = {
|
| 310 |
'ɑ': 'a', 'ɢ': 'g', 'ᴅ': 'd', 'ɡ': 'g', 'ɪ': 'i',
|
| 311 |
'ɴ': 'n', 'ᴘ': 'p', 'ᴠ': 'v', 'ᴡ': 'w', 'ɨ': 'i'
|
| 312 |
}
|
|
|
|
| 313 |
all_lookalikes = {**lookalikes_cyrillic, **lookalikes_greek, **lookalikes_latin}
|
|
|
|
| 314 |
for char in url_str:
|
| 315 |
if char in all_lookalikes:
|
| 316 |
return 1
|
|
|
|
| 318 |
|
| 319 |
out["has_lookalike_chars"] = s.apply(_detect_lookalike_chars)
|
| 320 |
|
|
|
|
|
|
|
|
|
|
| 321 |
return out.reindex(columns=feature_cols, fill_value=0)
|
| 322 |
|
| 323 |
|
|
|
|
| 343 |
return (url or "").strip().rstrip("/")
|
| 344 |
|
| 345 |
|
| 346 |
+
# ============================================================================
|
| 347 |
+
# API ENDPOINTS
|
| 348 |
+
# ============================================================================
|
| 349 |
+
|
| 350 |
@app.get("/")
|
| 351 |
def root():
|
| 352 |
+
return {
|
| 353 |
+
"status": "ok",
|
| 354 |
+
"service": "PhishWatch Pro API",
|
| 355 |
+
"modules": {
|
| 356 |
+
"module_2_text_preprocessing": NLTK_AVAILABLE,
|
| 357 |
+
"module_4_url_analyzer": True
|
| 358 |
+
},
|
| 359 |
+
"endpoints": [
|
| 360 |
+
"/predict-url (Module 4: URL Analysis)",
|
| 361 |
+
"/preprocess-text (Module 2: Text Preprocessing)"
|
| 362 |
+
]
|
| 363 |
+
}
|
| 364 |
+
|
| 365 |
+
|
| 366 |
+
@app.post("/preprocess-text")
|
| 367 |
+
def preprocess_text(payload: PreprocessTextPayload):
|
| 368 |
+
"""
|
| 369 |
+
Module 2: Text Preprocessing
|
| 370 |
+
|
| 371 |
+
Implements the complete NLP pipeline as per methodology Section 3.7.2:
|
| 372 |
+
- Tokenization
|
| 373 |
+
- Stemming & Lemmatization
|
| 374 |
+
- Stop word removal
|
| 375 |
+
- Sentiment analysis (emotional/persuasive language detection)
|
| 376 |
+
"""
|
| 377 |
+
if not NLTK_AVAILABLE:
|
| 378 |
+
return JSONResponse(
|
| 379 |
+
status_code=503,
|
| 380 |
+
content={
|
| 381 |
+
"error": "NLP libraries not available",
|
| 382 |
+
"message": "Please install: pip install nltk textblob"
|
| 383 |
+
}
|
| 384 |
+
)
|
| 385 |
+
|
| 386 |
+
try:
|
| 387 |
+
text = (payload.text or "").strip()
|
| 388 |
+
if not text:
|
| 389 |
+
return JSONResponse(status_code=400, content={"error": "Empty text"})
|
| 390 |
+
|
| 391 |
+
# Step 1: Tokenization
|
| 392 |
+
tokens = word_tokenize(text.lower())
|
| 393 |
+
|
| 394 |
+
# Step 2: Stop word removal (optional)
|
| 395 |
+
if payload.remove_stopwords:
|
| 396 |
+
tokens_filtered = [t for t in tokens if t.isalnum() and t not in stop_words]
|
| 397 |
+
else:
|
| 398 |
+
tokens_filtered = [t for t in tokens if t.isalnum()]
|
| 399 |
+
|
| 400 |
+
# Step 3: Stemming (optional)
|
| 401 |
+
stemmed_tokens = []
|
| 402 |
+
if payload.include_stemming:
|
| 403 |
+
stemmed_tokens = [stemmer.stem(t) for t in tokens_filtered]
|
| 404 |
+
|
| 405 |
+
# Step 4: Lemmatization (optional)
|
| 406 |
+
lemmatized_tokens = []
|
| 407 |
+
if payload.include_lemmatization:
|
| 408 |
+
lemmatized_tokens = [lemmatizer.lemmatize(t) for t in tokens_filtered]
|
| 409 |
+
|
| 410 |
+
# Step 5: Sentiment Analysis & Phishing Indicators (optional)
|
| 411 |
+
sentiment_data = {}
|
| 412 |
+
phishing_indicators = {}
|
| 413 |
+
if payload.include_sentiment:
|
| 414 |
+
blob = TextBlob(text)
|
| 415 |
+
sentiment_data = {
|
| 416 |
+
"polarity": float(blob.sentiment.polarity), # -1 (negative) to 1 (positive)
|
| 417 |
+
"subjectivity": float(blob.sentiment.subjectivity), # 0 (objective) to 1 (subjective)
|
| 418 |
+
"classification": (
|
| 419 |
+
"positive" if blob.sentiment.polarity > 0.1 else
|
| 420 |
+
"negative" if blob.sentiment.polarity < -0.1 else
|
| 421 |
+
"neutral"
|
| 422 |
+
)
|
| 423 |
+
}
|
| 424 |
+
|
| 425 |
+
# Detect phishing-specific emotional/persuasive language
|
| 426 |
+
text_lower = text.lower()
|
| 427 |
+
detected_keywords = [kw for kw in PHISHING_KEYWORDS if kw in text_lower]
|
| 428 |
+
|
| 429 |
+
# Calculate risk score based on keyword density and emotional manipulation
|
| 430 |
+
keyword_density = len(detected_keywords) / max(len(tokens_filtered), 1)
|
| 431 |
+
urgency_detected = any(kw in detected_keywords for kw in [
|
| 432 |
+
'urgent', 'expire', 'act now', 'suspended', 'locked', 'warning', 'alert'
|
| 433 |
+
])
|
| 434 |
+
emotional_appeal = blob.sentiment.subjectivity > 0.6
|
| 435 |
+
|
| 436 |
+
phishing_indicators = {
|
| 437 |
+
"suspicious_keywords": detected_keywords,
|
| 438 |
+
"keyword_count": len(detected_keywords),
|
| 439 |
+
"keyword_density": float(keyword_density),
|
| 440 |
+
"urgency_detected": urgency_detected,
|
| 441 |
+
"emotional_appeal": emotional_appeal,
|
| 442 |
+
"high_subjectivity": blob.sentiment.subjectivity > 0.6,
|
| 443 |
+
"risk_score": min(1.0,
|
| 444 |
+
len(detected_keywords) * 0.12 +
|
| 445 |
+
(0.25 if urgency_detected else 0) +
|
| 446 |
+
(0.20 if emotional_appeal else 0) +
|
| 447 |
+
(keyword_density * 0.3)
|
| 448 |
+
),
|
| 449 |
+
"risk_level": (
|
| 450 |
+
"HIGH" if len(detected_keywords) >= 3 or urgency_detected else
|
| 451 |
+
"MEDIUM" if len(detected_keywords) >= 1 else
|
| 452 |
+
"LOW"
|
| 453 |
+
)
|
| 454 |
+
}
|
| 455 |
+
|
| 456 |
+
# Prepare cleaned text variants
|
| 457 |
+
cleaned_text = " ".join(tokens_filtered)
|
| 458 |
+
stemmed_text = " ".join(stemmed_tokens) if stemmed_tokens else None
|
| 459 |
+
lemmatized_text = " ".join(lemmatized_tokens) if lemmatized_tokens else None
|
| 460 |
+
|
| 461 |
+
return {
|
| 462 |
+
"module": "text_preprocessing",
|
| 463 |
+
"original_text": text,
|
| 464 |
+
"tokens": tokens[:100], # Limit for readability
|
| 465 |
+
"token_count": len(tokens),
|
| 466 |
+
"filtered_tokens": tokens_filtered[:100],
|
| 467 |
+
"filtered_token_count": len(tokens_filtered),
|
| 468 |
+
"cleaned_text": cleaned_text,
|
| 469 |
+
"stemmed_text": stemmed_text,
|
| 470 |
+
"lemmatized_text": lemmatized_text,
|
| 471 |
+
"sentiment": sentiment_data if sentiment_data else None,
|
| 472 |
+
"phishing_indicators": phishing_indicators if phishing_indicators else None,
|
| 473 |
+
"preprocessing_applied": {
|
| 474 |
+
"tokenization": True,
|
| 475 |
+
"stopword_removal": payload.remove_stopwords,
|
| 476 |
+
"stemming": payload.include_stemming,
|
| 477 |
+
"lemmatization": payload.include_lemmatization,
|
| 478 |
+
"sentiment_analysis": payload.include_sentiment
|
| 479 |
+
}
|
| 480 |
+
}
|
| 481 |
+
|
| 482 |
+
except Exception as e:
|
| 483 |
+
return JSONResponse(status_code=500, content={"error": str(e)})
|
| 484 |
|
| 485 |
|
| 486 |
@app.post("/predict-url")
|
| 487 |
def predict_url(payload: PredictUrlPayload):
|
| 488 |
+
"""
|
| 489 |
+
Module 4: URL Analyzer
|
| 490 |
+
|
| 491 |
+
Analyzes URLs for phishing using Random Forest model with:
|
| 492 |
+
- Structural analysis (length, symbols, patterns)
|
| 493 |
+
- Domain analysis (SLD, TLD, subdomains)
|
| 494 |
+
- Typosquatting detection
|
| 495 |
+
- Lookalike character detection
|
| 496 |
+
- Brand similarity analysis
|
| 497 |
+
"""
|
| 498 |
try:
|
| 499 |
_load_url_model()
|
| 500 |
|
| 501 |
+
# Load CSVs on every request
|
| 502 |
phishy_list = _read_urls_from_csv(AUTOCALIB_PHISHY_CSV)
|
| 503 |
legit_list = _read_urls_from_csv(AUTOCALIB_LEGIT_CSV)
|
| 504 |
host_map = _read_hosts_from_csv(KNOWN_HOSTS_CSV)
|
|
|
|
| 517 |
if not url_str:
|
| 518 |
return JSONResponse(status_code=400, content={"error": "Empty url"})
|
| 519 |
|
| 520 |
+
# URL-level override via CSV lists
|
| 521 |
norm_url = _normalize_url_string(url_str)
|
| 522 |
phishy_set = { _normalize_url_string(u) for u in phishy_list }
|
| 523 |
legit_set = { _normalize_url_string(u) for u in legit_list }
|
|
|
|
| 529 |
phish_proba = 0.99 if label == "PHISH" else 0.01
|
| 530 |
score = phish_proba if label == "PHISH" else (1.0 - phish_proba)
|
| 531 |
return {
|
| 532 |
+
"module": "url_analyzer",
|
| 533 |
"label": label,
|
| 534 |
"predicted_label": int(predicted_label),
|
| 535 |
"score": float(score),
|
|
|
|
| 540 |
"override": {"reason": "csv_url_match"},
|
| 541 |
}
|
| 542 |
|
| 543 |
+
# Known-host override
|
| 544 |
host = (urlparse(_ensure_scheme(url_str)).hostname or "").lower()
|
| 545 |
if host and host_map:
|
| 546 |
for h, lbl in host_map.items():
|
|
|
|
| 551 |
phish_proba = 0.99 if label == "PHISH" else 0.01
|
| 552 |
score = phish_proba if label == "PHISH" else (1.0 - phish_proba)
|
| 553 |
return {
|
| 554 |
+
"module": "url_analyzer",
|
| 555 |
"label": label,
|
| 556 |
"predicted_label": int(predicted_label),
|
| 557 |
"score": float(score),
|
|
|
|
| 559 |
"backend": str(model_type),
|
| 560 |
"threshold": 0.5,
|
| 561 |
"url_col": url_col,
|
| 562 |
+
"override": {"reason": "known_host_match"},
|
| 563 |
}
|
| 564 |
|
| 565 |
+
# Lookalike character guard
|
| 566 |
+
lookalikes_cyrillic = {
|
| 567 |
+
'а': 'a', 'е': 'e', 'о': 'o', 'р': 'p', 'с': 'c', 'х': 'x',
|
| 568 |
+
'у': 'y', 'ч': '4', 'ы': 'b', 'ь': 'b', 'і': 'i', 'ї': 'yi',
|
| 569 |
+
'ґ': 'g', 'ė': 'e', 'ń': 'n', 'ș': 's', 'ț': 't'
|
| 570 |
+
}
|
| 571 |
+
lookalikes_greek = {
|
| 572 |
+
'α': 'a', 'ο': 'o', 'ν': 'v', 'τ': 't', 'ρ': 'p'
|
| 573 |
+
}
|
| 574 |
+
lookalikes_latin = {
|
| 575 |
+
'ɑ': 'a', 'ɢ': 'g', 'ᴅ': 'd', 'ɡ': 'g', 'ɪ': 'i',
|
| 576 |
+
'ɴ': 'n', 'ᴘ': 'p', 'ᴠ': 'v', 'ᴡ': 'w', 'ɨ': 'i'
|
| 577 |
+
}
|
| 578 |
+
all_lookalikes = {**lookalikes_cyrillic, **lookalikes_greek, **lookalikes_latin}
|
| 579 |
+
|
| 580 |
+
for char in url_str:
|
| 581 |
+
if char in all_lookalikes:
|
| 582 |
+
phish_is_positive = True if URL_POSITIVE_CLASS_ENV == "" else (URL_POSITIVE_CLASS_ENV == "PHISH")
|
| 583 |
+
label = "PHISH"
|
| 584 |
+
predicted_label = 1 if ((label == "PHISH") == phish_is_positive) else 0
|
| 585 |
+
phish_proba = 0.95
|
| 586 |
+
score = phish_proba
|
| 587 |
+
return {
|
| 588 |
+
"module": "url_analyzer",
|
| 589 |
+
"label": label,
|
| 590 |
+
"predicted_label": int(predicted_label),
|
| 591 |
+
"score": float(score),
|
| 592 |
+
"phishing_probability": float(phish_proba),
|
| 593 |
+
"backend": "lookalike_guard",
|
| 594 |
+
"threshold": 0.5,
|
| 595 |
+
"url_col": url_col,
|
| 596 |
+
"rule": "lookalike_character_detected",
|
| 597 |
+
}
|
| 598 |
+
|
| 599 |
+
# Typosquat guard
|
| 600 |
+
s_host = (urlparse(_ensure_scheme(url_str)).hostname or "").lower()
|
| 601 |
+
s_sld = s_host.split(".")[-2] if "." in s_host else s_host
|
| 602 |
+
def _normalize_brand(s: str) -> str:
|
| 603 |
+
return re.sub(r"[^a-z]", "", s.lower())
|
| 604 |
+
s_clean = _normalize_brand(s_sld)
|
| 605 |
+
brands = [
|
| 606 |
+
"facebook","linkedin","paypal","google","amazon","apple",
|
| 607 |
+
"microsoft","instagram","netflix","twitter","whatsapp"
|
| 608 |
+
]
|
| 609 |
+
|
| 610 |
+
if s_clean:
|
| 611 |
+
best = 0.0
|
| 612 |
+
for b in brands:
|
| 613 |
+
best = max(best, SequenceMatcher(None, s_clean, _normalize_brand(b)).ratio())
|
| 614 |
+
has_digits = bool(re.search(r"\d", s_sld))
|
| 615 |
+
has_hyphen = ("-" in s_sld)
|
| 616 |
+
is_official = any(s_host.endswith(f"{_normalize_brand(b)}.com") for b in brands)
|
| 617 |
+
if (best >= 0.90) and (has_digits or has_hyphen) and (not is_official):
|
| 618 |
+
phish_is_positive = True if URL_POSITIVE_CLASS_ENV == "" else (URL_POSITIVE_CLASS_ENV == "PHISH")
|
| 619 |
+
label = "PHISH"
|
| 620 |
+
predicted_label = 1 if ((label == "PHISH") == phish_is_positive) else 0
|
| 621 |
+
phish_proba = 0.90
|
| 622 |
+
score = phish_proba
|
| 623 |
+
return {
|
| 624 |
+
"module": "url_analyzer",
|
| 625 |
+
"label": label,
|
| 626 |
+
"predicted_label": int(predicted_label),
|
| 627 |
+
"score": float(score),
|
| 628 |
+
"phishing_probability": float(phish_proba),
|
| 629 |
+
"backend": "typosquat_guard",
|
| 630 |
+
"threshold": 0.5,
|
| 631 |
+
"url_col": url_col,
|
| 632 |
+
"rule": "typosquat_guard",
|
| 633 |
+
}
|
| 634 |
+
|
| 635 |
+
# ML model inference
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 636 |
feats = _engineer_features([url_str], feature_cols)
|
| 637 |
if model_type == "xgboost_bst":
|
| 638 |
if xgb is None:
|
|
|
|
| 645 |
pred = model.predict(feats)[0]
|
| 646 |
raw_p_class1 = 1.0 if int(pred) == 1 else 0.0
|
| 647 |
|
|
|
|
| 648 |
phish_is_positive = True if URL_POSITIVE_CLASS_ENV == "" else (URL_POSITIVE_CLASS_ENV == "PHISH")
|
|
|
|
| 649 |
phish_proba = raw_p_class1 if phish_is_positive else (1.0 - raw_p_class1)
|
| 650 |
label = "PHISH" if phish_proba >= 0.5 else "LEGIT"
|
| 651 |
predicted_label = 1 if ((label == "PHISH") == phish_is_positive) else 0
|
| 652 |
score = phish_proba if label == "PHISH" else (1.0 - phish_proba)
|
| 653 |
|
| 654 |
return {
|
| 655 |
+
"module": "url_analyzer",
|
| 656 |
"label": label,
|
| 657 |
"predicted_label": int(predicted_label),
|
| 658 |
"score": float(score),
|
|
|
|
| 662 |
"url_col": url_col,
|
| 663 |
}
|
| 664 |
except Exception as e:
|
| 665 |
+
return JSONResponse(status_code=500, content={"error": str(e)})
|
|
|
|
|
|