Spaces:
Running
Running
| import pytest | |
| from fastapi.testclient import TestClient | |
| from uuid import uuid4, UUID | |
| from datetime import datetime, timedelta | |
| # FastAPI application under test | |
| from tensorus.api import app | |
| from tensorus.metadata.storage import InMemoryStorage | |
| from tensorus.metadata.storage_abc import MetadataStorage | |
| from tensorus.metadata.schemas import ( | |
| TensorDescriptor, LineageMetadata, UsageMetadata, DataType, | |
| ParentTensorLink, TransformationStep | |
| ) | |
| from tensorus.config import settings as global_settings | |
| from tensorus.metadata import storage_instance as global_app_storage_instance | |
| # --- Fixtures --- | |
| def client_with_clean_storage_analytics(monkeypatch): | |
| """ | |
| Provides a TestClient with a fresh InMemoryStorage for analytics tests. | |
| """ | |
| monkeypatch.setattr(global_settings, "STORAGE_BACKEND", "in_memory") | |
| if not isinstance(global_app_storage_instance, InMemoryStorage): | |
| pytest.skip("Skipping Analytics API tests: Requires InMemoryStorage for clean state.") | |
| global_app_storage_instance.clear_all_data() | |
| with TestClient(app) as c: | |
| yield c | |
| global_app_storage_instance.clear_all_data() | |
| def analytics_setup_data(client_with_clean_storage_analytics: TestClient): | |
| """ | |
| Populates storage with diverse data for testing analytics endpoints. | |
| Uses the global_app_storage_instance directly for setup simplicity. | |
| """ | |
| storage = global_app_storage_instance | |
| tds_data = [] | |
| for days_ago, owner, tags in [ | |
| (10, "user1", ["tagA", "tagB", "tagC"]), | |
| (100, "user2", ["tagB", "tagC", "tagD"]), | |
| (5, "user1", ["tagA", "tagD", "tagE"]), | |
| (200, "user3", ["tagX", "tagY"]), | |
| (1, "user4", ["tagA", "tagB", "tagD"]), # Recent; includes tagD for co-occurrence tests | |
| ]: | |
| ts = datetime.utcnow() - timedelta(days=days_ago) | |
| tds_data.append( | |
| { | |
| "tensor_id": uuid4(), | |
| "owner": owner, | |
| "tags": tags, | |
| "creation_timestamp": ts, | |
| "last_modified_timestamp": ts, | |
| } | |
| ) | |
| created_tds = [] | |
| for i, data in enumerate(tds_data): | |
| td = TensorDescriptor( | |
| dimensionality=1, shape=[1], data_type=DataType.FLOAT32, byte_size=4, | |
| **data | |
| ) | |
| storage.add_tensor_descriptor(td) | |
| created_tds.append(td) | |
| # Add UsageMetadata for some | |
| # td1 (index 0) -> accessed recently | |
| storage.add_usage_metadata(UsageMetadata(tensor_id=created_tds[0].tensor_id, last_accessed_at=datetime.utcnow() - timedelta(days=1))) | |
| # td2 (index 1) -> last_accessed_at is older than last_modified, but still stale by last_modified | |
| storage.add_usage_metadata(UsageMetadata(tensor_id=created_tds[1].tensor_id, last_accessed_at=datetime.utcnow() - timedelta(days=150))) | |
| # td3 (index 2) -> accessed very long ago, but modified recently | |
| storage.add_usage_metadata(UsageMetadata(tensor_id=created_tds[2].tensor_id, last_accessed_at=datetime.utcnow() - timedelta(days=300))) | |
| # Add LineageMetadata for some | |
| # td1: 1 parent, 1 step | |
| storage.add_lineage_metadata(LineageMetadata(tensor_id=created_tds[0].tensor_id, parent_tensors=[ParentTensorLink(tensor_id=uuid4())], transformation_history=[TransformationStep(operation="op1")])) | |
| # td2: 2 parents, 3 steps | |
| storage.add_lineage_metadata(LineageMetadata(tensor_id=created_tds[1].tensor_id, parent_tensors=[ParentTensorLink(tensor_id=uuid4()), ParentTensorLink(tensor_id=uuid4())], transformation_history=[TransformationStep(operation="op1"), TransformationStep(operation="op2"), TransformationStep(operation="op3")])) | |
| # td4 (index 3): 0 parents, 5 steps | |
| storage.add_lineage_metadata(LineageMetadata(tensor_id=created_tds[3].tensor_id, transformation_history=[TransformationStep(operation=f"op{i}") for i in range(5)])) | |
| return created_tds | |
| # --- /analytics/co_occurring_tags Tests --- | |
| def test_get_co_occurring_tags_default_params(client_with_clean_storage_analytics: TestClient, analytics_setup_data): | |
| response = client_with_clean_storage_analytics.get("/analytics/co_occurring_tags") | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert "tagA" in data | |
| assert "tagB" in data | |
| assert "tagC" in data | |
| assert "tagD" in data | |
| # Check for tagA: expects tagB (2 times with default min_co_occurrence=2) | |
| # td0: A,B,C -> (A,B), (A,C) | |
| # td2: A,D,E -> (A,D), (A,E) | |
| # td4: A,B -> (A,B) | |
| # So, (A,B) occurs 2 times. | |
| tag_a_co = {item["tag"]: item["count"] for item in data.get("tagA", [])} | |
| assert tag_a_co.get("tagB") == 2 | |
| tag_b_co = {item["tag"]: item["count"] for item in data.get("tagB", [])} | |
| assert tag_b_co.get("tagA") == 2 | |
| assert tag_b_co.get("tagC") == 2 | |
| def test_get_co_occurring_tags_custom_params(client_with_clean_storage_analytics: TestClient, analytics_setup_data): | |
| response = client_with_clean_storage_analytics.get("/analytics/co_occurring_tags?min_co_occurrence=3&limit=5") | |
| assert response.status_code == 200 | |
| data = response.json() | |
| # With min_co_occurrence=3, only pairs occurring 3+ times. | |
| # In setup: (tagA,tagB):2, (tagA,tagC):1, (tagA,tagD):1, (tagA,tagE):0 | |
| # (tagB,tagC):2, (tagB,tagD):1 | |
| # (tagC,tagD):1 | |
| # No pairs occur 3 times with the current setup data. | |
| assert len(data) == 0 # Expect empty if no tags meet min_co_occurrence of 3 | |
| # Test with min_co_occurrence=1 to get more results | |
| response_min1 = client_with_clean_storage_analytics.get("/analytics/co_occurring_tags?min_co_occurrence=1&limit=1") | |
| assert response_min1.status_code == 200 | |
| data_min1 = response_min1.json() | |
| assert "tagA" in data_min1 | |
| if data_min1.get("tagA"): # If tagA has co-occurring tags | |
| assert len(data_min1["tagA"]) <= 1 # Limit is 1 | |
| def test_get_co_occurring_tags_no_tags_or_no_cooccurrence(client_with_clean_storage_analytics: TestClient): | |
| # Storage is cleared by fixture. Add a tensor with no tags, or one tag. | |
| storage = global_app_storage_instance | |
| storage.add_tensor_descriptor(TensorDescriptor(dimensionality=1, shape=[1], data_type=DataType.FLOAT32, owner="u", byte_size=4, tags=[])) | |
| storage.add_tensor_descriptor(TensorDescriptor(dimensionality=1, shape=[1], data_type=DataType.FLOAT32, owner="u", byte_size=4, tags=["single"])) | |
| response = client_with_clean_storage_analytics.get("/analytics/co_occurring_tags") | |
| assert response.status_code == 200 | |
| assert response.json() == {} | |
| # --- /analytics/stale_tensors Tests --- | |
| def test_get_stale_tensors_default_threshold(client_with_clean_storage_analytics: TestClient, analytics_setup_data): | |
| # Default threshold_days = 90 | |
| # analytics_setup_data: | |
| # td[0] ("tagA", "tagB", "tagC"): modified 10d ago, accessed 1d ago -> NOT STALE | |
| # td[1] ("tagB", "tagC", "tagD"): modified 100d ago, accessed 150d ago -> STALE (last_relevant is 100d ago) | |
| # td[2] ("tagA", "tagD", "tagE"): modified 5d ago, accessed 300d ago -> NOT STALE | |
| # td[3] ("tagX", "tagY"): modified 200d ago, no usage data -> STALE | |
| # td[4] ("tagA", "tagB", "tagD"): modified 1d ago, no usage data -> NOT STALE | |
| response = client_with_clean_storage_analytics.get("/analytics/stale_tensors") | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert len(data) == 2 | |
| stale_ids = {item["tensor_id"] for item in data} | |
| assert str(analytics_setup_data[1].tensor_id) in stale_ids # td[1] | |
| assert str(analytics_setup_data[3].tensor_id) in stale_ids # td[3] | |
| def test_get_stale_tensors_custom_threshold(client_with_clean_storage_analytics: TestClient, analytics_setup_data): | |
| # Threshold 0 days (everything not touched/modified "today" is stale) | |
| # Depending on exact timing, this could include many. Let's use a small number like 3 days. | |
| response = client_with_clean_storage_analytics.get("/analytics/stale_tensors?threshold_days=3") | |
| assert response.status_code == 200 | |
| data = response.json() | |
| # td[0]: modified 10d, accessed 1d -> NOT STALE by 3d rule | |
| # td[1]: modified 100d, accessed 150d -> STALE | |
| # td[2]: modified 5d, accessed 300d -> STALE by 3d rule (last_relevant is 5d ago) | |
| # td[3]: modified 200d -> STALE | |
| # td[4]: modified 1d -> NOT STALE | |
| assert len(data) == 3 | |
| stale_ids = {item["tensor_id"] for item in data} | |
| assert str(analytics_setup_data[1].tensor_id) in stale_ids | |
| assert str(analytics_setup_data[2].tensor_id) in stale_ids | |
| assert str(analytics_setup_data[3].tensor_id) in stale_ids | |
| # --- /analytics/complex_tensors Tests --- | |
| def test_get_complex_tensors_by_parents(client_with_clean_storage_analytics: TestClient, analytics_setup_data): | |
| # td[0]: 1 parent | |
| # td[1]: 2 parents | |
| # td[3]: 0 parents | |
| response = client_with_clean_storage_analytics.get("/analytics/complex_tensors?min_parent_count=2") | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert len(data) == 1 | |
| assert data[0]["tensor_id"] == str(analytics_setup_data[1].tensor_id) | |
| def test_get_complex_tensors_by_transformations(client_with_clean_storage_analytics: TestClient, analytics_setup_data): | |
| # td[0]: 1 step | |
| # td[1]: 3 steps | |
| # td[3]: 5 steps | |
| response = client_with_clean_storage_analytics.get("/analytics/complex_tensors?min_transformation_steps=4") | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert len(data) == 1 | |
| assert data[0]["tensor_id"] == str(analytics_setup_data[3].tensor_id) | |
| def test_get_complex_tensors_by_either_criterion(client_with_clean_storage_analytics: TestClient, analytics_setup_data): | |
| # td[1]: 2 parents, 3 steps | |
| # td[3]: 0 parents, 5 steps | |
| response = client_with_clean_storage_analytics.get("/analytics/complex_tensors?min_parent_count=2&min_transformation_steps=4") | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert len(data) == 2 # Both td[1] (by parents) and td[3] (by steps) should match if logic is OR | |
| # Current InMemoryStorage logic is OR, so this is correct. | |
| complex_ids = {item["tensor_id"] for item in data} | |
| assert str(analytics_setup_data[1].tensor_id) in complex_ids | |
| assert str(analytics_setup_data[3].tensor_id) in complex_ids | |
| def test_get_complex_tensors_no_criteria(client_with_clean_storage_analytics: TestClient): | |
| response = client_with_clean_storage_analytics.get("/analytics/complex_tensors") | |
| assert response.status_code == 400 # Bad Request | |
| assert "At least one criterion" in response.json()["detail"] | |
| def test_get_complex_tensors_limit(client_with_clean_storage_analytics: TestClient, analytics_setup_data): | |
| # All of td[0], td[1], td[3] have some lineage. | |
| # td[0]: 1 parent, 1 step | |
| # td[1]: 2 parents, 3 steps | |
| # td[3]: 0 parents, 5 steps | |
| response = client_with_clean_storage_analytics.get("/analytics/complex_tensors?min_parent_count=0&limit=1") # min_parent_count=0 should include those with lineage | |
| assert response.status_code == 200 | |
| assert len(response.json()) == 1 | |
| response_steps = client_with_clean_storage_analytics.get("/analytics/complex_tensors?min_transformation_steps=1&limit=2") | |
| assert response_steps.status_code == 200 | |
| assert len(response_steps.json()) == 2 | |
| # Conceptual tests for Postgres (mocking storage methods) could be added here | |
| # if specific API error handling or data transformation for these endpoints needed testing | |
| # independent of InMemoryStorage logic. For now, covered by storage tests. | |