File size: 4,659 Bytes
214209a 3e9b7e0 214209a 3e9b7e0 214209a 3e9b7e0 214209a 3e9b7e0 214209a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 | """
Phishing Detection API Routes
"""
import logging
from typing import Optional
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel
from ..services.phishing_analyzer import full_analysis
from ..services.email_analyzer import get_email_analyzer
from ..services.threats_manager import threats_manager
from ..config import settings
logger = logging.getLogger(__name__)
router = APIRouter()
class AnalyzeURLRequest(BaseModel):
url: str
class AnalyzeEmailRequest(BaseModel):
subject: str
sender: str
body_text: str
links: list[str] = []
headers: dict | None = None
class EmailAnalysisResponse(BaseModel):
safe: bool
riskScore: int
confidence: float
reasons: list[str]
category: str
timestamp: str
analysis: dict
class PhishingResponse(BaseModel):
url: str
safe: bool
riskScore: int
confidence: float
reasons: list[str]
category: str
timestamp: str
id: int | None = None
phishTank: dict | None = None
external_checks_failed: bool | None = None
@router.post("/analyze-url", response_model=PhishingResponse, tags=["Phishing"])
async def analyze_url(request: AnalyzeURLRequest):
"""
Analyze a URL for phishing indicators.
Returns: Phishing analysis result with risk score and reasons
"""
if not request.url:
raise HTTPException(status_code=400, detail="URL is required")
try:
result = await full_analysis(request.url, settings.PHISHTANK_API_KEY)
threat = threats_manager.add_threat(result)
return PhishingResponse(**threat)
except Exception as e:
logger.error(f"Analysis error: {e}")
raise HTTPException(status_code=500, detail="Internal server error during analysis")
@router.post("/analyze-email", response_model=EmailAnalysisResponse, tags=["Phishing"])
async def analyze_email(request: AnalyzeEmailRequest):
"""
Analyze an email for phishing, malicious links, and header anomalies.
Performs three parallel analyses:
- Intent & Sentiment: Detects phishing language and urgency tactics
- URL Analysis: Checks all embedded links for malware/phishing
- Header Anomaly: Detects domain spoofing and suspicious sender patterns
Returns: Combined risk score with detailed reasons for detection
"""
if not request.subject and not request.body_text:
raise HTTPException(status_code=400, detail="Email subject or body is required")
try:
from ..services import phishing_analyzer as url_analyzer
analyzer = get_email_analyzer()
result = await analyzer.analyze_email(
subject=request.subject,
sender=request.sender,
body_text=request.body_text,
links=request.links,
headers=request.headers,
phishing_analyzer=url_analyzer
)
print(f"[API] Email Analysis - Score: {result.get('riskScore')}, Category: {result.get('category')}, Sender: {request.sender}")
return EmailAnalysisResponse(**result)
except Exception as e:
logger.error(f"Email analysis error: {e}")
raise HTTPException(status_code=500, detail="Internal server error during analysis")
@router.get("/threats", tags=["Phishing"])
async def get_threats(
limit: int = 50,
offset: int = 0,
category: Optional[str] = None,
search: Optional[str] = None
):
"""
Get all stored threats with filtering, pagination, and stats.
Query params:
- limit: Number of threats to return (1-500, default 50)
- offset: Starting offset (default 0)
- category: Filter by category (high_risk, medium_risk, low_risk, trusted, invalid, malformed)
- search: Search by URL
"""
try:
return threats_manager.get_threats(
limit=limit,
offset=offset,
category=category,
search=search
)
except Exception as e:
import traceback
tb = traceback.format_exc()
logger.error(f"Threats fetch error: {e}\n{tb}")
# Write to file for debugging
with open("error_log.txt", "w") as f:
f.write(f"Error: {e}\n\nTraceback:\n{tb}\n\nParams: limit={limit} offset={offset} category={category} search={search}")
raise HTTPException(status_code=500, detail="Failed to fetch threats")
@router.get("/threats/{threat_id}", tags=["Phishing"])
async def get_threat(threat_id: int):
"""
Get a single threat by ID.
"""
threat = threats_manager.get_threat(threat_id)
if not threat:
raise HTTPException(status_code=404, detail="Threat not found")
return threat
|