Deepfake_detection / tests /test_api.py
tanishq93's picture
Upload folder using huggingface_hub
2b30a24 verified
from __future__ import annotations
import hashlib
import hmac
import json
from io import BytesIO
import pytest
from fastapi.testclient import TestClient
from PIL import Image
import api
from deepfake_detector.auth_store import AuthStore
from deepfake_detector.billing import LemonSqueezyBilling
from deepfake_detector.google_auth import GoogleIdentity
from deepfake_detector.types import ImageDetectionReport, VideoDetectionReport
client = TestClient(api.app)
def _png_bytes() -> bytes:
image = Image.new("RGB", (16, 16), color=(240, 120, 90))
buffer = BytesIO()
image.save(buffer, format="PNG")
return buffer.getvalue()
@pytest.fixture()
def auth_headers(monkeypatch: pytest.MonkeyPatch, tmp_path) -> dict[str, str]:
monkeypatch.setenv("GOOGLE_CLIENT_ID", "test-client-id.apps.googleusercontent.com")
monkeypatch.setenv("FREE_TIER_IMAGE_LIMIT", "2")
monkeypatch.setenv("FREE_TIER_VIDEO_LIMIT", "1")
store = AuthStore(db_path=str(tmp_path / "verilens-test.db"))
monkeypatch.setattr(api, "auth_store", store)
def fake_verify_google_id_token(_token: str) -> GoogleIdentity:
return GoogleIdentity(
email="tester@example.com",
subject="google-sub-123",
name="Verifier",
picture_url="https://example.com/avatar.png",
)
monkeypatch.setattr(api, "verify_google_id_token", fake_verify_google_id_token)
response = client.post("/auth/google", json={"id_token": "fake-google-id-token"})
assert response.status_code == 200
payload = response.json()
return {"Authorization": f"Bearer {payload['session_token']}"}
@pytest.fixture()
def billing_enabled(monkeypatch: pytest.MonkeyPatch) -> LemonSqueezyBilling:
monkeypatch.setenv("LEMON_SQUEEZY_API_KEY", "test-api-key")
monkeypatch.setenv("LEMON_SQUEEZY_STORE_ID", "12345")
monkeypatch.setenv("LEMON_SQUEEZY_WEBHOOK_SECRET", "webhook-secret")
monkeypatch.setenv("LEMON_STARTER_VARIANT_ID", "111")
monkeypatch.setenv("LEMON_PRO_VARIANT_ID", "222")
monkeypatch.setenv("LEMON_BUSINESS_VARIANT_ID", "333")
billing = LemonSqueezyBilling()
monkeypatch.setattr(api, "billing", billing)
return billing
def test_health() -> None:
response = client.get("/health")
assert response.status_code == 200
assert response.json() == {"status": "ok"}
def test_metadata() -> None:
response = client.get("/metadata")
assert response.status_code == 200
payload = response.json()
assert payload["service"] == "media-authenticity-detector"
assert "model" in payload
assert "limits" in payload
def test_version() -> None:
response = client.get("/version")
assert response.status_code == 200
assert "version" in response.json()
def test_auth_config(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("GOOGLE_CLIENT_ID", "test-client-id.apps.googleusercontent.com")
response = client.get("/auth/config")
assert response.status_code == 200
assert response.json() == {
"enabled": True,
"google_client_id": "test-client-id.apps.googleusercontent.com",
"free_image_limit": api.auth_store.free_image_limit,
"free_video_limit": api.auth_store.free_video_limit,
}
def test_auth_login_and_me(auth_headers: dict[str, str]) -> None:
response = client.get("/me", headers=auth_headers)
assert response.status_code == 200
payload = response.json()
assert payload["user"]["email"] == "tester@example.com"
assert payload["user"]["plan_name"] == "free"
assert payload["usage"]["image_limit"] == 2
assert payload["usage"]["video_limit"] == 1
def test_billing_config_lists_public_plans(billing_enabled: LemonSqueezyBilling) -> None:
response = client.get("/billing/config")
assert response.status_code == 200
payload = response.json()
assert payload["enabled"] is True
assert payload["provider"] == "lemonsqueezy"
assert {plan["slug"] for plan in payload["plans"]} == {"starter", "pro", "business"}
def test_billing_checkout(monkeypatch: pytest.MonkeyPatch, auth_headers: dict[str, str], billing_enabled: LemonSqueezyBilling) -> None:
async def fake_checkout_url(**kwargs):
assert kwargs["plan"].slug == "starter"
assert kwargs["email"] == "tester@example.com"
return "https://checkout.example.com/session/starter"
monkeypatch.setattr(api.billing, "create_checkout_url", fake_checkout_url)
response = client.post(
"/billing/checkout",
headers=auth_headers,
json={"plan_slug": "starter"},
)
assert response.status_code == 200
assert response.json()["url"] == "https://checkout.example.com/session/starter"
def test_lemonsqueezy_webhook_upgrades_plan(
auth_headers: dict[str, str],
billing_enabled: LemonSqueezyBilling,
) -> None:
event = {
"meta": {
"event_name": "subscription_created",
"custom_data": {
"user_email": "tester@example.com",
"plan_slug": "pro",
},
},
"data": {
"id": "sub_123",
"attributes": {
"variant_id": 222,
"status": "active",
"customer_id": 999,
"customer_email": "tester@example.com",
},
},
}
raw = json.dumps(event).encode("utf-8")
signature = hmac.new(
b"webhook-secret",
raw,
hashlib.sha256,
).hexdigest()
response = client.post(
"/webhooks/lemonsqueezy",
headers={"X-Signature": signature},
content=raw,
)
assert response.status_code == 200
me_response = client.get("/me", headers=auth_headers)
assert me_response.status_code == 200
payload = me_response.json()
assert payload["user"]["plan_name"] == "pro"
assert payload["user"]["subscription_status"] == "active"
assert payload["usage"]["image_limit"] == api.billing.get_plan("pro").image_limit
def test_detect_image(monkeypatch: pytest.MonkeyPatch, auth_headers: dict[str, str]) -> None:
def fake_detect(_image):
return ImageDetectionReport(
verdict="No",
fake_probability=0.12,
synthetic_likelihood=0.12,
deepfake_likelihood=0.12,
face_count=0,
evidence={"real": 0.88, "fake": 0.12},
summary="Synthetic signal not detected.",
model_name="mock-model",
)
monkeypatch.setattr(api.image_detector, "detect_pil", fake_detect)
response = client.post(
"/detect/image",
headers=auth_headers,
files={"file": ("sample.png", _png_bytes(), "image/png")},
)
assert response.status_code == 200
payload = response.json()
assert payload["media_type"] == "image"
assert payload["report"]["verdict"] == "No"
assert payload["usage"]["images_used"] == 1
assert payload["usage"]["image_remaining"] == 1
def test_detect_video(monkeypatch: pytest.MonkeyPatch, auth_headers: dict[str, str]) -> None:
def fake_validate_video(_path, limits=None):
return 4.0
def fake_detect_file(_path):
return VideoDetectionReport(
verdict="Yes",
fake_probability=0.77,
synthetic_likelihood=0.77,
deepfake_likelihood=0.77,
frames_sampled=6,
evidence={"mean_frame_fake_probability": 0.77},
summary="Likely manipulated.",
model_name="mock-model",
)
monkeypatch.setattr(api, "validate_video_file", fake_validate_video)
monkeypatch.setattr(api.video_detector, "detect_file", fake_detect_file)
response = client.post(
"/detect/video",
headers=auth_headers,
files={"file": ("sample.mp4", b"not-a-real-video", "video/mp4")},
)
assert response.status_code == 200
payload = response.json()
assert payload["media_type"] == "video"
assert payload["report"]["frames_sampled"] == 6
assert payload["usage"]["videos_used"] == 1
assert payload["usage"]["video_remaining"] == 0
def test_quota_limit_returns_payment_required(
monkeypatch: pytest.MonkeyPatch,
auth_headers: dict[str, str],
) -> None:
def fake_detect(_image):
return ImageDetectionReport(
verdict="No",
fake_probability=0.12,
synthetic_likelihood=0.12,
deepfake_likelihood=0.12,
face_count=0,
evidence={"real": 0.88, "fake": 0.12},
summary="Synthetic signal not detected.",
model_name="mock-model",
)
monkeypatch.setattr(api.image_detector, "detect_pil", fake_detect)
first = client.post(
"/detect/image",
headers=auth_headers,
files={"file": ("sample-1.png", _png_bytes(), "image/png")},
)
second = client.post(
"/detect/image",
headers=auth_headers,
files={"file": ("sample-2.png", _png_bytes(), "image/png")},
)
third = client.post(
"/detect/image",
headers=auth_headers,
files={"file": ("sample-3.png", _png_bytes(), "image/png")},
)
assert first.status_code == 200
assert second.status_code == 200
assert third.status_code == 402
assert third.json()["detail"] == "Free image limit reached. Upgrade your plan to continue."
def test_logout_invalidates_session(auth_headers: dict[str, str]) -> None:
response = client.post("/auth/logout", headers=auth_headers)
assert response.status_code == 200
after_logout = client.get("/me", headers=auth_headers)
assert after_logout.status_code == 401