File size: 4,873 Bytes
a029907 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 | """
Integration tests for admin endpoint security.
Tests that admin endpoints properly enforce authorization.
"""
import pytest
from fastapi.testclient import TestClient
from app.main import app
from app.dependencies import get_current_user
from app.db_models import User
client = TestClient(app)
def test_admin_endpoints_require_authentication():
"""Test that admin endpoints return 401 when not authenticated."""
# Try to access admin stats without authentication
response = client.get("/api/v1/admin/validation-stats")
assert response.status_code == 401 or response.status_code == 403
response = client.get("/api/v1/admin/quality-distribution")
assert response.status_code == 401 or response.status_code == 403
response = client.get("/api/v1/admin/recent-validations")
assert response.status_code == 401 or response.status_code == 403
def test_admin_endpoints_reject_regular_users():
"""Test that admin endpoints return 403 when accessed by non-admin users."""
# Mock a regular user (not admin)
def get_mock_user():
return User(
id=1,
oauth_provider="test",
oauth_id="test123",
email="user@test.com",
username="testuser",
role="user", # NOT admin
)
app.dependency_overrides[get_current_user] = get_mock_user
try:
response = client.get("/api/v1/admin/validation-stats")
assert response.status_code == 403
assert "Admin access required" in response.json().get("detail", "")
response = client.get("/api/v1/admin/quality-distribution")
assert response.status_code == 403
response = client.get("/api/v1/admin/recent-validations")
assert response.status_code == 403
finally:
app.dependency_overrides.clear()
def test_scrape_endpoints_require_admin():
"""Test that scrape endpoints require admin authentication."""
# Test without authentication
response = client.post(
"/api/v1/proxies/scrape",
json={"url": "https://example.com/proxies.txt", "type": "github_raw"},
)
assert response.status_code == 401 or response.status_code == 403
response = client.post("/api/v1/proxies/demo")
assert response.status_code == 401 or response.status_code == 403
response = client.post("/api/v1/proxies/scrape-all")
assert response.status_code == 401 or response.status_code == 403
def test_scrape_endpoints_reject_regular_users():
"""Test that scrape endpoints reject regular users."""
def get_mock_user():
return User(
id=1,
oauth_provider="test",
oauth_id="test123",
email="user@test.com",
username="testuser",
role="user",
)
app.dependency_overrides[get_current_user] = get_mock_user
try:
response = client.post(
"/api/v1/proxies/scrape",
json={"url": "https://example.com/proxies.txt", "type": "github_raw"},
)
assert response.status_code == 403
response = client.post("/api/v1/proxies/demo")
assert response.status_code == 403
response = client.post("/api/v1/proxies/scrape-all")
assert response.status_code == 403
finally:
app.dependency_overrides.clear()
def test_admin_user_can_access_admin_endpoints():
"""Test that admin users CAN access admin endpoints."""
def get_mock_admin():
return User(
id=1,
oauth_provider="test",
oauth_id="admin123",
email="admin@test.com",
username="admin",
role="admin", # IS admin
)
app.dependency_overrides[get_current_user] = get_mock_admin
try:
# Admin should be able to access these endpoints
# (They might return errors due to missing data, but should not return 403)
response = client.get("/api/v1/admin/validation-stats")
assert response.status_code != 403 # Should not be forbidden
response = client.get("/api/v1/admin/quality-distribution")
assert response.status_code != 403
response = client.get("/api/v1/admin/recent-validations")
assert response.status_code != 403
finally:
app.dependency_overrides.clear()
def test_public_endpoints_remain_accessible():
"""Test that public endpoints don't require authentication."""
# These endpoints should be accessible without auth
response = client.get("/")
assert response.status_code == 200
response = client.get("/health")
assert response.status_code == 200
response = client.get("/api/v1/sources")
assert response.status_code == 200
# Proxies endpoint should work without auth (public service)
response = client.get("/api/v1/proxies?limit=10")
assert response.status_code == 200
|