| """ |
| Deepfake Authenticator - FastAPI Backend |
| """ |
|
|
| import os |
| import uuid |
| import logging |
| import shutil |
| import subprocess |
| from pathlib import Path |
|
|
| from fastapi import FastAPI, File, UploadFile, HTTPException, Header |
| from fastapi.middleware.cors import CORSMiddleware |
| from typing import Optional |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", |
| ) |
| logger = logging.getLogger(__name__) |
|
|
| from detector import DeepfakeAuthenticator |
| from auth import validate_api_key, check_usage_limit, increment_usage |
|
|
| |
| def convert_to_mp4(src: Path): |
| """ |
| Convert a video file to mp4 using the bundled ffmpeg binary. |
| Returns the converted Path, or None if conversion failed. |
| """ |
| if src.suffix.lower() == ".mp4": |
| return None |
|
|
| dst = src.with_suffix(".mp4") |
|
|
| |
| ffmpeg_bin = "ffmpeg" |
| try: |
| import imageio_ffmpeg |
| ffmpeg_bin = imageio_ffmpeg.get_ffmpeg_exe() |
| logger.info(f"Using bundled ffmpeg: {ffmpeg_bin}") |
| except Exception as e: |
| logger.warning(f"imageio_ffmpeg not available, trying system ffmpeg: {e}") |
|
|
| try: |
| result = subprocess.run( |
| [ |
| ffmpeg_bin, "-y", "-i", str(src), |
| "-c:v", "libx264", "-preset", "ultrafast", "-crf", "28", |
| "-c:a", "aac", "-movflags", "+faststart", |
| str(dst), |
| ], |
| capture_output=True, |
| timeout=120, |
| ) |
| if result.returncode == 0 and dst.exists() and dst.stat().st_size > 1000: |
| logger.info(f"Converted {src.name} -> {dst.name} ({dst.stat().st_size // 1024} KB)") |
| return dst |
| stderr = result.stderr.decode(errors="ignore")[-500:] |
| logger.warning(f"ffmpeg exit {result.returncode}: {stderr}") |
| except Exception as e: |
| logger.warning(f"ffmpeg conversion failed: {e}") |
|
|
| |
| try: |
| try: |
| from moviepy import VideoFileClip |
| except ImportError: |
| from moviepy.editor import VideoFileClip |
| clip = VideoFileClip(str(src)) |
| clip.write_videofile( |
| str(dst), codec="libx264", audio_codec="aac", |
| logger=None, preset="ultrafast", |
| ) |
| clip.close() |
| if dst.exists() and dst.stat().st_size > 1000: |
| logger.info(f"moviepy converted {src.name} -> {dst.name}") |
| return dst |
| except Exception as e: |
| logger.warning(f"moviepy conversion also failed: {e}") |
|
|
| return None |
|
|
|
|
| |
| app = FastAPI( |
| title="Deepfake Authenticator API", |
| description="AI-powered deepfake detection using MediaPipe + HuggingFace", |
| version="1.0.0", |
| ) |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=[ |
| "http://localhost:5173", |
| "http://localhost:3000", |
| "https://*.vercel.app", |
| "https://authrix.vercel.app", |
| |
| ], |
| allow_origin_regex=r"https://.*\.vercel\.app", |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| UPLOAD_DIR = Path("uploads") |
| UPLOAD_DIR.mkdir(exist_ok=True) |
|
|
| ALLOWED_EXTENSIONS = {".mp4", ".avi", ".mov", ".mkv", ".webm", ".wmv"} |
| MAX_FILE_SIZE_MB = 100 |
|
|
| |
| authenticator = None |
|
|
|
|
| @app.on_event("startup") |
| async def startup_event(): |
| global authenticator |
| logger.info("Initializing DeepfakeAuthenticator...") |
| authenticator = DeepfakeAuthenticator() |
| logger.info( |
| f"DeepfakeAuthenticator ready β model: " |
| f"{'HuggingFace' if authenticator.decision_agent.use_hf_model else 'Heuristic'}" |
| ) |
|
|
|
|
| |
|
|
| @app.get("/health") |
| async def health(): |
| agent = authenticator.decision_agent if authenticator else None |
| if agent and agent.use_hf_model: |
| model_info = f"Ensemble ({len(agent.models)} ViT models)" |
| elif agent: |
| model_info = "Heuristic" |
| else: |
| model_info = "Loading" |
| return { |
| "status": "ok", |
| "model": model_info, |
| "ready": authenticator is not None, |
| } |
|
|
|
|
| @app.post("/clear-cache") |
| async def clear_cache(): |
| """Clear the result cache.""" |
| try: |
| from detector import _result_cache |
| cache_size = len(_result_cache) |
| _result_cache.clear() |
| logger.info(f"Cache cleared: {cache_size} entries removed") |
| return { |
| "status": "success", |
| "message": f"Cleared {cache_size} cached results", |
| "entries_removed": cache_size |
| } |
| except Exception as e: |
| logger.error(f"Failed to clear cache: {e}") |
| raise HTTPException(status_code=500, detail=f"Failed to clear cache: {str(e)}") |
|
|
|
|
| @app.post("/analyze-url") |
| async def analyze_from_url(payload: dict): |
| """Download a video from a URL and analyze it. Used by the browser extension.""" |
| if not authenticator: |
| raise HTTPException(status_code=503, detail="Server is still initializing") |
|
|
| video_url = payload.get("url", "").strip() |
| if not video_url: |
| raise HTTPException(status_code=400, detail="No URL provided") |
|
|
| tmp_prefix = UPLOAD_DIR / f"ext_{uuid.uuid4().hex}" |
| actual_path = None |
| downloaded = False |
|
|
| try: |
| |
| try: |
| import yt_dlp |
| ydl_opts = { |
| "format": "bestvideo[ext=mp4][height<=720]+bestaudio[ext=m4a]/best[ext=mp4][height<=720]/best", |
| "outtmpl": str(tmp_prefix) + ".%(ext)s", |
| "quiet": True, |
| "no_warnings": True, |
| "merge_output_format": "mp4", |
| } |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
| ydl.download([video_url]) |
|
|
| for ext in (".mp4", ".webm", ".mkv", ".avi", ".mov"): |
| candidate = Path(str(tmp_prefix) + ext) |
| if candidate.exists() and candidate.stat().st_size > 1000: |
| actual_path = candidate |
| downloaded = True |
| logger.info(f"yt-dlp: {actual_path.name} ({actual_path.stat().st_size // 1024}KB)") |
| break |
|
|
| if not downloaded: |
| for f in sorted(UPLOAD_DIR.glob(f"{tmp_prefix.name}*")): |
| if f.stat().st_size > 1000: |
| actual_path = f |
| downloaded = True |
| logger.info(f"yt-dlp (glob): {actual_path.name}") |
| break |
|
|
| except ImportError: |
| logger.info("yt-dlp not installed β trying direct HTTP fetch") |
| except Exception as e: |
| logger.warning(f"yt-dlp failed ({e}) β trying direct fetch") |
|
|
| |
| if not downloaded: |
| try: |
| import httpx |
| actual_path = Path(str(tmp_prefix) + ".mp4") |
| async with httpx.AsyncClient(timeout=60, follow_redirects=True) as client: |
| r = await client.get(video_url, headers={"User-Agent": "Mozilla/5.0"}) |
| if r.status_code == 200 and len(r.content) > 1000: |
| actual_path.write_bytes(r.content) |
| downloaded = True |
| logger.info(f"Direct fetch: {len(r.content) // 1024}KB") |
| except Exception as e: |
| logger.warning(f"Direct fetch failed: {e}") |
|
|
| if not downloaded or actual_path is None: |
| raise HTTPException( |
| status_code=400, |
| detail="Could not download video. For YouTube, ensure yt-dlp is installed: pip install yt-dlp", |
| ) |
|
|
| |
| converted = convert_to_mp4(actual_path) |
| analyze_path = converted if converted else actual_path |
|
|
| result = authenticator.analyze(str(analyze_path)) |
| return result |
|
|
| except HTTPException: |
| raise |
| except Exception as e: |
| logger.exception(f"analyze-url failed: {e}") |
| raise HTTPException(status_code=500, detail=str(e)) |
| finally: |
| for f in UPLOAD_DIR.glob(f"{tmp_prefix.name}*"): |
| try: |
| f.unlink() |
| except Exception: |
| pass |
|
|
|
|
| @app.post("/analyze") |
| async def analyze_video( |
| file: UploadFile = File(...), |
| x_api_key: Optional[str] = Header(None, alias="X-API-Key") |
| ): |
| """Analyze an uploaded video for deepfake content.""" |
| import asyncio |
| |
| if x_api_key: |
| key_data = validate_api_key(x_api_key) |
| if not key_data: |
| raise HTTPException(status_code=401, detail="Invalid API key") |
| |
| allowed, used, limit = check_usage_limit(x_api_key) |
| if not allowed: |
| raise HTTPException( |
| status_code=429, |
| detail=f"Monthly limit exceeded ({used}/{limit}). Upgrade your plan at https://authrix.ai/pricing" |
| ) |
| |
| logger.info(f"API request from {key_data['email']} ({key_data['tier']}) - {used+1}/{limit}") |
| else: |
| logger.info("Local request (no API key)") |
|
|
| if not authenticator: |
| raise HTTPException(status_code=503, detail="Server is still initializing, please retry.") |
|
|
| suffix = Path(file.filename).suffix.lower() |
| if suffix not in ALLOWED_EXTENSIONS: |
| raise HTTPException( |
| status_code=400, |
| detail=f"Unsupported file type '{suffix}'. Allowed: {', '.join(ALLOWED_EXTENSIONS)}", |
| ) |
|
|
| unique_name = f"{uuid.uuid4().hex}{suffix}" |
| save_path = UPLOAD_DIR / unique_name |
| converted_path = None |
|
|
| try: |
| content = await file.read() |
| size_mb = len(content) / (1024 * 1024) |
| if size_mb > MAX_FILE_SIZE_MB: |
| raise HTTPException( |
| status_code=413, |
| detail=f"File too large ({size_mb:.1f} MB). Max allowed: {MAX_FILE_SIZE_MB} MB", |
| ) |
|
|
| save_path.write_bytes(content) |
| logger.info(f"Saved upload: {unique_name} ({size_mb:.1f} MB)") |
|
|
| |
| analyze_path = save_path |
| if suffix in (".webm", ".mkv", ".avi", ".wmv"): |
| logger.info(f"File has {suffix} extension β conversion needed") |
| converted_path = convert_to_mp4(save_path) |
| if converted_path: |
| analyze_path = converted_path |
| logger.info(f"β Conversion successful β using {analyze_path.name}") |
| else: |
| logger.error(f"β Conversion FAILED for {suffix} β will attempt direct analysis (likely to fail)") |
| else: |
| logger.info(f"File is {suffix} β no conversion needed") |
|
|
| logger.info(f"Calling authenticator.analyze({analyze_path})") |
| |
| try: |
| import cv2 as _cv2 |
| _cap = _cv2.VideoCapture(str(analyze_path)) |
| _fps = _cap.get(_cv2.CAP_PROP_FPS) |
| _tot = _cap.get(_cv2.CAP_PROP_FRAME_COUNT) |
| _cap.release() |
| duration = _tot / _fps if _fps > 0 else 999 |
| except Exception: |
| duration = 999 |
| fast = duration < 30 |
| logger.info(f"Video duration: {duration:.1f}s β fast_mode={fast}") |
|
|
| |
| import asyncio, concurrent.futures as _cf |
| loop = asyncio.get_event_loop() |
| with _cf.ThreadPoolExecutor(max_workers=1) as pool: |
| try: |
| result = await asyncio.wait_for( |
| loop.run_in_executor(pool, lambda: authenticator.analyze(str(analyze_path), fast_mode=fast)), |
| timeout=120.0 |
| ) |
| except asyncio.TimeoutError: |
| raise HTTPException(status_code=504, detail="Analysis timed out after 120s. Try a shorter video.") |
| |
| |
| if x_api_key: |
| increment_usage(x_api_key) |
| |
| return result |
|
|
| except HTTPException: |
| raise |
| except Exception as e: |
| logger.exception(f"Analysis failed for {unique_name}: {e}") |
| |
| error_log = UPLOAD_DIR / "last_error.txt" |
| import traceback |
| error_log.write_text(f"File: {unique_name}\nError: {e}\n\n{traceback.format_exc()}") |
| raise HTTPException(status_code=500, detail=f"Analysis failed: {str(e)}") |
| finally: |
| for p in [save_path, converted_path]: |
| if p is not None and p.exists(): |
| try: |
| p.unlink() |
| logger.info(f"Cleaned up: {p.name}") |
| except Exception: |
| pass |
|
|
|
|
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=False) |
|
|