| import pytest |
| from uuid import uuid4, UUID |
| from datetime import datetime, timedelta |
|
|
| from tensorus.metadata.schemas import ( |
| TensorDescriptor, SemanticMetadata, DataType, StorageFormat, |
| LineageMetadata, LineageSource, LineageSourceType, ParentTensorLink, TransformationStep, |
| ComputationalMetadata, |
| QualityMetadata, QualityStatistics, MissingValuesInfo, |
| RelationalMetadata, RelatedTensorLink, |
| UsageMetadata, UsageAccessRecord |
| ) |
| from tensorus.metadata.storage import InMemoryStorage |
|
|
| |
| @pytest.fixture |
| def mem_storage() -> InMemoryStorage: |
| storage = InMemoryStorage() |
| storage.clear_all_data() |
| return storage |
|
|
| |
| @pytest.fixture |
| def base_td(mem_storage: InMemoryStorage) -> TensorDescriptor: |
| td = TensorDescriptor( |
| tensor_id=uuid4(), |
| dimensionality=2, |
| shape=[10, 20], |
| data_type=DataType.FLOAT32, |
| owner="test_owner", |
| byte_size=800, |
| tags=["base_tag"], |
| metadata={"domain": "vision"} |
| ) |
| mem_storage.add_tensor_descriptor(td) |
| return td |
|
|
| |
|
|
| |
| def _add_sample_td(storage: InMemoryStorage, **kwargs) -> TensorDescriptor: |
| defaults = { |
| "tensor_id": uuid4(), "dimensionality": 1, "shape": [1], |
| "data_type": DataType.FLOAT32, "owner": "owner", "byte_size": 4 |
| } |
| defaults.update(kwargs) |
| td = TensorDescriptor(**defaults) |
| storage.add_tensor_descriptor(td) |
| return td |
|
|
| |
| @pytest.fixture |
| def sample_lm(base_td: TensorDescriptor) -> LineageMetadata: |
| return LineageMetadata( |
| tensor_id=base_td.tensor_id, |
| source=LineageSource(type=LineageSourceType.SYNTHETIC, identifier="test_script.py"), |
| version="1.0" |
| ) |
|
|
| def test_add_get_lineage_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_lm: LineageMetadata): |
| mem_storage.add_lineage_metadata(sample_lm) |
| retrieved = mem_storage.get_lineage_metadata(base_td.tensor_id) |
| assert retrieved is not None |
| assert retrieved.version == "1.0" |
| assert retrieved.source.identifier == "test_script.py" |
|
|
| def test_add_lineage_metadata_upsert(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_lm: LineageMetadata): |
| mem_storage.add_lineage_metadata(sample_lm) |
|
|
| updated_lm_data = sample_lm.model_dump() |
| updated_lm_data["version"] = "2.0" |
| updated_lm = LineageMetadata(**updated_lm_data) |
|
|
| mem_storage.add_lineage_metadata(updated_lm) |
|
|
| retrieved = mem_storage.get_lineage_metadata(base_td.tensor_id) |
| assert retrieved is not None |
| assert retrieved.version == "2.0" |
|
|
| def test_update_lineage_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_lm: LineageMetadata): |
| mem_storage.add_lineage_metadata(sample_lm) |
| updates = {"version": "1.1", "provenance": {"author": "updater"}} |
| updated = mem_storage.update_lineage_metadata(base_td.tensor_id, **updates) |
| assert updated is not None |
| assert updated.version == "1.1" |
| assert updated.provenance["author"] == "updater" |
| assert updated.source.identifier == "test_script.py" |
|
|
| def test_delete_lineage_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_lm: LineageMetadata): |
| mem_storage.add_lineage_metadata(sample_lm) |
| assert mem_storage.delete_lineage_metadata(base_td.tensor_id) is True |
| assert mem_storage.get_lineage_metadata(base_td.tensor_id) is None |
| assert mem_storage.delete_lineage_metadata(base_td.tensor_id) is False |
|
|
| |
| @pytest.fixture |
| def sample_cm(base_td: TensorDescriptor) -> ComputationalMetadata: |
| return ComputationalMetadata( |
| tensor_id=base_td.tensor_id, |
| algorithm="CNN", |
| computation_time_seconds=5.0, |
| parameters={"lr": 0.01} |
| ) |
|
|
| def test_add_get_computational_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_cm: ComputationalMetadata): |
| mem_storage.add_computational_metadata(sample_cm) |
| retrieved = mem_storage.get_computational_metadata(base_td.tensor_id) |
| assert retrieved is not None |
| assert retrieved.algorithm == "CNN" |
| assert retrieved.parameters["lr"] == 0.01 |
|
|
| def test_add_computational_metadata_upsert(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_cm: ComputationalMetadata): |
| mem_storage.add_computational_metadata(sample_cm) |
| new_cm = ComputationalMetadata(**{**sample_cm.model_dump(), "algorithm": "RNN"}) |
| mem_storage.add_computational_metadata(new_cm) |
| retrieved = mem_storage.get_computational_metadata(base_td.tensor_id) |
| assert retrieved is not None |
| assert retrieved.algorithm == "RNN" |
|
|
| def test_update_computational_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_cm: ComputationalMetadata): |
| mem_storage.add_computational_metadata(sample_cm) |
| updated = mem_storage.update_computational_metadata(base_td.tensor_id, algorithm="Updated", parameters={"dropout": 0.2}) |
| assert updated is not None |
| assert updated.algorithm == "Updated" |
| assert updated.parameters["dropout"] == 0.2 |
|
|
| def test_delete_computational_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_cm: ComputationalMetadata): |
| mem_storage.add_computational_metadata(sample_cm) |
| assert mem_storage.delete_computational_metadata(base_td.tensor_id) is True |
| assert mem_storage.get_computational_metadata(base_td.tensor_id) is None |
| assert mem_storage.delete_computational_metadata(base_td.tensor_id) is False |
|
|
| |
| @pytest.fixture |
| def sample_qm(base_td: TensorDescriptor) -> QualityMetadata: |
| return QualityMetadata( |
| tensor_id=base_td.tensor_id, |
| statistics=QualityStatistics(mean=0.5), |
| missing_values=MissingValuesInfo(count=0, percentage=0.0), |
| confidence_score=0.9 |
| ) |
|
|
| def test_add_get_quality_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_qm: QualityMetadata): |
| mem_storage.add_quality_metadata(sample_qm) |
| retrieved = mem_storage.get_quality_metadata(base_td.tensor_id) |
| assert retrieved is not None |
| assert retrieved.confidence_score == 0.9 |
|
|
| def test_add_quality_metadata_upsert(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_qm: QualityMetadata): |
| mem_storage.add_quality_metadata(sample_qm) |
| new_qm = QualityMetadata(**{**sample_qm.model_dump(), "noise_level": 0.1}) |
| mem_storage.add_quality_metadata(new_qm) |
| retrieved = mem_storage.get_quality_metadata(base_td.tensor_id) |
| assert retrieved is not None |
| assert retrieved.noise_level == 0.1 |
|
|
| def test_update_quality_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_qm: QualityMetadata): |
| mem_storage.add_quality_metadata(sample_qm) |
| updated = mem_storage.update_quality_metadata(base_td.tensor_id, noise_level=0.2) |
| assert updated is not None |
| assert updated.noise_level == 0.2 |
| assert updated.confidence_score == 0.9 |
|
|
| def test_delete_quality_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_qm: QualityMetadata): |
| mem_storage.add_quality_metadata(sample_qm) |
| assert mem_storage.delete_quality_metadata(base_td.tensor_id) is True |
| assert mem_storage.get_quality_metadata(base_td.tensor_id) is None |
| assert mem_storage.delete_quality_metadata(base_td.tensor_id) is False |
|
|
| |
| @pytest.fixture |
| def sample_rm(base_td: TensorDescriptor) -> RelationalMetadata: |
| return RelationalMetadata( |
| tensor_id=base_td.tensor_id, |
| related_tensors=[RelatedTensorLink(related_tensor_id=uuid4(), relationship_type="related")], |
| collections=["setA"], |
| dependencies=[uuid4()], |
| dataset_context="dataset1" |
| ) |
|
|
| def test_add_get_relational_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_rm: RelationalMetadata): |
| mem_storage.add_relational_metadata(sample_rm) |
| retrieved = mem_storage.get_relational_metadata(base_td.tensor_id) |
| assert retrieved is not None |
| assert retrieved.dataset_context == "dataset1" |
|
|
| def test_add_relational_metadata_upsert(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_rm: RelationalMetadata): |
| mem_storage.add_relational_metadata(sample_rm) |
| new_rm = RelationalMetadata(**{**sample_rm.model_dump(), "collections": ["setB"]}) |
| mem_storage.add_relational_metadata(new_rm) |
| retrieved = mem_storage.get_relational_metadata(base_td.tensor_id) |
| assert retrieved is not None |
| assert retrieved.collections == ["setB"] |
|
|
| def test_update_relational_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_rm: RelationalMetadata): |
| mem_storage.add_relational_metadata(sample_rm) |
| updated = mem_storage.update_relational_metadata(base_td.tensor_id, dataset_context="dataset2") |
| assert updated is not None |
| assert updated.dataset_context == "dataset2" |
|
|
| def test_delete_relational_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_rm: RelationalMetadata): |
| mem_storage.add_relational_metadata(sample_rm) |
| assert mem_storage.delete_relational_metadata(base_td.tensor_id) is True |
| assert mem_storage.get_relational_metadata(base_td.tensor_id) is None |
| assert mem_storage.delete_relational_metadata(base_td.tensor_id) is False |
|
|
| |
| @pytest.fixture |
| def sample_um(base_td: TensorDescriptor) -> UsageMetadata: |
| now = datetime.utcnow() |
| return UsageMetadata( |
| tensor_id=base_td.tensor_id, |
| access_history=[UsageAccessRecord(user_or_service="tester", operation_type="read", accessed_at=now)], |
| application_references=["app1"], |
| purpose={"training": "modelA"} |
| ) |
|
|
| def test_add_get_usage_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_um: UsageMetadata): |
| mem_storage.add_usage_metadata(sample_um) |
| retrieved = mem_storage.get_usage_metadata(base_td.tensor_id) |
| assert retrieved is not None |
| assert retrieved.usage_frequency == 1 |
| assert retrieved.application_references == ["app1"] |
|
|
| def test_add_usage_metadata_upsert(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_um: UsageMetadata): |
| mem_storage.add_usage_metadata(sample_um) |
| new_record = UsageAccessRecord(user_or_service="tester2", operation_type="write", accessed_at=datetime.utcnow()) |
| new_um = UsageMetadata(**{**sample_um.model_dump(), "access_history": [new_record]}) |
| mem_storage.add_usage_metadata(new_um) |
| retrieved = mem_storage.get_usage_metadata(base_td.tensor_id) |
| assert retrieved is not None |
| assert retrieved.usage_frequency == 1 |
| assert retrieved.access_history[0].user_or_service == "tester2" |
|
|
| def test_update_usage_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_um: UsageMetadata): |
| mem_storage.add_usage_metadata(sample_um) |
| new_access = UsageAccessRecord(user_or_service="user_x", operation_type="read", accessed_at=datetime.utcnow()) |
| updated = mem_storage.update_usage_metadata(base_td.tensor_id, access_history=sample_um.access_history + [new_access]) |
| assert updated is not None |
| assert updated.usage_frequency == 2 |
| assert updated.access_history[-1].user_or_service == "user_x" |
|
|
| def test_delete_usage_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_um: UsageMetadata): |
| mem_storage.add_usage_metadata(sample_um) |
| assert mem_storage.delete_usage_metadata(base_td.tensor_id) is True |
| assert mem_storage.get_usage_metadata(base_td.tensor_id) is None |
| assert mem_storage.delete_usage_metadata(base_td.tensor_id) is False |
|
|
|
|
| |
| def test_delete_tensor_descriptor_cascades_extended_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_lm: LineageMetadata): |
| mem_storage.add_lineage_metadata(sample_lm) |
| |
|
|
| assert mem_storage.get_lineage_metadata(base_td.tensor_id) is not None |
|
|
| mem_storage.delete_tensor_descriptor(base_td.tensor_id) |
|
|
| assert mem_storage.get_tensor_descriptor(base_td.tensor_id) is None |
| assert mem_storage.get_lineage_metadata(base_td.tensor_id) is None |
| |
|
|
|
|
| |
| def test_get_parent_tensor_ids(mem_storage: InMemoryStorage, base_td: TensorDescriptor): |
| parent1_id = uuid4() |
| parent2_id = uuid4() |
| _add_sample_td(mem_storage, tensor_id=parent1_id, owner="parent1_owner") |
| _add_sample_td(mem_storage, tensor_id=parent2_id, owner="parent2_owner") |
|
|
| lineage = LineageMetadata( |
| tensor_id=base_td.tensor_id, |
| parent_tensors=[ |
| ParentTensorLink(tensor_id=parent1_id, relationship="derived"), |
| ParentTensorLink(tensor_id=parent2_id, relationship="copied") |
| ] |
| ) |
| mem_storage.add_lineage_metadata(lineage) |
|
|
| parent_ids = mem_storage.get_parent_tensor_ids(base_td.tensor_id) |
| assert len(parent_ids) == 2 |
| assert parent1_id in parent_ids |
| assert parent2_id in parent_ids |
| assert mem_storage.get_parent_tensor_ids(uuid4()) == [] |
|
|
| def test_get_child_tensor_ids(mem_storage: InMemoryStorage, base_td: TensorDescriptor): |
| child1_td = _add_sample_td(mem_storage, owner="child1_owner") |
| child2_td = _add_sample_td(mem_storage, owner="child2_owner") |
| |
| _add_sample_td(mem_storage, owner="other_owner") |
|
|
| |
| lm_child1 = LineageMetadata(tensor_id=child1_td.tensor_id, parent_tensors=[ParentTensorLink(tensor_id=base_td.tensor_id)]) |
| mem_storage.add_lineage_metadata(lm_child1) |
|
|
| |
| lm_child2 = LineageMetadata(tensor_id=child2_td.tensor_id, parent_tensors=[ParentTensorLink(tensor_id=base_td.tensor_id)]) |
| mem_storage.add_lineage_metadata(lm_child2) |
|
|
| child_ids = mem_storage.get_child_tensor_ids(base_td.tensor_id) |
| assert len(child_ids) == 2 |
| assert child1_td.tensor_id in child_ids |
| assert child2_td.tensor_id in child_ids |
| assert mem_storage.get_child_tensor_ids(uuid4()) == [] |
|
|
|
|
| |
|
|
| @pytest.fixture |
| def search_setup(mem_storage: InMemoryStorage): |
| td1 = _add_sample_td(mem_storage, owner="user_alpha", tags=["raw", "image_data"], metadata={"project": "skyfall"}) |
| td2 = _add_sample_td(mem_storage, owner="user_beta", tags=["processed", "image_data"], metadata={"project": "pegasus"}) |
| td3 = _add_sample_td(mem_storage, owner="user_alpha", tags=["text", "document"], metadata={"project": "skyfall", "language": "EN"}) |
|
|
| sm1 = SemanticMetadata(tensor_id=td1.tensor_id, name="Raw Image", description="This is a raw image from sensor X.") |
| mem_storage.add_semantic_metadata(sm1) |
| sm2 = SemanticMetadata(tensor_id=td2.tensor_id, name="Processed Image", description="Processed image after cleanup.") |
| mem_storage.add_semantic_metadata(sm2) |
| sm3 = SemanticMetadata(tensor_id=td3.tensor_id, name="Document Alpha", description="Text document for project skyfall.") |
| mem_storage.add_semantic_metadata(sm3) |
|
|
| lm1 = LineageMetadata(tensor_id=td1.tensor_id, source=LineageSource(type=LineageSourceType.FILE, identifier="/data/raw/img1.tiff")) |
| mem_storage.add_lineage_metadata(lm1) |
|
|
| return td1, td2, td3 |
|
|
|
|
| def test_search_tensor_descriptors(mem_storage: InMemoryStorage, search_setup): |
| td1, td2, td3 = search_setup |
|
|
| |
| results = mem_storage.search_tensor_descriptors("user_alpha", ["owner"]) |
| assert len(results) == 2 |
| assert td1 in results and td3 in results |
|
|
| |
| results = mem_storage.search_tensor_descriptors("image_data", ["tags"]) |
| assert len(results) == 2 |
| assert td1 in results and td2 in results |
|
|
| |
| results = mem_storage.search_tensor_descriptors("skyfall", ["metadata"]) |
| assert len(results) == 2 |
| assert td1 in results and td3 in results |
|
|
| |
| results = mem_storage.search_tensor_descriptors("sensor X", ["semantic.description"]) |
| assert len(results) == 1 |
| assert td1 in results |
|
|
| |
| results = mem_storage.search_tensor_descriptors("/data/raw/img1.tiff", ["lineage.source.identifier"]) |
| assert len(results) == 1 |
| assert td1 in results |
|
|
| |
| results = mem_storage.search_tensor_descriptors("SKYFALL", ["metadata.project"]) |
| assert len(results) == 2 |
|
|
| |
| results = mem_storage.search_tensor_descriptors("non_existent_term", ["tags", "owner"]) |
| assert len(results) == 0 |
|
|
| |
| results = mem_storage.search_tensor_descriptors("alpha", ["owner", "semantic.name"]) |
| assert len(results) == 2 |
|
|
|
|
| @pytest.fixture |
| def agg_setup(mem_storage: InMemoryStorage): |
| td1 = _add_sample_td(mem_storage, owner="user_x", data_type=DataType.FLOAT32, byte_size=100, tags=["A", "B"]) |
| td2 = _add_sample_td(mem_storage, owner="user_y", data_type=DataType.INT64, byte_size=200, tags=["B", "C"]) |
| td3 = _add_sample_td(mem_storage, owner="user_x", data_type=DataType.FLOAT32, byte_size=150, tags=["A"]) |
|
|
| cm1 = ComputationalMetadata(tensor_id=td1.tensor_id, computation_time_seconds=10.0) |
| mem_storage.add_computational_metadata(cm1) |
| cm2 = ComputationalMetadata(tensor_id=td2.tensor_id, computation_time_seconds=20.0) |
| mem_storage.add_computational_metadata(cm2) |
| cm3 = ComputationalMetadata(tensor_id=td3.tensor_id, computation_time_seconds=12.0) |
| mem_storage.add_computational_metadata(cm3) |
|
|
| return td1, td2, td3 |
|
|
| def test_aggregate_tensor_descriptors_count(mem_storage: InMemoryStorage, agg_setup): |
| |
| result = mem_storage.aggregate_tensor_descriptors("owner", "count") |
| assert result == {"user_x": 2, "user_y": 1} |
|
|
| |
| result = mem_storage.aggregate_tensor_descriptors("data_type", "count") |
| assert result == {DataType.FLOAT32: 2, DataType.INT64: 1} |
|
|
| def test_aggregate_tensor_descriptors_sum_avg(mem_storage: InMemoryStorage, agg_setup): |
| |
| result_sum = mem_storage.aggregate_tensor_descriptors("owner", "sum", "byte_size") |
| assert result_sum == {"user_x": 250, "user_y": 200} |
|
|
| |
| result_avg = mem_storage.aggregate_tensor_descriptors("owner", "avg", "byte_size") |
| assert result_avg == {"user_x": 125.0, "user_y": 200.0} |
|
|
| |
| result_avg_time = mem_storage.aggregate_tensor_descriptors("owner", "avg", "computational.computation_time_seconds") |
| assert result_avg_time == {"user_x": 11.0, "user_y": 20.0} |
|
|
| def test_aggregate_min_max(mem_storage: InMemoryStorage, agg_setup): |
| result_min = mem_storage.aggregate_tensor_descriptors("owner", "min", "computational.computation_time_seconds") |
| assert result_min == {"user_x": 10.0, "user_y": 20.0} |
| result_max = mem_storage.aggregate_tensor_descriptors("owner", "max", "byte_size") |
| assert result_max == {"user_x": 150, "user_y": 200} |
|
|
| def test_aggregate_group_by_nested_missing(mem_storage: InMemoryStorage, agg_setup): |
| |
| _add_sample_td(mem_storage, owner="user_z", data_type=DataType.BOOLEAN, byte_size=1) |
| result = mem_storage.aggregate_tensor_descriptors("owner", "avg", "computational.computation_time_seconds") |
| assert result["user_z"] == 0 |
|
|
| result_count = mem_storage.aggregate_tensor_descriptors("computational.algorithm", "count") |
| |
| assert result_count.get("N/A", 0) >= 3 |
|
|
| def test_aggregate_invalid_function(mem_storage: InMemoryStorage, agg_setup): |
| with pytest.raises(NotImplementedError): |
| mem_storage.aggregate_tensor_descriptors("owner", "median", "byte_size") |
|
|
| |
| @pytest.fixture |
| def sample_td_for_semantic(mem_storage: InMemoryStorage): |
| td = _add_sample_td(mem_storage, owner="semantic_test_owner") |
| return td |
|
|
| @pytest.fixture |
| def sample_sm(sample_td_for_semantic: TensorDescriptor): |
| return SemanticMetadata( |
| name="test_semantic_data", |
| description="A piece of semantic info", |
| tensor_id=sample_td_for_semantic.tensor_id |
| ) |
|
|
| def test_add_and_get_semantic_metadata(mem_storage: InMemoryStorage, sample_td_for_semantic: TensorDescriptor, sample_sm: SemanticMetadata): |
| mem_storage.add_semantic_metadata(sample_sm) |
| retrieved_sms = mem_storage.get_semantic_metadata(sample_td_for_semantic.tensor_id) |
| assert len(retrieved_sms) == 1 |
| assert retrieved_sms[0].name == sample_sm.name |
|
|
| |
| |
| |
| |
| |
| def test_add_and_get_tensor_descriptor(mem_storage: InMemoryStorage, base_td: TensorDescriptor): |
| |
| retrieved_td = mem_storage.get_tensor_descriptor(base_td.tensor_id) |
| assert retrieved_td is not None |
| assert retrieved_td.tensor_id == base_td.tensor_id |
| assert retrieved_td.owner == "test_owner" |
|
|