Spaces:
Running
Running
| from __future__ import annotations | |
| import hashlib | |
| import hmac | |
| import json | |
| from io import BytesIO | |
| import pytest | |
| from fastapi.testclient import TestClient | |
| from PIL import Image | |
| import api | |
| from deepfake_detector.auth_store import AuthStore | |
| from deepfake_detector.billing import LemonSqueezyBilling | |
| from deepfake_detector.google_auth import GoogleIdentity | |
| from deepfake_detector.types import ImageDetectionReport, VideoDetectionReport | |
| client = TestClient(api.app) | |
| def _png_bytes() -> bytes: | |
| image = Image.new("RGB", (16, 16), color=(240, 120, 90)) | |
| buffer = BytesIO() | |
| image.save(buffer, format="PNG") | |
| return buffer.getvalue() | |
| def auth_headers(monkeypatch: pytest.MonkeyPatch, tmp_path) -> dict[str, str]: | |
| monkeypatch.setenv("GOOGLE_CLIENT_ID", "test-client-id.apps.googleusercontent.com") | |
| monkeypatch.setenv("FREE_TIER_IMAGE_LIMIT", "2") | |
| monkeypatch.setenv("FREE_TIER_VIDEO_LIMIT", "1") | |
| store = AuthStore(db_path=str(tmp_path / "verilens-test.db")) | |
| monkeypatch.setattr(api, "auth_store", store) | |
| def fake_verify_google_id_token(_token: str) -> GoogleIdentity: | |
| return GoogleIdentity( | |
| email="tester@example.com", | |
| subject="google-sub-123", | |
| name="Verifier", | |
| picture_url="https://example.com/avatar.png", | |
| ) | |
| monkeypatch.setattr(api, "verify_google_id_token", fake_verify_google_id_token) | |
| response = client.post("/auth/google", json={"id_token": "fake-google-id-token"}) | |
| assert response.status_code == 200 | |
| payload = response.json() | |
| return {"Authorization": f"Bearer {payload['session_token']}"} | |
| def billing_enabled(monkeypatch: pytest.MonkeyPatch) -> LemonSqueezyBilling: | |
| monkeypatch.setenv("LEMON_SQUEEZY_API_KEY", "test-api-key") | |
| monkeypatch.setenv("LEMON_SQUEEZY_STORE_ID", "12345") | |
| monkeypatch.setenv("LEMON_SQUEEZY_WEBHOOK_SECRET", "webhook-secret") | |
| monkeypatch.setenv("LEMON_STARTER_VARIANT_ID", "111") | |
| monkeypatch.setenv("LEMON_PRO_VARIANT_ID", "222") | |
| monkeypatch.setenv("LEMON_BUSINESS_VARIANT_ID", "333") | |
| billing = LemonSqueezyBilling() | |
| monkeypatch.setattr(api, "billing", billing) | |
| return billing | |
| def test_health() -> None: | |
| response = client.get("/health") | |
| assert response.status_code == 200 | |
| assert response.json() == {"status": "ok"} | |
| def test_metadata() -> None: | |
| response = client.get("/metadata") | |
| assert response.status_code == 200 | |
| payload = response.json() | |
| assert payload["service"] == "media-authenticity-detector" | |
| assert "model" in payload | |
| assert "limits" in payload | |
| def test_version() -> None: | |
| response = client.get("/version") | |
| assert response.status_code == 200 | |
| assert "version" in response.json() | |
| def test_auth_config(monkeypatch: pytest.MonkeyPatch) -> None: | |
| monkeypatch.setenv("GOOGLE_CLIENT_ID", "test-client-id.apps.googleusercontent.com") | |
| response = client.get("/auth/config") | |
| assert response.status_code == 200 | |
| assert response.json() == { | |
| "enabled": True, | |
| "google_client_id": "test-client-id.apps.googleusercontent.com", | |
| "free_image_limit": api.auth_store.free_image_limit, | |
| "free_video_limit": api.auth_store.free_video_limit, | |
| } | |
| def test_auth_login_and_me(auth_headers: dict[str, str]) -> None: | |
| response = client.get("/me", headers=auth_headers) | |
| assert response.status_code == 200 | |
| payload = response.json() | |
| assert payload["user"]["email"] == "tester@example.com" | |
| assert payload["user"]["plan_name"] == "free" | |
| assert payload["usage"]["image_limit"] == 2 | |
| assert payload["usage"]["video_limit"] == 1 | |
| def test_billing_config_lists_public_plans(billing_enabled: LemonSqueezyBilling) -> None: | |
| response = client.get("/billing/config") | |
| assert response.status_code == 200 | |
| payload = response.json() | |
| assert payload["enabled"] is True | |
| assert payload["provider"] == "lemonsqueezy" | |
| assert {plan["slug"] for plan in payload["plans"]} == {"starter", "pro", "business"} | |
| def test_billing_checkout(monkeypatch: pytest.MonkeyPatch, auth_headers: dict[str, str], billing_enabled: LemonSqueezyBilling) -> None: | |
| async def fake_checkout_url(**kwargs): | |
| assert kwargs["plan"].slug == "starter" | |
| assert kwargs["email"] == "tester@example.com" | |
| return "https://checkout.example.com/session/starter" | |
| monkeypatch.setattr(api.billing, "create_checkout_url", fake_checkout_url) | |
| response = client.post( | |
| "/billing/checkout", | |
| headers=auth_headers, | |
| json={"plan_slug": "starter"}, | |
| ) | |
| assert response.status_code == 200 | |
| assert response.json()["url"] == "https://checkout.example.com/session/starter" | |
| def test_lemonsqueezy_webhook_upgrades_plan( | |
| auth_headers: dict[str, str], | |
| billing_enabled: LemonSqueezyBilling, | |
| ) -> None: | |
| event = { | |
| "meta": { | |
| "event_name": "subscription_created", | |
| "custom_data": { | |
| "user_email": "tester@example.com", | |
| "plan_slug": "pro", | |
| }, | |
| }, | |
| "data": { | |
| "id": "sub_123", | |
| "attributes": { | |
| "variant_id": 222, | |
| "status": "active", | |
| "customer_id": 999, | |
| "customer_email": "tester@example.com", | |
| }, | |
| }, | |
| } | |
| raw = json.dumps(event).encode("utf-8") | |
| signature = hmac.new( | |
| b"webhook-secret", | |
| raw, | |
| hashlib.sha256, | |
| ).hexdigest() | |
| response = client.post( | |
| "/webhooks/lemonsqueezy", | |
| headers={"X-Signature": signature}, | |
| content=raw, | |
| ) | |
| assert response.status_code == 200 | |
| me_response = client.get("/me", headers=auth_headers) | |
| assert me_response.status_code == 200 | |
| payload = me_response.json() | |
| assert payload["user"]["plan_name"] == "pro" | |
| assert payload["user"]["subscription_status"] == "active" | |
| assert payload["usage"]["image_limit"] == api.billing.get_plan("pro").image_limit | |
| def test_detect_image(monkeypatch: pytest.MonkeyPatch, auth_headers: dict[str, str]) -> None: | |
| def fake_detect(_image): | |
| return ImageDetectionReport( | |
| verdict="No", | |
| fake_probability=0.12, | |
| synthetic_likelihood=0.12, | |
| deepfake_likelihood=0.12, | |
| face_count=0, | |
| evidence={"real": 0.88, "fake": 0.12}, | |
| summary="Synthetic signal not detected.", | |
| model_name="mock-model", | |
| ) | |
| monkeypatch.setattr(api.image_detector, "detect_pil", fake_detect) | |
| response = client.post( | |
| "/detect/image", | |
| headers=auth_headers, | |
| files={"file": ("sample.png", _png_bytes(), "image/png")}, | |
| ) | |
| assert response.status_code == 200 | |
| payload = response.json() | |
| assert payload["media_type"] == "image" | |
| assert payload["report"]["verdict"] == "No" | |
| assert payload["usage"]["images_used"] == 1 | |
| assert payload["usage"]["image_remaining"] == 1 | |
| def test_detect_video(monkeypatch: pytest.MonkeyPatch, auth_headers: dict[str, str]) -> None: | |
| def fake_validate_video(_path, limits=None): | |
| return 4.0 | |
| def fake_detect_file(_path): | |
| return VideoDetectionReport( | |
| verdict="Yes", | |
| fake_probability=0.77, | |
| synthetic_likelihood=0.77, | |
| deepfake_likelihood=0.77, | |
| frames_sampled=6, | |
| evidence={"mean_frame_fake_probability": 0.77}, | |
| summary="Likely manipulated.", | |
| model_name="mock-model", | |
| ) | |
| monkeypatch.setattr(api, "validate_video_file", fake_validate_video) | |
| monkeypatch.setattr(api.video_detector, "detect_file", fake_detect_file) | |
| response = client.post( | |
| "/detect/video", | |
| headers=auth_headers, | |
| files={"file": ("sample.mp4", b"not-a-real-video", "video/mp4")}, | |
| ) | |
| assert response.status_code == 200 | |
| payload = response.json() | |
| assert payload["media_type"] == "video" | |
| assert payload["report"]["frames_sampled"] == 6 | |
| assert payload["usage"]["videos_used"] == 1 | |
| assert payload["usage"]["video_remaining"] == 0 | |
| def test_quota_limit_returns_payment_required( | |
| monkeypatch: pytest.MonkeyPatch, | |
| auth_headers: dict[str, str], | |
| ) -> None: | |
| def fake_detect(_image): | |
| return ImageDetectionReport( | |
| verdict="No", | |
| fake_probability=0.12, | |
| synthetic_likelihood=0.12, | |
| deepfake_likelihood=0.12, | |
| face_count=0, | |
| evidence={"real": 0.88, "fake": 0.12}, | |
| summary="Synthetic signal not detected.", | |
| model_name="mock-model", | |
| ) | |
| monkeypatch.setattr(api.image_detector, "detect_pil", fake_detect) | |
| first = client.post( | |
| "/detect/image", | |
| headers=auth_headers, | |
| files={"file": ("sample-1.png", _png_bytes(), "image/png")}, | |
| ) | |
| second = client.post( | |
| "/detect/image", | |
| headers=auth_headers, | |
| files={"file": ("sample-2.png", _png_bytes(), "image/png")}, | |
| ) | |
| third = client.post( | |
| "/detect/image", | |
| headers=auth_headers, | |
| files={"file": ("sample-3.png", _png_bytes(), "image/png")}, | |
| ) | |
| assert first.status_code == 200 | |
| assert second.status_code == 200 | |
| assert third.status_code == 402 | |
| assert third.json()["detail"] == "Free image limit reached. Upgrade your plan to continue." | |
| def test_logout_invalidates_session(auth_headers: dict[str, str]) -> None: | |
| response = client.post("/auth/logout", headers=auth_headers) | |
| assert response.status_code == 200 | |
| after_logout = client.get("/me", headers=auth_headers) | |
| assert after_logout.status_code == 401 | |