NotebookLMClone / tests /test_notebook_management_api.py
github-actions[bot]
Sync from GitHub e2e802be5157aa05d1251459f529eb7eb4242ef2
dba1a8e
"""
Integration tests for notebook rename/delete management endpoints.
"""
from __future__ import annotations
import os
import pathlib
import sys
from unittest.mock import patch
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
ROOT = pathlib.Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT))
import app as app_module
from app import app
from data.db import Base, get_db
@pytest.fixture()
def db_engine(tmp_path):
db_file = tmp_path / "test_notebook_mgmt.db"
engine = create_engine(
f"sqlite:///{db_file}",
connect_args={"check_same_thread": False},
)
import data.models # noqa: F401
Base.metadata.create_all(bind=engine)
yield engine
Base.metadata.drop_all(bind=engine)
engine.dispose()
@pytest.fixture()
def db_session(db_engine):
Session = sessionmaker(autocommit=False, autoflush=False, bind=db_engine)
session = Session()
yield session
session.close()
@pytest.fixture()
def client(db_session, monkeypatch, tmp_path):
monkeypatch.setenv("AUTH_MODE", "dev")
monkeypatch.setenv("APP_SESSION_SECRET", "notebook-mgmt-test-secret")
monkeypatch.setenv("STORAGE_BASE_DIR", str(tmp_path / "storage"))
monkeypatch.setattr("app.UPLOADS_ROOT", tmp_path / "uploads")
def _override_get_db():
yield db_session
app.dependency_overrides[get_db] = _override_get_db
with TestClient(app, raise_server_exceptions=True) as c:
yield c
app.dependency_overrides.clear()
def test_rename_notebook_success(client):
create_resp = client.post("/notebooks", json={"title": "Original title"})
assert create_resp.status_code == 200
notebook_id = create_resp.json()["id"]
rename_resp = client.patch(f"/notebooks/{notebook_id}", json={"title": "Renamed title"})
assert rename_resp.status_code == 200
payload = rename_resp.json()
assert payload["id"] == notebook_id
assert payload["title"] == "Renamed title"
list_resp = client.get("/notebooks")
assert list_resp.status_code == 200
titles = [n["title"] for n in list_resp.json()]
assert "Renamed title" in titles
def test_rename_notebook_unknown_returns_404(client):
rename_resp = client.patch("/notebooks/999999", json={"title": "No notebook"})
assert rename_resp.status_code == 404
def test_delete_notebook_success(client):
create_resp = client.post("/notebooks", json={"title": "Delete me"})
assert create_resp.status_code == 200
notebook_id = create_resp.json()["id"]
delete_resp = client.delete(f"/notebooks/{notebook_id}")
assert delete_resp.status_code == 200
assert delete_resp.json()["status"] == "deleted"
list_resp = client.get("/notebooks")
assert list_resp.status_code == 200
ids = [n["id"] for n in list_resp.json()]
assert notebook_id not in ids
def test_delete_notebook_other_user_returns_404(client):
create_resp = client.post("/notebooks", json={"title": "User1 notebook"})
assert create_resp.status_code == 200
notebook_id = create_resp.json()["id"]
logout_resp = client.post("/auth/logout")
assert logout_resp.status_code == 200
login_user2 = client.post(
"/auth/dev-login",
json={"email": "user2@example.com", "display_name": "User Two"},
)
assert login_user2.status_code == 200
delete_resp = client.delete(f"/notebooks/{notebook_id}")
assert delete_resp.status_code == 404
def test_create_url_source_rejects_localhost_target(client):
create_resp = client.post("/notebooks", json={"title": "URL validation"})
assert create_resp.status_code == 200
notebook_id = create_resp.json()["id"]
source_resp = client.post(
f"/notebooks/{notebook_id}/sources",
json={
"type": "url",
"title": "Bad URL",
"url": "http://127.0.0.1:8000/health",
"status": "pending",
},
)
assert source_resp.status_code == 400
assert "restricted IP" in source_resp.json()["detail"]
def test_create_url_source_requires_url_field(client):
create_resp = client.post("/notebooks", json={"title": "Missing URL"})
assert create_resp.status_code == 200
notebook_id = create_resp.json()["id"]
source_resp = client.post(
f"/notebooks/{notebook_id}/sources",
json={
"type": "url",
"title": "No URL",
"status": "pending",
},
)
assert source_resp.status_code == 400
assert "URL is required" in source_resp.json()["detail"]
def test_create_url_source_accepts_public_url(client):
create_resp = client.post("/notebooks", json={"title": "Good URL"})
assert create_resp.status_code == 200
notebook_id = create_resp.json()["id"]
with patch("src.ingestion.extractors.socket.getaddrinfo") as mock_getaddrinfo, patch(
"app.ingest_source", return_value=3
) as mock_ingest:
mock_getaddrinfo.return_value = [
(
2,
1,
6,
"",
("93.184.216.34", 0),
)
]
source_resp = client.post(
f"/notebooks/{notebook_id}/sources",
json={
"type": "url",
"title": "Example URL",
"url": "https://example.com/article",
"status": "pending",
},
)
assert source_resp.status_code == 200
payload = source_resp.json()
assert payload["type"] == "url"
assert payload["url"] == "https://example.com/article"
assert payload["status"] == "ready"
assert payload["ingested_at"] is not None
mock_ingest.assert_called_once()
def test_upload_source_sanitizes_filename(client):
create_resp = client.post("/notebooks", json={"title": "Uploads"})
assert create_resp.status_code == 200
notebook_id = create_resp.json()["id"]
with patch("app.ingest_source", return_value=1):
upload_resp = client.post(
f"/notebooks/{notebook_id}/sources/upload",
data={"status": "pending"},
files={"file": ("../../../../evil.txt", b"hello world", "text/plain")},
)
assert upload_resp.status_code == 200
payload = upload_resp.json()
assert payload["original_name"] == "evil.txt"
assert payload["storage_path"] is not None
assert ".." not in payload["storage_path"]
assert f"notebook_{notebook_id}" in payload["storage_path"]
assert pathlib.Path(payload["storage_path"]).exists()
def test_delete_notebook_removes_notebook_storage_and_uploads(client):
create_resp = client.post("/notebooks", json={"title": "Delete storage"})
assert create_resp.status_code == 200
notebook_id = create_resp.json()["id"]
storage_root = pathlib.Path(os.environ["STORAGE_BASE_DIR"]) / "users" / "1" / "notebooks" / str(notebook_id)
storage_root.mkdir(parents=True, exist_ok=True)
(storage_root / "marker.txt").write_text("x", encoding="utf-8")
upload_root = pathlib.Path(app_module.UPLOADS_ROOT) / f"notebook_{notebook_id}"
upload_root.mkdir(parents=True, exist_ok=True)
(upload_root / "upload.txt").write_text("x", encoding="utf-8")
delete_resp = client.delete(f"/notebooks/{notebook_id}")
assert delete_resp.status_code == 200
assert not storage_root.exists()
assert not upload_root.exists()