from __future__ import annotations import logging import os import sys import time from datetime import UTC, datetime from io import BytesIO from pathlib import Path from tempfile import NamedTemporaryFile from typing import Annotated from uuid import uuid4 from fastapi import Depends, FastAPI, File, Header, HTTPException, Request, UploadFile from fastapi.middleware.cors import CORSMiddleware from PIL import Image, UnidentifiedImageError from pydantic import BaseModel, Field ROOT = Path(__file__).resolve().parent SRC = ROOT / "src" if str(SRC) not in sys.path: sys.path.insert(0, str(SRC)) from deepfake_detector.auth_store import AuthStore, UserProfile from deepfake_detector.billing import LemonSqueezyBilling from deepfake_detector.google_auth import google_auth_enabled, verify_google_id_token from deepfake_detector.image_detector import ImageDetector from deepfake_detector.security import ( InMemoryRateLimiter, UploadLimits, normalize_suffix, validate_upload_metadata, validate_video_file, ) from deepfake_detector.video_detector import VideoDetector logging.basicConfig( level=os.getenv("LOG_LEVEL", "INFO").upper(), format="%(asctime)s %(levelname)s %(name)s %(message)s", ) logger = logging.getLogger("verilens.api") APP_VERSION = os.getenv("APP_VERSION", "1.0.0") FRONTEND_URL = os.getenv( "FRONTEND_URL", "https://tanishqkolhatkar93.github.io/Deep_Fake_Detection/" ) DEFAULT_ALLOWED_ORIGINS = ( "http://localhost:3000", "http://127.0.0.1:3000", "http://localhost:5500", "http://127.0.0.1:5500", "https://tanishqkolhatkar93.github.io", "https://tanishq93-deepfake-detection.hf.space", ) ALLOWED_ORIGINS = [ origin.strip() for origin in os.getenv("ALLOWED_ORIGINS", ",".join(DEFAULT_ALLOWED_ORIGINS)).split(",") if origin.strip() ] class ServiceInfoResponse(BaseModel): service: str version: str frontend_url: str docs: str openapi: str endpoints: list[str] class HealthResponse(BaseModel): status: str class VersionResponse(BaseModel): version: str class ModelMetadata(BaseModel): name: str threshold: float positive_label: str device: str class LimitsMetadata(BaseModel): max_image_bytes: int max_video_bytes: int max_video_duration_seconds: float rate_limit_max_requests: int rate_limit_window_seconds: int class MetadataResponse(BaseModel): service: str version: str frontend_url: str model: ModelMetadata limits: LimitsMetadata class DetectionEnvelope(BaseModel): request_id: str processed_at: str processing_ms: float = Field(..., ge=0) filename: str media_type: str model: str report: dict[str, object] usage: dict[str, int | str] | None = None class AuthConfigResponse(BaseModel): enabled: bool google_client_id: str | None free_image_limit: int free_video_limit: int class GoogleAuthRequest(BaseModel): id_token: str class UserResponse(BaseModel): email: str name: str picture_url: str plan_name: str subscription_status: str | None class AuthSessionResponse(BaseModel): session_token: str user: UserResponse usage: dict[str, int | str] class MeResponse(BaseModel): user: UserResponse usage: dict[str, int | str] class BillingPlanResponse(BaseModel): slug: str name: str price_label: str description: str image_limit: int video_limit: int featured: bool checkout_available: bool class BillingConfigResponse(BaseModel): enabled: bool provider: str plans: list[BillingPlanResponse] class BillingCheckoutRequest(BaseModel): plan_slug: str class BillingRedirectResponse(BaseModel): url: str class BillingPortalResponse(BaseModel): url: str class BillingWebhookResponse(BaseModel): status: str def _extract_bearer_token(authorization: str | None) -> str: if not authorization: raise HTTPException(status_code=401, detail="Authentication required.") scheme, _, token = authorization.partition(" ") if scheme.lower() != "bearer" or not token.strip(): raise HTTPException(status_code=401, detail="Invalid authorization header.") return token.strip() def _serialize_user(user: UserProfile) -> UserResponse: return { "email": user.email, "name": user.name, "picture_url": user.picture_url, "plan_name": user.plan_name, "subscription_status": user.subscription_status, } def _require_user( authorization: Annotated[str | None, Header(alias="Authorization")] = None, ) -> UserProfile: token = _extract_bearer_token(authorization) try: return auth_store.get_session_user(token) except KeyError as exc: raise HTTPException(status_code=401, detail="Session expired or invalid.") from exc app = FastAPI( title="Media Authenticity Detector API", description=( "HTTP service for binary Yes/No image and video checks using a local " "xRayon checkpoint-based model with hardened upload validation." ), version=APP_VERSION, ) app.add_middleware( CORSMiddleware, allow_origins=ALLOWED_ORIGINS, allow_credentials=False, allow_methods=["GET", "POST", "OPTIONS"], allow_headers=["*"], expose_headers=["X-Request-ID", "X-Process-Time-MS"], ) image_detector = ImageDetector() video_detector = VideoDetector(image_detector=image_detector) upload_limits = UploadLimits() rate_limiter = InMemoryRateLimiter() auth_store = AuthStore() billing = LemonSqueezyBilling() @app.middleware("http") async def add_request_context(request: Request, call_next): request.state.request_id = uuid4().hex started = time.perf_counter() response = await call_next(request) duration_ms = (time.perf_counter() - started) * 1000 response.headers["X-Request-ID"] = request.state.request_id response.headers["X-Process-Time-MS"] = f"{duration_ms:.2f}" logger.info( "request_complete request_id=%s method=%s path=%s status=%s duration_ms=%.2f", request.state.request_id, request.method, request.url.path, response.status_code, duration_ms, ) return response @app.get("/", response_model=ServiceInfoResponse) def root() -> ServiceInfoResponse: return { "service": "media-authenticity-detector", "version": APP_VERSION, "frontend_url": FRONTEND_URL, "docs": "/docs", "openapi": "/openapi.json", "endpoints": [ "/health", "/version", "/metadata", "/auth/config", "/auth/google", "/me", "/billing/config", "/billing/checkout", "/billing/portal", "/detect/image", "/detect/video", ], } @app.get("/health", response_model=HealthResponse) def health() -> HealthResponse: return {"status": "ok"} @app.get("/version", response_model=VersionResponse) def version() -> VersionResponse: return {"version": APP_VERSION} @app.get("/metadata", response_model=MetadataResponse) def metadata() -> MetadataResponse: return { "service": "media-authenticity-detector", "version": APP_VERSION, "frontend_url": FRONTEND_URL, "model": { "name": image_detector.model_id, "threshold": image_detector.threshold, "positive_label": image_detector.fake_label, "device": image_detector.device, }, "limits": { "max_image_bytes": upload_limits.max_image_bytes, "max_video_bytes": upload_limits.max_video_bytes, "max_video_duration_seconds": upload_limits.max_video_duration_seconds, "rate_limit_max_requests": rate_limiter.max_requests, "rate_limit_window_seconds": rate_limiter.window_seconds, }, } @app.get("/auth/config", response_model=AuthConfigResponse) def auth_config() -> AuthConfigResponse: client_id = os.getenv("GOOGLE_CLIENT_ID", "").strip() return { "enabled": google_auth_enabled(), "google_client_id": client_id or None, "free_image_limit": auth_store.free_image_limit, "free_video_limit": auth_store.free_video_limit, } @app.get("/billing/config", response_model=BillingConfigResponse) def billing_config() -> BillingConfigResponse: return billing.public_config() @app.post("/auth/google", response_model=AuthSessionResponse) def auth_google(payload: GoogleAuthRequest) -> AuthSessionResponse: try: identity = verify_google_id_token(payload.id_token) except ValueError as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc user = auth_store.upsert_google_user( email=identity.email, google_sub=identity.subject, name=identity.name, picture_url=identity.picture_url, ) session_token = auth_store.create_session(user.email) usage = auth_store.get_usage(user.email) return { "session_token": session_token, "user": _serialize_user(user), "usage": usage.to_dict(), } @app.post("/auth/logout") def auth_logout( _: Annotated[UserProfile, Depends(_require_user)], authorization: Annotated[str | None, Header(alias="Authorization")] = None, ) -> dict[str, str]: token = _extract_bearer_token(authorization) auth_store.delete_session(token) return {"status": "ok"} @app.get("/me", response_model=MeResponse) def me(current_user: Annotated[UserProfile, Depends(_require_user)]) -> MeResponse: usage = auth_store.get_usage(current_user.email) return { "user": _serialize_user(current_user), "usage": usage.to_dict(), } @app.post("/billing/checkout", response_model=BillingRedirectResponse) async def create_billing_checkout( payload: BillingCheckoutRequest, current_user: Annotated[UserProfile, Depends(_require_user)], ) -> BillingRedirectResponse: if current_user.lemon_subscription_id: raise HTTPException( status_code=409, detail="You already have an active subscription. Use the billing portal to manage it.", ) if payload.plan_slug == "free": raise HTTPException(status_code=400, detail="The free plan does not require checkout.") try: plan = billing.get_plan(payload.plan_slug) checkout_url = await billing.create_checkout_url( plan=plan, email=current_user.email, name=current_user.name, redirect_url=FRONTEND_URL, ) except ValueError as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc except RuntimeError as exc: raise HTTPException(status_code=503, detail=str(exc)) from exc except Exception as exc: logger.exception( "billing_checkout_failed email=%s plan=%s", current_user.email, payload.plan_slug, ) raise HTTPException(status_code=502, detail="Unable to start checkout right now.") from exc return {"url": checkout_url} @app.get("/billing/portal", response_model=BillingPortalResponse) async def billing_portal( current_user: Annotated[UserProfile, Depends(_require_user)], ) -> BillingPortalResponse: if not current_user.lemon_subscription_id: raise HTTPException( status_code=404, detail="No paid subscription is linked to this account yet.", ) try: urls = await billing.fetch_subscription_urls(current_user.lemon_subscription_id) except RuntimeError as exc: raise HTTPException(status_code=503, detail=str(exc)) from exc except Exception as exc: logger.exception( "billing_portal_failed email=%s subscription_id=%s", current_user.email, current_user.lemon_subscription_id, ) raise HTTPException(status_code=502, detail="Unable to open the billing portal right now.") from exc portal_url = ( urls.get("customer_portal_update_subscription") or urls.get("customer_portal") or urls.get("update_payment_method") ) if not portal_url: raise HTTPException(status_code=404, detail="No billing portal URL was available.") return {"url": portal_url} @app.post("/webhooks/lemonsqueezy", response_model=BillingWebhookResponse) async def lemonsqueezy_webhook( request: Request, x_signature: Annotated[str | None, Header(alias="X-Signature")] = None, ) -> BillingWebhookResponse: payload = await request.body() if not billing.verify_signature(payload, x_signature): raise HTTPException(status_code=401, detail="Invalid webhook signature.") event = await request.json() resource = event.get("data") or event attributes = resource.get("attributes") or {} meta = event.get("meta") or {} event_name = str(meta.get("event_name") or "").strip() custom_data = meta.get("custom_data") or {} user_email = ( str(custom_data.get("user_email") or "") or str(attributes.get("user_email") or "") or str(attributes.get("customer_email") or "") ).strip().lower() if not user_email: logger.warning("billing_webhook_missing_email event=%s", event_name) return {"status": "ignored"} try: current_user = auth_store.get_user(user_email) except KeyError: logger.warning("billing_webhook_unknown_user email=%s event=%s", user_email, event_name) return {"status": "ignored"} variant_id_raw = attributes.get("variant_id") variant_id = int(variant_id_raw) if variant_id_raw is not None else None plan = billing.plan_from_variant(variant_id) subscription_status = str(attributes.get("status") or "").strip().lower() or None resource_id = resource.get("id") subscription_id = str(resource_id).strip() if resource_id is not None else None customer_id = str(attributes.get("customer_id") or "").strip() or None keep_paid_access = event_name not in { "subscription_expired", "subscription_refunded", } and subscription_status not in {"expired"} if plan and keep_paid_access: auth_store.apply_plan( email=current_user.email, plan_name=plan.slug, image_limit=plan.image_limit, video_limit=plan.video_limit, lemon_customer_id=customer_id, lemon_subscription_id=subscription_id, lemon_variant_id=variant_id, subscription_status=subscription_status, ) elif event_name in { "subscription_expired", "subscription_refunded", } or subscription_status in {"expired"}: auth_store.reset_to_free_plan(current_user.email) else: logger.info( "billing_webhook_ignored email=%s event=%s variant_id=%s", current_user.email, event_name, variant_id, ) return {"status": "ok"} @app.post("/detect/image", response_model=DetectionEnvelope) async def detect_image( request: Request, current_user: Annotated[UserProfile, Depends(_require_user)], file: UploadFile = File(...), ) -> DetectionEnvelope: started = time.perf_counter() _check_rate_limit(request) try: auth_store.ensure_usage_available(current_user.email, "image") except ValueError as exc: raise HTTPException(status_code=402, detail=str(exc)) from exc payload = await file.read() try: validate_upload_metadata( filename=file.filename, content_type=file.content_type, payload=payload, media_type="image", limits=upload_limits, ) except ValueError as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc try: image = Image.open(BytesIO(payload)).convert("RGB") except UnidentifiedImageError as exc: raise HTTPException(status_code=400, detail="Invalid image payload") from exc try: report = image_detector.detect_pil(image) except RuntimeError as exc: raise HTTPException(status_code=503, detail=str(exc)) from exc usage = auth_store.consume_usage(current_user.email, "image") return DetectionEnvelope( request_id=request.state.request_id, processed_at=datetime.now(UTC).isoformat(), processing_ms=round((time.perf_counter() - started) * 1000, 2), filename=file.filename or "upload", media_type="image", model=image_detector.model_id, report=report.to_dict(), usage=usage.to_dict(), ) @app.post("/detect/video", response_model=DetectionEnvelope) async def detect_video( request: Request, current_user: Annotated[UserProfile, Depends(_require_user)], file: UploadFile = File(...), ) -> DetectionEnvelope: started = time.perf_counter() _check_rate_limit(request) try: auth_store.ensure_usage_available(current_user.email, "video") except ValueError as exc: raise HTTPException(status_code=402, detail=str(exc)) from exc payload = await file.read() try: validate_upload_metadata( filename=file.filename, content_type=file.content_type, payload=payload, media_type="video", limits=upload_limits, ) except ValueError as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc temp_dir = ROOT / ".tmp" temp_dir.mkdir(exist_ok=True) with NamedTemporaryFile( delete=False, suffix=normalize_suffix(file.filename) or ".mp4", dir=temp_dir, ) as tmp: tmp.write(payload) temp_path = Path(tmp.name) try: validate_video_file(temp_path, limits=upload_limits) report = video_detector.detect_file(temp_path) except ValueError as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc except RuntimeError as exc: raise HTTPException(status_code=503, detail=str(exc)) from exc finally: temp_path.unlink(missing_ok=True) usage = auth_store.consume_usage(current_user.email, "video") return DetectionEnvelope( request_id=request.state.request_id, processed_at=datetime.now(UTC).isoformat(), processing_ms=round((time.perf_counter() - started) * 1000, 2), filename=file.filename or "upload", media_type="video", model=video_detector.image_detector.model_id, report=report.to_dict(), usage=usage.to_dict(), ) def _check_rate_limit(request: Request) -> None: forwarded_for = request.headers.get("x-forwarded-for", "") client_ip = forwarded_for.split(",")[0].strip() if forwarded_for else "" if not client_ip and request.client is not None: client_ip = request.client.host if not client_ip: client_ip = "unknown" try: rate_limiter.check(client_ip) except ValueError as exc: raise HTTPException(status_code=429, detail=str(exc)) from exc