feat(auth): login to Hugging Face Hub via HUGGINGFACE_HUB_TOKEN; set HF_TOKEN/TOKEN; default OMP_NUM_THREADS=1
80b99f5
| from fastapi import FastAPI, File, UploadFile, HTTPException, BackgroundTasks, Depends, Security, Request | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer | |
| from pydantic import BaseModel | |
| from typing import Optional, List | |
| import os | |
| import uuid | |
| import asyncio | |
| from datetime import datetime | |
| import motor.motor_asyncio | |
| from bson import ObjectId | |
| import json | |
| import shutil | |
| from pathlib import Path | |
| from fastapi.responses import FileResponse, StreamingResponse, JSONResponse | |
| import logging | |
| from logging.handlers import BufferingHandler | |
| from firebase_app_check import verify_app_check_token, verify_firebase_id_token | |
| from huggingface_hub import login as hf_login | |
| # Import face swap functionality | |
| import sys | |
| sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| import DeepFakeAI.globals as DF_G | |
| from DeepFakeAI import utilities as DF_U | |
| from DeepFakeAI.processors.frame.modules import face_swapper as DF_FS | |
| app = FastAPI(title="Face Swap Video API", version="1.0.0") | |
| # Authentication | |
| API_PASSWORD = os.getenv("API_PASSWORD", "logicgo_videoswap@153") | |
| security = HTTPBearer() | |
| # Authenticate Hugging Face Hub for model downloads (private/rate-limited) | |
| _hf_token = os.getenv("HUGGINGFACE_HUB_TOKEN") or os.getenv("HF_TOKEN") | |
| if _hf_token: | |
| try: | |
| hf_login(token=_hf_token) # nosec - token provided via env | |
| except Exception: | |
| pass | |
| # Also expose legacy env names some utils expect | |
| os.environ.setdefault("HF_TOKEN", _hf_token) | |
| os.environ.setdefault("TOKEN", _hf_token) | |
| # Ensure sane threads to avoid libgomp warnings | |
| os.environ.setdefault("OMP_NUM_THREADS", "1") | |
| def verify_api_key(credentials: HTTPAuthorizationCredentials = Security(security)): | |
| """Verify API key from Bearer token. | |
| Supports two authentication methods: | |
| 1. Static password (API_PASSWORD) | |
| 2. Firebase ID token (from Firebase Auth) | |
| """ | |
| token = credentials.credentials | |
| # Try static password first | |
| if token == API_PASSWORD: | |
| return {"auth_mode": "static", "token": token} | |
| # Try Firebase ID token | |
| firebase_claims = verify_firebase_id_token(token) | |
| if firebase_claims: | |
| return {"auth_mode": "firebase", "claims": firebase_claims} | |
| # If neither works, raise error | |
| raise HTTPException( | |
| status_code=401, | |
| detail="Invalid authentication credentials. Use static password or Firebase ID token.", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| # CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Firebase App Check configuration | |
| APP_CHECK_ENABLED = os.getenv("APP_CHECK_ENABLED", "false").lower() == "true" | |
| def verify_app_check(request: Request): | |
| """Dependency to enforce Firebase App Check when enabled. | |
| Looks for token in headers: 'X-Firebase-AppCheck' or 'X-Firebase-AppCheck-Token'. | |
| """ | |
| if not APP_CHECK_ENABLED: | |
| return True | |
| token = request.headers.get("X-Firebase-AppCheck") or request.headers.get("X-Firebase-AppCheck-Token") | |
| if not token or not verify_app_check_token(token): | |
| raise HTTPException(status_code=401, detail="Invalid or missing Firebase App Check token") | |
| return True | |
| # MongoDB connection | |
| MONGODB_URL = os.getenv("MONGODB_URL", "mongodb+srv://itishalogicgo_db_user:HR837xi0B9yh2vZK@cluster0.jeeytpz.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0") | |
| DATABASE_NAME = "face_swap_video" | |
| try: | |
| client = motor.motor_asyncio.AsyncIOMotorClient(MONGODB_URL) | |
| db = client[DATABASE_NAME] | |
| # Collections | |
| source_images_collection = db["source_images"] | |
| target_videos_collection = db["target_videos"] | |
| result_videos_collection = db["result_videos"] | |
| jobs_collection = db["processing_jobs"] | |
| api_logs_collection = db["api_logs"] # For logging API requests | |
| except Exception as e: | |
| print(f"Warning: MongoDB connection failed at startup: {e}") | |
| print("App will continue but MongoDB operations will fail") | |
| client = None | |
| db = None | |
| source_images_collection = None | |
| target_videos_collection = None | |
| result_videos_collection = None | |
| jobs_collection = None | |
| api_logs_collection = None | |
| # Setup logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| async def log_to_mongodb(level: str, message: str, endpoint: str = None, method: str = None, | |
| status_code: int = None, user_ip: str = None, error: str = None): | |
| """Log events to MongoDB""" | |
| if api_logs_collection is None: | |
| return | |
| try: | |
| log_entry = { | |
| "timestamp": datetime.utcnow(), | |
| "level": level, | |
| "message": message, | |
| "endpoint": endpoint, | |
| "method": method, | |
| "status_code": status_code, | |
| "user_ip": user_ip, | |
| "error": error | |
| } | |
| await api_logs_collection.insert_one(log_entry) | |
| except Exception as e: | |
| logger.error(f"Failed to log to MongoDB: {e}") | |
| # Middleware to log API requests | |
| async def log_requests(request: Request, call_next): | |
| """Middleware to log all API requests to MongoDB""" | |
| start_time = datetime.utcnow() | |
| user_ip = request.client.host if request.client else None | |
| try: | |
| response = await call_next(request) | |
| process_time = (datetime.utcnow() - start_time).total_seconds() | |
| # Log successful request | |
| await log_to_mongodb( | |
| level="INFO", | |
| message=f"{request.method} {request.url.path} - {response.status_code}", | |
| endpoint=str(request.url.path), | |
| method=request.method, | |
| status_code=response.status_code, | |
| user_ip=user_ip | |
| ) | |
| return response | |
| except Exception as e: | |
| # Log error | |
| await log_to_mongodb( | |
| level="ERROR", | |
| message=f"Error processing {request.method} {request.url.path}", | |
| endpoint=str(request.url.path), | |
| method=request.method, | |
| user_ip=user_ip, | |
| error=str(e) | |
| ) | |
| raise | |
| # Upload directories | |
| UPLOAD_DIR = Path("uploads") | |
| SOURCE_IMAGES_DIR = UPLOAD_DIR / "source_images" | |
| TARGET_VIDEOS_DIR = UPLOAD_DIR / "target_videos" | |
| RESULT_VIDEOS_DIR = UPLOAD_DIR / "result_videos" | |
| # Create directories | |
| for dir_path in [UPLOAD_DIR, SOURCE_IMAGES_DIR, TARGET_VIDEOS_DIR, RESULT_VIDEOS_DIR]: | |
| dir_path.mkdir(parents=True, exist_ok=True) | |
| def _run_local_faceswap(source_image_path: str, target_video_path: str) -> Optional[str]: | |
| # Configure defaults for local pipeline | |
| DF_G.source_path = source_image_path | |
| DF_G.target_path = target_video_path | |
| DF_G.output_video_encoder = 'libx264' | |
| DF_G.output_video_quality = 20 | |
| DF_G.temp_frame_format = 'png' | |
| DF_G.temp_frame_quality = 95 | |
| DF_G.keep_temp = False | |
| DF_G.skip_audio = False | |
| # Face processing options | |
| DF_G.face_recognition = ['many'] | |
| DF_G.reference_frame_number = 0 | |
| DF_G.execution_thread_count = 2 | |
| DF_G.execution_queue_count = 2 | |
| # Prefer CUDA (GPU) if available; fallback to CPU | |
| try: | |
| DF_G.execution_providers = DF_U.decode_execution_providers(['cuda', 'cpu']) | |
| except: | |
| DF_G.execution_providers = DF_U.decode_execution_providers(['cpu']) | |
| # Fix invalid OMP thread settings | |
| try: | |
| import os as _os | |
| _os.environ["OMP_NUM_THREADS"] = "1" | |
| except: | |
| pass | |
| # Ensure model exists | |
| model_dir = DF_U.resolve_relative_path('../.assets/models') | |
| os.makedirs(model_dir, exist_ok=True) | |
| model_path = os.path.join(model_dir, 'inswapper_128.onnx') | |
| if not os.path.exists(model_path): | |
| from huggingface_hub import hf_hub_download | |
| token = os.environ.get('TOKEN') or os.environ.get('HF_TOKEN') | |
| for repo_id in ['zihaomu/inswapper_128.onnx', 'linyi/inswapper_128.onnx', 'banodoco/inswapper_128.onnx']: | |
| try: | |
| model_path = hf_hub_download(repo_id=repo_id, filename='inswapper_128.onnx', token=token) | |
| break | |
| except: | |
| continue | |
| if os.path.exists(model_path): | |
| os.environ['INSWAPPER_PATH'] = model_path | |
| DF_FS.pre_check() | |
| # Extract frames | |
| fps = DF_U.detect_fps(target_video_path) or 12.0 | |
| DF_U.create_temp(target_video_path) | |
| ok = DF_U.extract_frames(target_video_path, fps) | |
| if not ok: | |
| return None | |
| temp_frames = DF_U.get_temp_frame_paths(target_video_path) | |
| if not temp_frames: | |
| return None | |
| # Process frames | |
| DF_FS.process_video(source_image_path, temp_frames) | |
| # Rebuild video and restore audio | |
| if not DF_U.create_video(target_video_path, fps): | |
| return None | |
| out_path = DF_U.normalize_output_path(source_image_path, target_video_path, str(RESULT_VIDEOS_DIR / f"out_{uuid.uuid4().hex}.mp4")) | |
| DF_U.restore_audio(target_video_path, out_path) | |
| DF_U.clear_temp(target_video_path) | |
| return out_path | |
| # Pydantic models | |
| class SourceImageResponse(BaseModel): | |
| id: str | |
| filename: str | |
| file_path: str | |
| uploaded_at: datetime | |
| status: str | |
| class TargetVideoResponse(BaseModel): | |
| id: str | |
| filename: str | |
| file_path: str | |
| uploaded_at: datetime | |
| status: str | |
| class ResultVideoResponse(BaseModel): | |
| id: str | |
| source_image_id: str | |
| target_video_id: str | |
| result_file_path: str | |
| created_at: datetime | |
| status: str | |
| processing_time: Optional[float] = None | |
| class FaceSwapRequest(BaseModel): | |
| source_image_id: str | |
| target_video_id: str | |
| class JobStatus(BaseModel): | |
| job_id: str | |
| status: str | |
| progress: Optional[float] = None | |
| result_video_id: Optional[str] = None | |
| result_video_url: Optional[str] = None # HTTPS download URL | |
| error: Optional[str] = None | |
| # Base URL for generating download links | |
| BASE_URL = os.getenv("BASE_URL", "https://logicgoinfotechspaces-face-swap-video.hf.space") | |
| def get_result_video_url(result_video_id: str) -> str: | |
| """Generate HTTPS download URL for result video""" | |
| return f"{BASE_URL}/api/result-video/{result_video_id}" | |
| # Helper functions | |
| def save_file_to_disk(file: UploadFile, directory: Path) -> str: | |
| """Save uploaded file to disk and return the file path""" | |
| file_extension = Path(file.filename).suffix | |
| unique_filename = f"{uuid.uuid4().hex}{file_extension}" | |
| file_path = directory / unique_filename | |
| with open(file_path, "wb") as buffer: | |
| shutil.copyfileobj(file.file, buffer) | |
| return str(file_path) | |
| async def process_face_swap(job_id: str, source_image_path: str, target_video_path: str): | |
| """Background task to process face swap""" | |
| try: | |
| # Update job status to processing | |
| await jobs_collection.update_one( | |
| {"job_id": job_id}, | |
| {"$set": {"status": "processing", "progress": 0.0}} | |
| ) | |
| # Run face swap | |
| result_path = _run_local_faceswap(source_image_path, target_video_path) | |
| if result_path and os.path.exists(result_path): | |
| # Save result to MongoDB | |
| result_doc = { | |
| "source_image_path": source_image_path, | |
| "target_video_path": target_video_path, | |
| "result_file_path": result_path, | |
| "created_at": datetime.utcnow(), | |
| "status": "completed", | |
| "job_id": job_id | |
| } | |
| result = await result_videos_collection.insert_one(result_doc) | |
| result_video_id = str(result.inserted_id) | |
| # Update job status to completed | |
| await jobs_collection.update_one( | |
| {"job_id": job_id}, | |
| {"$set": { | |
| "status": "completed", | |
| "progress": 100.0, | |
| "result_video_id": result_video_id, | |
| "result_video_url": get_result_video_url(result_video_id) | |
| }} | |
| ) | |
| else: | |
| # Update job status to failed | |
| await jobs_collection.update_one( | |
| {"job_id": job_id}, | |
| {"$set": { | |
| "status": "failed", | |
| "error": "Face swap processing failed" | |
| }} | |
| ) | |
| except Exception as e: | |
| # Update job status to failed | |
| await jobs_collection.update_one( | |
| {"job_id": job_id}, | |
| {"$set": { | |
| "status": "failed", | |
| "error": str(e) | |
| }} | |
| ) | |
| # API Endpoints | |
| async def upload_source_image( | |
| file: UploadFile = File(...), | |
| api_key: dict = Depends(verify_api_key), | |
| _app_check_ok: bool = Depends(verify_app_check) | |
| ): | |
| """Upload and store source image in MongoDB""" | |
| if not file.content_type.startswith('image/'): | |
| raise HTTPException(status_code=400, detail="File must be an image") | |
| try: | |
| # Save file to disk | |
| file_path = save_file_to_disk(file, SOURCE_IMAGES_DIR) | |
| # Store metadata in MongoDB | |
| doc = { | |
| "filename": file.filename, | |
| "file_path": file_path, | |
| "uploaded_at": datetime.utcnow(), | |
| "status": "uploaded", | |
| "content_type": file.content_type, | |
| "file_size": os.path.getsize(file_path) | |
| } | |
| result = await source_images_collection.insert_one(doc) | |
| return SourceImageResponse( | |
| id=str(result.inserted_id), | |
| filename=file.filename, | |
| file_path=file_path, | |
| uploaded_at=doc["uploaded_at"], | |
| status=doc["status"] | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error uploading source image: {str(e)}") | |
| async def upload_target_video( | |
| file: UploadFile = File(...), | |
| api_key: dict = Depends(verify_api_key), | |
| _app_check_ok: bool = Depends(verify_app_check) | |
| ): | |
| """Upload and store target video in MongoDB""" | |
| if not file.content_type.startswith('video/'): | |
| raise HTTPException(status_code=400, detail="File must be a video") | |
| try: | |
| # Save file to disk | |
| file_path = save_file_to_disk(file, TARGET_VIDEOS_DIR) | |
| # Store metadata in MongoDB | |
| doc = { | |
| "filename": file.filename, | |
| "file_path": file_path, | |
| "uploaded_at": datetime.utcnow(), | |
| "status": "uploaded", | |
| "content_type": file.content_type, | |
| "file_size": os.path.getsize(file_path) | |
| } | |
| result = await target_videos_collection.insert_one(doc) | |
| return TargetVideoResponse( | |
| id=str(result.inserted_id), | |
| filename=file.filename, | |
| file_path=file_path, | |
| uploaded_at=doc["uploaded_at"], | |
| status=doc["status"] | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error uploading target video: {str(e)}") | |
| async def start_face_swap( | |
| request: FaceSwapRequest, | |
| background_tasks: BackgroundTasks, | |
| api_key: dict = Depends(verify_api_key), | |
| _app_check_ok: bool = Depends(verify_app_check) | |
| ): | |
| """Start face swap processing""" | |
| try: | |
| # Get source image and target video from MongoDB | |
| source_image = await source_images_collection.find_one({"_id": ObjectId(request.source_image_id)}) | |
| target_video = await target_videos_collection.find_one({"_id": ObjectId(request.target_video_id)}) | |
| if not source_image: | |
| raise HTTPException(status_code=404, detail="Source image not found") | |
| if not target_video: | |
| raise HTTPException(status_code=404, detail="Target video not found") | |
| # Create job record | |
| job_id = str(uuid.uuid4()) | |
| job_doc = { | |
| "job_id": job_id, | |
| "source_image_id": request.source_image_id, | |
| "target_video_id": request.target_video_id, | |
| "status": "queued", | |
| "created_at": datetime.utcnow(), | |
| "progress": 0.0 | |
| } | |
| await jobs_collection.insert_one(job_doc) | |
| # Start background processing | |
| background_tasks.add_task( | |
| process_face_swap, | |
| job_id, | |
| source_image["file_path"], | |
| target_video["file_path"] | |
| ) | |
| return JobStatus( | |
| job_id=job_id, | |
| status="queued", | |
| progress=0.0 | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error starting face swap: {str(e)}") | |
| async def get_job_status(job_id: str, api_key: dict = Depends(verify_api_key), _app_check_ok: bool = Depends(verify_app_check)): | |
| """Get job status""" | |
| job = await jobs_collection.find_one({"job_id": job_id}) | |
| if not job: | |
| raise HTTPException(status_code=404, detail="Job not found") | |
| result_video_url = None | |
| if job.get("result_video_id"): | |
| result_video_url = get_result_video_url(job["result_video_id"]) | |
| return JobStatus( | |
| job_id=job["job_id"], | |
| status=job["status"], | |
| progress=job.get("progress"), | |
| result_video_id=job.get("result_video_id"), | |
| result_video_url=result_video_url, | |
| error=job.get("error") | |
| ) | |
| async def get_result_video(result_video_id: str, api_key: dict = Depends(verify_api_key), _app_check_ok: bool = Depends(verify_app_check)): | |
| """Get result video file""" | |
| result = await result_videos_collection.find_one({"_id": ObjectId(result_video_id)}) | |
| if not result: | |
| raise HTTPException(status_code=404, detail="Result video not found") | |
| if not os.path.exists(result["result_file_path"]): | |
| raise HTTPException(status_code=404, detail="Result video file not found") | |
| return FileResponse( | |
| path=result["result_file_path"], | |
| media_type="video/mp4", | |
| filename=f"face_swap_result_{result_video_id}.mp4" | |
| ) | |
| async def list_source_images(api_key: dict = Depends(verify_api_key), _app_check_ok: bool = Depends(verify_app_check)): | |
| """List all source images""" | |
| cursor = source_images_collection.find().sort("uploaded_at", -1) | |
| images = [] | |
| async for doc in cursor: | |
| images.append(SourceImageResponse( | |
| id=str(doc["_id"]), | |
| filename=doc["filename"], | |
| file_path=doc["file_path"], | |
| uploaded_at=doc["uploaded_at"], | |
| status=doc["status"] | |
| )) | |
| return images | |
| async def list_target_videos(api_key: dict = Depends(verify_api_key), _app_check_ok: bool = Depends(verify_app_check)): | |
| """List all target videos""" | |
| cursor = target_videos_collection.find().sort("uploaded_at", -1) | |
| videos = [] | |
| async for doc in cursor: | |
| videos.append(TargetVideoResponse( | |
| id=str(doc["_id"]), | |
| filename=doc["filename"], | |
| file_path=doc["file_path"], | |
| uploaded_at=doc["uploaded_at"], | |
| status=doc["status"] | |
| )) | |
| return videos | |
| async def list_result_videos(api_key: dict = Depends(verify_api_key), _app_check_ok: bool = Depends(verify_app_check)): | |
| """List all result videos""" | |
| cursor = result_videos_collection.find().sort("created_at", -1) | |
| results = [] | |
| async for doc in cursor: | |
| results.append(ResultVideoResponse( | |
| id=str(doc["_id"]), | |
| source_image_id=doc.get("source_image_path", ""), | |
| target_video_id=doc.get("target_video_path", ""), | |
| result_file_path=doc["result_file_path"], | |
| created_at=doc["created_at"], | |
| status=doc["status"], | |
| processing_time=doc.get("processing_time") | |
| )) | |
| return results | |
| async def api_health(api_key: dict = Depends(verify_api_key), _app_check_ok: bool = Depends(verify_app_check)): | |
| """Health check endpoint with GPU status (requires authentication)""" | |
| import onnxruntime | |
| available_providers = onnxruntime.get_available_providers() | |
| gpu_available = 'CUDAExecutionProvider' in available_providers | |
| return { | |
| "status": "ok", | |
| "time": datetime.utcnow().isoformat(), | |
| "gpu_available": gpu_available, | |
| "execution_providers": available_providers | |
| } | |
| async def root(): | |
| """Root endpoint - shows API is running""" | |
| return { | |
| "message": "Face Swap Video API is running", | |
| "version": "1.0.0", | |
| "docs": "/docs", | |
| "health": "/api/health" | |
| } | |
| # Test endpoint to verify API is accessible (no auth required for testing) | |
| async def test_endpoint(): | |
| """Test endpoint to verify API is running (public endpoint)""" | |
| return { | |
| "status": "ok", | |
| "message": "API is accessible", | |
| "authentication": "Bearer token required for all endpoints except /api/test", | |
| "endpoints": { | |
| "upload_source": "POST /api/source-image", | |
| "upload_target": "POST /api/target-video", | |
| "face_swap": "POST /api/face-swap", | |
| "health": "GET /api/health" | |
| } | |
| } | |
| # API Logs endpoint | |
| async def get_api_logs( | |
| limit: int = 100, | |
| level: Optional[str] = None, | |
| endpoint: Optional[str] = None, | |
| api_key: dict = Depends(verify_api_key) | |
| ): | |
| """Get API logs from MongoDB""" | |
| if api_logs_collection is None: | |
| raise HTTPException(status_code=503, detail="MongoDB not available") | |
| try: | |
| query = {} | |
| if level: | |
| query["level"] = level.upper() | |
| if endpoint: | |
| query["endpoint"] = endpoint | |
| cursor = api_logs_collection.find(query).sort("timestamp", -1).limit(limit) | |
| logs = [] | |
| async for doc in cursor: | |
| logs.append({ | |
| "timestamp": doc["timestamp"].isoformat() if doc.get("timestamp") else None, | |
| "level": doc.get("level"), | |
| "message": doc.get("message"), | |
| "endpoint": doc.get("endpoint"), | |
| "method": doc.get("method"), | |
| "status_code": doc.get("status_code"), | |
| "user_ip": doc.get("user_ip"), | |
| "error": doc.get("error") | |
| }) | |
| return { | |
| "total": len(logs), | |
| "logs": logs | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error fetching logs: {str(e)}") | |
| # User info endpoint | |
| async def get_current_user(api_key: dict = Depends(verify_api_key)): | |
| """Get current authenticated user info""" | |
| auth_mode = api_key.get("auth_mode") | |
| if auth_mode == "firebase": | |
| claims = api_key.get("claims", {}) | |
| return { | |
| "auth_mode": "firebase", | |
| "user_id": claims.get("uid"), | |
| "email": claims.get("email"), | |
| "email_verified": claims.get("email_verified", False), | |
| "name": claims.get("name"), | |
| "picture": claims.get("picture"), | |
| "firebase": { | |
| "sign_in_provider": claims.get("firebase", {}).get("sign_in_provider"), | |
| "iss": claims.get("iss"), | |
| "aud": claims.get("aud"), | |
| "auth_time": claims.get("auth_time"), | |
| "exp": claims.get("exp") | |
| } | |
| } | |
| else: | |
| return { | |
| "auth_mode": "static", | |
| "message": "Using static API password authentication" | |
| } | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |