Spaces:
Running
Running
| from datetime import datetime | |
| from uuid import UUID, uuid4 | |
| import pytest | |
| from fastapi.testclient import TestClient | |
| from fastapi import HTTPException, status | |
| from app.main import app | |
| from app.dependencies.auth import TokenUser, get_current_user | |
| from app.documents.services import DocumentRBAC, DocumentRepository, DocumentService, MinioStorageAdapter | |
| from app.documents.controllers.router import get_document_service | |
| class InMemoryObject: | |
| def __init__(self, **kwargs): | |
| self.id = kwargs.get("id", uuid4()) | |
| self.tenant_id = kwargs.get("tenant_id") | |
| self.domain = kwargs.get("domain") | |
| self.entity_id = kwargs.get("entity_id") | |
| self.category = kwargs.get("category") | |
| self.bucket_name = kwargs.get("bucket_name", "cutra-scm-documents") | |
| self.object_key = kwargs.get("object_key") | |
| self.file_name = kwargs.get("file_name") | |
| self.mime_type = kwargs.get("mime_type") | |
| self.file_size = kwargs.get("file_size") | |
| self.visibility = kwargs.get("visibility", "private") | |
| self.created_by = kwargs.get("created_by") | |
| self.created_at = kwargs.get("created_at", datetime.utcnow()) | |
| self.checksum_sha256 = kwargs.get("checksum_sha256") | |
| self.deleted_at = kwargs.get("deleted_at") | |
| self.legal_hold = kwargs.get("legal_hold", False) | |
| class FakeDocumentRepository(DocumentRepository): | |
| def __init__(self): | |
| self.records = {} | |
| async def create_upload_placeholder(self, **kwargs): | |
| obj = InMemoryObject(**kwargs) | |
| self.records[obj.id] = obj | |
| return obj | |
| async def finalize_upload(self, object_id: UUID, tenant_id: str, checksum: str): | |
| obj = await self.get_active(object_id, tenant_id) | |
| obj.checksum_sha256 = checksum | |
| return obj | |
| async def get_upload(self, object_id: UUID, tenant_id: str): | |
| obj = self.records.get(object_id) | |
| if not obj or obj.tenant_id != tenant_id: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Upload not found") | |
| if obj.deleted_at: | |
| raise HTTPException(status_code=status.HTTP_410_GONE, detail="Object deleted") | |
| return obj | |
| async def get_active(self, object_id: UUID, tenant_id: str): | |
| obj = self.records.get(object_id) | |
| if not obj or obj.tenant_id != tenant_id or obj.deleted_at: | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Object not found") | |
| return obj | |
| async def resolve_object(self, **kwargs): | |
| object_id = kwargs.get("object_id") | |
| tenant_id = kwargs.get("tenant_id") | |
| if object_id: | |
| return await self.get_active(object_id, tenant_id) | |
| for obj in self.records.values(): | |
| if ( | |
| obj.tenant_id == tenant_id | |
| and obj.domain == kwargs.get("domain") | |
| and obj.entity_id == kwargs.get("entity_id") | |
| and obj.category == kwargs.get("category") | |
| and obj.file_name == kwargs.get("file_name") | |
| and not obj.deleted_at | |
| ): | |
| return obj | |
| raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Object not found") | |
| async def find_by_checksum(self, tenant_id: str, checksum: str): | |
| for obj in self.records.values(): | |
| if obj.tenant_id == tenant_id and obj.checksum_sha256 == checksum and not obj.deleted_at: | |
| return obj | |
| return None | |
| async def soft_delete(self, object_id: UUID, tenant_id: str): | |
| obj = await self.get_active(object_id, tenant_id) | |
| obj.deleted_at = datetime.utcnow() | |
| class FakeStorageAdapter(MinioStorageAdapter): | |
| def __init__(self): | |
| self.force_mismatch = False | |
| async def create_upload_urls(self, bucket: str, object_key: str, size: int, mime: str): | |
| return [f"http://minio/{bucket}/{object_key}"] | |
| async def complete_multipart(self, bucket: str, object_key: str, parts=None): | |
| return None | |
| async def verify_checksum(self, bucket: str, object_key: str, checksum: str): | |
| return False if self.force_mismatch else bool(checksum) | |
| async def create_download_url(self, bucket: str, object_key: str): | |
| return f"http://minio/{bucket}/{object_key}?download=true" | |
| def client(): | |
| repo = FakeDocumentRepository() | |
| storage = FakeStorageAdapter() | |
| service = DocumentService(repo=repo, storage=storage, rbac=DocumentRBAC(), cache=None) | |
| def override_service(): | |
| return service | |
| def override_user(role: str = "buyer", tenant: str = "tenant-1"): | |
| def _user(): | |
| return TokenUser(user_id="user-1", username="u", role=role, merchant_id=tenant) | |
| return _user | |
| app.dependency_overrides[get_document_service] = override_service | |
| app.dependency_overrides[get_current_user] = override_user() | |
| yield TestClient(app), repo, storage, override_user | |
| app.dependency_overrides.clear() | |
| def test_presign_and_upload_flow(client): | |
| api, repo, _, _ = client | |
| payload = { | |
| "domain": "po", | |
| "entity_id": "po-1", | |
| "category": "invoice", | |
| "file_name": "invoice.pdf", | |
| "mime_type": "application/pdf", | |
| "file_size": 1024, | |
| "visibility": "private", | |
| } | |
| res = api.post("/scm/storage/upload/init", json=payload) | |
| assert res.status_code == 200 | |
| data = res.json() | |
| assert data["presigned_urls"] | |
| upload_id = data["upload_id"] | |
| complete = api.post( | |
| "/scm/storage/upload/complete", | |
| json={"upload_id": upload_id, "checksum_sha256": "abc123", "parts": None}, | |
| ) | |
| assert complete.status_code == 200 | |
| assert repo.records[UUID(upload_id)].checksum_sha256 == "abc123" | |
| def test_rbac_blocks_unauthorized_role(client): | |
| api, _, _, override_user = client | |
| app.dependency_overrides[get_current_user] = override_user(role="supplier") | |
| payload = { | |
| "domain": "promotions", | |
| "entity_id": "promo-1", | |
| "category": "banner", | |
| "file_name": "banner.png", | |
| "mime_type": "image/png", | |
| "file_size": 1000, | |
| "visibility": "private", | |
| } | |
| res = api.post("/scm/storage/upload/init", json=payload) | |
| assert res.status_code == status.HTTP_403_FORBIDDEN | |
| def test_tenant_isolation_on_fetch(client): | |
| api, repo, _, override_user = client | |
| payload = { | |
| "domain": "po", | |
| "entity_id": "po-2", | |
| "category": "invoice", | |
| "file_name": "invoice2.pdf", | |
| "mime_type": "application/pdf", | |
| "file_size": 2048, | |
| "visibility": "private", | |
| } | |
| res = api.post("/scm/storage/upload/init", json=payload) | |
| object_id = res.json()["upload_id"] | |
| app.dependency_overrides[get_current_user] = override_user(tenant="tenant-2") | |
| meta = api.get(f"/scm/storage/{object_id}") | |
| assert meta.status_code == status.HTTP_404_NOT_FOUND | |
| def test_checksum_mismatch_raises_conflict(client): | |
| api, _, storage, _ = client | |
| payload = { | |
| "domain": "po", | |
| "entity_id": "po-3", | |
| "category": "invoice", | |
| "file_name": "invoice3.pdf", | |
| "mime_type": "application/pdf", | |
| "file_size": 2048, | |
| "visibility": "private", | |
| } | |
| init_res = api.post("/scm/storage/upload/init", json=payload) | |
| upload_id = init_res.json()["upload_id"] | |
| storage.force_mismatch = True | |
| complete = api.post( | |
| "/scm/storage/upload/complete", | |
| json={"upload_id": upload_id, "checksum_sha256": "bad", "parts": None}, | |
| ) | |
| assert complete.status_code == status.HTTP_409_CONFLICT | |
| def test_soft_delete_hides_object(client): | |
| api, _, _, override_user = client | |
| # Use ops role which has delete permissions | |
| app.dependency_overrides[get_current_user] = override_user(role="ops") | |
| payload = { | |
| "domain": "po", | |
| "entity_id": "po-4", | |
| "category": "invoice", | |
| "file_name": "invoice4.pdf", | |
| "mime_type": "application/pdf", | |
| "file_size": 2048, | |
| "visibility": "private", | |
| } | |
| init_res = api.post("/scm/storage/upload/init", json=payload) | |
| upload_id = init_res.json()["upload_id"] | |
| api.post( | |
| "/scm/storage/upload/complete", | |
| json={"upload_id": upload_id, "checksum_sha256": "abc", "parts": None}, | |
| ) | |
| delete_res = api.delete(f"/scm/storage/{upload_id}") | |
| assert delete_res.status_code == 200 | |
| meta = api.get(f"/scm/storage/{upload_id}") | |
| assert meta.status_code == status.HTTP_404_NOT_FOUND | |