import pytest from uuid import uuid4, UUID from datetime import datetime, timedelta from tensorus.metadata.schemas import ( TensorDescriptor, SemanticMetadata, DataType, StorageFormat, LineageMetadata, LineageSource, LineageSourceType, ParentTensorLink, TransformationStep, ComputationalMetadata, QualityMetadata, QualityStatistics, MissingValuesInfo, RelationalMetadata, RelatedTensorLink, UsageMetadata, UsageAccessRecord ) from tensorus.metadata.storage import InMemoryStorage # Fixture for a clean InMemoryStorage instance for each test @pytest.fixture def mem_storage() -> InMemoryStorage: storage = InMemoryStorage() storage.clear_all_data() return storage # Fixture for a sample TensorDescriptor, ensuring it's added to the test's storage instance @pytest.fixture def base_td(mem_storage: InMemoryStorage) -> TensorDescriptor: td = TensorDescriptor( tensor_id=uuid4(), dimensionality=2, shape=[10, 20], data_type=DataType.FLOAT32, owner="test_owner", byte_size=800, tags=["base_tag"], metadata={"domain": "vision"} ) mem_storage.add_tensor_descriptor(td) return td # --- Extended Metadata Storage Tests --- # Helper to create and add a sample TensorDescriptor def _add_sample_td(storage: InMemoryStorage, **kwargs) -> TensorDescriptor: defaults = { "tensor_id": uuid4(), "dimensionality": 1, "shape": [1], "data_type": DataType.FLOAT32, "owner": "owner", "byte_size": 4 } defaults.update(kwargs) td = TensorDescriptor(**defaults) storage.add_tensor_descriptor(td) return td # --- LineageMetadata Storage Tests --- @pytest.fixture def sample_lm(base_td: TensorDescriptor) -> LineageMetadata: return LineageMetadata( tensor_id=base_td.tensor_id, source=LineageSource(type=LineageSourceType.SYNTHETIC, identifier="test_script.py"), version="1.0" ) def test_add_get_lineage_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_lm: LineageMetadata): mem_storage.add_lineage_metadata(sample_lm) retrieved = mem_storage.get_lineage_metadata(base_td.tensor_id) assert retrieved is not None assert retrieved.version == "1.0" assert retrieved.source.identifier == "test_script.py" def test_add_lineage_metadata_upsert(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_lm: LineageMetadata): mem_storage.add_lineage_metadata(sample_lm) # First add updated_lm_data = sample_lm.model_dump() updated_lm_data["version"] = "2.0" updated_lm = LineageMetadata(**updated_lm_data) mem_storage.add_lineage_metadata(updated_lm) # This should replace due to upsert logic retrieved = mem_storage.get_lineage_metadata(base_td.tensor_id) assert retrieved is not None assert retrieved.version == "2.0" def test_update_lineage_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_lm: LineageMetadata): mem_storage.add_lineage_metadata(sample_lm) updates = {"version": "1.1", "provenance": {"author": "updater"}} updated = mem_storage.update_lineage_metadata(base_td.tensor_id, **updates) assert updated is not None assert updated.version == "1.1" assert updated.provenance["author"] == "updater" assert updated.source.identifier == "test_script.py" # Check unchanged field def test_delete_lineage_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_lm: LineageMetadata): mem_storage.add_lineage_metadata(sample_lm) assert mem_storage.delete_lineage_metadata(base_td.tensor_id) is True assert mem_storage.get_lineage_metadata(base_td.tensor_id) is None assert mem_storage.delete_lineage_metadata(base_td.tensor_id) is False # Already deleted # --- ComputationalMetadata Storage Tests --- @pytest.fixture def sample_cm(base_td: TensorDescriptor) -> ComputationalMetadata: return ComputationalMetadata( tensor_id=base_td.tensor_id, algorithm="CNN", computation_time_seconds=5.0, parameters={"lr": 0.01} ) def test_add_get_computational_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_cm: ComputationalMetadata): mem_storage.add_computational_metadata(sample_cm) retrieved = mem_storage.get_computational_metadata(base_td.tensor_id) assert retrieved is not None assert retrieved.algorithm == "CNN" assert retrieved.parameters["lr"] == 0.01 def test_add_computational_metadata_upsert(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_cm: ComputationalMetadata): mem_storage.add_computational_metadata(sample_cm) new_cm = ComputationalMetadata(**{**sample_cm.model_dump(), "algorithm": "RNN"}) mem_storage.add_computational_metadata(new_cm) retrieved = mem_storage.get_computational_metadata(base_td.tensor_id) assert retrieved is not None assert retrieved.algorithm == "RNN" def test_update_computational_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_cm: ComputationalMetadata): mem_storage.add_computational_metadata(sample_cm) updated = mem_storage.update_computational_metadata(base_td.tensor_id, algorithm="Updated", parameters={"dropout": 0.2}) assert updated is not None assert updated.algorithm == "Updated" assert updated.parameters["dropout"] == 0.2 def test_delete_computational_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_cm: ComputationalMetadata): mem_storage.add_computational_metadata(sample_cm) assert mem_storage.delete_computational_metadata(base_td.tensor_id) is True assert mem_storage.get_computational_metadata(base_td.tensor_id) is None assert mem_storage.delete_computational_metadata(base_td.tensor_id) is False # --- QualityMetadata Storage Tests --- @pytest.fixture def sample_qm(base_td: TensorDescriptor) -> QualityMetadata: return QualityMetadata( tensor_id=base_td.tensor_id, statistics=QualityStatistics(mean=0.5), missing_values=MissingValuesInfo(count=0, percentage=0.0), confidence_score=0.9 ) def test_add_get_quality_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_qm: QualityMetadata): mem_storage.add_quality_metadata(sample_qm) retrieved = mem_storage.get_quality_metadata(base_td.tensor_id) assert retrieved is not None assert retrieved.confidence_score == 0.9 def test_add_quality_metadata_upsert(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_qm: QualityMetadata): mem_storage.add_quality_metadata(sample_qm) new_qm = QualityMetadata(**{**sample_qm.model_dump(), "noise_level": 0.1}) mem_storage.add_quality_metadata(new_qm) retrieved = mem_storage.get_quality_metadata(base_td.tensor_id) assert retrieved is not None assert retrieved.noise_level == 0.1 def test_update_quality_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_qm: QualityMetadata): mem_storage.add_quality_metadata(sample_qm) updated = mem_storage.update_quality_metadata(base_td.tensor_id, noise_level=0.2) assert updated is not None assert updated.noise_level == 0.2 assert updated.confidence_score == 0.9 def test_delete_quality_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_qm: QualityMetadata): mem_storage.add_quality_metadata(sample_qm) assert mem_storage.delete_quality_metadata(base_td.tensor_id) is True assert mem_storage.get_quality_metadata(base_td.tensor_id) is None assert mem_storage.delete_quality_metadata(base_td.tensor_id) is False # --- RelationalMetadata Storage Tests --- @pytest.fixture def sample_rm(base_td: TensorDescriptor) -> RelationalMetadata: return RelationalMetadata( tensor_id=base_td.tensor_id, related_tensors=[RelatedTensorLink(related_tensor_id=uuid4(), relationship_type="related")], collections=["setA"], dependencies=[uuid4()], dataset_context="dataset1" ) def test_add_get_relational_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_rm: RelationalMetadata): mem_storage.add_relational_metadata(sample_rm) retrieved = mem_storage.get_relational_metadata(base_td.tensor_id) assert retrieved is not None assert retrieved.dataset_context == "dataset1" def test_add_relational_metadata_upsert(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_rm: RelationalMetadata): mem_storage.add_relational_metadata(sample_rm) new_rm = RelationalMetadata(**{**sample_rm.model_dump(), "collections": ["setB"]}) mem_storage.add_relational_metadata(new_rm) retrieved = mem_storage.get_relational_metadata(base_td.tensor_id) assert retrieved is not None assert retrieved.collections == ["setB"] def test_update_relational_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_rm: RelationalMetadata): mem_storage.add_relational_metadata(sample_rm) updated = mem_storage.update_relational_metadata(base_td.tensor_id, dataset_context="dataset2") assert updated is not None assert updated.dataset_context == "dataset2" def test_delete_relational_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_rm: RelationalMetadata): mem_storage.add_relational_metadata(sample_rm) assert mem_storage.delete_relational_metadata(base_td.tensor_id) is True assert mem_storage.get_relational_metadata(base_td.tensor_id) is None assert mem_storage.delete_relational_metadata(base_td.tensor_id) is False # --- UsageMetadata Storage Tests --- @pytest.fixture def sample_um(base_td: TensorDescriptor) -> UsageMetadata: now = datetime.utcnow() return UsageMetadata( tensor_id=base_td.tensor_id, access_history=[UsageAccessRecord(user_or_service="tester", operation_type="read", accessed_at=now)], application_references=["app1"], purpose={"training": "modelA"} ) def test_add_get_usage_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_um: UsageMetadata): mem_storage.add_usage_metadata(sample_um) retrieved = mem_storage.get_usage_metadata(base_td.tensor_id) assert retrieved is not None assert retrieved.usage_frequency == 1 assert retrieved.application_references == ["app1"] def test_add_usage_metadata_upsert(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_um: UsageMetadata): mem_storage.add_usage_metadata(sample_um) new_record = UsageAccessRecord(user_or_service="tester2", operation_type="write", accessed_at=datetime.utcnow()) new_um = UsageMetadata(**{**sample_um.model_dump(), "access_history": [new_record]}) mem_storage.add_usage_metadata(new_um) retrieved = mem_storage.get_usage_metadata(base_td.tensor_id) assert retrieved is not None assert retrieved.usage_frequency == 1 assert retrieved.access_history[0].user_or_service == "tester2" def test_update_usage_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_um: UsageMetadata): mem_storage.add_usage_metadata(sample_um) new_access = UsageAccessRecord(user_or_service="user_x", operation_type="read", accessed_at=datetime.utcnow()) updated = mem_storage.update_usage_metadata(base_td.tensor_id, access_history=sample_um.access_history + [new_access]) assert updated is not None assert updated.usage_frequency == 2 assert updated.access_history[-1].user_or_service == "user_x" def test_delete_usage_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_um: UsageMetadata): mem_storage.add_usage_metadata(sample_um) assert mem_storage.delete_usage_metadata(base_td.tensor_id) is True assert mem_storage.get_usage_metadata(base_td.tensor_id) is None assert mem_storage.delete_usage_metadata(base_td.tensor_id) is False # --- Test Cascade Delete --- def test_delete_tensor_descriptor_cascades_extended_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_lm: LineageMetadata): mem_storage.add_lineage_metadata(sample_lm) # Add other types of extended metadata here too if testing comprehensively assert mem_storage.get_lineage_metadata(base_td.tensor_id) is not None mem_storage.delete_tensor_descriptor(base_td.tensor_id) assert mem_storage.get_tensor_descriptor(base_td.tensor_id) is None assert mem_storage.get_lineage_metadata(base_td.tensor_id) is None # Add asserts for other extended metadata types being None # --- Versioning and Lineage Specific Storage Methods --- def test_get_parent_tensor_ids(mem_storage: InMemoryStorage, base_td: TensorDescriptor): parent1_id = uuid4() parent2_id = uuid4() _add_sample_td(mem_storage, tensor_id=parent1_id, owner="parent1_owner") _add_sample_td(mem_storage, tensor_id=parent2_id, owner="parent2_owner") lineage = LineageMetadata( tensor_id=base_td.tensor_id, parent_tensors=[ ParentTensorLink(tensor_id=parent1_id, relationship="derived"), ParentTensorLink(tensor_id=parent2_id, relationship="copied") ] ) mem_storage.add_lineage_metadata(lineage) parent_ids = mem_storage.get_parent_tensor_ids(base_td.tensor_id) assert len(parent_ids) == 2 assert parent1_id in parent_ids assert parent2_id in parent_ids assert mem_storage.get_parent_tensor_ids(uuid4()) == [] # Non-existent tensor def test_get_child_tensor_ids(mem_storage: InMemoryStorage, base_td: TensorDescriptor): child1_td = _add_sample_td(mem_storage, owner="child1_owner") child2_td = _add_sample_td(mem_storage, owner="child2_owner") # Non-child tensor _add_sample_td(mem_storage, owner="other_owner") # Child1 lists base_td as parent lm_child1 = LineageMetadata(tensor_id=child1_td.tensor_id, parent_tensors=[ParentTensorLink(tensor_id=base_td.tensor_id)]) mem_storage.add_lineage_metadata(lm_child1) # Child2 lists base_td as parent lm_child2 = LineageMetadata(tensor_id=child2_td.tensor_id, parent_tensors=[ParentTensorLink(tensor_id=base_td.tensor_id)]) mem_storage.add_lineage_metadata(lm_child2) child_ids = mem_storage.get_child_tensor_ids(base_td.tensor_id) assert len(child_ids) == 2 assert child1_td.tensor_id in child_ids assert child2_td.tensor_id in child_ids assert mem_storage.get_child_tensor_ids(uuid4()) == [] # Non-existent tensor # --- Search and Aggregation Storage Methods --- @pytest.fixture def search_setup(mem_storage: InMemoryStorage): td1 = _add_sample_td(mem_storage, owner="user_alpha", tags=["raw", "image_data"], metadata={"project": "skyfall"}) td2 = _add_sample_td(mem_storage, owner="user_beta", tags=["processed", "image_data"], metadata={"project": "pegasus"}) td3 = _add_sample_td(mem_storage, owner="user_alpha", tags=["text", "document"], metadata={"project": "skyfall", "language": "EN"}) sm1 = SemanticMetadata(tensor_id=td1.tensor_id, name="Raw Image", description="This is a raw image from sensor X.") mem_storage.add_semantic_metadata(sm1) sm2 = SemanticMetadata(tensor_id=td2.tensor_id, name="Processed Image", description="Processed image after cleanup.") mem_storage.add_semantic_metadata(sm2) sm3 = SemanticMetadata(tensor_id=td3.tensor_id, name="Document Alpha", description="Text document for project skyfall.") mem_storage.add_semantic_metadata(sm3) lm1 = LineageMetadata(tensor_id=td1.tensor_id, source=LineageSource(type=LineageSourceType.FILE, identifier="/data/raw/img1.tiff")) mem_storage.add_lineage_metadata(lm1) return td1, td2, td3 def test_search_tensor_descriptors(mem_storage: InMemoryStorage, search_setup): td1, td2, td3 = search_setup # Search by owner (direct TD field) results = mem_storage.search_tensor_descriptors("user_alpha", ["owner"]) assert len(results) == 2 assert td1 in results and td3 in results # Search by tag (list field in TD) results = mem_storage.search_tensor_descriptors("image_data", ["tags"]) assert len(results) == 2 assert td1 in results and td2 in results # Search by metadata (dict field in TD) results = mem_storage.search_tensor_descriptors("skyfall", ["metadata"]) # Searches values in the metadata dict assert len(results) == 2 assert td1 in results and td3 in results # Search by semantic description results = mem_storage.search_tensor_descriptors("sensor X", ["semantic.description"]) assert len(results) == 1 assert td1 in results # Search by lineage source identifier results = mem_storage.search_tensor_descriptors("/data/raw/img1.tiff", ["lineage.source.identifier"]) assert len(results) == 1 assert td1 in results # Case-insensitive search results = mem_storage.search_tensor_descriptors("SKYFALL", ["metadata.project"]) # Assuming metadata.project path works assert len(results) == 2 # No results results = mem_storage.search_tensor_descriptors("non_existent_term", ["tags", "owner"]) assert len(results) == 0 # Search multiple fields results = mem_storage.search_tensor_descriptors("alpha", ["owner", "semantic.name"]) assert len(results) == 2 # td1 (owner), td3 (owner, semantic.name) @pytest.fixture def agg_setup(mem_storage: InMemoryStorage): td1 = _add_sample_td(mem_storage, owner="user_x", data_type=DataType.FLOAT32, byte_size=100, tags=["A", "B"]) td2 = _add_sample_td(mem_storage, owner="user_y", data_type=DataType.INT64, byte_size=200, tags=["B", "C"]) td3 = _add_sample_td(mem_storage, owner="user_x", data_type=DataType.FLOAT32, byte_size=150, tags=["A"]) cm1 = ComputationalMetadata(tensor_id=td1.tensor_id, computation_time_seconds=10.0) mem_storage.add_computational_metadata(cm1) cm2 = ComputationalMetadata(tensor_id=td2.tensor_id, computation_time_seconds=20.0) mem_storage.add_computational_metadata(cm2) cm3 = ComputationalMetadata(tensor_id=td3.tensor_id, computation_time_seconds=12.0) mem_storage.add_computational_metadata(cm3) return td1, td2, td3 def test_aggregate_tensor_descriptors_count(mem_storage: InMemoryStorage, agg_setup): # Group by owner (direct TD field) result = mem_storage.aggregate_tensor_descriptors("owner", "count") assert result == {"user_x": 2, "user_y": 1} # Group by data_type (direct TD field) result = mem_storage.aggregate_tensor_descriptors("data_type", "count") assert result == {DataType.FLOAT32: 2, DataType.INT64: 1} def test_aggregate_tensor_descriptors_sum_avg(mem_storage: InMemoryStorage, agg_setup): # Sum of byte_size grouped by owner result_sum = mem_storage.aggregate_tensor_descriptors("owner", "sum", "byte_size") assert result_sum == {"user_x": 250, "user_y": 200} # 100 + 150 for user_x # Average of byte_size grouped by owner result_avg = mem_storage.aggregate_tensor_descriptors("owner", "avg", "byte_size") assert result_avg == {"user_x": 125.0, "user_y": 200.0} # Average of computation_time_seconds grouped by owner result_avg_time = mem_storage.aggregate_tensor_descriptors("owner", "avg", "computational.computation_time_seconds") assert result_avg_time == {"user_x": 11.0, "user_y": 20.0} # (10+12)/2 for user_x def test_aggregate_min_max(mem_storage: InMemoryStorage, agg_setup): result_min = mem_storage.aggregate_tensor_descriptors("owner", "min", "computational.computation_time_seconds") assert result_min == {"user_x": 10.0, "user_y": 20.0} result_max = mem_storage.aggregate_tensor_descriptors("owner", "max", "byte_size") assert result_max == {"user_x": 150, "user_y": 200} def test_aggregate_group_by_nested_missing(mem_storage: InMemoryStorage, agg_setup): # Add one TD that doesn't have computational metadata _add_sample_td(mem_storage, owner="user_z", data_type=DataType.BOOLEAN, byte_size=1) result = mem_storage.aggregate_tensor_descriptors("owner", "avg", "computational.computation_time_seconds") assert result["user_z"] == 0 # Or handle as None depending on desired behavior for missing agg_field result_count = mem_storage.aggregate_tensor_descriptors("computational.algorithm", "count") # All current agg_setup items have no algorithm set in their ComputationalMetadata assert result_count.get("N/A", 0) >= 3 # Expecting 3 from agg_setup + any others without algorithm def test_aggregate_invalid_function(mem_storage: InMemoryStorage, agg_setup): with pytest.raises(NotImplementedError): mem_storage.aggregate_tensor_descriptors("owner", "median", "byte_size") # Original SemanticMetadata storage tests from Phase 1 (abbreviated) @pytest.fixture def sample_td_for_semantic(mem_storage: InMemoryStorage): # Renamed to avoid conflict with base_td td = _add_sample_td(mem_storage, owner="semantic_test_owner") return td @pytest.fixture def sample_sm(sample_td_for_semantic: TensorDescriptor): return SemanticMetadata( name="test_semantic_data", description="A piece of semantic info", tensor_id=sample_td_for_semantic.tensor_id ) def test_add_and_get_semantic_metadata(mem_storage: InMemoryStorage, sample_td_for_semantic: TensorDescriptor, sample_sm: SemanticMetadata): mem_storage.add_semantic_metadata(sample_sm) retrieved_sms = mem_storage.get_semantic_metadata(sample_td_for_semantic.tensor_id) assert len(retrieved_sms) == 1 assert retrieved_sms[0].name == sample_sm.name # (Include other semantic metadata tests: add duplicate name, get empty, get by name, update, delete) # (Include original TensorDescriptor storage tests: add_td, get_td, update_td, list_td, delete_td) # These are omitted for brevity as the focus is on new Phase 2 functionality tests. # Ensure they are present and pass in the full test suite. # Example: test_add_and_get_tensor_descriptor (from earlier phase, using base_td now) def test_add_and_get_tensor_descriptor(mem_storage: InMemoryStorage, base_td: TensorDescriptor): # base_td is already added by its fixture retrieved_td = mem_storage.get_tensor_descriptor(base_td.tensor_id) assert retrieved_td is not None assert retrieved_td.tensor_id == base_td.tensor_id assert retrieved_td.owner == "test_owner"