File size: 7,435 Bytes
aacd162
 
 
 
 
dba1a8e
aacd162
 
 
 
 
 
 
 
 
 
 
 
dba1a8e
aacd162
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dba1a8e
aacd162
 
dba1a8e
 
aacd162
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dba1a8e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
"""
Integration tests for notebook rename/delete management endpoints.
"""
from __future__ import annotations

import os
import pathlib
import sys
from unittest.mock import patch

import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

ROOT = pathlib.Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT))

import app as app_module
from app import app
from data.db import Base, get_db


@pytest.fixture()
def db_engine(tmp_path):
    db_file = tmp_path / "test_notebook_mgmt.db"
    engine = create_engine(
        f"sqlite:///{db_file}",
        connect_args={"check_same_thread": False},
    )
    import data.models  # noqa: F401

    Base.metadata.create_all(bind=engine)
    yield engine
    Base.metadata.drop_all(bind=engine)
    engine.dispose()


@pytest.fixture()
def db_session(db_engine):
    Session = sessionmaker(autocommit=False, autoflush=False, bind=db_engine)
    session = Session()
    yield session
    session.close()


@pytest.fixture()
def client(db_session, monkeypatch, tmp_path):
    monkeypatch.setenv("AUTH_MODE", "dev")
    monkeypatch.setenv("APP_SESSION_SECRET", "notebook-mgmt-test-secret")
    monkeypatch.setenv("STORAGE_BASE_DIR", str(tmp_path / "storage"))
    monkeypatch.setattr("app.UPLOADS_ROOT", tmp_path / "uploads")

    def _override_get_db():
        yield db_session

    app.dependency_overrides[get_db] = _override_get_db
    with TestClient(app, raise_server_exceptions=True) as c:
        yield c
    app.dependency_overrides.clear()


def test_rename_notebook_success(client):
    create_resp = client.post("/notebooks", json={"title": "Original title"})
    assert create_resp.status_code == 200
    notebook_id = create_resp.json()["id"]

    rename_resp = client.patch(f"/notebooks/{notebook_id}", json={"title": "Renamed title"})
    assert rename_resp.status_code == 200
    payload = rename_resp.json()
    assert payload["id"] == notebook_id
    assert payload["title"] == "Renamed title"

    list_resp = client.get("/notebooks")
    assert list_resp.status_code == 200
    titles = [n["title"] for n in list_resp.json()]
    assert "Renamed title" in titles


def test_rename_notebook_unknown_returns_404(client):
    rename_resp = client.patch("/notebooks/999999", json={"title": "No notebook"})
    assert rename_resp.status_code == 404


def test_delete_notebook_success(client):
    create_resp = client.post("/notebooks", json={"title": "Delete me"})
    assert create_resp.status_code == 200
    notebook_id = create_resp.json()["id"]

    delete_resp = client.delete(f"/notebooks/{notebook_id}")
    assert delete_resp.status_code == 200
    assert delete_resp.json()["status"] == "deleted"

    list_resp = client.get("/notebooks")
    assert list_resp.status_code == 200
    ids = [n["id"] for n in list_resp.json()]
    assert notebook_id not in ids


def test_delete_notebook_other_user_returns_404(client):
    create_resp = client.post("/notebooks", json={"title": "User1 notebook"})
    assert create_resp.status_code == 200
    notebook_id = create_resp.json()["id"]

    logout_resp = client.post("/auth/logout")
    assert logout_resp.status_code == 200
    login_user2 = client.post(
        "/auth/dev-login",
        json={"email": "user2@example.com", "display_name": "User Two"},
    )
    assert login_user2.status_code == 200

    delete_resp = client.delete(f"/notebooks/{notebook_id}")
    assert delete_resp.status_code == 404


def test_create_url_source_rejects_localhost_target(client):
    create_resp = client.post("/notebooks", json={"title": "URL validation"})
    assert create_resp.status_code == 200
    notebook_id = create_resp.json()["id"]

    source_resp = client.post(
        f"/notebooks/{notebook_id}/sources",
        json={
            "type": "url",
            "title": "Bad URL",
            "url": "http://127.0.0.1:8000/health",
            "status": "pending",
        },
    )
    assert source_resp.status_code == 400
    assert "restricted IP" in source_resp.json()["detail"]


def test_create_url_source_requires_url_field(client):
    create_resp = client.post("/notebooks", json={"title": "Missing URL"})
    assert create_resp.status_code == 200
    notebook_id = create_resp.json()["id"]

    source_resp = client.post(
        f"/notebooks/{notebook_id}/sources",
        json={
            "type": "url",
            "title": "No URL",
            "status": "pending",
        },
    )
    assert source_resp.status_code == 400
    assert "URL is required" in source_resp.json()["detail"]


def test_create_url_source_accepts_public_url(client):
    create_resp = client.post("/notebooks", json={"title": "Good URL"})
    assert create_resp.status_code == 200
    notebook_id = create_resp.json()["id"]

    with patch("src.ingestion.extractors.socket.getaddrinfo") as mock_getaddrinfo, patch(
        "app.ingest_source", return_value=3
    ) as mock_ingest:
        mock_getaddrinfo.return_value = [
            (
                2,
                1,
                6,
                "",
                ("93.184.216.34", 0),
            )
        ]
        source_resp = client.post(
            f"/notebooks/{notebook_id}/sources",
            json={
                "type": "url",
                "title": "Example URL",
                "url": "https://example.com/article",
                "status": "pending",
            },
        )

    assert source_resp.status_code == 200
    payload = source_resp.json()
    assert payload["type"] == "url"
    assert payload["url"] == "https://example.com/article"
    assert payload["status"] == "ready"
    assert payload["ingested_at"] is not None
    mock_ingest.assert_called_once()


def test_upload_source_sanitizes_filename(client):
    create_resp = client.post("/notebooks", json={"title": "Uploads"})
    assert create_resp.status_code == 200
    notebook_id = create_resp.json()["id"]

    with patch("app.ingest_source", return_value=1):
        upload_resp = client.post(
            f"/notebooks/{notebook_id}/sources/upload",
            data={"status": "pending"},
            files={"file": ("../../../../evil.txt", b"hello world", "text/plain")},
        )

    assert upload_resp.status_code == 200
    payload = upload_resp.json()
    assert payload["original_name"] == "evil.txt"
    assert payload["storage_path"] is not None
    assert ".." not in payload["storage_path"]
    assert f"notebook_{notebook_id}" in payload["storage_path"]
    assert pathlib.Path(payload["storage_path"]).exists()


def test_delete_notebook_removes_notebook_storage_and_uploads(client):
    create_resp = client.post("/notebooks", json={"title": "Delete storage"})
    assert create_resp.status_code == 200
    notebook_id = create_resp.json()["id"]

    storage_root = pathlib.Path(os.environ["STORAGE_BASE_DIR"]) / "users" / "1" / "notebooks" / str(notebook_id)
    storage_root.mkdir(parents=True, exist_ok=True)
    (storage_root / "marker.txt").write_text("x", encoding="utf-8")

    upload_root = pathlib.Path(app_module.UPLOADS_ROOT) / f"notebook_{notebook_id}"
    upload_root.mkdir(parents=True, exist_ok=True)
    (upload_root / "upload.txt").write_text("x", encoding="utf-8")

    delete_resp = client.delete(f"/notebooks/{notebook_id}")
    assert delete_resp.status_code == 200

    assert not storage_root.exists()
    assert not upload_root.exists()