Spaces:
Running
Running
| from __future__ import annotations | |
| import logging | |
| import os | |
| import sys | |
| import time | |
| from datetime import UTC, datetime | |
| from io import BytesIO | |
| from pathlib import Path | |
| from tempfile import NamedTemporaryFile | |
| from typing import Annotated | |
| from uuid import uuid4 | |
| from fastapi import Depends, FastAPI, File, Header, HTTPException, Request, UploadFile | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from PIL import Image, UnidentifiedImageError | |
| from pydantic import BaseModel, Field | |
| ROOT = Path(__file__).resolve().parent | |
| SRC = ROOT / "src" | |
| if str(SRC) not in sys.path: | |
| sys.path.insert(0, str(SRC)) | |
| from deepfake_detector.auth_store import AuthStore, UserProfile | |
| from deepfake_detector.billing import LemonSqueezyBilling | |
| from deepfake_detector.google_auth import google_auth_enabled, verify_google_id_token | |
| from deepfake_detector.image_detector import ImageDetector | |
| from deepfake_detector.security import ( | |
| InMemoryRateLimiter, | |
| UploadLimits, | |
| normalize_suffix, | |
| validate_upload_metadata, | |
| validate_video_file, | |
| ) | |
| from deepfake_detector.video_detector import VideoDetector | |
| logging.basicConfig( | |
| level=os.getenv("LOG_LEVEL", "INFO").upper(), | |
| format="%(asctime)s %(levelname)s %(name)s %(message)s", | |
| ) | |
| logger = logging.getLogger("verilens.api") | |
| APP_VERSION = os.getenv("APP_VERSION", "1.0.0") | |
| FRONTEND_URL = os.getenv( | |
| "FRONTEND_URL", "https://tanishqkolhatkar93.github.io/Deep_Fake_Detection/" | |
| ) | |
| DEFAULT_ALLOWED_ORIGINS = ( | |
| "http://localhost:3000", | |
| "http://127.0.0.1:3000", | |
| "http://localhost:5500", | |
| "http://127.0.0.1:5500", | |
| "https://tanishqkolhatkar93.github.io", | |
| "https://tanishq93-deepfake-detection.hf.space", | |
| ) | |
| ALLOWED_ORIGINS = [ | |
| origin.strip() | |
| for origin in os.getenv("ALLOWED_ORIGINS", ",".join(DEFAULT_ALLOWED_ORIGINS)).split(",") | |
| if origin.strip() | |
| ] | |
| class ServiceInfoResponse(BaseModel): | |
| service: str | |
| version: str | |
| frontend_url: str | |
| docs: str | |
| openapi: str | |
| endpoints: list[str] | |
| class HealthResponse(BaseModel): | |
| status: str | |
| class VersionResponse(BaseModel): | |
| version: str | |
| class ModelMetadata(BaseModel): | |
| name: str | |
| threshold: float | |
| positive_label: str | |
| device: str | |
| class LimitsMetadata(BaseModel): | |
| max_image_bytes: int | |
| max_video_bytes: int | |
| max_video_duration_seconds: float | |
| rate_limit_max_requests: int | |
| rate_limit_window_seconds: int | |
| class MetadataResponse(BaseModel): | |
| service: str | |
| version: str | |
| frontend_url: str | |
| model: ModelMetadata | |
| limits: LimitsMetadata | |
| class DetectionEnvelope(BaseModel): | |
| request_id: str | |
| processed_at: str | |
| processing_ms: float = Field(..., ge=0) | |
| filename: str | |
| media_type: str | |
| model: str | |
| report: dict[str, object] | |
| usage: dict[str, int | str] | None = None | |
| class AuthConfigResponse(BaseModel): | |
| enabled: bool | |
| google_client_id: str | None | |
| free_image_limit: int | |
| free_video_limit: int | |
| class GoogleAuthRequest(BaseModel): | |
| id_token: str | |
| class UserResponse(BaseModel): | |
| email: str | |
| name: str | |
| picture_url: str | |
| plan_name: str | |
| subscription_status: str | None | |
| class AuthSessionResponse(BaseModel): | |
| session_token: str | |
| user: UserResponse | |
| usage: dict[str, int | str] | |
| class MeResponse(BaseModel): | |
| user: UserResponse | |
| usage: dict[str, int | str] | |
| class BillingPlanResponse(BaseModel): | |
| slug: str | |
| name: str | |
| price_label: str | |
| description: str | |
| image_limit: int | |
| video_limit: int | |
| featured: bool | |
| checkout_available: bool | |
| class BillingConfigResponse(BaseModel): | |
| enabled: bool | |
| provider: str | |
| plans: list[BillingPlanResponse] | |
| class BillingCheckoutRequest(BaseModel): | |
| plan_slug: str | |
| class BillingRedirectResponse(BaseModel): | |
| url: str | |
| class BillingPortalResponse(BaseModel): | |
| url: str | |
| class BillingWebhookResponse(BaseModel): | |
| status: str | |
| def _extract_bearer_token(authorization: str | None) -> str: | |
| if not authorization: | |
| raise HTTPException(status_code=401, detail="Authentication required.") | |
| scheme, _, token = authorization.partition(" ") | |
| if scheme.lower() != "bearer" or not token.strip(): | |
| raise HTTPException(status_code=401, detail="Invalid authorization header.") | |
| return token.strip() | |
| def _serialize_user(user: UserProfile) -> UserResponse: | |
| return { | |
| "email": user.email, | |
| "name": user.name, | |
| "picture_url": user.picture_url, | |
| "plan_name": user.plan_name, | |
| "subscription_status": user.subscription_status, | |
| } | |
| def _require_user( | |
| authorization: Annotated[str | None, Header(alias="Authorization")] = None, | |
| ) -> UserProfile: | |
| token = _extract_bearer_token(authorization) | |
| try: | |
| return auth_store.get_session_user(token) | |
| except KeyError as exc: | |
| raise HTTPException(status_code=401, detail="Session expired or invalid.") from exc | |
| app = FastAPI( | |
| title="Media Authenticity Detector API", | |
| description=( | |
| "HTTP service for binary Yes/No image and video checks using a local " | |
| "xRayon checkpoint-based model with hardened upload validation." | |
| ), | |
| version=APP_VERSION, | |
| ) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=ALLOWED_ORIGINS, | |
| allow_credentials=False, | |
| allow_methods=["GET", "POST", "OPTIONS"], | |
| allow_headers=["*"], | |
| expose_headers=["X-Request-ID", "X-Process-Time-MS"], | |
| ) | |
| image_detector = ImageDetector() | |
| video_detector = VideoDetector(image_detector=image_detector) | |
| upload_limits = UploadLimits() | |
| rate_limiter = InMemoryRateLimiter() | |
| auth_store = AuthStore() | |
| billing = LemonSqueezyBilling() | |
| async def add_request_context(request: Request, call_next): | |
| request.state.request_id = uuid4().hex | |
| started = time.perf_counter() | |
| response = await call_next(request) | |
| duration_ms = (time.perf_counter() - started) * 1000 | |
| response.headers["X-Request-ID"] = request.state.request_id | |
| response.headers["X-Process-Time-MS"] = f"{duration_ms:.2f}" | |
| logger.info( | |
| "request_complete request_id=%s method=%s path=%s status=%s duration_ms=%.2f", | |
| request.state.request_id, | |
| request.method, | |
| request.url.path, | |
| response.status_code, | |
| duration_ms, | |
| ) | |
| return response | |
| def root() -> ServiceInfoResponse: | |
| return { | |
| "service": "media-authenticity-detector", | |
| "version": APP_VERSION, | |
| "frontend_url": FRONTEND_URL, | |
| "docs": "/docs", | |
| "openapi": "/openapi.json", | |
| "endpoints": [ | |
| "/health", | |
| "/version", | |
| "/metadata", | |
| "/auth/config", | |
| "/auth/google", | |
| "/me", | |
| "/billing/config", | |
| "/billing/checkout", | |
| "/billing/portal", | |
| "/detect/image", | |
| "/detect/video", | |
| ], | |
| } | |
| def health() -> HealthResponse: | |
| return {"status": "ok"} | |
| def version() -> VersionResponse: | |
| return {"version": APP_VERSION} | |
| def metadata() -> MetadataResponse: | |
| return { | |
| "service": "media-authenticity-detector", | |
| "version": APP_VERSION, | |
| "frontend_url": FRONTEND_URL, | |
| "model": { | |
| "name": image_detector.model_id, | |
| "threshold": image_detector.threshold, | |
| "positive_label": image_detector.fake_label, | |
| "device": image_detector.device, | |
| }, | |
| "limits": { | |
| "max_image_bytes": upload_limits.max_image_bytes, | |
| "max_video_bytes": upload_limits.max_video_bytes, | |
| "max_video_duration_seconds": upload_limits.max_video_duration_seconds, | |
| "rate_limit_max_requests": rate_limiter.max_requests, | |
| "rate_limit_window_seconds": rate_limiter.window_seconds, | |
| }, | |
| } | |
| def auth_config() -> AuthConfigResponse: | |
| client_id = os.getenv("GOOGLE_CLIENT_ID", "").strip() | |
| return { | |
| "enabled": google_auth_enabled(), | |
| "google_client_id": client_id or None, | |
| "free_image_limit": auth_store.free_image_limit, | |
| "free_video_limit": auth_store.free_video_limit, | |
| } | |
| def billing_config() -> BillingConfigResponse: | |
| return billing.public_config() | |
| def auth_google(payload: GoogleAuthRequest) -> AuthSessionResponse: | |
| try: | |
| identity = verify_google_id_token(payload.id_token) | |
| except ValueError as exc: | |
| raise HTTPException(status_code=400, detail=str(exc)) from exc | |
| user = auth_store.upsert_google_user( | |
| email=identity.email, | |
| google_sub=identity.subject, | |
| name=identity.name, | |
| picture_url=identity.picture_url, | |
| ) | |
| session_token = auth_store.create_session(user.email) | |
| usage = auth_store.get_usage(user.email) | |
| return { | |
| "session_token": session_token, | |
| "user": _serialize_user(user), | |
| "usage": usage.to_dict(), | |
| } | |
| def auth_logout( | |
| _: Annotated[UserProfile, Depends(_require_user)], | |
| authorization: Annotated[str | None, Header(alias="Authorization")] = None, | |
| ) -> dict[str, str]: | |
| token = _extract_bearer_token(authorization) | |
| auth_store.delete_session(token) | |
| return {"status": "ok"} | |
| def me(current_user: Annotated[UserProfile, Depends(_require_user)]) -> MeResponse: | |
| usage = auth_store.get_usage(current_user.email) | |
| return { | |
| "user": _serialize_user(current_user), | |
| "usage": usage.to_dict(), | |
| } | |
| async def create_billing_checkout( | |
| payload: BillingCheckoutRequest, | |
| current_user: Annotated[UserProfile, Depends(_require_user)], | |
| ) -> BillingRedirectResponse: | |
| if current_user.lemon_subscription_id: | |
| raise HTTPException( | |
| status_code=409, | |
| detail="You already have an active subscription. Use the billing portal to manage it.", | |
| ) | |
| if payload.plan_slug == "free": | |
| raise HTTPException(status_code=400, detail="The free plan does not require checkout.") | |
| try: | |
| plan = billing.get_plan(payload.plan_slug) | |
| checkout_url = await billing.create_checkout_url( | |
| plan=plan, | |
| email=current_user.email, | |
| name=current_user.name, | |
| redirect_url=FRONTEND_URL, | |
| ) | |
| except ValueError as exc: | |
| raise HTTPException(status_code=400, detail=str(exc)) from exc | |
| except RuntimeError as exc: | |
| raise HTTPException(status_code=503, detail=str(exc)) from exc | |
| except Exception as exc: | |
| logger.exception( | |
| "billing_checkout_failed email=%s plan=%s", | |
| current_user.email, | |
| payload.plan_slug, | |
| ) | |
| raise HTTPException(status_code=502, detail="Unable to start checkout right now.") from exc | |
| return {"url": checkout_url} | |
| async def billing_portal( | |
| current_user: Annotated[UserProfile, Depends(_require_user)], | |
| ) -> BillingPortalResponse: | |
| if not current_user.lemon_subscription_id: | |
| raise HTTPException( | |
| status_code=404, | |
| detail="No paid subscription is linked to this account yet.", | |
| ) | |
| try: | |
| urls = await billing.fetch_subscription_urls(current_user.lemon_subscription_id) | |
| except RuntimeError as exc: | |
| raise HTTPException(status_code=503, detail=str(exc)) from exc | |
| except Exception as exc: | |
| logger.exception( | |
| "billing_portal_failed email=%s subscription_id=%s", | |
| current_user.email, | |
| current_user.lemon_subscription_id, | |
| ) | |
| raise HTTPException(status_code=502, detail="Unable to open the billing portal right now.") from exc | |
| portal_url = ( | |
| urls.get("customer_portal_update_subscription") | |
| or urls.get("customer_portal") | |
| or urls.get("update_payment_method") | |
| ) | |
| if not portal_url: | |
| raise HTTPException(status_code=404, detail="No billing portal URL was available.") | |
| return {"url": portal_url} | |
| async def lemonsqueezy_webhook( | |
| request: Request, | |
| x_signature: Annotated[str | None, Header(alias="X-Signature")] = None, | |
| ) -> BillingWebhookResponse: | |
| payload = await request.body() | |
| if not billing.verify_signature(payload, x_signature): | |
| raise HTTPException(status_code=401, detail="Invalid webhook signature.") | |
| event = await request.json() | |
| resource = event.get("data") or event | |
| attributes = resource.get("attributes") or {} | |
| meta = event.get("meta") or {} | |
| event_name = str(meta.get("event_name") or "").strip() | |
| custom_data = meta.get("custom_data") or {} | |
| user_email = ( | |
| str(custom_data.get("user_email") or "") | |
| or str(attributes.get("user_email") or "") | |
| or str(attributes.get("customer_email") or "") | |
| ).strip().lower() | |
| if not user_email: | |
| logger.warning("billing_webhook_missing_email event=%s", event_name) | |
| return {"status": "ignored"} | |
| try: | |
| current_user = auth_store.get_user(user_email) | |
| except KeyError: | |
| logger.warning("billing_webhook_unknown_user email=%s event=%s", user_email, event_name) | |
| return {"status": "ignored"} | |
| variant_id_raw = attributes.get("variant_id") | |
| variant_id = int(variant_id_raw) if variant_id_raw is not None else None | |
| plan = billing.plan_from_variant(variant_id) | |
| subscription_status = str(attributes.get("status") or "").strip().lower() or None | |
| resource_id = resource.get("id") | |
| subscription_id = str(resource_id).strip() if resource_id is not None else None | |
| customer_id = str(attributes.get("customer_id") or "").strip() or None | |
| keep_paid_access = event_name not in { | |
| "subscription_expired", | |
| "subscription_refunded", | |
| } and subscription_status not in {"expired"} | |
| if plan and keep_paid_access: | |
| auth_store.apply_plan( | |
| email=current_user.email, | |
| plan_name=plan.slug, | |
| image_limit=plan.image_limit, | |
| video_limit=plan.video_limit, | |
| lemon_customer_id=customer_id, | |
| lemon_subscription_id=subscription_id, | |
| lemon_variant_id=variant_id, | |
| subscription_status=subscription_status, | |
| ) | |
| elif event_name in { | |
| "subscription_expired", | |
| "subscription_refunded", | |
| } or subscription_status in {"expired"}: | |
| auth_store.reset_to_free_plan(current_user.email) | |
| else: | |
| logger.info( | |
| "billing_webhook_ignored email=%s event=%s variant_id=%s", | |
| current_user.email, | |
| event_name, | |
| variant_id, | |
| ) | |
| return {"status": "ok"} | |
| async def detect_image( | |
| request: Request, | |
| current_user: Annotated[UserProfile, Depends(_require_user)], | |
| file: UploadFile = File(...), | |
| ) -> DetectionEnvelope: | |
| started = time.perf_counter() | |
| _check_rate_limit(request) | |
| try: | |
| auth_store.ensure_usage_available(current_user.email, "image") | |
| except ValueError as exc: | |
| raise HTTPException(status_code=402, detail=str(exc)) from exc | |
| payload = await file.read() | |
| try: | |
| validate_upload_metadata( | |
| filename=file.filename, | |
| content_type=file.content_type, | |
| payload=payload, | |
| media_type="image", | |
| limits=upload_limits, | |
| ) | |
| except ValueError as exc: | |
| raise HTTPException(status_code=400, detail=str(exc)) from exc | |
| try: | |
| image = Image.open(BytesIO(payload)).convert("RGB") | |
| except UnidentifiedImageError as exc: | |
| raise HTTPException(status_code=400, detail="Invalid image payload") from exc | |
| try: | |
| report = image_detector.detect_pil(image) | |
| except RuntimeError as exc: | |
| raise HTTPException(status_code=503, detail=str(exc)) from exc | |
| usage = auth_store.consume_usage(current_user.email, "image") | |
| return DetectionEnvelope( | |
| request_id=request.state.request_id, | |
| processed_at=datetime.now(UTC).isoformat(), | |
| processing_ms=round((time.perf_counter() - started) * 1000, 2), | |
| filename=file.filename or "upload", | |
| media_type="image", | |
| model=image_detector.model_id, | |
| report=report.to_dict(), | |
| usage=usage.to_dict(), | |
| ) | |
| async def detect_video( | |
| request: Request, | |
| current_user: Annotated[UserProfile, Depends(_require_user)], | |
| file: UploadFile = File(...), | |
| ) -> DetectionEnvelope: | |
| started = time.perf_counter() | |
| _check_rate_limit(request) | |
| try: | |
| auth_store.ensure_usage_available(current_user.email, "video") | |
| except ValueError as exc: | |
| raise HTTPException(status_code=402, detail=str(exc)) from exc | |
| payload = await file.read() | |
| try: | |
| validate_upload_metadata( | |
| filename=file.filename, | |
| content_type=file.content_type, | |
| payload=payload, | |
| media_type="video", | |
| limits=upload_limits, | |
| ) | |
| except ValueError as exc: | |
| raise HTTPException(status_code=400, detail=str(exc)) from exc | |
| temp_dir = ROOT / ".tmp" | |
| temp_dir.mkdir(exist_ok=True) | |
| with NamedTemporaryFile( | |
| delete=False, | |
| suffix=normalize_suffix(file.filename) or ".mp4", | |
| dir=temp_dir, | |
| ) as tmp: | |
| tmp.write(payload) | |
| temp_path = Path(tmp.name) | |
| try: | |
| validate_video_file(temp_path, limits=upload_limits) | |
| report = video_detector.detect_file(temp_path) | |
| except ValueError as exc: | |
| raise HTTPException(status_code=400, detail=str(exc)) from exc | |
| except RuntimeError as exc: | |
| raise HTTPException(status_code=503, detail=str(exc)) from exc | |
| finally: | |
| temp_path.unlink(missing_ok=True) | |
| usage = auth_store.consume_usage(current_user.email, "video") | |
| return DetectionEnvelope( | |
| request_id=request.state.request_id, | |
| processed_at=datetime.now(UTC).isoformat(), | |
| processing_ms=round((time.perf_counter() - started) * 1000, 2), | |
| filename=file.filename or "upload", | |
| media_type="video", | |
| model=video_detector.image_detector.model_id, | |
| report=report.to_dict(), | |
| usage=usage.to_dict(), | |
| ) | |
| def _check_rate_limit(request: Request) -> None: | |
| forwarded_for = request.headers.get("x-forwarded-for", "") | |
| client_ip = forwarded_for.split(",")[0].strip() if forwarded_for else "" | |
| if not client_ip and request.client is not None: | |
| client_ip = request.client.host | |
| if not client_ip: | |
| client_ip = "unknown" | |
| try: | |
| rate_limiter.check(client_ip) | |
| except ValueError as exc: | |
| raise HTTPException(status_code=429, detail=str(exc)) from exc | |