File size: 13,640 Bytes
6b659dc 3337c5d 6b659dc 3337c5d 6b659dc 3337c5d 6b659dc 3337c5d 6b659dc 3337c5d 6b659dc 1dfcfc7 6b659dc 3337c5d 6b659dc c9cd8bf 6c71866 20dd477 6c71866 3337c5d 6c71866 3337c5d 6c71866 20dd477 6c71866 3337c5d 6c71866 20dd477 6c71866 20dd477 6c71866 20dd477 6c71866 3337c5d 6c71866 3337c5d 7902802 6c71866 20dd477 6c71866 6b659dc 3337c5d 695c9b0 3337c5d 6b659dc 3337c5d 6b659dc 3337c5d 6b659dc 3337c5d 6b659dc 3337c5d 695c9b0 3df7b7b 695c9b0 3df7b7b 695c9b0 3df7b7b 695c9b0 3337c5d 6b659dc 3337c5d 6b659dc 3337c5d 6b659dc | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 | """
Deepfake Authenticator - FastAPI Backend
"""
import os
import uuid
import logging
import shutil
import subprocess
from pathlib import Path
from fastapi import FastAPI, File, UploadFile, HTTPException, Header
from fastapi.middleware.cors import CORSMiddleware
from typing import Optional
# ββ Logging (must be set up before any logger usage) βββββββββββββββββββββββββ
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger(__name__)
from detector import DeepfakeAuthenticator
from auth import validate_api_key, check_usage_limit, increment_usage
# ββ Video conversion helper βββββββββββββββββββββββββββββββββββββββββββββββββββ
def convert_to_mp4(src: Path):
"""
Convert a video file to mp4 using the bundled ffmpeg binary.
Returns the converted Path, or None if conversion failed.
"""
if src.suffix.lower() == ".mp4":
return None # already mp4
dst = src.with_suffix(".mp4")
# Use imageio-ffmpeg bundled binary (always available, no system install needed)
ffmpeg_bin = "ffmpeg"
try:
import imageio_ffmpeg
ffmpeg_bin = imageio_ffmpeg.get_ffmpeg_exe()
logger.info(f"Using bundled ffmpeg: {ffmpeg_bin}")
except Exception as e:
logger.warning(f"imageio_ffmpeg not available, trying system ffmpeg: {e}")
try:
result = subprocess.run(
[
ffmpeg_bin, "-y", "-i", str(src),
"-c:v", "libx264", "-preset", "ultrafast", "-crf", "28",
"-c:a", "aac", "-movflags", "+faststart",
str(dst),
],
capture_output=True,
timeout=120,
)
if result.returncode == 0 and dst.exists() and dst.stat().st_size > 1000:
logger.info(f"Converted {src.name} -> {dst.name} ({dst.stat().st_size // 1024} KB)")
return dst
stderr = result.stderr.decode(errors="ignore")[-500:]
logger.warning(f"ffmpeg exit {result.returncode}: {stderr}")
except Exception as e:
logger.warning(f"ffmpeg conversion failed: {e}")
# Fallback: moviepy
try:
try:
from moviepy import VideoFileClip
except ImportError:
from moviepy.editor import VideoFileClip
clip = VideoFileClip(str(src))
clip.write_videofile(
str(dst), codec="libx264", audio_codec="aac",
logger=None, preset="ultrafast",
)
clip.close()
if dst.exists() and dst.stat().st_size > 1000:
logger.info(f"moviepy converted {src.name} -> {dst.name}")
return dst
except Exception as e:
logger.warning(f"moviepy conversion also failed: {e}")
return None
# ββ App setup ββββββββββββββββββββββββββββββββ
app = FastAPI(
title="Deepfake Authenticator API",
description="AI-powered deepfake detection using MediaPipe + HuggingFace",
version="1.0.0",
)
app.add_middleware(
CORSMiddleware,
allow_origins=[
"http://localhost:5173",
"http://localhost:3000",
"https://*.vercel.app",
"https://authrix.vercel.app",
# Add your custom domain here if you have one
],
allow_origin_regex=r"https://.*\.vercel\.app",
allow_methods=["*"],
allow_headers=["*"],
)
# ββ Upload directory ββββββββββββββββββββββββββ
UPLOAD_DIR = Path("uploads")
UPLOAD_DIR.mkdir(exist_ok=True)
ALLOWED_EXTENSIONS = {".mp4", ".avi", ".mov", ".mkv", ".webm", ".wmv"}
MAX_FILE_SIZE_MB = 100
# ββ Singleton authenticator βββββββββββββββββββ
authenticator = None
@app.on_event("startup")
async def startup_event():
global authenticator
logger.info("Initializing DeepfakeAuthenticator...")
authenticator = DeepfakeAuthenticator()
logger.info(
f"DeepfakeAuthenticator ready β model: "
f"{'HuggingFace' if authenticator.decision_agent.use_hf_model else 'Heuristic'}"
)
# ββ Routes ββββββββββββββββββββββββββββββββββββ
@app.get("/health")
async def health():
agent = authenticator.decision_agent if authenticator else None
if agent and agent.use_hf_model:
model_info = f"Ensemble ({len(agent.models)} ViT models)"
elif agent:
model_info = "Heuristic"
else:
model_info = "Loading"
return {
"status": "ok",
"model": model_info,
"ready": authenticator is not None,
}
@app.post("/clear-cache")
async def clear_cache():
"""Clear the result cache."""
try:
from detector import _result_cache
cache_size = len(_result_cache)
_result_cache.clear()
logger.info(f"Cache cleared: {cache_size} entries removed")
return {
"status": "success",
"message": f"Cleared {cache_size} cached results",
"entries_removed": cache_size
}
except Exception as e:
logger.error(f"Failed to clear cache: {e}")
raise HTTPException(status_code=500, detail=f"Failed to clear cache: {str(e)}")
@app.post("/analyze-url")
async def analyze_from_url(payload: dict):
"""Download a video from a URL and analyze it. Used by the browser extension."""
if not authenticator:
raise HTTPException(status_code=503, detail="Server is still initializing")
video_url = payload.get("url", "").strip()
if not video_url:
raise HTTPException(status_code=400, detail="No URL provided")
tmp_prefix = UPLOAD_DIR / f"ext_{uuid.uuid4().hex}"
actual_path = None
downloaded = False
try:
# yt-dlp: handles YouTube, Twitter, Instagram, TikTok
try:
import yt_dlp
ydl_opts = {
"format": "bestvideo[ext=mp4][height<=720]+bestaudio[ext=m4a]/best[ext=mp4][height<=720]/best",
"outtmpl": str(tmp_prefix) + ".%(ext)s",
"quiet": True,
"no_warnings": True,
"merge_output_format": "mp4",
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([video_url])
for ext in (".mp4", ".webm", ".mkv", ".avi", ".mov"):
candidate = Path(str(tmp_prefix) + ext)
if candidate.exists() and candidate.stat().st_size > 1000:
actual_path = candidate
downloaded = True
logger.info(f"yt-dlp: {actual_path.name} ({actual_path.stat().st_size // 1024}KB)")
break
if not downloaded:
for f in sorted(UPLOAD_DIR.glob(f"{tmp_prefix.name}*")):
if f.stat().st_size > 1000:
actual_path = f
downloaded = True
logger.info(f"yt-dlp (glob): {actual_path.name}")
break
except ImportError:
logger.info("yt-dlp not installed β trying direct HTTP fetch")
except Exception as e:
logger.warning(f"yt-dlp failed ({e}) β trying direct fetch")
# Fallback: direct HTTP fetch
if not downloaded:
try:
import httpx
actual_path = Path(str(tmp_prefix) + ".mp4")
async with httpx.AsyncClient(timeout=60, follow_redirects=True) as client:
r = await client.get(video_url, headers={"User-Agent": "Mozilla/5.0"})
if r.status_code == 200 and len(r.content) > 1000:
actual_path.write_bytes(r.content)
downloaded = True
logger.info(f"Direct fetch: {len(r.content) // 1024}KB")
except Exception as e:
logger.warning(f"Direct fetch failed: {e}")
if not downloaded or actual_path is None:
raise HTTPException(
status_code=400,
detail="Could not download video. For YouTube, ensure yt-dlp is installed: pip install yt-dlp",
)
# Convert if needed
converted = convert_to_mp4(actual_path)
analyze_path = converted if converted else actual_path
result = authenticator.analyze(str(analyze_path)) # full mode for URL downloads
return result
except HTTPException:
raise
except Exception as e:
logger.exception(f"analyze-url failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
for f in UPLOAD_DIR.glob(f"{tmp_prefix.name}*"):
try:
f.unlink()
except Exception:
pass
@app.post("/analyze")
async def analyze_video(
file: UploadFile = File(...),
x_api_key: Optional[str] = Header(None, alias="X-API-Key")
):
"""Analyze an uploaded video for deepfake content."""
import asyncio
# Check API key (allow localhost without key for development)
if x_api_key:
key_data = validate_api_key(x_api_key)
if not key_data:
raise HTTPException(status_code=401, detail="Invalid API key")
allowed, used, limit = check_usage_limit(x_api_key)
if not allowed:
raise HTTPException(
status_code=429,
detail=f"Monthly limit exceeded ({used}/{limit}). Upgrade your plan at https://authrix.ai/pricing"
)
logger.info(f"API request from {key_data['email']} ({key_data['tier']}) - {used+1}/{limit}")
else:
logger.info("Local request (no API key)")
if not authenticator:
raise HTTPException(status_code=503, detail="Server is still initializing, please retry.")
suffix = Path(file.filename).suffix.lower()
if suffix not in ALLOWED_EXTENSIONS:
raise HTTPException(
status_code=400,
detail=f"Unsupported file type '{suffix}'. Allowed: {', '.join(ALLOWED_EXTENSIONS)}",
)
unique_name = f"{uuid.uuid4().hex}{suffix}"
save_path = UPLOAD_DIR / unique_name
converted_path = None
try:
content = await file.read()
size_mb = len(content) / (1024 * 1024)
if size_mb > MAX_FILE_SIZE_MB:
raise HTTPException(
status_code=413,
detail=f"File too large ({size_mb:.1f} MB). Max allowed: {MAX_FILE_SIZE_MB} MB",
)
save_path.write_bytes(content)
logger.info(f"Saved upload: {unique_name} ({size_mb:.1f} MB)")
# Convert webm/mkv/etc to mp4 β OpenCV on Windows cannot decode webm natively
analyze_path = save_path
if suffix in (".webm", ".mkv", ".avi", ".wmv"):
logger.info(f"File has {suffix} extension β conversion needed")
converted_path = convert_to_mp4(save_path)
if converted_path:
analyze_path = converted_path
logger.info(f"β Conversion successful β using {analyze_path.name}")
else:
logger.error(f"β Conversion FAILED for {suffix} β will attempt direct analysis (likely to fail)")
else:
logger.info(f"File is {suffix} β no conversion needed")
logger.info(f"Calling authenticator.analyze({analyze_path})")
# Detect duration for fast_mode
try:
import cv2 as _cv2
_cap = _cv2.VideoCapture(str(analyze_path))
_fps = _cap.get(_cv2.CAP_PROP_FPS)
_tot = _cap.get(_cv2.CAP_PROP_FRAME_COUNT)
_cap.release()
duration = _tot / _fps if _fps > 0 else 999
except Exception:
duration = 999
fast = duration < 30
logger.info(f"Video duration: {duration:.1f}s β fast_mode={fast}")
# Run with 120s timeout β never hang forever
import asyncio, concurrent.futures as _cf
loop = asyncio.get_event_loop()
with _cf.ThreadPoolExecutor(max_workers=1) as pool:
try:
result = await asyncio.wait_for(
loop.run_in_executor(pool, lambda: authenticator.analyze(str(analyze_path), fast_mode=fast)),
timeout=120.0
)
except asyncio.TimeoutError:
raise HTTPException(status_code=504, detail="Analysis timed out after 120s. Try a shorter video.")
# Increment usage counter if API key provided
if x_api_key:
increment_usage(x_api_key)
return result
except HTTPException:
raise
except Exception as e:
logger.exception(f"Analysis failed for {unique_name}: {e}")
# Write detailed error to file for debugging
error_log = UPLOAD_DIR / "last_error.txt"
import traceback
error_log.write_text(f"File: {unique_name}\nError: {e}\n\n{traceback.format_exc()}")
raise HTTPException(status_code=500, detail=f"Analysis failed: {str(e)}")
finally:
for p in [save_path, converted_path]:
if p is not None and p.exists():
try:
p.unlink()
logger.info(f"Cleaned up: {p.name}")
except Exception:
pass
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=False)
|