| import os |
| import logging |
| from pathlib import Path |
| import tempfile |
| from fastapi import FastAPI, UploadFile, File, HTTPException, Depends, status |
| from fastapi.responses import JSONResponse |
| from fastapi.middleware.cors import CORSMiddleware |
| from pydantic import BaseModel |
| from .config import settings |
| from .api import webhooks, phishing, xai, prompt_injection, mitigation |
| from .services.deepfake_detection import detector |
| from .services.deepfake_reasoning_engine import analyze_with_reasoning |
| from .services.event_hub import event_hub |
| from fastapi.responses import StreamingResponse |
| import asyncio |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", |
| ) |
| logger = logging.getLogger(__name__) |
|
|
| app = FastAPI( |
| title="Cyber Defense Chatbot API", |
| description="Webhook ingest and analysis processing for Deepfake, Phishing and SMS intercepting WhatsApp bot.", |
| version="1.0.0", |
| ) |
|
|
| |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
|
|
| |
| class DeepfakeResponse(BaseModel): |
| is_deepfake: bool | None |
| confidence: float |
| message: str | None = None |
| error: str | None = None |
| details: dict | None = None |
| reasoning: str | None = None |
| key_factors: list | None = None |
|
|
|
|
| @app.on_event("startup") |
| async def startup_validate_llm(): |
| """Validate Featherless LLM model availability at startup.""" |
| try: |
| from .services.featherless_llm import validate_model |
| await validate_model() |
| except Exception as e: |
| logger.error(f"LLM model validation error during startup: {e}") |
|
|
|
|
| |
| app.include_router(webhooks.router, prefix="/webhooks", tags=["webhooks"]) |
|
|
| |
| app.include_router(phishing.router, prefix="/api", tags=["phishing"]) |
|
|
| |
| app.include_router(xai.router, prefix="/api", tags=["xai"]) |
|
|
| |
| app.include_router(prompt_injection.router, prefix="/api", tags=["prompt_injection"]) |
|
|
| |
| app.include_router(mitigation.router, prefix="/api", tags=["mitigation"]) |
|
|
| |
| from .api import auth |
| app.include_router(auth.router, prefix="/auth", tags=["auth"]) |
|
|
| from .services.voice_history_manager import voice_history_manager |
|
|
| @app.get("/api/voice/history", tags=["Voice"]) |
| async def get_voice_history(): |
| """Get recent voice call history.""" |
| try: |
| return voice_history_manager.get_history() |
| except Exception as e: |
| logger.error(f"Error fetching voice history: {e}") |
| raise HTTPException(status_code=500, detail="Failed to fetch voice history") |
|
|
|
|
| @app.get("/api/events", tags=["Real-time"]) |
| async def stream_events(): |
| """ |
| Server-Sent Events (SSE) endpoint to stream real-time threat alerts. |
| """ |
| return StreamingResponse( |
| event_hub.subscribe(), |
| media_type="text/event-stream" |
| ) |
|
|
|
|
| |
| @app.post("/detect/audio", response_model=DeepfakeResponse, tags=["Detection"]) |
| async def detect_audio_deepfake(file: UploadFile = File(...)): |
| """ |
| Detect deepfake in audio file. |
| |
| Supports: MP3, WAV, OGG, M4A, FLAC |
| """ |
| if detector is None: |
| raise HTTPException( |
| status_code=503, detail="Deepfake detector not initialized" |
| ) |
|
|
| |
| valid_audio_types = { |
| "audio/mpeg", |
| "audio/wav", |
| "audio/ogg", |
| "audio/mp4", |
| "audio/flac", |
| "application/octet-stream", |
| } |
| if file.content_type not in valid_audio_types: |
| raise HTTPException( |
| status_code=400, |
| detail=f"Invalid audio format. Supported: MP3, WAV, OGG, M4A, FLAC", |
| ) |
|
|
| temp_file = None |
| try: |
| |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".audio") as tmp: |
| contents = await file.read() |
| tmp.write(contents) |
| temp_file = tmp.name |
|
|
| |
| logger.info(f"Analyzing audio file: {file.filename}") |
| loop = asyncio.get_event_loop() |
| result = await loop.run_in_executor( |
| None, detector.detect_audio_deepfake, temp_file |
| ) |
|
|
| return DeepfakeResponse( |
| is_deepfake=result.get("is_deepfake"), |
| confidence=result.get("confidence", 0.0), |
| message=result.get("message"), |
| error=result.get("error"), |
| details={ |
| k: v |
| for k, v in result.items() |
| if k not in ["is_deepfake", "confidence", "message", "error"] |
| }, |
| ) |
|
|
| except Exception as e: |
| logger.error(f"Error processing audio: {e}") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| finally: |
| |
| if temp_file and os.path.exists(temp_file): |
| try: |
| os.remove(temp_file) |
| except Exception as e: |
| logger.warning(f"Could not delete temp file: {e}") |
|
|
|
|
| @app.post("/detect/video", response_model=DeepfakeResponse, tags=["Detection"]) |
| async def detect_video_deepfake(file: UploadFile = File(...)): |
| """ |
| Detect deepfake in video file. |
| |
| Supports: MP4, AVI, MKV, MOV, WEBM |
| """ |
| if detector is None: |
| raise HTTPException( |
| status_code=503, detail="Deepfake detector not initialized" |
| ) |
|
|
| |
| valid_video_types = { |
| "video/mp4", |
| "video/x-msvideo", |
| "video/x-matroska", |
| "video/quicktime", |
| "video/webm", |
| "application/octet-stream", |
| } |
| if file.content_type not in valid_video_types: |
| raise HTTPException( |
| status_code=400, |
| detail=f"Invalid video format. Supported: MP4, AVI, MKV, MOV, WEBM", |
| ) |
|
|
| temp_file = None |
| try: |
| |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".video") as tmp: |
| contents = await file.read() |
| tmp.write(contents) |
| temp_file = tmp.name |
|
|
| |
| logger.info(f"Analyzing video file: {file.filename}") |
| loop = asyncio.get_event_loop() |
| result = await loop.run_in_executor( |
| None, detector.detect_video_deepfake, temp_file |
| ) |
|
|
| return DeepfakeResponse( |
| is_deepfake=result.get("is_deepfake"), |
| confidence=result.get("confidence", 0.0), |
| message=result.get("message"), |
| error=result.get("error"), |
| details={ |
| k: v |
| for k, v in result.items() |
| if k not in ["is_deepfake", "confidence", "message", "error"] |
| }, |
| ) |
|
|
| except Exception as e: |
| logger.error(f"Error processing video: {e}") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| finally: |
| |
| if temp_file and os.path.exists(temp_file): |
| try: |
| os.remove(temp_file) |
| except Exception as e: |
| logger.warning(f"Could not delete temp file: {e}") |
|
|
|
|
| @app.post("/detect/image", response_model=DeepfakeResponse, tags=["Detection"]) |
| async def detect_image_deepfake(file: UploadFile = File(...)): |
| """ |
| Detect deepfake in an image file. |
| |
| Supports: JPG, JPEG, PNG, WEBP, AVIF |
| """ |
| if detector is None: |
| raise HTTPException( |
| status_code=503, detail="Deepfake detector not initialized" |
| ) |
|
|
| |
| valid_image_types = { |
| "image/jpeg", |
| "image/png", |
| "image/webp", |
| "image/avif", |
| "application/octet-stream", |
| } |
| if file.content_type not in valid_image_types: |
| raise HTTPException( |
| status_code=400, |
| detail=f"Invalid image format. Supported: JPG, JPEG, PNG, WEBP, AVIF", |
| ) |
|
|
| temp_file = None |
| try: |
| |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".image") as tmp: |
| contents = await file.read() |
| tmp.write(contents) |
| temp_file = tmp.name |
|
|
| |
| logger.info(f"Analyzing image file: {file.filename}") |
| loop = asyncio.get_event_loop() |
| result = await loop.run_in_executor( |
| None, detector.detect_image_deepfake, temp_file |
| ) |
|
|
| return DeepfakeResponse( |
| is_deepfake=result.get("is_deepfake"), |
| confidence=result.get("confidence", 0.0), |
| message=result.get("message"), |
| error=result.get("error"), |
| details={ |
| k: v |
| for k, v in result.items() |
| if k not in ["is_deepfake", "confidence", "message", "error"] |
| }, |
| ) |
|
|
| except Exception as e: |
| logger.error(f"Error processing image: {e}") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| finally: |
| |
| if temp_file and os.path.exists(temp_file): |
| try: |
| os.remove(temp_file) |
| except Exception as e: |
| logger.warning(f"Could not delete temp file: {e}") |
|
|
|
|
|
|
| @app.post("/detect", response_model=DeepfakeResponse, tags=["Detection"]) |
| async def detect_deepfake(file: UploadFile = File(...)): |
| """ |
| Auto-detect and analyze file (audio or video). |
| |
| Automatically determines file type and applies appropriate detection. |
| """ |
| if detector is None: |
| raise HTTPException( |
| status_code=503, detail="Deepfake detector not initialized" |
| ) |
|
|
| temp_file = None |
| try: |
| |
| with tempfile.NamedTemporaryFile(delete=False) as tmp: |
| contents = await file.read() |
| tmp.write(contents) |
| temp_file = tmp.name |
|
|
| |
| file_ext = Path(file.filename).suffix.lower() |
| logger.info(f"Detecting type for: {file.filename} ({file_ext})") |
|
|
| loop = asyncio.get_event_loop() |
| if file_ext in [".mp3", ".wav", ".ogg", ".m4a", ".flac"]: |
| result = await loop.run_in_executor( |
| None, detector.detect_audio_deepfake, temp_file |
| ) |
| elif file_ext in [".mp4", ".avi", ".mkv", ".mov", ".webm"]: |
| result = await loop.run_in_executor( |
| None, detector.detect_video_deepfake, temp_file |
| ) |
| elif file_ext in [".jpg", ".jpeg", ".png", ".webp", ".avif"]: |
| result = await loop.run_in_executor( |
| None, detector.detect_image_deepfake, temp_file |
| ) |
| else: |
| return DeepfakeResponse( |
| is_deepfake=None, |
| confidence=0.0, |
| error=f"Unsupported file format: {file_ext}", |
| ) |
|
|
| |
| audio_exts = [".mp3", ".wav", ".ogg", ".m4a", ".flac"] |
| video_exts = [".mp4", ".avi", ".mkv", ".mov", ".webm"] |
| image_exts = [".jpg", ".jpeg", ".png", ".webp", ".avif"] |
| if file_ext in audio_exts: |
| media_type = "audio" |
| elif file_ext in video_exts: |
| media_type = "video" |
| elif file_ext in image_exts: |
| media_type = "image" |
| else: |
| media_type = "unknown" |
|
|
| |
| result = await analyze_with_reasoning(result, media_type) |
|
|
| return DeepfakeResponse( |
| is_deepfake=result.get("is_deepfake"), |
| confidence=result.get("confidence", 0.0), |
| message=result.get("message"), |
| error=result.get("error"), |
| reasoning=result.get("reasoning"), |
| key_factors=result.get("key_factors"), |
| details={ |
| k: v |
| for k, v in result.items() |
| if k not in ["is_deepfake", "confidence", "message", "error", "reasoning", "key_factors"] |
| }, |
| ) |
|
|
| except Exception as e: |
| logger.error(f"Error processing file: {e}") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| finally: |
| |
| if temp_file and os.path.exists(temp_file): |
| try: |
| os.remove(temp_file) |
| except Exception as e: |
| logger.warning(f"Could not delete temp file: {e}") |
|
|
|
|
| |
| @app.get("/health", tags=["Health"]) |
| async def health_check(): |
| """Health check endpoint.""" |
| return { |
| "status": "ok", |
| "device": settings.DEVICE, |
| "environment": settings.API_HOST, |
| "detector_initialized": detector is not None, |
| } |
|
|
|
|
| |
| @app.get("/", tags=["Info"]) |
| async def root(): |
| """API information endpoint.""" |
| return { |
| "name": "Cyber Defense Chatbot API", |
| "version": "1.0.0", |
| "description": "Unified API for deepfake detection, phishing detection, and webhook processing", |
| "endpoints": { |
| "health": "/health", |
| "detect_audio": "/detect/audio", |
| "detect_video": "/detect/video", |
| "detect_image": "/detect/image", |
| "auto_detect": "/detect", |
| "webhooks": "/webhooks", |
| }, |
| "supported_formats": { |
| "audio": [".mp3", ".wav", ".ogg", ".m4a", ".flac"], |
| "video": [".mp4", ".avi", ".mkv", ".mov", ".webm"], |
| "image": [".jpg", ".jpeg", ".png", ".webp", ".avif"], |
| }, |
|
|
| } |
|
|
|
|
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run("src.main:app", host=settings.API_HOST, port=settings.API_PORT, reload=settings.DEBUG) |
|
|