| | import os |
| | import re |
| | import json |
| | import time |
| | import sys |
| | import asyncio |
| | import socket |
| | import random |
| | import logging |
| | import warnings |
| | import unicodedata |
| | import email |
| | from email.policy import default |
| | from typing import List, Dict, Optional, Any |
| | from urllib.parse import urlparse |
| |
|
| | |
| | import httpx |
| | import uvicorn |
| | import joblib |
| | import torch |
| | import numpy as np |
| | import pandas as pd |
| | from fastapi import FastAPI, HTTPException |
| | from fastapi.middleware.cors import CORSMiddleware |
| | from pydantic import BaseModel |
| | from groq import AsyncGroq, RateLimitError, APIError |
| | from dotenv import load_dotenv |
| | from bs4 import BeautifulSoup, MarkupResemblesLocatorWarning |
| | from playwright.async_api import async_playwright |
| |
|
| | |
| | import config |
| | from models import get_ml_models, get_dl_models, FinetunedBERT |
| | from feature_extraction import process_row |
| |
|
| | load_dotenv() |
| | sys.path.append(os.path.join(config.BASE_DIR, 'Message_model')) |
| |
|
| | |
| | try: |
| | from predict import PhishingPredictor |
| | except ImportError: |
| | PhishingPredictor = None |
| |
|
| | |
| | |
| | |
| | class UltraColorFormatter(logging.Formatter): |
| | |
| | GREY = "\x1b[38;5;240m" |
| | CYAN = "\x1b[36m" |
| | NEON_BLUE = "\x1b[38;5;39m" |
| | NEON_GREEN = "\x1b[38;5;82m" |
| | NEON_PURPLE = "\x1b[38;5;129m" |
| | YELLOW = "\x1b[33m" |
| | ORANGE = "\x1b[38;5;208m" |
| | RED = "\x1b[31m" |
| | BOLD_RED = "\x1b[31;1m" |
| | WHITE_BOLD = "\x1b[37;1m" |
| | RESET = "\x1b[0m" |
| |
|
| | |
| | FORMATS = { |
| | logging.DEBUG: GREY + " 🐞 [DEBUG] %(message)s" + RESET, |
| | logging.INFO: "%(message)s" + RESET, |
| | logging.WARNING: ORANGE + " ⚠️ [WARNING] %(message)s" + RESET, |
| | logging.ERROR: RED + " ❌ [ERROR] %(message)s" + RESET, |
| | logging.CRITICAL: BOLD_RED + "\n🚨 [CRITICAL] %(message)s\n" + RESET |
| | } |
| |
|
| | def format(self, record): |
| | log_fmt = self.FORMATS.get(record.levelno) |
| | formatter = logging.Formatter(log_fmt) |
| | return formatter.format(record) |
| |
|
| | logger = logging.getLogger("PhishingAPI") |
| | logger.setLevel(logging.INFO) |
| | ch = logging.StreamHandler(sys.stdout) |
| | ch.setFormatter(UltraColorFormatter()) |
| | if logger.hasHandlers(): |
| | logger.handlers.clear() |
| | logger.addHandler(ch) |
| |
|
| | |
| | def log_section(title): |
| | logger.info(f"\n{UltraColorFormatter.NEON_PURPLE}┌{'─'*70}┐") |
| | logger.info(f"{UltraColorFormatter.NEON_PURPLE}│ {UltraColorFormatter.WHITE_BOLD}{title.center(68)}{UltraColorFormatter.NEON_PURPLE} │") |
| | logger.info(f"{UltraColorFormatter.NEON_PURPLE}└{'─'*70}┘{UltraColorFormatter.RESET}") |
| |
|
| | def log_step(icon, text): |
| | logger.info(f"{UltraColorFormatter.CYAN} {icon} {text}{UltraColorFormatter.RESET}") |
| |
|
| | def log_substep(text, value=""): |
| | val_str = f": {UltraColorFormatter.NEON_GREEN}{value}{UltraColorFormatter.RESET}" if value else "" |
| | logger.info(f"{UltraColorFormatter.GREY} ├─ {text}{val_str}") |
| |
|
| | def log_success(text): |
| | logger.info(f"{UltraColorFormatter.NEON_GREEN} ✅ {text}{UltraColorFormatter.RESET}") |
| |
|
| | def log_metric(label, value, warning=False): |
| | color = UltraColorFormatter.ORANGE if warning else UltraColorFormatter.NEON_BLUE |
| | logger.info(f" {color}📊 {label}: {UltraColorFormatter.WHITE_BOLD}{value}{UltraColorFormatter.RESET}") |
| |
|
| | |
| |
|
| | |
| | MAX_INPUT_CHARS = 4000 |
| | MAX_CONCURRENT_REQUESTS = 5 |
| | MAX_URLS_TO_ANALYZE = 15 |
| | LLM_MAX_RETRIES = 3 |
| |
|
| | app = FastAPI( |
| | title="Phishing Detection API (Robust Ensemble)", |
| | description="Multilingual phishing detection using Weighted Ensemble (ML/DL) + LLM Semantic Analysis + Live Scraping", |
| | version="2.6.0" |
| | ) |
| |
|
| | app.add_middleware( |
| | CORSMiddleware, |
| | allow_origins=["*"], |
| | allow_credentials=True, |
| | allow_methods=["*"], |
| | allow_headers=["*"], |
| | ) |
| |
|
| | request_semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS) |
| |
|
| | |
| | class MessageInput(BaseModel): |
| | sender: Optional[str] = "" |
| | subject: Optional[str] = "" |
| | text: Optional[str] = "" |
| | metadata: Optional[Dict] = {} |
| |
|
| | class PredictionResponse(BaseModel): |
| | confidence: float |
| | reasoning: str |
| | highlighted_text: str |
| | final_decision: str |
| | suggestion: str |
| |
|
| | |
| | class SmartAPIKeyRotator: |
| | def __init__(self): |
| | keys_str = os.environ.get('GROQ_API_KEYS', '') |
| | self.keys = [k.strip() for k in keys_str.split(',') if k.strip()] |
| | if not self.keys: |
| | single_key = os.environ.get('GROQ_API_KEY') |
| | if single_key: |
| | self.keys = [single_key] |
| |
|
| | if not self.keys: |
| | logger.critical("CRITICAL: No GROQ_API_KEYS found in environment variables!") |
| | else: |
| | log_substep("API Key Rotator", f"Initialized with {len(self.keys)} keys") |
| | |
| | self.clients = [AsyncGroq(api_key=k) for k in self.keys] |
| | self.num_keys = len(self.clients) |
| | self.current_index = 0 |
| |
|
| | def get_client_and_rotate(self): |
| | if not self.clients: |
| | return None |
| | client = self.clients[self.current_index] |
| | self.current_index = (self.current_index + 1) % self.num_keys |
| | return client |
| |
|
| | |
| | ml_models = {} |
| | dl_models = {} |
| | bert_model = None |
| | semantic_model = None |
| | key_rotator: Optional[SmartAPIKeyRotator] = None |
| | ip_cache = {} |
| |
|
| | def clean_and_parse_json(text: str) -> Dict: |
| | try: |
| | return json.loads(text) |
| | except json.JSONDecodeError: |
| | pass |
| | |
| | text = re.sub(r"json\s*", "", text, flags=re.IGNORECASE) |
| | text = re.sub(r"", "", text) |
| | |
| | try: |
| | start = text.find('{') |
| | end = text.rfind('}') |
| | if start != -1 and end != -1: |
| | json_str = text[start:end+1] |
| | return json.loads(json_str) |
| | except Exception: |
| | pass |
| | logger.error(f"Failed to parse JSON from LLM response: {text[:50]}...") |
| | return {} |
| |
|
| | class EnsembleScorer: |
| | WEIGHTS = {'ml': 0.30, 'dl': 0.20, 'bert': 0.20, 'semantic': 0.10, 'network': 0.20} |
| |
|
| | @staticmethod |
| | def calculate_technical_score(predictions: Dict, network_data: List[Dict], urls: List[str]) -> Dict: |
| | score_accum = 0.0 |
| | weight_accum = 0.0 |
| | details = [] |
| | |
| | log_step("🧮", "Calculating Ensemble Weights") |
| |
|
| | |
| | ml_scores = [p['raw_score'] for k, p in predictions.items() if k in ['logistic', 'svm', 'xgboost']] |
| | if ml_scores: |
| | avg_ml = np.mean(ml_scores) |
| | score_accum += avg_ml * EnsembleScorer.WEIGHTS['ml'] * 100 |
| | weight_accum += EnsembleScorer.WEIGHTS['ml'] |
| | details.append(f"ML Consensus: {avg_ml:.2f}") |
| | log_substep("ML Models Consensus", f"{avg_ml:.4f} (Weight: {EnsembleScorer.WEIGHTS['ml']})") |
| |
|
| | |
| | dl_scores = [p['raw_score'] for k, p in predictions.items() if k in ['attention_blstm', 'rcnn']] |
| | if dl_scores: |
| | avg_dl = np.mean(dl_scores) |
| | score_accum += avg_dl * EnsembleScorer.WEIGHTS['dl'] * 100 |
| | weight_accum += EnsembleScorer.WEIGHTS['dl'] |
| | details.append(f"DL Consensus: {avg_dl:.2f}") |
| | log_substep("Deep Learning Consensus", f"{avg_dl:.4f} (Weight: {EnsembleScorer.WEIGHTS['dl']})") |
| |
|
| | |
| | if 'bert' in predictions: |
| | bert_s = predictions['bert']['raw_score'] |
| | score_accum += bert_s * EnsembleScorer.WEIGHTS['bert'] * 100 |
| | weight_accum += EnsembleScorer.WEIGHTS['bert'] |
| | details.append(f"BERT Score: {bert_s:.2f}") |
| | log_substep("BERT Finetuned", f"{bert_s:.4f} (Weight: {EnsembleScorer.WEIGHTS['bert']})") |
| |
|
| | |
| | if 'semantic' in predictions: |
| | sem_s = predictions['semantic']['raw_score'] |
| | score_accum += sem_s * EnsembleScorer.WEIGHTS['semantic'] * 100 |
| | weight_accum += EnsembleScorer.WEIGHTS['semantic'] |
| | log_substep("Semantic Analysis", f"{sem_s:.4f} (Weight: {EnsembleScorer.WEIGHTS['semantic']})") |
| |
|
| | |
| | net_risk = 0.0 |
| | net_reasons = [] |
| | for net_info in network_data: |
| | if net_info.get('proxy') or net_info.get('hosting'): |
| | net_risk += 40 |
| | net_reasons.append("Hosted/Proxy IP") |
| | |
| | org = str(net_info.get('org', '')).lower() |
| | isp = str(net_info.get('isp', '')).lower() |
| | suspicious_hosts = ['hostinger', 'namecheap', 'digitalocean', 'hetzner', 'ovh', 'flokinet'] |
| | |
| | if any(x in org or x in isp for x in suspicious_hosts): |
| | net_risk += 20 |
| | net_reasons.append(f"Cheap Cloud Provider ({org[:15]}...)") |
| | |
| | net_risk = min(net_risk, 100) |
| | score_accum += net_risk * EnsembleScorer.WEIGHTS['network'] |
| | weight_accum += EnsembleScorer.WEIGHTS['network'] |
| | |
| | log_substep("Network Risk Calculated", f"{net_risk:.2f} (Weight: {EnsembleScorer.WEIGHTS['network']})") |
| | if net_reasons: |
| | details.append(f"Network Penalties: {', '.join(list(set(net_reasons)))}") |
| |
|
| | if weight_accum == 0: |
| | final_score = 50.0 |
| | else: |
| | final_score = score_accum / weight_accum |
| | |
| | return { |
| | "score": min(max(final_score, 0), 100), |
| | "details": "; ".join(details), |
| | "network_risk": net_risk |
| | } |
| |
|
| | def load_models(): |
| | global ml_models, dl_models, bert_model, semantic_model, key_rotator |
| | log_section("SYSTEM STARTUP: LOADING ASSETS") |
| | |
| | models_dir = config.MODELS_DIR |
| | |
| | |
| | for model_name in ['logistic', 'svm', 'xgboost']: |
| | try: |
| | path = os.path.join(models_dir, f'{model_name}.joblib') |
| | if os.path.exists(path): |
| | ml_models[model_name] = joblib.load(path) |
| | log_substep(f"ML Model Loaded", model_name) |
| | except Exception: |
| | pass |
| |
|
| | |
| | for model_name in ['attention_blstm', 'rcnn']: |
| | try: |
| | path = os.path.join(models_dir, f'{model_name}.pt') |
| | if os.path.exists(path): |
| | template = get_dl_models(input_dim=len(config.NUMERICAL_FEATURES)) |
| | model = template[model_name] |
| | model.load_state_dict(torch.load(path, map_location='cpu')) |
| | model.eval() |
| | dl_models[model_name] = model |
| | log_substep(f"DL Model Loaded", model_name) |
| | except Exception: |
| | pass |
| |
|
| | |
| | bert_path = os.path.join(config.BASE_DIR, 'finetuned_bert') |
| | if os.path.exists(bert_path): |
| | try: |
| | bert_model = FinetunedBERT(bert_path) |
| | log_substep("BERT Model", "Loaded Successfully") |
| | except Exception: |
| | pass |
| |
|
| | |
| | sem_path = os.path.join(config.BASE_DIR, 'Message_model', 'final_semantic_model') |
| | if os.path.exists(sem_path) and PhishingPredictor: |
| | try: |
| | semantic_model = PhishingPredictor(model_path=sem_path) |
| | log_substep("Semantic Model", "Loaded Successfully") |
| | except Exception: |
| | pass |
| |
|
| | key_rotator = SmartAPIKeyRotator() |
| |
|
| | |
| | def extract_visible_text_and_links(raw_email: str) -> tuple: |
| | """ |
| | Parse a full raw email using Python's email library and extract: |
| | - extracted_text (merged plain text + HTML text + metadata) |
| | - links (list of all URLs found anywhere) |
| | """ |
| | log_step("📨", "Parsing Email MIME Structure") |
| | if not raw_email: |
| | logger.warning("Parsing received empty email input") |
| | return "", [] |
| |
|
| | extracted_text_parts = [] |
| | links = set() |
| |
|
| | |
| | try: |
| | msg = email.message_from_string(raw_email, policy=default) |
| | |
| | |
| | metadata = { |
| | "from": msg.get("From", ""), |
| | "to": msg.get("To", ""), |
| | "subject": msg.get("Subject", "") |
| | } |
| | for k, v in metadata.items(): |
| | if v: |
| | extracted_text_parts.append(f"{k.capitalize()}: {v}") |
| | log_substep(f"Metadata [{k}]", v[:50] + "..." if len(v) > 50 else v) |
| |
|
| | part_count = 0 |
| | for part in msg.walk(): |
| | part_count += 1 |
| | content_type = part.get_content_type() |
| | content_disposition = str(part.get("Content-Disposition") or "") |
| |
|
| | try: |
| | if content_type == "text/plain": |
| | text_data = part.get_payload(decode=True) |
| | if text_data: |
| | text_str = text_data.decode(part.get_content_charset() or "utf-8", errors="ignore") |
| | extracted_text_parts.append(text_str) |
| | links.update(re.findall(r'https?://\S+', text_str)) |
| | |
| | elif content_type == "text/html": |
| | html_data = part.get_payload(decode=True) |
| | if html_data: |
| | html_str = html_data.decode(part.get_content_charset() or "utf-8", errors="ignore") |
| | soup = BeautifulSoup(html_str, "html.parser") |
| | extracted_text_parts.append(soup.get_text(separator="\n")) |
| | for a in soup.find_all("a", href=True): |
| | links.add(a["href"]) |
| | for img in soup.find_all("img", src=True): |
| | links.add(img["src"]) |
| | |
| | elif "attachment" in content_disposition.lower() or "inline" in content_disposition.lower(): |
| | filename = part.get_filename() |
| | if filename: |
| | extracted_text_parts.append(f"[Attachment found: {filename}]") |
| | log_substep("Attachment", filename) |
| |
|
| | except Exception as e: |
| | logger.warning(f"Error parsing email part: {e}") |
| |
|
| | except Exception as e: |
| | logger.error(f"Email Parsing Failed: {e}") |
| |
|
| | |
| | extracted_text = "\n".join(extracted_text_parts).strip() |
| |
|
| | |
| | |
| | |
| | if not extracted_text: |
| | if "<html" in raw_email.lower() or "<body" in raw_email.lower() or "<div" in raw_email.lower(): |
| | log_substep("Fallback", "Input appears to be Raw HTML, stripping tags...") |
| | try: |
| | soup = BeautifulSoup(raw_email, "html.parser") |
| | extracted_text = soup.get_text(separator="\n") |
| | |
| | for a in soup.find_all("a", href=True): |
| | links.add(a["href"]) |
| | for img in soup.find_all("img", src=True): |
| | links.add(img["src"]) |
| | except Exception: |
| | extracted_text = raw_email |
| | else: |
| | extracted_text = raw_email |
| | |
| | |
| | links.update(re.findall(r'https?://\S+', raw_email)) |
| |
|
| | cleaned_links = [] |
| | for link in links: |
| | link = link.strip().strip("<>").replace('"', "") |
| | if link.startswith("http://") or link.startswith("https://"): |
| | cleaned_links.append(link) |
| |
|
| | log_success(f"Parsed Content. Extracted {len(cleaned_links)} unique URLs.") |
| | return extracted_text, cleaned_links |
| |
|
| | async def extract_url_features(urls: List[str]) -> pd.DataFrame: |
| | if not urls: |
| | return pd.DataFrame() |
| | |
| | log_step("🧬", f"Extracting Features for {len(urls)} URLs") |
| | df = pd.DataFrame({'url': urls}) |
| | whois_cache, ssl_cache = {}, {} |
| |
|
| | tasks = [asyncio.to_thread(process_row, row, whois_cache, ssl_cache) for _, row in df.iterrows()] |
| | feature_list_raw = await asyncio.gather(*tasks, return_exceptions=True) |
| |
|
| | feature_list = [] |
| | for i, f in enumerate(feature_list_raw): |
| | if isinstance(f, Exception): |
| | logger.error(f"Feature extraction error on {urls[i]}: {f}") |
| | feature_list.append({}) |
| | else: |
| | feature_list.append(f) |
| | |
| | log_substep("Feature Extraction", "Complete") |
| | return pd.concat([df, pd.DataFrame(feature_list)], axis=1) |
| |
|
| | def get_model_predictions(features_df: pd.DataFrame, message_text: str) -> Dict: |
| | predictions = {} |
| | num_feats = config.NUMERICAL_FEATURES |
| | cat_feats = config.CATEGORICAL_FEATURES |
| | |
| | if not features_df.empty: |
| | try: |
| | log_step("🤖", "Running Machine Learning Inference") |
| | X = features_df[num_feats + cat_feats].copy() |
| | X[num_feats] = X[num_feats].fillna(-1) |
| | X[cat_feats] = X[cat_feats].fillna('N/A') |
| | |
| | |
| | for name, model in ml_models.items(): |
| | try: |
| | probas = model.predict_proba(X)[:, 1] |
| | raw_score = float(np.max(probas)) |
| | predictions[name] = {'raw_score': raw_score} |
| | log_substep(f"ML: {name.ljust(10)}", f"{raw_score:.4f}") |
| | except: |
| | predictions[name] = {'raw_score': 0.5} |
| |
|
| | |
| | if dl_models: |
| | X_num = torch.tensor(X[num_feats].values.astype(np.float32)) |
| | with torch.no_grad(): |
| | for name, model in dl_models.items(): |
| | try: |
| | out = model(X_num) |
| | raw_score = float(torch.max(out).item()) |
| | predictions[name] = {'raw_score': raw_score} |
| | log_substep(f"DL: {name.ljust(10)}", f"{raw_score:.4f}") |
| | except: |
| | predictions[name] = {'raw_score': 0.5} |
| | |
| | |
| | if bert_model: |
| | try: |
| | scores = bert_model.predict_proba(features_df['url'].tolist()) |
| | avg_score = float(np.mean([s[1] for s in scores])) |
| | predictions['bert'] = {'raw_score': avg_score} |
| | log_substep("BERT Inference", f"{avg_score:.4f}") |
| | except: |
| | pass |
| | except Exception as e: |
| | logger.error(f"Feature Pipeline Error: {e}") |
| |
|
| | if semantic_model and message_text: |
| | try: |
| | log_step("🧠", "Running Semantic Text Analysis") |
| | res = semantic_model.predict(message_text) |
| | predictions['semantic'] = {'raw_score': float(res['phishing_probability'])} |
| | log_substep("Semantic Prob", f"{res['phishing_probability']:.4f}") |
| | except: |
| | pass |
| |
|
| | return predictions |
| |
|
| | async def get_network_data_raw(urls: List[str]) -> List[Dict]: |
| | data = [] |
| | unique_hosts = set() |
| | |
| | for url_str in urls: |
| | try: |
| | parsed = urlparse(url_str if url_str.startswith(('http', 'https')) else f"http://{url_str}") |
| | if parsed.hostname: |
| | unique_hosts.add(parsed.hostname) |
| | except: |
| | pass |
| |
|
| | target_hosts = list(unique_hosts)[:5] |
| | log_step("🌍", f"Geo-Locating Hosts: {target_hosts}") |
| |
|
| | async with httpx.AsyncClient(timeout=3.0) as client: |
| | for host in target_hosts: |
| | if host in ip_cache: |
| | data.append(ip_cache[host]) |
| | log_substep(f"Cache Hit", host) |
| | continue |
| | |
| | try: |
| | ip = await asyncio.to_thread(socket.gethostbyname, host) |
| | resp = await client.get(f"http://ip-api.com/json/{ip}?fields=status,message,country,isp,org,as,proxy,hosting") |
| | if resp.status_code == 200: |
| | geo = resp.json() |
| | if geo.get('status') == 'success': |
| | geo['ip'] = ip |
| | geo['host'] = host |
| | data.append(geo) |
| | ip_cache[host] = geo |
| | log_substep(f"Resolved {host}", f"{geo.get('org', 'Unknown')} [{geo.get('country', 'UNK')}]") |
| | except Exception: |
| | log_substep(f"Failed to resolve", host) |
| | |
| | await asyncio.sleep(0.2) |
| | return data |
| |
|
| | async def scrape_landing_page(urls: list[str]) -> dict: |
| | |
| | |
| | urls = urls[:10] |
| |
|
| | results = {} |
| |
|
| | async def scrape_single(url: str): |
| | nonlocal results |
| | try: |
| | async with async_playwright() as p: |
| | browser = await p.chromium.launch(headless=True) |
| | context = await browser.new_context( |
| | user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36" |
| | ) |
| | page = await context.new_page() |
| |
|
| | try: |
| | target_url = url if url.startswith(("http", "https")) else f"http://{url}" |
| | await page.goto(target_url, timeout=10000, wait_until="domcontentloaded") |
| |
|
| | content = await page.content() |
| |
|
| | soup = BeautifulSoup(content, "html.parser") |
| | for tag in soup(["script", "style", "nav", "footer", "svg", "noscript"]): |
| | tag.decompose() |
| |
|
| | text = soup.get_text(separator=" ", strip=True) |
| | text = unicodedata.normalize("NFKC", text) |
| |
|
| | results[url] = text[:300] |
| |
|
| | except Exception as e: |
| | results[url] = f"Error accessing page: {str(e)}" |
| |
|
| | finally: |
| | await browser.close() |
| |
|
| | except Exception as e: |
| | results[url] = f"Scraping failed: {str(e)}" |
| |
|
| | |
| | tasks = [scrape_single(u) for u in urls] |
| | await asyncio.gather(*tasks) |
| |
|
| | return results |
| |
|
| |
|
| | |
| |
|
| | SYSTEM_PROMPT = """You are the 'Maverick', an elite, autonomous Cybersecurity Judge. Your sole purpose is to analyze the provided Evidence Dossier and return a JSON object. |
| | **Core Rules:** |
| | 1. **The "One Bad Link" Rule:** If the email contains **ANY** suspicious or malicious URL, the Final Decision MUST be "phishing" (100% Confidence), even if other links are legitimate. |
| | 2. **Prioritize Ground Truth:** You must prioritize **Scraped Content** (e.g., a page asking for credentials) and **Network Data** (e.g., a Bank hosted on DigitalOcean) over the Technical Score. |
| | 3. **Override Authority:** Even if the 'Technical Ensemble Score' is low (e.g., 20/100), if you find a Critical Threat in the Scraped Data or Forensic Scan, you MUST override with a High Score (90-100). |
| | 4. **Suspicious Indicators:** |
| | - **Scraped Data:** Login forms on non-official domains, "Verify Identity" text, urgency. |
| | - **Network:** Mismatch between Sender Domain and Hosting (e.g., Microsoft email hosted on Namecheap). |
| | - **Forensics:** Hidden H1 tags, Typosquatting (paypa1.com), Mismatched hrefs. |
| | 5. **Confidence score:** |
| | -Give confidence score between 0-100 based on all the evidences and the decision being made. The score >50 should be given if the email seems phishing and <50 should be given if the email seems legitimate. |
| | 6. **Final Decision:** |
| | -Based on the evidences and confidence score, give the final decision , if the final score seems phishing then give final decision as phishing and if the final score seems legitimate then give final decision as legitimate. |
| | **8 ROBUST FEW-SHOT EXAMPLES:** |
| | **Example 1: Phishing (Credential Harvesting - Scraped Data Override)** |
| | **Input:** |
| | Sender: security-alert@microsoft-online-verify.com |
| | Subject: Action Required: Unusual Sign-in Activity Detected |
| | Technical Score: 35 / 100 |
| | Network Intelligence: Host: 162.241.2.1 | Org: Unified Layer (Cheap Hosting) | ISP: Bluehost | Proxy: False |
| | Scraped Content: "Microsoft 365. Sign in to your account. Email, phone, or Skype. No account? Create one. Can't access your account? Sign-in options. Terms of Use Privacy & Cookies. © Microsoft 2025. NOTE: This page is for authorized users only." |
| | Forensic Scan: Link: http://microsoft-online-verify.com/login.php |
| | Message: "Microsoft Security Alert |
| | We detected a sign-in attempt from a new device or location. |
| | **Account:** user@example.com |
| | **Date:** Fri, Nov 28, 2025 10:23 AM GMT |
| | **Location:** Moscow, Russia |
| | **IP Address:** 103.22.14.2 |
| | **Browser:** Firefox on Windows 10 |
| | If this wasn't you, your account may have been compromised. Please **verify your identity immediately** to secure your account and avoid permanent suspension. |
| | [Secure My Account] |
| | Thanks, |
| | The Microsoft Account Team" |
| | **Correct Decision:** |
| | {{ |
| | "confidence": 99.0, |
| | "reasoning": "CRITICAL OVERRIDE. The Scraped Data mimics a Microsoft 365 Login portal ('Sign in to your account'), but the Network Data confirms the site is hosted on 'Unified Layer/Bluehost', NOT Microsoft's official Azure infrastructure. This is a classic credential harvesting attack using a fake security alert.", |
| | "highlighted_text": "Please @@verify your identity immediately@@ to secure your account and avoid permanent suspension. @@[Secure My Account]@@", |
| | "final_decision": "phishing", |
| | "suggestion": "Do not enter credentials. This is a fake login page hosted on non-Microsoft servers." |
| | }} |
| | **Example 2: Phishing (Hidden Malicious URL - Forensic Override)** |
| | **Input:** |
| | Sender: hr-updates@wipro.com |
| | Subject: MANDATORY: Updated Employee Handbook & Compliance Policy 2025 |
| | Technical Score: 45 / 100 |
| | Network Intelligence: Host: docs.google.com (Google LLC) |
| | Scraped Content: "Google Docs. Sign in. Employee Handbook 2025.pdf. You need permission. Request access. Switch accounts. Google Workspace." |
| | Forensic Scan: CRITICAL: Found hidden URL in H1 tag: 'http://bit.ly/malware-redirect-payload' |
| | Message: "Dear Team, |
| | As part of our annual compliance audit (ISO 27001), all employees are required to review and sign the updated Employee Handbook for the fiscal year 2025. |
| | Please access the document via the secure Google Docs link below: |
| | [docs.google.com/handbook-2025](Link) |
| | Failure to acknowledge this document by Friday may result in a temporary suspension of network access. |
| | Regards, |
| | HR Compliance Team |
| | Wipro Limited" |
| | **Correct Decision:** |
| | {{ |
| | "confidence": 98.0, |
| | "reasoning": "Phishing. While the visible body text points to a legitimate Google Docs URL, the email contains a hidden malicious URL ('bit.ly/malware-redirect-payload') embedded in the HTML header tags. This is a sophisticated evasion tactic designed to bypass filters while tricking the user.", |
| | "highlighted_text": "Please access the document via the secure Google Docs link below: [docs.google.com/handbook-2025] @@(Hidden Header URL Detected)@@", |
| | "final_decision": "phishing", |
| | "suggestion": "Do not click. A hidden malicious payload was detected in the email structure." |
| | }} |
| | **Example 3: Phishing (Typosquatting & Urgency)** |
| | **Input:** |
| | Sender: support@paypa1-resolution.com |
| | Subject: URGENT: Wallet Suspended - Case ID #99283-AX |
| | Technical Score: 88 / 100 |
| | Network Intelligence: Host: paypa1-resolution.com | Org: Namecheap Inc | ISP: Namecheap |
| | Scraped Content: "PayPal. Security Challenge. Enter your credit card number to verify ownership. Expiration Date. CVV. Billing Address. Submit. Copyright 1999-2025 PayPal. All rights reserved." |
| | Forensic Scan: Link: https://paypa1-resolution.com/verify-identity |
| | Message: "Hello Customer, |
| | Your PayPal wallet has been temporarily suspended due to suspicious transactions totaling $400.00 USD to 'Global-Tech-Solutions Ltd'. |
| | To restore full access to your funds, you must **verify your payment method immediately**. Failure to do so within 24 hours will result in the permanent closure of your account and forfeiture of remaining balance. |
| | [Resolve Issue Now] |
| | Thank you for being a valued customer." |
| | **Correct Decision:** |
| | {{ |
| | "confidence": 99.0, |
| | "reasoning": "Phishing. Typosquatting detected ('paypa1' instead of 'paypal'). The Scraped Data confirms the landing page asks for credit card details (CVV/Expiry), and the domain is registered via Namecheap, not PayPal's official infrastructure. High urgency and threat of 'forfeiture' are clear indicators.", |
| | "highlighted_text": "Your PayPal wallet has been temporarily @@suspended@@. To restore full access, you must @@verify your payment method immediately@@. Failure to do so within 24 hours will result in @@permanent closure@@.", |
| | "final_decision": "phishing", |
| | "suggestion": "Delete immediately. This is an impersonation attack stealing financial data." |
| | }} |
| | **Example 4: Legitimate (Internal Corporate Email)** |
| | **Input:** |
| | Sender: admin@internal.daiict.ac.in |
| | Subject: Scheduled Maintenance - Server Room B - Sunday Nov 30 |
| | Technical Score: 15 / 100 |
| | Network Intelligence: Host: internal.daiict.ac.in | Org: DA-IICT | ISP: NKN (National Knowledge Network) | Proxy: False |
| | Scraped Content: "DA-IICT Intranet Login. Authorized Personnel Only. User ID. Password. Forgot Password? IT Helpdesk Extension: 4444." |
| | Forensic Scan: Link: https://internal.daiict.ac.in/wiki/maintenance-schedule |
| | Message: "Dear Faculty and Staff, |
| | The central server room (Room B) will be undergoing scheduled maintenance on Sunday, Nov 30, from 2:00 AM to 4:00 AM IST. |
| | **Services affected:** |
| | 1. Moodle (LMS) |
| | 2. Faculty Webmail |
| | 3. Student Portal |
| | Please ensure all work is saved before this time. For the full maintenance schedule and alternative access points, please check the wiki: |
| | https://internal.daiict.ac.in/wiki/maintenance-schedule |
| | Regards, |
| | IT Services" |
| | **Correct Decision:** |
| | {{ |
| | "confidence": 5.0, |
| | "reasoning": "Legitimate. The sender domain, link domain, and network infrastructure (DA-IICT/NKN) all align perfectly. The scraped content matches an internal login page. The content is informational with specific details and lacks suspicious urgency or external hosting.", |
| | "highlighted_text": "The central server room (Room B) will be undergoing scheduled maintenance on Sunday. Please check the wiki: https://internal.daiict.ac.in/wiki/maintenance-schedule", |
| | "final_decision": "legitimate", |
| | "suggestion": "Safe internal communication regarding maintenance." |
| | }} |
| | **Example 5: Phishing (Legitimate Cloud Abuse - Google Forms)** |
| | **Input:** |
| | Sender: recruitment-officer.john.doe@gmail.com |
| | Subject: Job Offer: Senior Python Developer - Document Upload |
| | Technical Score: 60 / 100 |
| | Network Intelligence: Host: forms.gle (Google LLC) |
| | Scraped Content: "Untitled Form. Job Application - Senior Dev. Upload your Passport Front and Back. Upload SSN Card. Upload Drivers License. Submit. Never submit passwords through Google Forms." |
| | Forensic Scan: Link: https://forms.gle/xyz123abc |
| | Message: "Dear Candidate, |
| | Congratulations! We are pleased to offer you the Senior Python Developer position at TechSolutions Inc. |
| | To proceed with your background check and contract generation, we need you to upload the following documents immediately: |
| | 1. Scanned Copy of Passport |
| | 2. Social Security Number (SSN) Card |
| | 3. Driver's License |
| | Please upload them to our secure portal here: |
| | [Link to Google Form] |
| | We need this by EOD today." |
| | **Correct Decision:** |
| | {{ |
| | "confidence": 92.0, |
| | "reasoning": "Phishing. Although the domain is legitimate (Google Forms), the Scraped Content reveals it is soliciting highly sensitive PII (Passport/SSN). Legitimate companies do not collect SSNs via public Google Forms. The use of a generic Gmail address for a 'Recruitment Officer' is also a red flag.", |
| | "highlighted_text": "To proceed with your background check... please @@upload the following documents immediately@@: 1. Scanned Copy of Passport 2. Social Security Number (SSN) Card. Please upload them to our secure portal here: @@[Link to Google Form]@@", |
| | "final_decision": "phishing", |
| | "suggestion": "Do not upload sensitive ID documents to public forms. This is likely identity theft." |
| | }} |
| | **Example 6: Legitimate (Transactional Alert)** |
| | **Input:** |
| | Sender: alerts@hdfcbank.net |
| | Subject: Transaction Alert: INR 5,000.00 Debited |
| | Technical Score: 8 / 100 |
| | Network Intelligence: Host: hdfcbank.net | Org: HDFC Bank Ltd | ISP: Sify Technologies |
| | Scraped Content: N/A (No Links) |
| | Forensic Scan: No URLs found. |
| | Message: "Dear Customer, |
| | INR 5,000.00 was debited from your A/c XX1234 on 28-Nov-2025. |
| | **Info:** UPI-12345-AmazonPay |
| | **Available Bal:** INR 42,000.00 |
| | If this transaction was not done by you, please call our 24x7 PhoneBanking number 1800-202-6161 immediately to block your card/account. |
| | Warm Regards, |
| | HDFC Bank" |
| | **Correct Decision:** |
| | {{ |
| | "confidence": 2.0, |
| | "reasoning": "Legitimate. This is a standard text-only transactional alert. The sender domain matches HDFC Bank's official domain, and the network data confirms it. The phone number is a standard support line. There are no suspicious links.", |
| | "highlighted_text": "INR 5,000.00 was debited from your A/c XX1234 on 28-Nov-2025. Info: UPI-12345-AmazonPay.", |
| | "final_decision": "legitimate", |
| | "suggestion": "Safe transactional alert. No action needed unless the transaction is unrecognized." |
| | }} |
| | **Example 7: Phishing (CEO Fraud / BEC - No Links)** |
| | **Input:** |
| | Sender: ceo.work.private@gmail.com (Spoofed Name: "Elon Musk") |
| | Subject: Urgent Task - Confidential - DO NOT IGNORE |
| | Technical Score: 75 / 100 |
| | Network Intelligence: Host: mail.google.com (Google LLC) |
| | Scraped Content: N/A |
| | Forensic Scan: No URLs found. |
| | Message: "Akshat, |
| | I am currently in a closed-door meeting with the board of investors and cannot talk on the phone. I need a favor. |
| | I need you to purchase 5 Apple Gift Cards ($100 each) for a client gift. It is urgent and needs to be done in the next 30 minutes. I will reimburse you personally by this evening. |
| | Do not mention this to anyone else yet. Reply with the codes here as soon as you have them. |
| | Elon." |
| | **Correct Decision:** |
| | {{ |
| | "confidence": 90.0, |
| | "reasoning": "Phishing (BEC). Classic Business Email Compromise. The Sender is using a generic Gmail address to impersonate a C-level executive. The request involves financial urgency (Gift Cards), secrecy ('closed-door meeting', 'do not mention'), and bypasses standard procurement channels.", |
| | "highlighted_text": "I need you to @@purchase 5 Apple Gift Cards@@ ($100 each) for a client gift. It is urgent... @@Reply with the codes here@@ as soon as you have them.", |
| | "final_decision": "phishing", |
| | "suggestion": "Do not reply. Verify this request with the CEO via a different, verified channel (Slack/Phone/Corporate Email)." |
| | }} |
| | **Example 8: Legitimate (Marketing with Trackers)** |
| | **Input:** |
| | Sender: newsletter@coursera.org |
| | Subject: Recommended for you: Python for Everybody Specialization |
| | Technical Score: 20 / 100 |
| | Network Intelligence: Host: links.coursera.org | Org: Coursera Inc | ISP: Amazon.com |
| | Scraped Content: "Coursera. Master Python. Enroll for Free. Starts Nov 29. Financial Aid available. Top Instructors. University of Michigan. 4.8 Stars (120k ratings)." |
| | Forensic Scan: Link: https://links.coursera.org/track/click?id=12345&user=akshat |
| | Message: "Hi Student, |
| | Based on your interest in Data Science, we found a course you might like: |
| | **Python for Everybody Specialization** |
| | Offered by University of Michigan. |
| | Start learning today and build job-ready skills. |
| | [Enroll Now] |
| | See you in class, |
| | The Coursera Team |
| | 381 E. Evelyn Ave, Mountain View, CA 94041" |
| | **Correct Decision:** |
| | {{ |
| | "confidence": 10.0, |
| | "reasoning": "Legitimate. Standard marketing email from a known education platform. Network data confirms the link tracking domain belongs to Coursera (hosted on AWS). Scraped content is consistent with the offer. Address matches public records.", |
| | "highlighted_text": "Based on your interest in Data Science, we found a course you might like: Python for Everybody Specialization. [Enroll Now]", |
| | "final_decision": "legitimate", |
| | "suggestion": "Safe marketing email." |
| | }}""" |
| |
|
| | async def get_groq_decision(ensemble_result: Dict, network_data: List[Dict], landing_page_text: str, cleaned_text: str, original_raw_html: str, readable_display_text: str, sender: str, subject: str): |
| | net_str = "No Network Data" |
| | if network_data: |
| | net_str = "\n".join([ |
| | f"- Host: {d.get('host')} | IP: {d.get('ip')} | Org: {d.get('org')} | ISP: {d.get('isp')} | Hosting/Proxy: {d.get('hosting') or d.get('proxy')}" |
| | for d in network_data |
| | ]) |
| |
|
| | log_step("🔎", "Starting Forensic HTML Scan") |
| | forensic_report = [] |
| | try: |
| | soup = BeautifulSoup(original_raw_html, 'html.parser') |
| | |
| | |
| | for form in soup.find_all('form'): |
| | action = form.get('action') |
| | if action: |
| | forensic_report.append(f"CRITICAL: Found URL in <form action>: {action}") |
| | |
| | |
| | for img in soup.find_all('img'): |
| | src = img.get('src') |
| | if src: |
| | forensic_report.append(f"Found URL in <img src>: {src}") |
| | |
| | |
| | for a in soup.find_all('a'): |
| | href = a.get('href') |
| | if href: |
| | forensic_report.append(f"Found URL in <a href>: {href}") |
| |
|
| | |
| | url_pattern = r'(?:https?://|ftp://|www\.)[\w\-\.]+\.[a-zA-Z]{2,}(?:/[\w\-\._~:/?#[\]@!$&\'()*+,;=]*)?' |
| | all_text_urls = set(re.findall(url_pattern, original_raw_html)) |
| | if all_text_urls: |
| | forensic_report.append(f"All URLs detected in raw text: {', '.join(all_text_urls)}") |
| | |
| | except Exception as e: |
| | logger.warning(f"Forensic Scan Error: {e}") |
| | forensic_report.append("Forensic scan failed to parse HTML structure.") |
| |
|
| | forensic_str = "\n".join(forensic_report) if forensic_report else "No URLs found in forensic scan." |
| | log_substep("Forensic Scan", f"Found {len(forensic_report)} potential indicators") |
| |
|
| | |
| | prompt_display_text = readable_display_text[:MAX_INPUT_CHARS] |
| | |
| | prompt = f""" |
| | **ANALYSIS CONTEXT** |
| | Sender: {sender} |
| | Subject: {subject} |
| | **FORENSIC URL SCAN (INTERNAL HTML ANALYSIS)** |
| | The system scanned the raw HTML and found these URLs (hidden in tags): |
| | {forensic_str} |
| | **TECHNICAL INDICATORS** |
| | Calculated Ensemble Score: {ensemble_result['score']:.2f} / 100 |
| | Key Factors: {ensemble_result['details']} |
| | **NETWORK GROUND TRUTH** |
| | {net_str} |
| | **LANDING PAGE PREVIEW (Scraped Text)** |
| | "{landing_page_text}" |
| | **MESSAGE CONTENT (READABLE VERSION)** |
| | "{prompt_display_text}" |
| | **TASK:** |
| | Analyze the "FORENSIC URL SCAN" findings. |
| | - If ANY URL in the forensic scan is NSFW/Adult or malicious, flag as PHISHING. |
| | - If a URL looks like a generated subdomain (e.g. 643646.me) or is unrelated to the sender, FLAG AS PHISHING immediately. |
| | - IMPORTANT: For the 'highlighted_text' field in your JSON response, use the **MESSAGE CONTENT (READABLE VERSION)** provided above. Do NOT output raw HTML tags. Just mark suspicious parts in the readable text with @@...@@. |
| | """ |
| |
|
| | attempts = 0 |
| | while attempts < LLM_MAX_RETRIES: |
| | try: |
| | client = key_rotator.get_client_and_rotate() |
| | if not client: |
| | raise Exception("No Keys") |
| | |
| | log_step("🚀", f"Sending LLM Request (Attempt {attempts+1}/{LLM_MAX_RETRIES})") |
| | |
| | completion = await client.chat.completions.create( |
| | messages=[ |
| | {"role": "system", "content": SYSTEM_PROMPT}, |
| | {"role": "user", "content": prompt} |
| | ], |
| | model="meta-llama/llama-4-scout-17b-16e-instruct", |
| | temperature=0.1, |
| | max_tokens=4096, |
| | response_format={"type": "json_object"} |
| | ) |
| |
|
| | raw_content = completion.choices[0].message.content |
| | log_substep("LLM Response Received", f"Length: {len(raw_content)} chars") |
| | |
| | parsed_json = clean_and_parse_json(raw_content) |
| | |
| | if parsed_json: |
| | log_success("LLM Response Parsed Successfully") |
| | return parsed_json |
| | else: |
| | raise ValueError("Empty or Invalid JSON from LLM") |
| |
|
| | except RateLimitError as e: |
| | wait_time = 2 ** (attempts + 1) + random.uniform(0, 1) |
| | if hasattr(e, 'headers') and 'retry-after' in e.headers: |
| | try: |
| | wait_time = float(e.headers['retry-after']) + 1 |
| | except: |
| | pass |
| | logger.warning(f"LLM Rate Limit (429). Retrying in {wait_time:.2f}s...") |
| | await asyncio.sleep(wait_time) |
| | attempts += 1 |
| | except Exception as e: |
| | logger.warning(f"LLM Attempt {attempts+1} failed: {e}") |
| | attempts += 1 |
| | await asyncio.sleep(1) |
| |
|
| | is_phishing = ensemble_result['score'] > 50 |
| | return { |
| | "confidence": ensemble_result['score'], |
| | "reasoning": f"LLM Unavailable after retries. Decision based purely on Technical Score ({ensemble_result['score']:.2f}).", |
| | "highlighted_text": readable_display_text, |
| | "final_decision": "phishing" if is_phishing else "legitimate", |
| | "suggestion": "Exercise caution. Automated analysis detected risks." if is_phishing else "Appears safe." |
| | } |
| |
|
| | @app.on_event("startup") |
| | async def startup(): |
| | logger.info(f"\n{UltraColorFormatter.NEON_BLUE}{'='*70}") |
| | logger.info(f"{UltraColorFormatter.WHITE_BOLD} PHISHING DETECTION API v2.6.0 - SYSTEM STARTUP ".center(80)) |
| | logger.info(f"{UltraColorFormatter.NEON_BLUE}{'='*70}{UltraColorFormatter.RESET}") |
| | load_models() |
| | logger.info(f"\n{UltraColorFormatter.NEON_GREEN}🚀 SYSTEM READY AND LISTENING ON PORT 8000{UltraColorFormatter.RESET}\n") |
| |
|
| | @app.post("/predict", response_model=PredictionResponse) |
| | async def predict(input_data: MessageInput): |
| | log_section(f"NEW REQUEST: {input_data.sender}") |
| | |
| | if not input_data.text or not input_data.text.strip(): |
| | logger.warning("Received empty input text.") |
| | return PredictionResponse( |
| | confidence=0.0, |
| | reasoning="Empty input.", |
| | highlighted_text="", |
| | final_decision="legitimate", |
| | suggestion="None" |
| | ) |
| |
|
| | async with request_semaphore: |
| | try: |
| | start_time = time.time() |
| | |
| | |
| | extracted_text, all_urls = extract_visible_text_and_links(input_data.text) |
| | |
| | |
| | url_pattern_for_cleaning = r'(?:https?://|ftp://|www\.)[\w\-\.]+\.[a-zA-Z]{2,}(?:/[\w\-\._~:/?#[\]@!$&\'()*+,;=]*)?' |
| | cleaned_text_for_models = re.sub(url_pattern_for_cleaning, '', extracted_text) |
| | cleaned_text_for_models = ' '.join(cleaned_text_for_models.lower().split()) |
| | |
| | all_urls = all_urls[:MAX_URLS_TO_ANALYZE] |
| |
|
| | if all_urls: |
| | log_step("🔗", f"Proceeding with {len(all_urls)} URLs") |
| | else: |
| | log_step("🚫", "No URLs Detected - Skipping Feature Extraction") |
| |
|
| | features_df = pd.DataFrame() |
| | network_data_raw = [] |
| | landing_page_text = "" |
| |
|
| | |
| | if all_urls: |
| | log_step("⚡", "Initiating Parallel Async Tasks") |
| | results = await asyncio.gather( |
| | extract_url_features(all_urls), |
| | get_network_data_raw(all_urls), |
| | scrape_landing_page(all_urls) |
| | ) |
| | features_df, network_data_raw, landing_page_text = results |
| |
|
| | if isinstance(landing_page_text, dict): |
| | landing_page_text = "\n".join(f"{u}: {txt}" for u, txt in landing_page_text.items()) |
| | else: |
| | landing_page_text = str(landing_page_text) |
| | |
| | |
| | predictions = await asyncio.to_thread(get_model_predictions, features_df, cleaned_text_for_models) |
| | ensemble_result = EnsembleScorer.calculate_technical_score(predictions, network_data_raw, all_urls) |
| | |
| | log_metric("Ensemble Technical Score", f"{ensemble_result['score']:.2f}/100", warning=ensemble_result['score']>50) |
| |
|
| | |
| | llm_result = await get_groq_decision( |
| | ensemble_result, |
| | network_data_raw, |
| | landing_page_text, |
| | cleaned_text_for_models, |
| | input_data.text, |
| | extracted_text, |
| | input_data.sender, |
| | input_data.subject |
| | ) |
| |
|
| | |
| | |
| | |
| | final_dec = llm_result.get('final_decision', 'legitimate').lower() |
| | if final_dec not in ['phishing', 'legitimate']: |
| | final_dec = 'legitimate' |
| |
|
| | |
| | final_confidence = float(llm_result.get('confidence', ensemble_result['score'])) |
| |
|
| | |
| | |
| | if final_dec == "phishing" and final_confidence <= 50: |
| | logger.warning(f"⚠️ Consistency Fix: Verdict is Phishing but Score was {final_confidence}. Forcing to 85.0.") |
| | final_confidence = max(60.0, final_confidence) |
| |
|
| | |
| | elif final_dec == "legitimate" and final_confidence > 50: |
| | logger.warning(f"⚠️ Consistency Fix: Verdict is Legitimate but Score was {final_confidence}. Forcing to 15.0.") |
| | final_confidence = min(40.0, final_confidence) |
| |
|
| | |
| |
|
| | elapsed = time.time() - start_time |
| | |
| | log_section("REQUEST COMPLETE") |
| | log_metric("Execution Time", f"{elapsed:.2f}s") |
| | log_metric("Technical Score", f"{ensemble_result['score']:.0f}") |
| | |
| | decision_color = UltraColorFormatter.BOLD_RED if final_dec == "phishing" else UltraColorFormatter.NEON_GREEN |
| | logger.info(f" ⚖️ FINAL VERDICT: {decision_color}{final_dec.upper()}{UltraColorFormatter.RESET} (Conf: {final_confidence})") |
| |
|
| | return PredictionResponse( |
| | confidence=final_confidence, |
| | reasoning=llm_result.get('reasoning', ensemble_result['details']), |
| | highlighted_text=llm_result.get('highlighted_text', extracted_text), |
| | final_decision=final_dec, |
| | suggestion=llm_result.get('suggestion', 'Check details carefully.') |
| | ) |
| |
|
| | except Exception as e: |
| | logger.error(f"CRITICAL FAILURE in Prediction Pipeline: {e}") |
| | import traceback |
| | traceback.print_exc() |
| | raise HTTPException(status_code=500, detail=str(e)) |