| import asyncio |
| from io import BytesIO |
| from pathlib import Path |
| from unittest.mock import AsyncMock, MagicMock, patch |
|
|
| from fastapi import UploadFile |
|
|
| from src.gateway.routers import uploads |
|
|
|
|
| def test_upload_files_writes_thread_storage_and_skips_local_sandbox_sync(tmp_path): |
| thread_uploads_dir = tmp_path / "uploads" |
| thread_uploads_dir.mkdir(parents=True) |
|
|
| provider = MagicMock() |
| provider.acquire.return_value = "local" |
| sandbox = MagicMock() |
| provider.get.return_value = sandbox |
|
|
| with ( |
| patch.object(uploads, "get_uploads_dir", return_value=thread_uploads_dir), |
| patch.object(uploads, "get_sandbox_provider", return_value=provider), |
| ): |
|
|
| file = UploadFile(filename="notes.txt", file=BytesIO(b"hello uploads")) |
| result = asyncio.run(uploads.upload_files("thread-local", files=[file])) |
|
|
| assert result.success is True |
| assert len(result.files) == 1 |
| assert result.files[0]["filename"] == "notes.txt" |
| assert (thread_uploads_dir / "notes.txt").read_bytes() == b"hello uploads" |
|
|
| sandbox.update_file.assert_not_called() |
|
|
|
|
| def test_upload_files_syncs_non_local_sandbox_and_marks_markdown_file(tmp_path): |
| thread_uploads_dir = tmp_path / "uploads" |
| thread_uploads_dir.mkdir(parents=True) |
|
|
| provider = MagicMock() |
| provider.acquire.return_value = "aio-1" |
| sandbox = MagicMock() |
| provider.get.return_value = sandbox |
|
|
| async def fake_convert(file_path: Path) -> Path: |
| md_path = file_path.with_suffix(".md") |
| md_path.write_text("converted", encoding="utf-8") |
| return md_path |
|
|
| with ( |
| patch.object(uploads, "get_uploads_dir", return_value=thread_uploads_dir), |
| patch.object(uploads, "get_sandbox_provider", return_value=provider), |
| patch.object(uploads, "convert_file_to_markdown", AsyncMock(side_effect=fake_convert)), |
| ): |
|
|
| file = UploadFile(filename="report.pdf", file=BytesIO(b"pdf-bytes")) |
| result = asyncio.run(uploads.upload_files("thread-aio", files=[file])) |
|
|
| assert result.success is True |
| assert len(result.files) == 1 |
| file_info = result.files[0] |
| assert file_info["filename"] == "report.pdf" |
| assert file_info["markdown_file"] == "report.md" |
|
|
| assert (thread_uploads_dir / "report.pdf").read_bytes() == b"pdf-bytes" |
| assert (thread_uploads_dir / "report.md").read_text(encoding="utf-8") == "converted" |
|
|
| sandbox.update_file.assert_any_call("/mnt/user-data/uploads/report.pdf", b"pdf-bytes") |
| sandbox.update_file.assert_any_call("/mnt/user-data/uploads/report.md", b"converted") |
|
|
|
|
| def test_upload_files_rejects_dotdot_and_dot_filenames(tmp_path): |
| thread_uploads_dir = tmp_path / "uploads" |
| thread_uploads_dir.mkdir(parents=True) |
|
|
| provider = MagicMock() |
| provider.acquire.return_value = "local" |
| sandbox = MagicMock() |
| provider.get.return_value = sandbox |
|
|
| with ( |
| patch.object(uploads, "get_uploads_dir", return_value=thread_uploads_dir), |
| patch.object(uploads, "get_sandbox_provider", return_value=provider), |
| ): |
| |
| for bad_name in ["..", "."]: |
| file = UploadFile(filename=bad_name, file=BytesIO(b"data")) |
| result = asyncio.run(uploads.upload_files("thread-local", files=[file])) |
| assert result.success is True |
| assert result.files == [], f"Expected no files for unsafe filename {bad_name!r}" |
|
|
| |
| file = UploadFile(filename="../etc/passwd", file=BytesIO(b"data")) |
| result = asyncio.run(uploads.upload_files("thread-local", files=[file])) |
| assert result.success is True |
| assert len(result.files) == 1 |
| assert result.files[0]["filename"] == "passwd" |
|
|
| |
| assert [f.name for f in thread_uploads_dir.iterdir()] == ["passwd"] |
|
|