File size: 5,542 Bytes
7c46845
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
import asyncio
import io
import sys
import types
import uuid
from pathlib import Path

import pytest
from fastapi import UploadFile
from app.exceptions import ValidationException
from pypdf import PdfWriter
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

from app.database import Base
from app.models import Document, User
from app.routes import documents


def _pdf_bytes() -> bytes:
    buffer = io.BytesIO()
    writer = PdfWriter()
    writer.add_blank_page(width=72, height=72)
    writer.write(buffer)
    return buffer.getvalue()


def _upload_file(name: str, content: bytes) -> UploadFile:
    return UploadFile(filename=name, file=io.BytesIO(content))


def _run(coro):
    return asyncio.run(coro)


@pytest.fixture(autouse=True)
def fake_magic(monkeypatch: pytest.MonkeyPatch) -> None:
    monkeypatch.setitem(
        sys.modules,
        "magic",
        types.SimpleNamespace(from_file=lambda *_args, **_kwargs: "application/pdf"),
    )


def test_validate_upload_accepts_valid_pdf() -> None:
    temp_path = None

    try:
        temp_path = _run(documents.validate_upload(_upload_file("report.pdf", _pdf_bytes())))
        assert Path(temp_path).exists()
        assert Path(temp_path).suffix == ".pdf"
    finally:
        if temp_path:
            Path(temp_path).unlink(missing_ok=True)


def test_validate_upload_rejects_invalid_file_type() -> None:
    with pytest.raises(ValidationException) as exc:
        _run(documents.validate_upload(_upload_file("notes.exe", b"not a document")))

    assert exc.value.status_code == 400
    assert "Only PDF" in exc.value.message


def test_validate_upload_rejects_oversized_file_and_removes_temp_file(
    monkeypatch: pytest.MonkeyPatch,
    tmp_path: Path,
) -> None:
    created_paths: list[Path] = []
    original_named_temporary_file = documents.tempfile.NamedTemporaryFile

    def tracking_tempfile(*args, **kwargs):
        kwargs.setdefault("dir", tmp_path)
        handle = original_named_temporary_file(*args, **kwargs)
        created_paths.append(Path(handle.name))
        return handle

    monkeypatch.setattr(documents.settings, "MAX_UPLOAD_SIZE_MB", 0)
    monkeypatch.setattr(documents.tempfile, "NamedTemporaryFile", tracking_tempfile)

    with pytest.raises(ValidationException) as exc:
        _run(documents.validate_upload(_upload_file("too-large.pdf", _pdf_bytes())))

    assert exc.value.status_code == 400
    assert exc.value.message == "File too large"
    assert created_paths
    assert all(not path.exists() for path in created_paths)


def test_validate_upload_rejects_corrupted_pdf() -> None:
    with pytest.raises(ValidationException) as exc:
        _run(documents.validate_upload(_upload_file("broken.pdf", b"%PDF-1.4\nnot really a pdf")))

    assert exc.value.status_code == 400
    assert exc.value.message == "Corrupted or invalid file"


@pytest.mark.parametrize(
    "first_hex,second_hex",
    [
        (
            "11111111111111111111111111111111",
            "22222222222222222222222222222222",
        )
    ],
)
def test_upload_document_handles_duplicate_original_names(
    monkeypatch: pytest.MonkeyPatch,
    tmp_path: Path,
    first_hex: str,
    second_hex: str,
) -> None:
    engine = create_engine("sqlite:///:memory:", connect_args={"check_same_thread": False})
    Base.metadata.create_all(bind=engine)
    session = sessionmaker(bind=engine)()

    user = User(
        id=str(uuid.uuid4()),
        username="upload-tester",
        email="upload@example.com",
        hashed_password="hashed",
    )
    session.add(user)
    session.commit()
    session.refresh(user)

    temp_files: list[Path] = []

    async def fake_validate_upload(_file: UploadFile) -> str:
        handle = documents.tempfile.NamedTemporaryFile(delete=False, suffix=".pdf")
        with handle:
            handle.write(_pdf_bytes())
        temp_files.append(Path(handle.name))
        return handle.name

    class FakeUUID:
        def __init__(self, value: str) -> None:
            self.hex = value

    uuid_values = iter([FakeUUID(first_hex), FakeUUID(second_hex)])

    monkeypatch.setattr(documents, "validate_upload", fake_validate_upload)
    monkeypatch.setattr(documents.settings, "UPLOAD_DIR", str(tmp_path / "uploads"))
    monkeypatch.setattr(documents.uuid, "uuid4", lambda: next(uuid_values))
    monkeypatch.setattr(
        documents.process_document,
        "delay",
        lambda **_kwargs: types.SimpleNamespace(id="queued-task"),
    )

    first = _run(
        documents.upload_document(
            file=_upload_file("same-name.pdf", b"first"),
            chunk_size=1000,
            chunk_overlap=200,
            user=user,
            db=session,
        )
    )
    second = _run(
        documents.upload_document(
            file=_upload_file("same-name.pdf", b"second"),
            chunk_size=1000,
            chunk_overlap=200,
            user=user,
            db=session,
        )
    )

    stored_docs = session.query(Document).order_by(Document.filename).all()

    assert [doc.original_name for doc in stored_docs] == ["same-name.pdf", "same-name.pdf"]
    assert len({doc.filename for doc in stored_docs}) == 2
    assert first.original_name == second.original_name == "same-name.pdf"
    assert first.task_id == second.task_id == "queued-task"
    assert (tmp_path / "uploads" / user.id / f"{first_hex}.pdf").exists()
    assert (tmp_path / "uploads" / user.id / f"{second_hex}.pdf").exists()
    assert all(not path.exists() for path in temp_files)