KES-Hack / src /main.py
Meshyboi's picture
Update src/main.py
ec6454e verified
import os
import logging
from pathlib import Path
import tempfile
from fastapi import FastAPI, UploadFile, File, HTTPException, Depends, status
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from .config import settings
from .api import webhooks, phishing, xai, prompt_injection, mitigation
from .services.deepfake_detection import detector
from .services.deepfake_reasoning_engine import analyze_with_reasoning
from .services.event_hub import event_hub
from fastapi.responses import StreamingResponse
import asyncio
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
app = FastAPI(
title="Cyber Defense Chatbot API",
description="Webhook ingest and analysis processing for Deepfake, Phishing and SMS intercepting WhatsApp bot.",
version="1.0.0",
)
# CORS middleware for Frontend to communicate with Backend
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Models
class DeepfakeResponse(BaseModel):
is_deepfake: bool | None
confidence: float
message: str | None = None
error: str | None = None
details: dict | None = None
reasoning: str | None = None
key_factors: list | None = None
@app.on_event("startup")
async def startup_validate_llm():
"""Validate Featherless LLM model availability at startup."""
try:
from .services.featherless_llm import validate_model
await validate_model()
except Exception as e:
logger.error(f"LLM model validation error during startup: {e}")
# Routes - Webhooks
app.include_router(webhooks.router, prefix="/webhooks", tags=["webhooks"])
# Routes - Phishing Detection
app.include_router(phishing.router, prefix="/api", tags=["phishing"])
# Routes - XAI & Explainability
app.include_router(xai.router, prefix="/api", tags=["xai"])
# Routes - Prompt Injection Detection
app.include_router(prompt_injection.router, prefix="/api", tags=["prompt_injection"])
# Routes - Mitigation Reports
app.include_router(mitigation.router, prefix="/api", tags=["mitigation"])
# Routes - Authentication
from .api import auth
app.include_router(auth.router, prefix="/auth", tags=["auth"])
from .services.voice_history_manager import voice_history_manager
@app.get("/api/voice/history", tags=["Voice"])
async def get_voice_history():
"""Get recent voice call history."""
try:
return voice_history_manager.get_history()
except Exception as e:
logger.error(f"Error fetching voice history: {e}")
raise HTTPException(status_code=500, detail="Failed to fetch voice history")
@app.get("/api/events", tags=["Real-time"])
async def stream_events():
"""
Server-Sent Events (SSE) endpoint to stream real-time threat alerts.
"""
return StreamingResponse(
event_hub.subscribe(),
media_type="text/event-stream"
)
# Routes - Deepfake Detection
@app.post("/detect/audio", response_model=DeepfakeResponse, tags=["Detection"])
async def detect_audio_deepfake(file: UploadFile = File(...)):
"""
Detect deepfake in audio file.
Supports: MP3, WAV, OGG, M4A, FLAC
"""
if detector is None:
raise HTTPException(
status_code=503, detail="Deepfake detector not initialized"
)
# Validate file type
valid_audio_types = {
"audio/mpeg",
"audio/wav",
"audio/ogg",
"audio/mp4",
"audio/flac",
"application/octet-stream",
}
if file.content_type not in valid_audio_types:
raise HTTPException(
status_code=400,
detail=f"Invalid audio format. Supported: MP3, WAV, OGG, M4A, FLAC",
)
temp_file = None
try:
# Save uploaded file temporarily
with tempfile.NamedTemporaryFile(delete=False, suffix=".audio") as tmp:
contents = await file.read()
tmp.write(contents)
temp_file = tmp.name
# Detect deepfake
logger.info(f"Analyzing audio file: {file.filename}")
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None, detector.detect_audio_deepfake, temp_file
)
return DeepfakeResponse(
is_deepfake=result.get("is_deepfake"),
confidence=result.get("confidence", 0.0),
message=result.get("message"),
error=result.get("error"),
details={
k: v
for k, v in result.items()
if k not in ["is_deepfake", "confidence", "message", "error"]
},
)
except Exception as e:
logger.error(f"Error processing audio: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
# Clean up temporary file
if temp_file and os.path.exists(temp_file):
try:
os.remove(temp_file)
except Exception as e:
logger.warning(f"Could not delete temp file: {e}")
@app.post("/detect/video", response_model=DeepfakeResponse, tags=["Detection"])
async def detect_video_deepfake(file: UploadFile = File(...)):
"""
Detect deepfake in video file.
Supports: MP4, AVI, MKV, MOV, WEBM
"""
if detector is None:
raise HTTPException(
status_code=503, detail="Deepfake detector not initialized"
)
# Validate file type
valid_video_types = {
"video/mp4",
"video/x-msvideo",
"video/x-matroska",
"video/quicktime",
"video/webm",
"application/octet-stream",
}
if file.content_type not in valid_video_types:
raise HTTPException(
status_code=400,
detail=f"Invalid video format. Supported: MP4, AVI, MKV, MOV, WEBM",
)
temp_file = None
try:
# Save uploaded file temporarily
with tempfile.NamedTemporaryFile(delete=False, suffix=".video") as tmp:
contents = await file.read()
tmp.write(contents)
temp_file = tmp.name
# Detect deepfake
logger.info(f"Analyzing video file: {file.filename}")
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None, detector.detect_video_deepfake, temp_file
)
return DeepfakeResponse(
is_deepfake=result.get("is_deepfake"),
confidence=result.get("confidence", 0.0),
message=result.get("message"),
error=result.get("error"),
details={
k: v
for k, v in result.items()
if k not in ["is_deepfake", "confidence", "message", "error"]
},
)
except Exception as e:
logger.error(f"Error processing video: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
# Clean up temporary file
if temp_file and os.path.exists(temp_file):
try:
os.remove(temp_file)
except Exception as e:
logger.warning(f"Could not delete temp file: {e}")
@app.post("/detect/image", response_model=DeepfakeResponse, tags=["Detection"])
async def detect_image_deepfake(file: UploadFile = File(...)):
"""
Detect deepfake in an image file.
Supports: JPG, JPEG, PNG, WEBP, AVIF
"""
if detector is None:
raise HTTPException(
status_code=503, detail="Deepfake detector not initialized"
)
# Validate file type
valid_image_types = {
"image/jpeg",
"image/png",
"image/webp",
"image/avif",
"application/octet-stream",
}
if file.content_type not in valid_image_types:
raise HTTPException(
status_code=400,
detail=f"Invalid image format. Supported: JPG, JPEG, PNG, WEBP, AVIF",
)
temp_file = None
try:
# Save uploaded file temporarily
with tempfile.NamedTemporaryFile(delete=False, suffix=".image") as tmp:
contents = await file.read()
tmp.write(contents)
temp_file = tmp.name
# Detect deepfake
logger.info(f"Analyzing image file: {file.filename}")
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None, detector.detect_image_deepfake, temp_file
)
return DeepfakeResponse(
is_deepfake=result.get("is_deepfake"),
confidence=result.get("confidence", 0.0),
message=result.get("message"),
error=result.get("error"),
details={
k: v
for k, v in result.items()
if k not in ["is_deepfake", "confidence", "message", "error"]
},
)
except Exception as e:
logger.error(f"Error processing image: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
# Clean up temporary file
if temp_file and os.path.exists(temp_file):
try:
os.remove(temp_file)
except Exception as e:
logger.warning(f"Could not delete temp file: {e}")
@app.post("/detect", response_model=DeepfakeResponse, tags=["Detection"])
async def detect_deepfake(file: UploadFile = File(...)):
"""
Auto-detect and analyze file (audio or video).
Automatically determines file type and applies appropriate detection.
"""
if detector is None:
raise HTTPException(
status_code=503, detail="Deepfake detector not initialized"
)
temp_file = None
try:
# Save uploaded file temporarily
with tempfile.NamedTemporaryFile(delete=False) as tmp:
contents = await file.read()
tmp.write(contents)
temp_file = tmp.name
# Auto-detect based on file extension
file_ext = Path(file.filename).suffix.lower()
logger.info(f"Detecting type for: {file.filename} ({file_ext})")
loop = asyncio.get_event_loop()
if file_ext in [".mp3", ".wav", ".ogg", ".m4a", ".flac"]:
result = await loop.run_in_executor(
None, detector.detect_audio_deepfake, temp_file
)
elif file_ext in [".mp4", ".avi", ".mkv", ".mov", ".webm"]:
result = await loop.run_in_executor(
None, detector.detect_video_deepfake, temp_file
)
elif file_ext in [".jpg", ".jpeg", ".png", ".webp", ".avif"]:
result = await loop.run_in_executor(
None, detector.detect_image_deepfake, temp_file
)
else:
return DeepfakeResponse(
is_deepfake=None,
confidence=0.0,
error=f"Unsupported file format: {file_ext}",
)
# Determine media type for reasoning engine
audio_exts = [".mp3", ".wav", ".ogg", ".m4a", ".flac"]
video_exts = [".mp4", ".avi", ".mkv", ".mov", ".webm"]
image_exts = [".jpg", ".jpeg", ".png", ".webp", ".avif"]
if file_ext in audio_exts:
media_type = "audio"
elif file_ext in video_exts:
media_type = "video"
elif file_ext in image_exts:
media_type = "image"
else:
media_type = "unknown"
# Run LLM reasoning on top of ML detection
result = await analyze_with_reasoning(result, media_type)
return DeepfakeResponse(
is_deepfake=result.get("is_deepfake"),
confidence=result.get("confidence", 0.0),
message=result.get("message"),
error=result.get("error"),
reasoning=result.get("reasoning"),
key_factors=result.get("key_factors"),
details={
k: v
for k, v in result.items()
if k not in ["is_deepfake", "confidence", "message", "error", "reasoning", "key_factors"]
},
)
except Exception as e:
logger.error(f"Error processing file: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
# Clean up temporary file
if temp_file and os.path.exists(temp_file):
try:
os.remove(temp_file)
except Exception as e:
logger.warning(f"Could not delete temp file: {e}")
# Health check endpoint
@app.get("/health", tags=["Health"])
async def health_check():
"""Health check endpoint."""
return {
"status": "ok",
"device": settings.DEVICE,
"environment": settings.API_HOST,
"detector_initialized": detector is not None,
}
# API information endpoint
@app.get("/", tags=["Info"])
async def root():
"""API information endpoint."""
return {
"name": "Cyber Defense Chatbot API",
"version": "1.0.0",
"description": "Unified API for deepfake detection, phishing detection, and webhook processing",
"endpoints": {
"health": "/health",
"detect_audio": "/detect/audio",
"detect_video": "/detect/video",
"detect_image": "/detect/image",
"auto_detect": "/detect",
"webhooks": "/webhooks",
},
"supported_formats": {
"audio": [".mp3", ".wav", ".ogg", ".m4a", ".flac"],
"video": [".mp4", ".avi", ".mkv", ".mov", ".webm"],
"image": [".jpg", ".jpeg", ".png", ".webp", ".avif"],
},
}
if __name__ == "__main__":
import uvicorn
uvicorn.run("src.main:app", host=settings.API_HOST, port=settings.API_PORT, reload=settings.DEBUG)