| """ |
| Phishing Detection API Routes |
| """ |
|
|
| import logging |
| from typing import Optional |
| from fastapi import APIRouter, HTTPException, Query |
| from pydantic import BaseModel |
| from ..services.phishing_analyzer import full_analysis |
| from ..services.email_analyzer import get_email_analyzer |
| from ..services.threats_manager import threats_manager |
| from ..config import settings |
|
|
| logger = logging.getLogger(__name__) |
|
|
| router = APIRouter() |
|
|
|
|
| class AnalyzeURLRequest(BaseModel): |
| url: str |
|
|
|
|
| class AnalyzeEmailRequest(BaseModel): |
| subject: str |
| sender: str |
| body_text: str |
| links: list[str] = [] |
| headers: dict | None = None |
|
|
|
|
| class EmailAnalysisResponse(BaseModel): |
| safe: bool |
| riskScore: int |
| confidence: float |
| reasons: list[str] |
| category: str |
| timestamp: str |
| analysis: dict |
|
|
|
|
| class PhishingResponse(BaseModel): |
| url: str |
| safe: bool |
| riskScore: int |
| confidence: float |
| reasons: list[str] |
| category: str |
| timestamp: str |
| id: int | None = None |
| phishTank: dict | None = None |
| external_checks_failed: bool | None = None |
|
|
|
|
| @router.post("/analyze-url", response_model=PhishingResponse, tags=["Phishing"]) |
| async def analyze_url(request: AnalyzeURLRequest): |
| """ |
| Analyze a URL for phishing indicators. |
| |
| Returns: Phishing analysis result with risk score and reasons |
| """ |
| if not request.url: |
| raise HTTPException(status_code=400, detail="URL is required") |
|
|
| try: |
| result = await full_analysis(request.url, settings.PHISHTANK_API_KEY) |
| threat = threats_manager.add_threat(result) |
| return PhishingResponse(**threat) |
| except Exception as e: |
| logger.error(f"Analysis error: {e}") |
| raise HTTPException(status_code=500, detail="Internal server error during analysis") |
|
|
|
|
| @router.post("/analyze-email", response_model=EmailAnalysisResponse, tags=["Phishing"]) |
| async def analyze_email(request: AnalyzeEmailRequest): |
| """ |
| Analyze an email for phishing, malicious links, and header anomalies. |
| |
| Performs three parallel analyses: |
| - Intent & Sentiment: Detects phishing language and urgency tactics |
| - URL Analysis: Checks all embedded links for malware/phishing |
| - Header Anomaly: Detects domain spoofing and suspicious sender patterns |
| |
| Returns: Combined risk score with detailed reasons for detection |
| """ |
| if not request.subject and not request.body_text: |
| raise HTTPException(status_code=400, detail="Email subject or body is required") |
|
|
| try: |
| from ..services import phishing_analyzer as url_analyzer |
| analyzer = get_email_analyzer() |
| result = await analyzer.analyze_email( |
| subject=request.subject, |
| sender=request.sender, |
| body_text=request.body_text, |
| links=request.links, |
| headers=request.headers, |
| phishing_analyzer=url_analyzer |
| ) |
|
|
| print(f"[API] Email Analysis - Score: {result.get('riskScore')}, Category: {result.get('category')}, Sender: {request.sender}") |
| return EmailAnalysisResponse(**result) |
| except Exception as e: |
| logger.error(f"Email analysis error: {e}") |
| raise HTTPException(status_code=500, detail="Internal server error during analysis") |
|
|
|
|
| @router.get("/threats", tags=["Phishing"]) |
| async def get_threats( |
| limit: int = 50, |
| offset: int = 0, |
| category: Optional[str] = None, |
| search: Optional[str] = None |
| ): |
| """ |
| Get all stored threats with filtering, pagination, and stats. |
| |
| Query params: |
| - limit: Number of threats to return (1-500, default 50) |
| - offset: Starting offset (default 0) |
| - category: Filter by category (high_risk, medium_risk, low_risk, trusted, invalid, malformed) |
| - search: Search by URL |
| """ |
| try: |
| return threats_manager.get_threats( |
| limit=limit, |
| offset=offset, |
| category=category, |
| search=search |
| ) |
| except Exception as e: |
| import traceback |
| tb = traceback.format_exc() |
| logger.error(f"Threats fetch error: {e}\n{tb}") |
| |
| with open("error_log.txt", "w") as f: |
| f.write(f"Error: {e}\n\nTraceback:\n{tb}\n\nParams: limit={limit} offset={offset} category={category} search={search}") |
| raise HTTPException(status_code=500, detail="Failed to fetch threats") |
|
|
|
|
| @router.get("/threats/{threat_id}", tags=["Phishing"]) |
| async def get_threat(threat_id: int): |
| """ |
| Get a single threat by ID. |
| """ |
| threat = threats_manager.get_threat(threat_id) |
|
|
| if not threat: |
| raise HTTPException(status_code=404, detail="Threat not found") |
|
|
| return threat |
|
|