File size: 9,021 Bytes
35dc52d
49e9f9d
 
 
 
 
 
35dc52d
49e9f9d
 
 
 
35dc52d
49e9f9d
 
 
35dc52d
49e9f9d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
35dc52d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
"""Tests for the security hardening + reliability layer.

Covers:
  - Security-headers middleware applies on every response
  - Strengthened password rules (length, letter+digit complexity)
  - Per-IP rate limit on login + register + write endpoints
  - python-jose deprecation warning is silenced via pytest config
  - DB connect fallback when the Atlas URL omits the database segment
"""

from __future__ import annotations

from unittest.mock import AsyncMock, MagicMock, patch

import pytest
from bson import ObjectId
from pymongo.errors import ConfigurationError


# ──────────────────────────────────────────────────────────────────────
# Security-headers middleware
# ──────────────────────────────────────────────────────────────────────


@pytest.mark.asyncio
async def test_security_headers_present_on_health(client):
    c, _ = client
    resp = await c.get("/health")
    assert resp.status_code == 200
    assert resp.headers.get("x-content-type-options") == "nosniff"
    assert resp.headers.get("x-frame-options") == "DENY"
    assert "max-age" in (resp.headers.get("strict-transport-security") or "")
    assert "geolocation" in (resp.headers.get("permissions-policy") or "")


@pytest.mark.asyncio
async def test_security_headers_present_on_404(client):
    """Headers must apply even to error responses, otherwise a clever
    attacker could probe a non-existent path to bypass them."""
    c, _ = client
    resp = await c.get("/api/this-does-not-exist")
    # FastAPI returns 404 with HTML/JSON; headers must still be set
    assert resp.headers.get("x-content-type-options") == "nosniff"


# ──────────────────────────────────────────────────────────────────────
# Password complexity
# ──────────────────────────────────────────────────────────────────────


@pytest.mark.asyncio
async def test_register_rejects_short_password(client):
    c, _ = client
    payload = {
        "name": "Test User",
        "email": "test@example.com",
        "password": "short1",  # 6 chars β€” below the new 8-char floor
        "role": "reporter",
        "location": {"type": "Point", "coordinates": [76.7794, 30.7333]},
    }
    resp = await c.post("/api/auth/register", json=payload)
    assert resp.status_code == 422


@pytest.mark.asyncio
async def test_register_rejects_letters_only_password(client):
    c, _ = client
    payload = {
        "name": "Test User",
        "email": "test@example.com",
        "password": "alllettersnodigits",
        "role": "reporter",
        "location": {"type": "Point", "coordinates": [76.7794, 30.7333]},
    }
    resp = await c.post("/api/auth/register", json=payload)
    assert resp.status_code == 422


@pytest.mark.asyncio
async def test_register_rejects_digits_only_password(client):
    c, _ = client
    payload = {
        "name": "Test User",
        "email": "test@example.com",
        "password": "12345678",
        "role": "reporter",
        "location": {"type": "Point", "coordinates": [76.7794, 30.7333]},
    }
    resp = await c.post("/api/auth/register", json=payload)
    assert resp.status_code == 422


@pytest.mark.asyncio
async def test_register_accepts_strong_password(client):
    # Reset the rate limiter so a previous test's burst doesn't bleed in
    from app.services import ratelimit as rl

    rl.register_limiter.reset()
    c, db = client
    db.users.find_one = AsyncMock(return_value=None)
    db.users.insert_one = AsyncMock(
        return_value=MagicMock(inserted_id=ObjectId())
    )
    payload = {
        "name": "Test User",
        "email": "fresh@example.com",
        "password": "secret-pass-1",
        "role": "reporter",
        "location": {"type": "Point", "coordinates": [76.7794, 30.7333]},
    }
    resp = await c.post("/api/auth/register", json=payload)
    assert resp.status_code == 201
    rl.register_limiter.reset()


# ──────────────────────────────────────────────────────────────────────
# Rate limiters
# ──────────────────────────────────────────────────────────────────────


@pytest.mark.asyncio
async def test_login_rate_limit_after_burst(client):
    """20 logins in a minute is the cap β€” the 21st should be 429.
    Tests the limiter end-to-end through the FastAPI dependency."""
    from app.services import ratelimit as rl

    rl.login_limiter.reset()
    c, db = client
    db.users.find_one = AsyncMock(return_value=None)  # always 401

    last_status = 200
    for _ in range(25):
        resp = await c.post(
            "/api/auth/login",
            json={"email": "x@example.com", "password": "secret-1"},
        )
        last_status = resp.status_code
        if last_status == 429:
            break
    assert last_status == 429
    rl.login_limiter.reset()


@pytest.mark.asyncio
async def test_register_rate_limit(client):
    """5 registrations per IP per hour. 6th should be 429."""
    from app.services import ratelimit as rl

    rl.register_limiter.reset()
    c, db = client
    db.users.find_one = AsyncMock(return_value=None)
    db.users.insert_one = AsyncMock(
        return_value=MagicMock(inserted_id=ObjectId())
    )
    payload_template = {
        "name": "Test User",
        "password": "secret-pass-1",
        "role": "reporter",
        "location": {"type": "Point", "coordinates": [76.7794, 30.7333]},
    }

    last_status = 200
    for i in range(8):
        resp = await c.post(
            "/api/auth/register",
            json={**payload_template, "email": f"user{i}@example.com"},
        )
        last_status = resp.status_code
        if last_status == 429:
            break
    assert last_status == 429
    rl.register_limiter.reset()


@pytest.mark.asyncio
async def test_rate_limiter_dependency_returns_429_message():
    """Direct unit on the dep helper to confirm the exception payload."""
    from fastapi import HTTPException

    from app.core.limits import _make_dep
    from app.services.ratelimit import RateLimiter

    rl = RateLimiter(max_per_window=1, window_seconds=60)
    dep = _make_dep(rl, "unit")

    class _FakeReq:
        headers = {}
        client = type("Client", (), {"host": "1.2.3.4"})()

    await dep(_FakeReq())  # first call OK
    with pytest.raises(HTTPException) as exc:
        await dep(_FakeReq())
    assert exc.value.status_code == 429
    assert "unit" in exc.value.detail


# ──────────────────────────────────────────────────────────────────────
# DB connect fallback β€” the most common HF Spaces deploy crash
# ──────────────────────────────────────────────────────────────────────


@pytest.mark.asyncio
async def test_connect_falls_back_to_default_db_name():
    """Atlas users routinely paste a connection string with no /dbname
    segment, which makes `get_default_database()` raise. Our connect()
    should fall back to the `neighbouraid` database silently."""
    from app.db import client as db_client

    # Build a fake Motor client that:
    #   - raises ConfigurationError on get_default_database (the bug shape)
    #   - returns a mock DB when subscripted by name (the fallback path)
    fake_db = MagicMock()
    fake_db.alerts.create_index = AsyncMock()
    fake_db.users.create_index = AsyncMock()

    fake_client = MagicMock()
    fake_client.get_default_database = MagicMock(
        side_effect=ConfigurationError("no default db")
    )
    fake_client.__getitem__ = MagicMock(return_value=fake_db)

    with patch.object(
        db_client, "AsyncIOMotorClient", return_value=fake_client
    ):
        # Cache the original to restore afterwards so we don't pollute
        # other tests (the global fixture swaps _db too).
        original = db_client._db
        try:
            await db_client.connect()
            # The fallback should have indexed `neighbouraid`
            fake_client.__getitem__.assert_called_with("neighbouraid")
            assert db_client._db is fake_db
        finally:
            db_client._db = original