core / tests /test_storage.py
tensorus's picture
Upload 83 files
edfa748 verified
import pytest
from uuid import uuid4, UUID
from datetime import datetime, timedelta
from tensorus.metadata.schemas import (
TensorDescriptor, SemanticMetadata, DataType, StorageFormat,
LineageMetadata, LineageSource, LineageSourceType, ParentTensorLink, TransformationStep,
ComputationalMetadata,
QualityMetadata, QualityStatistics, MissingValuesInfo,
RelationalMetadata, RelatedTensorLink,
UsageMetadata, UsageAccessRecord
)
from tensorus.metadata.storage import InMemoryStorage
# Fixture for a clean InMemoryStorage instance for each test
@pytest.fixture
def mem_storage() -> InMemoryStorage:
storage = InMemoryStorage()
storage.clear_all_data()
return storage
# Fixture for a sample TensorDescriptor, ensuring it's added to the test's storage instance
@pytest.fixture
def base_td(mem_storage: InMemoryStorage) -> TensorDescriptor:
td = TensorDescriptor(
tensor_id=uuid4(),
dimensionality=2,
shape=[10, 20],
data_type=DataType.FLOAT32,
owner="test_owner",
byte_size=800,
tags=["base_tag"],
metadata={"domain": "vision"}
)
mem_storage.add_tensor_descriptor(td)
return td
# --- Extended Metadata Storage Tests ---
# Helper to create and add a sample TensorDescriptor
def _add_sample_td(storage: InMemoryStorage, **kwargs) -> TensorDescriptor:
defaults = {
"tensor_id": uuid4(), "dimensionality": 1, "shape": [1],
"data_type": DataType.FLOAT32, "owner": "owner", "byte_size": 4
}
defaults.update(kwargs)
td = TensorDescriptor(**defaults)
storage.add_tensor_descriptor(td)
return td
# --- LineageMetadata Storage Tests ---
@pytest.fixture
def sample_lm(base_td: TensorDescriptor) -> LineageMetadata:
return LineageMetadata(
tensor_id=base_td.tensor_id,
source=LineageSource(type=LineageSourceType.SYNTHETIC, identifier="test_script.py"),
version="1.0"
)
def test_add_get_lineage_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_lm: LineageMetadata):
mem_storage.add_lineage_metadata(sample_lm)
retrieved = mem_storage.get_lineage_metadata(base_td.tensor_id)
assert retrieved is not None
assert retrieved.version == "1.0"
assert retrieved.source.identifier == "test_script.py"
def test_add_lineage_metadata_upsert(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_lm: LineageMetadata):
mem_storage.add_lineage_metadata(sample_lm) # First add
updated_lm_data = sample_lm.model_dump()
updated_lm_data["version"] = "2.0"
updated_lm = LineageMetadata(**updated_lm_data)
mem_storage.add_lineage_metadata(updated_lm) # This should replace due to upsert logic
retrieved = mem_storage.get_lineage_metadata(base_td.tensor_id)
assert retrieved is not None
assert retrieved.version == "2.0"
def test_update_lineage_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_lm: LineageMetadata):
mem_storage.add_lineage_metadata(sample_lm)
updates = {"version": "1.1", "provenance": {"author": "updater"}}
updated = mem_storage.update_lineage_metadata(base_td.tensor_id, **updates)
assert updated is not None
assert updated.version == "1.1"
assert updated.provenance["author"] == "updater"
assert updated.source.identifier == "test_script.py" # Check unchanged field
def test_delete_lineage_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_lm: LineageMetadata):
mem_storage.add_lineage_metadata(sample_lm)
assert mem_storage.delete_lineage_metadata(base_td.tensor_id) is True
assert mem_storage.get_lineage_metadata(base_td.tensor_id) is None
assert mem_storage.delete_lineage_metadata(base_td.tensor_id) is False # Already deleted
# --- ComputationalMetadata Storage Tests ---
@pytest.fixture
def sample_cm(base_td: TensorDescriptor) -> ComputationalMetadata:
return ComputationalMetadata(
tensor_id=base_td.tensor_id,
algorithm="CNN",
computation_time_seconds=5.0,
parameters={"lr": 0.01}
)
def test_add_get_computational_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_cm: ComputationalMetadata):
mem_storage.add_computational_metadata(sample_cm)
retrieved = mem_storage.get_computational_metadata(base_td.tensor_id)
assert retrieved is not None
assert retrieved.algorithm == "CNN"
assert retrieved.parameters["lr"] == 0.01
def test_add_computational_metadata_upsert(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_cm: ComputationalMetadata):
mem_storage.add_computational_metadata(sample_cm)
new_cm = ComputationalMetadata(**{**sample_cm.model_dump(), "algorithm": "RNN"})
mem_storage.add_computational_metadata(new_cm)
retrieved = mem_storage.get_computational_metadata(base_td.tensor_id)
assert retrieved is not None
assert retrieved.algorithm == "RNN"
def test_update_computational_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_cm: ComputationalMetadata):
mem_storage.add_computational_metadata(sample_cm)
updated = mem_storage.update_computational_metadata(base_td.tensor_id, algorithm="Updated", parameters={"dropout": 0.2})
assert updated is not None
assert updated.algorithm == "Updated"
assert updated.parameters["dropout"] == 0.2
def test_delete_computational_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_cm: ComputationalMetadata):
mem_storage.add_computational_metadata(sample_cm)
assert mem_storage.delete_computational_metadata(base_td.tensor_id) is True
assert mem_storage.get_computational_metadata(base_td.tensor_id) is None
assert mem_storage.delete_computational_metadata(base_td.tensor_id) is False
# --- QualityMetadata Storage Tests ---
@pytest.fixture
def sample_qm(base_td: TensorDescriptor) -> QualityMetadata:
return QualityMetadata(
tensor_id=base_td.tensor_id,
statistics=QualityStatistics(mean=0.5),
missing_values=MissingValuesInfo(count=0, percentage=0.0),
confidence_score=0.9
)
def test_add_get_quality_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_qm: QualityMetadata):
mem_storage.add_quality_metadata(sample_qm)
retrieved = mem_storage.get_quality_metadata(base_td.tensor_id)
assert retrieved is not None
assert retrieved.confidence_score == 0.9
def test_add_quality_metadata_upsert(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_qm: QualityMetadata):
mem_storage.add_quality_metadata(sample_qm)
new_qm = QualityMetadata(**{**sample_qm.model_dump(), "noise_level": 0.1})
mem_storage.add_quality_metadata(new_qm)
retrieved = mem_storage.get_quality_metadata(base_td.tensor_id)
assert retrieved is not None
assert retrieved.noise_level == 0.1
def test_update_quality_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_qm: QualityMetadata):
mem_storage.add_quality_metadata(sample_qm)
updated = mem_storage.update_quality_metadata(base_td.tensor_id, noise_level=0.2)
assert updated is not None
assert updated.noise_level == 0.2
assert updated.confidence_score == 0.9
def test_delete_quality_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_qm: QualityMetadata):
mem_storage.add_quality_metadata(sample_qm)
assert mem_storage.delete_quality_metadata(base_td.tensor_id) is True
assert mem_storage.get_quality_metadata(base_td.tensor_id) is None
assert mem_storage.delete_quality_metadata(base_td.tensor_id) is False
# --- RelationalMetadata Storage Tests ---
@pytest.fixture
def sample_rm(base_td: TensorDescriptor) -> RelationalMetadata:
return RelationalMetadata(
tensor_id=base_td.tensor_id,
related_tensors=[RelatedTensorLink(related_tensor_id=uuid4(), relationship_type="related")],
collections=["setA"],
dependencies=[uuid4()],
dataset_context="dataset1"
)
def test_add_get_relational_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_rm: RelationalMetadata):
mem_storage.add_relational_metadata(sample_rm)
retrieved = mem_storage.get_relational_metadata(base_td.tensor_id)
assert retrieved is not None
assert retrieved.dataset_context == "dataset1"
def test_add_relational_metadata_upsert(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_rm: RelationalMetadata):
mem_storage.add_relational_metadata(sample_rm)
new_rm = RelationalMetadata(**{**sample_rm.model_dump(), "collections": ["setB"]})
mem_storage.add_relational_metadata(new_rm)
retrieved = mem_storage.get_relational_metadata(base_td.tensor_id)
assert retrieved is not None
assert retrieved.collections == ["setB"]
def test_update_relational_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_rm: RelationalMetadata):
mem_storage.add_relational_metadata(sample_rm)
updated = mem_storage.update_relational_metadata(base_td.tensor_id, dataset_context="dataset2")
assert updated is not None
assert updated.dataset_context == "dataset2"
def test_delete_relational_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_rm: RelationalMetadata):
mem_storage.add_relational_metadata(sample_rm)
assert mem_storage.delete_relational_metadata(base_td.tensor_id) is True
assert mem_storage.get_relational_metadata(base_td.tensor_id) is None
assert mem_storage.delete_relational_metadata(base_td.tensor_id) is False
# --- UsageMetadata Storage Tests ---
@pytest.fixture
def sample_um(base_td: TensorDescriptor) -> UsageMetadata:
now = datetime.utcnow()
return UsageMetadata(
tensor_id=base_td.tensor_id,
access_history=[UsageAccessRecord(user_or_service="tester", operation_type="read", accessed_at=now)],
application_references=["app1"],
purpose={"training": "modelA"}
)
def test_add_get_usage_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_um: UsageMetadata):
mem_storage.add_usage_metadata(sample_um)
retrieved = mem_storage.get_usage_metadata(base_td.tensor_id)
assert retrieved is not None
assert retrieved.usage_frequency == 1
assert retrieved.application_references == ["app1"]
def test_add_usage_metadata_upsert(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_um: UsageMetadata):
mem_storage.add_usage_metadata(sample_um)
new_record = UsageAccessRecord(user_or_service="tester2", operation_type="write", accessed_at=datetime.utcnow())
new_um = UsageMetadata(**{**sample_um.model_dump(), "access_history": [new_record]})
mem_storage.add_usage_metadata(new_um)
retrieved = mem_storage.get_usage_metadata(base_td.tensor_id)
assert retrieved is not None
assert retrieved.usage_frequency == 1
assert retrieved.access_history[0].user_or_service == "tester2"
def test_update_usage_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_um: UsageMetadata):
mem_storage.add_usage_metadata(sample_um)
new_access = UsageAccessRecord(user_or_service="user_x", operation_type="read", accessed_at=datetime.utcnow())
updated = mem_storage.update_usage_metadata(base_td.tensor_id, access_history=sample_um.access_history + [new_access])
assert updated is not None
assert updated.usage_frequency == 2
assert updated.access_history[-1].user_or_service == "user_x"
def test_delete_usage_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_um: UsageMetadata):
mem_storage.add_usage_metadata(sample_um)
assert mem_storage.delete_usage_metadata(base_td.tensor_id) is True
assert mem_storage.get_usage_metadata(base_td.tensor_id) is None
assert mem_storage.delete_usage_metadata(base_td.tensor_id) is False
# --- Test Cascade Delete ---
def test_delete_tensor_descriptor_cascades_extended_metadata(mem_storage: InMemoryStorage, base_td: TensorDescriptor, sample_lm: LineageMetadata):
mem_storage.add_lineage_metadata(sample_lm)
# Add other types of extended metadata here too if testing comprehensively
assert mem_storage.get_lineage_metadata(base_td.tensor_id) is not None
mem_storage.delete_tensor_descriptor(base_td.tensor_id)
assert mem_storage.get_tensor_descriptor(base_td.tensor_id) is None
assert mem_storage.get_lineage_metadata(base_td.tensor_id) is None
# Add asserts for other extended metadata types being None
# --- Versioning and Lineage Specific Storage Methods ---
def test_get_parent_tensor_ids(mem_storage: InMemoryStorage, base_td: TensorDescriptor):
parent1_id = uuid4()
parent2_id = uuid4()
_add_sample_td(mem_storage, tensor_id=parent1_id, owner="parent1_owner")
_add_sample_td(mem_storage, tensor_id=parent2_id, owner="parent2_owner")
lineage = LineageMetadata(
tensor_id=base_td.tensor_id,
parent_tensors=[
ParentTensorLink(tensor_id=parent1_id, relationship="derived"),
ParentTensorLink(tensor_id=parent2_id, relationship="copied")
]
)
mem_storage.add_lineage_metadata(lineage)
parent_ids = mem_storage.get_parent_tensor_ids(base_td.tensor_id)
assert len(parent_ids) == 2
assert parent1_id in parent_ids
assert parent2_id in parent_ids
assert mem_storage.get_parent_tensor_ids(uuid4()) == [] # Non-existent tensor
def test_get_child_tensor_ids(mem_storage: InMemoryStorage, base_td: TensorDescriptor):
child1_td = _add_sample_td(mem_storage, owner="child1_owner")
child2_td = _add_sample_td(mem_storage, owner="child2_owner")
# Non-child tensor
_add_sample_td(mem_storage, owner="other_owner")
# Child1 lists base_td as parent
lm_child1 = LineageMetadata(tensor_id=child1_td.tensor_id, parent_tensors=[ParentTensorLink(tensor_id=base_td.tensor_id)])
mem_storage.add_lineage_metadata(lm_child1)
# Child2 lists base_td as parent
lm_child2 = LineageMetadata(tensor_id=child2_td.tensor_id, parent_tensors=[ParentTensorLink(tensor_id=base_td.tensor_id)])
mem_storage.add_lineage_metadata(lm_child2)
child_ids = mem_storage.get_child_tensor_ids(base_td.tensor_id)
assert len(child_ids) == 2
assert child1_td.tensor_id in child_ids
assert child2_td.tensor_id in child_ids
assert mem_storage.get_child_tensor_ids(uuid4()) == [] # Non-existent tensor
# --- Search and Aggregation Storage Methods ---
@pytest.fixture
def search_setup(mem_storage: InMemoryStorage):
td1 = _add_sample_td(mem_storage, owner="user_alpha", tags=["raw", "image_data"], metadata={"project": "skyfall"})
td2 = _add_sample_td(mem_storage, owner="user_beta", tags=["processed", "image_data"], metadata={"project": "pegasus"})
td3 = _add_sample_td(mem_storage, owner="user_alpha", tags=["text", "document"], metadata={"project": "skyfall", "language": "EN"})
sm1 = SemanticMetadata(tensor_id=td1.tensor_id, name="Raw Image", description="This is a raw image from sensor X.")
mem_storage.add_semantic_metadata(sm1)
sm2 = SemanticMetadata(tensor_id=td2.tensor_id, name="Processed Image", description="Processed image after cleanup.")
mem_storage.add_semantic_metadata(sm2)
sm3 = SemanticMetadata(tensor_id=td3.tensor_id, name="Document Alpha", description="Text document for project skyfall.")
mem_storage.add_semantic_metadata(sm3)
lm1 = LineageMetadata(tensor_id=td1.tensor_id, source=LineageSource(type=LineageSourceType.FILE, identifier="/data/raw/img1.tiff"))
mem_storage.add_lineage_metadata(lm1)
return td1, td2, td3
def test_search_tensor_descriptors(mem_storage: InMemoryStorage, search_setup):
td1, td2, td3 = search_setup
# Search by owner (direct TD field)
results = mem_storage.search_tensor_descriptors("user_alpha", ["owner"])
assert len(results) == 2
assert td1 in results and td3 in results
# Search by tag (list field in TD)
results = mem_storage.search_tensor_descriptors("image_data", ["tags"])
assert len(results) == 2
assert td1 in results and td2 in results
# Search by metadata (dict field in TD)
results = mem_storage.search_tensor_descriptors("skyfall", ["metadata"]) # Searches values in the metadata dict
assert len(results) == 2
assert td1 in results and td3 in results
# Search by semantic description
results = mem_storage.search_tensor_descriptors("sensor X", ["semantic.description"])
assert len(results) == 1
assert td1 in results
# Search by lineage source identifier
results = mem_storage.search_tensor_descriptors("/data/raw/img1.tiff", ["lineage.source.identifier"])
assert len(results) == 1
assert td1 in results
# Case-insensitive search
results = mem_storage.search_tensor_descriptors("SKYFALL", ["metadata.project"]) # Assuming metadata.project path works
assert len(results) == 2
# No results
results = mem_storage.search_tensor_descriptors("non_existent_term", ["tags", "owner"])
assert len(results) == 0
# Search multiple fields
results = mem_storage.search_tensor_descriptors("alpha", ["owner", "semantic.name"])
assert len(results) == 2 # td1 (owner), td3 (owner, semantic.name)
@pytest.fixture
def agg_setup(mem_storage: InMemoryStorage):
td1 = _add_sample_td(mem_storage, owner="user_x", data_type=DataType.FLOAT32, byte_size=100, tags=["A", "B"])
td2 = _add_sample_td(mem_storage, owner="user_y", data_type=DataType.INT64, byte_size=200, tags=["B", "C"])
td3 = _add_sample_td(mem_storage, owner="user_x", data_type=DataType.FLOAT32, byte_size=150, tags=["A"])
cm1 = ComputationalMetadata(tensor_id=td1.tensor_id, computation_time_seconds=10.0)
mem_storage.add_computational_metadata(cm1)
cm2 = ComputationalMetadata(tensor_id=td2.tensor_id, computation_time_seconds=20.0)
mem_storage.add_computational_metadata(cm2)
cm3 = ComputationalMetadata(tensor_id=td3.tensor_id, computation_time_seconds=12.0)
mem_storage.add_computational_metadata(cm3)
return td1, td2, td3
def test_aggregate_tensor_descriptors_count(mem_storage: InMemoryStorage, agg_setup):
# Group by owner (direct TD field)
result = mem_storage.aggregate_tensor_descriptors("owner", "count")
assert result == {"user_x": 2, "user_y": 1}
# Group by data_type (direct TD field)
result = mem_storage.aggregate_tensor_descriptors("data_type", "count")
assert result == {DataType.FLOAT32: 2, DataType.INT64: 1}
def test_aggregate_tensor_descriptors_sum_avg(mem_storage: InMemoryStorage, agg_setup):
# Sum of byte_size grouped by owner
result_sum = mem_storage.aggregate_tensor_descriptors("owner", "sum", "byte_size")
assert result_sum == {"user_x": 250, "user_y": 200} # 100 + 150 for user_x
# Average of byte_size grouped by owner
result_avg = mem_storage.aggregate_tensor_descriptors("owner", "avg", "byte_size")
assert result_avg == {"user_x": 125.0, "user_y": 200.0}
# Average of computation_time_seconds grouped by owner
result_avg_time = mem_storage.aggregate_tensor_descriptors("owner", "avg", "computational.computation_time_seconds")
assert result_avg_time == {"user_x": 11.0, "user_y": 20.0} # (10+12)/2 for user_x
def test_aggregate_min_max(mem_storage: InMemoryStorage, agg_setup):
result_min = mem_storage.aggregate_tensor_descriptors("owner", "min", "computational.computation_time_seconds")
assert result_min == {"user_x": 10.0, "user_y": 20.0}
result_max = mem_storage.aggregate_tensor_descriptors("owner", "max", "byte_size")
assert result_max == {"user_x": 150, "user_y": 200}
def test_aggregate_group_by_nested_missing(mem_storage: InMemoryStorage, agg_setup):
# Add one TD that doesn't have computational metadata
_add_sample_td(mem_storage, owner="user_z", data_type=DataType.BOOLEAN, byte_size=1)
result = mem_storage.aggregate_tensor_descriptors("owner", "avg", "computational.computation_time_seconds")
assert result["user_z"] == 0 # Or handle as None depending on desired behavior for missing agg_field
result_count = mem_storage.aggregate_tensor_descriptors("computational.algorithm", "count")
# All current agg_setup items have no algorithm set in their ComputationalMetadata
assert result_count.get("N/A", 0) >= 3 # Expecting 3 from agg_setup + any others without algorithm
def test_aggregate_invalid_function(mem_storage: InMemoryStorage, agg_setup):
with pytest.raises(NotImplementedError):
mem_storage.aggregate_tensor_descriptors("owner", "median", "byte_size")
# Original SemanticMetadata storage tests from Phase 1 (abbreviated)
@pytest.fixture
def sample_td_for_semantic(mem_storage: InMemoryStorage): # Renamed to avoid conflict with base_td
td = _add_sample_td(mem_storage, owner="semantic_test_owner")
return td
@pytest.fixture
def sample_sm(sample_td_for_semantic: TensorDescriptor):
return SemanticMetadata(
name="test_semantic_data",
description="A piece of semantic info",
tensor_id=sample_td_for_semantic.tensor_id
)
def test_add_and_get_semantic_metadata(mem_storage: InMemoryStorage, sample_td_for_semantic: TensorDescriptor, sample_sm: SemanticMetadata):
mem_storage.add_semantic_metadata(sample_sm)
retrieved_sms = mem_storage.get_semantic_metadata(sample_td_for_semantic.tensor_id)
assert len(retrieved_sms) == 1
assert retrieved_sms[0].name == sample_sm.name
# (Include other semantic metadata tests: add duplicate name, get empty, get by name, update, delete)
# (Include original TensorDescriptor storage tests: add_td, get_td, update_td, list_td, delete_td)
# These are omitted for brevity as the focus is on new Phase 2 functionality tests.
# Ensure they are present and pass in the full test suite.
# Example: test_add_and_get_tensor_descriptor (from earlier phase, using base_td now)
def test_add_and_get_tensor_descriptor(mem_storage: InMemoryStorage, base_td: TensorDescriptor):
# base_td is already added by its fixture
retrieved_td = mem_storage.get_tensor_descriptor(base_td.tensor_id)
assert retrieved_td is not None
assert retrieved_td.tensor_id == base_td.tensor_id
assert retrieved_td.owner == "test_owner"