| import pytest |
| from fastapi.testclient import TestClient |
| from uuid import uuid4, UUID |
| from datetime import datetime, timedelta |
|
|
| |
| from tensorus.api import app |
| from tensorus.metadata.storage import InMemoryStorage |
| from tensorus.metadata.storage_abc import MetadataStorage |
| from tensorus.metadata.schemas import ( |
| TensorDescriptor, LineageMetadata, UsageMetadata, DataType, |
| ParentTensorLink, TransformationStep |
| ) |
| from tensorus.config import settings as global_settings |
| from tensorus.metadata import storage_instance as global_app_storage_instance |
|
|
| |
|
|
| @pytest.fixture(scope="function") |
| def client_with_clean_storage_analytics(monkeypatch): |
| """ |
| Provides a TestClient with a fresh InMemoryStorage for analytics tests. |
| """ |
| monkeypatch.setattr(global_settings, "STORAGE_BACKEND", "in_memory") |
| if not isinstance(global_app_storage_instance, InMemoryStorage): |
| pytest.skip("Skipping Analytics API tests: Requires InMemoryStorage for clean state.") |
| global_app_storage_instance.clear_all_data() |
|
|
| with TestClient(app) as c: |
| yield c |
|
|
| global_app_storage_instance.clear_all_data() |
|
|
|
|
| @pytest.fixture |
| def analytics_setup_data(client_with_clean_storage_analytics: TestClient): |
| """ |
| Populates storage with diverse data for testing analytics endpoints. |
| Uses the global_app_storage_instance directly for setup simplicity. |
| """ |
| storage = global_app_storage_instance |
|
|
| tds_data = [] |
| for days_ago, owner, tags in [ |
| (10, "user1", ["tagA", "tagB", "tagC"]), |
| (100, "user2", ["tagB", "tagC", "tagD"]), |
| (5, "user1", ["tagA", "tagD", "tagE"]), |
| (200, "user3", ["tagX", "tagY"]), |
| (1, "user4", ["tagA", "tagB", "tagD"]), |
| ]: |
| ts = datetime.utcnow() - timedelta(days=days_ago) |
| tds_data.append( |
| { |
| "tensor_id": uuid4(), |
| "owner": owner, |
| "tags": tags, |
| "creation_timestamp": ts, |
| "last_modified_timestamp": ts, |
| } |
| ) |
|
|
| created_tds = [] |
| for i, data in enumerate(tds_data): |
| td = TensorDescriptor( |
| dimensionality=1, shape=[1], data_type=DataType.FLOAT32, byte_size=4, |
| **data |
| ) |
| storage.add_tensor_descriptor(td) |
| created_tds.append(td) |
|
|
| |
| |
| storage.add_usage_metadata(UsageMetadata(tensor_id=created_tds[0].tensor_id, last_accessed_at=datetime.utcnow() - timedelta(days=1))) |
| |
| storage.add_usage_metadata(UsageMetadata(tensor_id=created_tds[1].tensor_id, last_accessed_at=datetime.utcnow() - timedelta(days=150))) |
| |
| storage.add_usage_metadata(UsageMetadata(tensor_id=created_tds[2].tensor_id, last_accessed_at=datetime.utcnow() - timedelta(days=300))) |
|
|
|
|
| |
| |
| storage.add_lineage_metadata(LineageMetadata(tensor_id=created_tds[0].tensor_id, parent_tensors=[ParentTensorLink(tensor_id=uuid4())], transformation_history=[TransformationStep(operation="op1")])) |
| |
| storage.add_lineage_metadata(LineageMetadata(tensor_id=created_tds[1].tensor_id, parent_tensors=[ParentTensorLink(tensor_id=uuid4()), ParentTensorLink(tensor_id=uuid4())], transformation_history=[TransformationStep(operation="op1"), TransformationStep(operation="op2"), TransformationStep(operation="op3")])) |
| |
| storage.add_lineage_metadata(LineageMetadata(tensor_id=created_tds[3].tensor_id, transformation_history=[TransformationStep(operation=f"op{i}") for i in range(5)])) |
|
|
| return created_tds |
|
|
|
|
| |
|
|
| def test_get_co_occurring_tags_default_params(client_with_clean_storage_analytics: TestClient, analytics_setup_data): |
| response = client_with_clean_storage_analytics.get("/analytics/co_occurring_tags") |
| assert response.status_code == 200 |
| data = response.json() |
|
|
| assert "tagA" in data |
| assert "tagB" in data |
| assert "tagC" in data |
| assert "tagD" in data |
|
|
| |
| |
| |
| |
| |
| tag_a_co = {item["tag"]: item["count"] for item in data.get("tagA", [])} |
| assert tag_a_co.get("tagB") == 2 |
|
|
| tag_b_co = {item["tag"]: item["count"] for item in data.get("tagB", [])} |
| assert tag_b_co.get("tagA") == 2 |
| assert tag_b_co.get("tagC") == 2 |
|
|
|
|
| def test_get_co_occurring_tags_custom_params(client_with_clean_storage_analytics: TestClient, analytics_setup_data): |
| response = client_with_clean_storage_analytics.get("/analytics/co_occurring_tags?min_co_occurrence=3&limit=5") |
| assert response.status_code == 200 |
| data = response.json() |
| |
| |
| |
| |
| |
| assert len(data) == 0 |
|
|
| |
| response_min1 = client_with_clean_storage_analytics.get("/analytics/co_occurring_tags?min_co_occurrence=1&limit=1") |
| assert response_min1.status_code == 200 |
| data_min1 = response_min1.json() |
| assert "tagA" in data_min1 |
| if data_min1.get("tagA"): |
| assert len(data_min1["tagA"]) <= 1 |
|
|
| def test_get_co_occurring_tags_no_tags_or_no_cooccurrence(client_with_clean_storage_analytics: TestClient): |
| |
| storage = global_app_storage_instance |
| storage.add_tensor_descriptor(TensorDescriptor(dimensionality=1, shape=[1], data_type=DataType.FLOAT32, owner="u", byte_size=4, tags=[])) |
| storage.add_tensor_descriptor(TensorDescriptor(dimensionality=1, shape=[1], data_type=DataType.FLOAT32, owner="u", byte_size=4, tags=["single"])) |
|
|
| response = client_with_clean_storage_analytics.get("/analytics/co_occurring_tags") |
| assert response.status_code == 200 |
| assert response.json() == {} |
|
|
|
|
| |
|
|
| def test_get_stale_tensors_default_threshold(client_with_clean_storage_analytics: TestClient, analytics_setup_data): |
| |
| |
| |
| |
| |
| |
| |
| response = client_with_clean_storage_analytics.get("/analytics/stale_tensors") |
| assert response.status_code == 200 |
| data = response.json() |
| assert len(data) == 2 |
| stale_ids = {item["tensor_id"] for item in data} |
| assert str(analytics_setup_data[1].tensor_id) in stale_ids |
| assert str(analytics_setup_data[3].tensor_id) in stale_ids |
|
|
| def test_get_stale_tensors_custom_threshold(client_with_clean_storage_analytics: TestClient, analytics_setup_data): |
| |
| |
| response = client_with_clean_storage_analytics.get("/analytics/stale_tensors?threshold_days=3") |
| assert response.status_code == 200 |
| data = response.json() |
| |
| |
| |
| |
| |
| assert len(data) == 3 |
| stale_ids = {item["tensor_id"] for item in data} |
| assert str(analytics_setup_data[1].tensor_id) in stale_ids |
| assert str(analytics_setup_data[2].tensor_id) in stale_ids |
| assert str(analytics_setup_data[3].tensor_id) in stale_ids |
|
|
|
|
| |
|
|
| def test_get_complex_tensors_by_parents(client_with_clean_storage_analytics: TestClient, analytics_setup_data): |
| |
| |
| |
| response = client_with_clean_storage_analytics.get("/analytics/complex_tensors?min_parent_count=2") |
| assert response.status_code == 200 |
| data = response.json() |
| assert len(data) == 1 |
| assert data[0]["tensor_id"] == str(analytics_setup_data[1].tensor_id) |
|
|
| def test_get_complex_tensors_by_transformations(client_with_clean_storage_analytics: TestClient, analytics_setup_data): |
| |
| |
| |
| response = client_with_clean_storage_analytics.get("/analytics/complex_tensors?min_transformation_steps=4") |
| assert response.status_code == 200 |
| data = response.json() |
| assert len(data) == 1 |
| assert data[0]["tensor_id"] == str(analytics_setup_data[3].tensor_id) |
|
|
| def test_get_complex_tensors_by_either_criterion(client_with_clean_storage_analytics: TestClient, analytics_setup_data): |
| |
| |
| response = client_with_clean_storage_analytics.get("/analytics/complex_tensors?min_parent_count=2&min_transformation_steps=4") |
| assert response.status_code == 200 |
| data = response.json() |
| assert len(data) == 2 |
| |
| complex_ids = {item["tensor_id"] for item in data} |
| assert str(analytics_setup_data[1].tensor_id) in complex_ids |
| assert str(analytics_setup_data[3].tensor_id) in complex_ids |
|
|
|
|
| def test_get_complex_tensors_no_criteria(client_with_clean_storage_analytics: TestClient): |
| response = client_with_clean_storage_analytics.get("/analytics/complex_tensors") |
| assert response.status_code == 400 |
| assert "At least one criterion" in response.json()["detail"] |
|
|
| def test_get_complex_tensors_limit(client_with_clean_storage_analytics: TestClient, analytics_setup_data): |
| |
| |
| |
| |
| response = client_with_clean_storage_analytics.get("/analytics/complex_tensors?min_parent_count=0&limit=1") |
| assert response.status_code == 200 |
| assert len(response.json()) == 1 |
|
|
| response_steps = client_with_clean_storage_analytics.get("/analytics/complex_tensors?min_transformation_steps=1&limit=2") |
| assert response_steps.status_code == 200 |
| assert len(response_steps.json()) == 2 |
|
|
|
|
| |
| |
| |
|
|