""" Deepfake Authenticator - FastAPI Backend """ import os import uuid import logging import shutil import subprocess from pathlib import Path from fastapi import FastAPI, File, UploadFile, HTTPException, Header from fastapi.middleware.cors import CORSMiddleware from typing import Optional # ── Logging (must be set up before any logger usage) ───────────────────────── logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) logger = logging.getLogger(__name__) from detector import DeepfakeAuthenticator from auth import validate_api_key, check_usage_limit, increment_usage # ── Video conversion helper ─────────────────────────────────────────────────── def convert_to_mp4(src: Path): """ Convert a video file to mp4 using the bundled ffmpeg binary. Returns the converted Path, or None if conversion failed. """ if src.suffix.lower() == ".mp4": return None # already mp4 dst = src.with_suffix(".mp4") # Use imageio-ffmpeg bundled binary (always available, no system install needed) ffmpeg_bin = "ffmpeg" try: import imageio_ffmpeg ffmpeg_bin = imageio_ffmpeg.get_ffmpeg_exe() logger.info(f"Using bundled ffmpeg: {ffmpeg_bin}") except Exception as e: logger.warning(f"imageio_ffmpeg not available, trying system ffmpeg: {e}") try: result = subprocess.run( [ ffmpeg_bin, "-y", "-i", str(src), "-c:v", "libx264", "-preset", "ultrafast", "-crf", "28", "-c:a", "aac", "-movflags", "+faststart", str(dst), ], capture_output=True, timeout=120, ) if result.returncode == 0 and dst.exists() and dst.stat().st_size > 1000: logger.info(f"Converted {src.name} -> {dst.name} ({dst.stat().st_size // 1024} KB)") return dst stderr = result.stderr.decode(errors="ignore")[-500:] logger.warning(f"ffmpeg exit {result.returncode}: {stderr}") except Exception as e: logger.warning(f"ffmpeg conversion failed: {e}") # Fallback: moviepy try: try: from moviepy import VideoFileClip except ImportError: from moviepy.editor import VideoFileClip clip = VideoFileClip(str(src)) clip.write_videofile( str(dst), codec="libx264", audio_codec="aac", logger=None, preset="ultrafast", ) clip.close() if dst.exists() and dst.stat().st_size > 1000: logger.info(f"moviepy converted {src.name} -> {dst.name}") return dst except Exception as e: logger.warning(f"moviepy conversion also failed: {e}") return None # ── App setup ──────────────────────────────── app = FastAPI( title="Deepfake Authenticator API", description="AI-powered deepfake detection using MediaPipe + HuggingFace", version="1.0.0", ) app.add_middleware( CORSMiddleware, allow_origins=[ "http://localhost:5173", "http://localhost:3000", "https://*.vercel.app", "https://authrix.vercel.app", # Add your custom domain here if you have one ], allow_origin_regex=r"https://.*\.vercel\.app", allow_methods=["*"], allow_headers=["*"], ) # ── Upload directory ────────────────────────── UPLOAD_DIR = Path("uploads") UPLOAD_DIR.mkdir(exist_ok=True) ALLOWED_EXTENSIONS = {".mp4", ".avi", ".mov", ".mkv", ".webm", ".wmv"} MAX_FILE_SIZE_MB = 100 # ── Singleton authenticator ─────────────────── authenticator = None @app.on_event("startup") async def startup_event(): global authenticator logger.info("Initializing DeepfakeAuthenticator...") authenticator = DeepfakeAuthenticator() logger.info( f"DeepfakeAuthenticator ready — model: " f"{'HuggingFace' if authenticator.decision_agent.use_hf_model else 'Heuristic'}" ) # ── Routes ──────────────────────────────────── @app.get("/health") async def health(): agent = authenticator.decision_agent if authenticator else None if agent and agent.use_hf_model: model_info = f"Ensemble ({len(agent.models)} ViT models)" elif agent: model_info = "Heuristic" else: model_info = "Loading" return { "status": "ok", "model": model_info, "ready": authenticator is not None, } @app.post("/clear-cache") async def clear_cache(): """Clear the result cache.""" try: from detector import _result_cache cache_size = len(_result_cache) _result_cache.clear() logger.info(f"Cache cleared: {cache_size} entries removed") return { "status": "success", "message": f"Cleared {cache_size} cached results", "entries_removed": cache_size } except Exception as e: logger.error(f"Failed to clear cache: {e}") raise HTTPException(status_code=500, detail=f"Failed to clear cache: {str(e)}") @app.post("/analyze-url") async def analyze_from_url(payload: dict): """Download a video from a URL and analyze it. Used by the browser extension.""" if not authenticator: raise HTTPException(status_code=503, detail="Server is still initializing") video_url = payload.get("url", "").strip() if not video_url: raise HTTPException(status_code=400, detail="No URL provided") tmp_prefix = UPLOAD_DIR / f"ext_{uuid.uuid4().hex}" actual_path = None downloaded = False try: # yt-dlp: handles YouTube, Twitter, Instagram, TikTok try: import yt_dlp ydl_opts = { "format": "bestvideo[ext=mp4][height<=720]+bestaudio[ext=m4a]/best[ext=mp4][height<=720]/best", "outtmpl": str(tmp_prefix) + ".%(ext)s", "quiet": True, "no_warnings": True, "merge_output_format": "mp4", } with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.download([video_url]) for ext in (".mp4", ".webm", ".mkv", ".avi", ".mov"): candidate = Path(str(tmp_prefix) + ext) if candidate.exists() and candidate.stat().st_size > 1000: actual_path = candidate downloaded = True logger.info(f"yt-dlp: {actual_path.name} ({actual_path.stat().st_size // 1024}KB)") break if not downloaded: for f in sorted(UPLOAD_DIR.glob(f"{tmp_prefix.name}*")): if f.stat().st_size > 1000: actual_path = f downloaded = True logger.info(f"yt-dlp (glob): {actual_path.name}") break except ImportError: logger.info("yt-dlp not installed — trying direct HTTP fetch") except Exception as e: logger.warning(f"yt-dlp failed ({e}) — trying direct fetch") # Fallback: direct HTTP fetch if not downloaded: try: import httpx actual_path = Path(str(tmp_prefix) + ".mp4") async with httpx.AsyncClient(timeout=60, follow_redirects=True) as client: r = await client.get(video_url, headers={"User-Agent": "Mozilla/5.0"}) if r.status_code == 200 and len(r.content) > 1000: actual_path.write_bytes(r.content) downloaded = True logger.info(f"Direct fetch: {len(r.content) // 1024}KB") except Exception as e: logger.warning(f"Direct fetch failed: {e}") if not downloaded or actual_path is None: raise HTTPException( status_code=400, detail="Could not download video. For YouTube, ensure yt-dlp is installed: pip install yt-dlp", ) # Convert if needed converted = convert_to_mp4(actual_path) analyze_path = converted if converted else actual_path result = authenticator.analyze(str(analyze_path)) # full mode for URL downloads return result except HTTPException: raise except Exception as e: logger.exception(f"analyze-url failed: {e}") raise HTTPException(status_code=500, detail=str(e)) finally: for f in UPLOAD_DIR.glob(f"{tmp_prefix.name}*"): try: f.unlink() except Exception: pass @app.post("/analyze") async def analyze_video( file: UploadFile = File(...), x_api_key: Optional[str] = Header(None, alias="X-API-Key") ): """Analyze an uploaded video for deepfake content.""" import asyncio # Check API key (allow localhost without key for development) if x_api_key: key_data = validate_api_key(x_api_key) if not key_data: raise HTTPException(status_code=401, detail="Invalid API key") allowed, used, limit = check_usage_limit(x_api_key) if not allowed: raise HTTPException( status_code=429, detail=f"Monthly limit exceeded ({used}/{limit}). Upgrade your plan at https://authrix.ai/pricing" ) logger.info(f"API request from {key_data['email']} ({key_data['tier']}) - {used+1}/{limit}") else: logger.info("Local request (no API key)") if not authenticator: raise HTTPException(status_code=503, detail="Server is still initializing, please retry.") suffix = Path(file.filename).suffix.lower() if suffix not in ALLOWED_EXTENSIONS: raise HTTPException( status_code=400, detail=f"Unsupported file type '{suffix}'. Allowed: {', '.join(ALLOWED_EXTENSIONS)}", ) unique_name = f"{uuid.uuid4().hex}{suffix}" save_path = UPLOAD_DIR / unique_name converted_path = None try: content = await file.read() size_mb = len(content) / (1024 * 1024) if size_mb > MAX_FILE_SIZE_MB: raise HTTPException( status_code=413, detail=f"File too large ({size_mb:.1f} MB). Max allowed: {MAX_FILE_SIZE_MB} MB", ) save_path.write_bytes(content) logger.info(f"Saved upload: {unique_name} ({size_mb:.1f} MB)") # Convert webm/mkv/etc to mp4 — OpenCV on Windows cannot decode webm natively analyze_path = save_path if suffix in (".webm", ".mkv", ".avi", ".wmv"): logger.info(f"File has {suffix} extension — conversion needed") converted_path = convert_to_mp4(save_path) if converted_path: analyze_path = converted_path logger.info(f"✓ Conversion successful — using {analyze_path.name}") else: logger.error(f"✗ Conversion FAILED for {suffix} — will attempt direct analysis (likely to fail)") else: logger.info(f"File is {suffix} — no conversion needed") logger.info(f"Calling authenticator.analyze({analyze_path})") # Detect duration for fast_mode try: import cv2 as _cv2 _cap = _cv2.VideoCapture(str(analyze_path)) _fps = _cap.get(_cv2.CAP_PROP_FPS) _tot = _cap.get(_cv2.CAP_PROP_FRAME_COUNT) _cap.release() duration = _tot / _fps if _fps > 0 else 999 except Exception: duration = 999 fast = duration < 30 logger.info(f"Video duration: {duration:.1f}s → fast_mode={fast}") # Run with 120s timeout — never hang forever import asyncio, concurrent.futures as _cf loop = asyncio.get_event_loop() with _cf.ThreadPoolExecutor(max_workers=1) as pool: try: result = await asyncio.wait_for( loop.run_in_executor(pool, lambda: authenticator.analyze(str(analyze_path), fast_mode=fast)), timeout=120.0 ) except asyncio.TimeoutError: raise HTTPException(status_code=504, detail="Analysis timed out after 120s. Try a shorter video.") # Increment usage counter if API key provided if x_api_key: increment_usage(x_api_key) return result except HTTPException: raise except Exception as e: logger.exception(f"Analysis failed for {unique_name}: {e}") # Write detailed error to file for debugging error_log = UPLOAD_DIR / "last_error.txt" import traceback error_log.write_text(f"File: {unique_name}\nError: {e}\n\n{traceback.format_exc()}") raise HTTPException(status_code=500, detail=f"Analysis failed: {str(e)}") finally: for p in [save_path, converted_path]: if p is not None and p.exists(): try: p.unlink() logger.info(f"Cleaned up: {p.name}") except Exception: pass if __name__ == "__main__": import uvicorn uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=False)