cuatrolabs-scm-ms / tests /test_documents_storage.py
MukeshKapoor25's picture
minio doc store
9b11567
from datetime import datetime
from uuid import UUID, uuid4
import pytest
from fastapi.testclient import TestClient
from fastapi import HTTPException, status
from app.main import app
from app.dependencies.auth import TokenUser, get_current_user
from app.documents.services import DocumentRBAC, DocumentRepository, DocumentService, MinioStorageAdapter
from app.documents.controllers.router import get_document_service
class InMemoryObject:
def __init__(self, **kwargs):
self.id = kwargs.get("id", uuid4())
self.tenant_id = kwargs.get("tenant_id")
self.domain = kwargs.get("domain")
self.entity_id = kwargs.get("entity_id")
self.category = kwargs.get("category")
self.bucket_name = kwargs.get("bucket_name", "cutra-scm-documents")
self.object_key = kwargs.get("object_key")
self.file_name = kwargs.get("file_name")
self.mime_type = kwargs.get("mime_type")
self.file_size = kwargs.get("file_size")
self.visibility = kwargs.get("visibility", "private")
self.created_by = kwargs.get("created_by")
self.created_at = kwargs.get("created_at", datetime.utcnow())
self.checksum_sha256 = kwargs.get("checksum_sha256")
self.deleted_at = kwargs.get("deleted_at")
self.legal_hold = kwargs.get("legal_hold", False)
class FakeDocumentRepository(DocumentRepository):
def __init__(self):
self.records = {}
async def create_upload_placeholder(self, **kwargs):
obj = InMemoryObject(**kwargs)
self.records[obj.id] = obj
return obj
async def finalize_upload(self, object_id: UUID, tenant_id: str, checksum: str):
obj = await self.get_active(object_id, tenant_id)
obj.checksum_sha256 = checksum
return obj
async def get_upload(self, object_id: UUID, tenant_id: str):
obj = self.records.get(object_id)
if not obj or obj.tenant_id != tenant_id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Upload not found")
if obj.deleted_at:
raise HTTPException(status_code=status.HTTP_410_GONE, detail="Object deleted")
return obj
async def get_active(self, object_id: UUID, tenant_id: str):
obj = self.records.get(object_id)
if not obj or obj.tenant_id != tenant_id or obj.deleted_at:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Object not found")
return obj
async def resolve_object(self, **kwargs):
object_id = kwargs.get("object_id")
tenant_id = kwargs.get("tenant_id")
if object_id:
return await self.get_active(object_id, tenant_id)
for obj in self.records.values():
if (
obj.tenant_id == tenant_id
and obj.domain == kwargs.get("domain")
and obj.entity_id == kwargs.get("entity_id")
and obj.category == kwargs.get("category")
and obj.file_name == kwargs.get("file_name")
and not obj.deleted_at
):
return obj
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Object not found")
async def find_by_checksum(self, tenant_id: str, checksum: str):
for obj in self.records.values():
if obj.tenant_id == tenant_id and obj.checksum_sha256 == checksum and not obj.deleted_at:
return obj
return None
async def soft_delete(self, object_id: UUID, tenant_id: str):
obj = await self.get_active(object_id, tenant_id)
obj.deleted_at = datetime.utcnow()
class FakeStorageAdapter(MinioStorageAdapter):
def __init__(self):
self.force_mismatch = False
async def create_upload_urls(self, bucket: str, object_key: str, size: int, mime: str):
return [f"http://minio/{bucket}/{object_key}"]
async def complete_multipart(self, bucket: str, object_key: str, parts=None):
return None
async def verify_checksum(self, bucket: str, object_key: str, checksum: str):
return False if self.force_mismatch else bool(checksum)
async def create_download_url(self, bucket: str, object_key: str):
return f"http://minio/{bucket}/{object_key}?download=true"
@pytest.fixture()
def client():
repo = FakeDocumentRepository()
storage = FakeStorageAdapter()
service = DocumentService(repo=repo, storage=storage, rbac=DocumentRBAC(), cache=None)
def override_service():
return service
def override_user(role: str = "buyer", tenant: str = "tenant-1"):
def _user():
return TokenUser(user_id="user-1", username="u", role=role, merchant_id=tenant)
return _user
app.dependency_overrides[get_document_service] = override_service
app.dependency_overrides[get_current_user] = override_user()
yield TestClient(app), repo, storage, override_user
app.dependency_overrides.clear()
def test_presign_and_upload_flow(client):
api, repo, _, _ = client
payload = {
"domain": "po",
"entity_id": "po-1",
"category": "invoice",
"file_name": "invoice.pdf",
"mime_type": "application/pdf",
"file_size": 1024,
"visibility": "private",
}
res = api.post("/scm/storage/upload/init", json=payload)
assert res.status_code == 200
data = res.json()
assert data["presigned_urls"]
upload_id = data["upload_id"]
complete = api.post(
"/scm/storage/upload/complete",
json={"upload_id": upload_id, "checksum_sha256": "abc123", "parts": None},
)
assert complete.status_code == 200
assert repo.records[UUID(upload_id)].checksum_sha256 == "abc123"
def test_rbac_blocks_unauthorized_role(client):
api, _, _, override_user = client
app.dependency_overrides[get_current_user] = override_user(role="supplier")
payload = {
"domain": "promotions",
"entity_id": "promo-1",
"category": "banner",
"file_name": "banner.png",
"mime_type": "image/png",
"file_size": 1000,
"visibility": "private",
}
res = api.post("/scm/storage/upload/init", json=payload)
assert res.status_code == status.HTTP_403_FORBIDDEN
def test_tenant_isolation_on_fetch(client):
api, repo, _, override_user = client
payload = {
"domain": "po",
"entity_id": "po-2",
"category": "invoice",
"file_name": "invoice2.pdf",
"mime_type": "application/pdf",
"file_size": 2048,
"visibility": "private",
}
res = api.post("/scm/storage/upload/init", json=payload)
object_id = res.json()["upload_id"]
app.dependency_overrides[get_current_user] = override_user(tenant="tenant-2")
meta = api.get(f"/scm/storage/{object_id}")
assert meta.status_code == status.HTTP_404_NOT_FOUND
def test_checksum_mismatch_raises_conflict(client):
api, _, storage, _ = client
payload = {
"domain": "po",
"entity_id": "po-3",
"category": "invoice",
"file_name": "invoice3.pdf",
"mime_type": "application/pdf",
"file_size": 2048,
"visibility": "private",
}
init_res = api.post("/scm/storage/upload/init", json=payload)
upload_id = init_res.json()["upload_id"]
storage.force_mismatch = True
complete = api.post(
"/scm/storage/upload/complete",
json={"upload_id": upload_id, "checksum_sha256": "bad", "parts": None},
)
assert complete.status_code == status.HTTP_409_CONFLICT
def test_soft_delete_hides_object(client):
api, _, _, override_user = client
# Use ops role which has delete permissions
app.dependency_overrides[get_current_user] = override_user(role="ops")
payload = {
"domain": "po",
"entity_id": "po-4",
"category": "invoice",
"file_name": "invoice4.pdf",
"mime_type": "application/pdf",
"file_size": 2048,
"visibility": "private",
}
init_res = api.post("/scm/storage/upload/init", json=payload)
upload_id = init_res.json()["upload_id"]
api.post(
"/scm/storage/upload/complete",
json={"upload_id": upload_id, "checksum_sha256": "abc", "parts": None},
)
delete_res = api.delete(f"/scm/storage/{upload_id}")
assert delete_res.status_code == 200
meta = api.get(f"/scm/storage/{upload_id}")
assert meta.status_code == status.HTTP_404_NOT_FOUND