File size: 11,720 Bytes
edfa748
 
 
 
 
011144d
 
edfa748
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
import pytest
from fastapi.testclient import TestClient
from uuid import uuid4, UUID
from datetime import datetime, timedelta

# FastAPI application under test
from tensorus.api import app
from tensorus.metadata.storage import InMemoryStorage
from tensorus.metadata.storage_abc import MetadataStorage
from tensorus.metadata.schemas import (
    TensorDescriptor, LineageMetadata, UsageMetadata, DataType,
    ParentTensorLink, TransformationStep
)
from tensorus.config import settings as global_settings
from tensorus.metadata import storage_instance as global_app_storage_instance

# --- Fixtures ---

@pytest.fixture(scope="function")
def client_with_clean_storage_analytics(monkeypatch):
    """

    Provides a TestClient with a fresh InMemoryStorage for analytics tests.

    """
    monkeypatch.setattr(global_settings, "STORAGE_BACKEND", "in_memory")
    if not isinstance(global_app_storage_instance, InMemoryStorage):
        pytest.skip("Skipping Analytics API tests: Requires InMemoryStorage for clean state.")
    global_app_storage_instance.clear_all_data()

    with TestClient(app) as c:
        yield c

    global_app_storage_instance.clear_all_data()


@pytest.fixture
def analytics_setup_data(client_with_clean_storage_analytics: TestClient):
    """

    Populates storage with diverse data for testing analytics endpoints.

    Uses the global_app_storage_instance directly for setup simplicity.

    """
    storage = global_app_storage_instance

    tds_data = []
    for days_ago, owner, tags in [
        (10, "user1", ["tagA", "tagB", "tagC"]),
        (100, "user2", ["tagB", "tagC", "tagD"]),
        (5, "user1", ["tagA", "tagD", "tagE"]),
        (200, "user3", ["tagX", "tagY"]),
        (1, "user4", ["tagA", "tagB", "tagD"]),  # Recent; includes tagD for co-occurrence tests
    ]:
        ts = datetime.utcnow() - timedelta(days=days_ago)
        tds_data.append(
            {
                "tensor_id": uuid4(),
                "owner": owner,
                "tags": tags,
                "creation_timestamp": ts,
                "last_modified_timestamp": ts,
            }
        )

    created_tds = []
    for i, data in enumerate(tds_data):
        td = TensorDescriptor(
            dimensionality=1, shape=[1], data_type=DataType.FLOAT32, byte_size=4,
            **data
        )
        storage.add_tensor_descriptor(td)
        created_tds.append(td)

    # Add UsageMetadata for some
    # td1 (index 0) -> accessed recently
    storage.add_usage_metadata(UsageMetadata(tensor_id=created_tds[0].tensor_id, last_accessed_at=datetime.utcnow() - timedelta(days=1)))
    # td2 (index 1) -> last_accessed_at is older than last_modified, but still stale by last_modified
    storage.add_usage_metadata(UsageMetadata(tensor_id=created_tds[1].tensor_id, last_accessed_at=datetime.utcnow() - timedelta(days=150)))
    # td3 (index 2) -> accessed very long ago, but modified recently
    storage.add_usage_metadata(UsageMetadata(tensor_id=created_tds[2].tensor_id, last_accessed_at=datetime.utcnow() - timedelta(days=300)))


    # Add LineageMetadata for some
    # td1: 1 parent, 1 step
    storage.add_lineage_metadata(LineageMetadata(tensor_id=created_tds[0].tensor_id, parent_tensors=[ParentTensorLink(tensor_id=uuid4())], transformation_history=[TransformationStep(operation="op1")]))
    # td2: 2 parents, 3 steps
    storage.add_lineage_metadata(LineageMetadata(tensor_id=created_tds[1].tensor_id, parent_tensors=[ParentTensorLink(tensor_id=uuid4()), ParentTensorLink(tensor_id=uuid4())], transformation_history=[TransformationStep(operation="op1"), TransformationStep(operation="op2"), TransformationStep(operation="op3")]))
    # td4 (index 3): 0 parents, 5 steps
    storage.add_lineage_metadata(LineageMetadata(tensor_id=created_tds[3].tensor_id, transformation_history=[TransformationStep(operation=f"op{i}") for i in range(5)]))

    return created_tds


# --- /analytics/co_occurring_tags Tests ---

def test_get_co_occurring_tags_default_params(client_with_clean_storage_analytics: TestClient, analytics_setup_data):
    response = client_with_clean_storage_analytics.get("/analytics/co_occurring_tags")
    assert response.status_code == 200
    data = response.json()

    assert "tagA" in data
    assert "tagB" in data
    assert "tagC" in data
    assert "tagD" in data

    # Check for tagA: expects tagB (2 times with default min_co_occurrence=2)
    # td0: A,B,C -> (A,B), (A,C)
    # td2: A,D,E -> (A,D), (A,E)
    # td4: A,B   -> (A,B)
    # So, (A,B) occurs 2 times.
    tag_a_co = {item["tag"]: item["count"] for item in data.get("tagA", [])}
    assert tag_a_co.get("tagB") == 2

    tag_b_co = {item["tag"]: item["count"] for item in data.get("tagB", [])}
    assert tag_b_co.get("tagA") == 2
    assert tag_b_co.get("tagC") == 2


def test_get_co_occurring_tags_custom_params(client_with_clean_storage_analytics: TestClient, analytics_setup_data):
    response = client_with_clean_storage_analytics.get("/analytics/co_occurring_tags?min_co_occurrence=3&limit=5")
    assert response.status_code == 200
    data = response.json()
    # With min_co_occurrence=3, only pairs occurring 3+ times.
    # In setup: (tagA,tagB):2, (tagA,tagC):1, (tagA,tagD):1, (tagA,tagE):0
    # (tagB,tagC):2, (tagB,tagD):1
    # (tagC,tagD):1
    # No pairs occur 3 times with the current setup data.
    assert len(data) == 0 # Expect empty if no tags meet min_co_occurrence of 3

    # Test with min_co_occurrence=1 to get more results
    response_min1 = client_with_clean_storage_analytics.get("/analytics/co_occurring_tags?min_co_occurrence=1&limit=1")
    assert response_min1.status_code == 200
    data_min1 = response_min1.json()
    assert "tagA" in data_min1
    if data_min1.get("tagA"): # If tagA has co-occurring tags
        assert len(data_min1["tagA"]) <= 1 # Limit is 1

def test_get_co_occurring_tags_no_tags_or_no_cooccurrence(client_with_clean_storage_analytics: TestClient):
    # Storage is cleared by fixture. Add a tensor with no tags, or one tag.
    storage = global_app_storage_instance
    storage.add_tensor_descriptor(TensorDescriptor(dimensionality=1, shape=[1], data_type=DataType.FLOAT32, owner="u", byte_size=4, tags=[]))
    storage.add_tensor_descriptor(TensorDescriptor(dimensionality=1, shape=[1], data_type=DataType.FLOAT32, owner="u", byte_size=4, tags=["single"]))

    response = client_with_clean_storage_analytics.get("/analytics/co_occurring_tags")
    assert response.status_code == 200
    assert response.json() == {}


# --- /analytics/stale_tensors Tests ---

def test_get_stale_tensors_default_threshold(client_with_clean_storage_analytics: TestClient, analytics_setup_data):
    # Default threshold_days = 90
    # analytics_setup_data:
    # td[0] ("tagA", "tagB", "tagC"): modified 10d ago, accessed 1d ago -> NOT STALE
    # td[1] ("tagB", "tagC", "tagD"): modified 100d ago, accessed 150d ago -> STALE (last_relevant is 100d ago)
    # td[2] ("tagA", "tagD", "tagE"): modified 5d ago, accessed 300d ago -> NOT STALE
    # td[3] ("tagX", "tagY"): modified 200d ago, no usage data -> STALE
    # td[4] ("tagA", "tagB", "tagD"): modified 1d ago, no usage data -> NOT STALE
    response = client_with_clean_storage_analytics.get("/analytics/stale_tensors")
    assert response.status_code == 200
    data = response.json()
    assert len(data) == 2
    stale_ids = {item["tensor_id"] for item in data}
    assert str(analytics_setup_data[1].tensor_id) in stale_ids # td[1]
    assert str(analytics_setup_data[3].tensor_id) in stale_ids # td[3]

def test_get_stale_tensors_custom_threshold(client_with_clean_storage_analytics: TestClient, analytics_setup_data):
    # Threshold 0 days (everything not touched/modified "today" is stale)
    # Depending on exact timing, this could include many. Let's use a small number like 3 days.
    response = client_with_clean_storage_analytics.get("/analytics/stale_tensors?threshold_days=3")
    assert response.status_code == 200
    data = response.json()
    # td[0]: modified 10d, accessed 1d -> NOT STALE by 3d rule
    # td[1]: modified 100d, accessed 150d -> STALE
    # td[2]: modified 5d, accessed 300d -> STALE by 3d rule (last_relevant is 5d ago)
    # td[3]: modified 200d -> STALE
    # td[4]: modified 1d -> NOT STALE
    assert len(data) == 3
    stale_ids = {item["tensor_id"] for item in data}
    assert str(analytics_setup_data[1].tensor_id) in stale_ids
    assert str(analytics_setup_data[2].tensor_id) in stale_ids
    assert str(analytics_setup_data[3].tensor_id) in stale_ids


# --- /analytics/complex_tensors Tests ---

def test_get_complex_tensors_by_parents(client_with_clean_storage_analytics: TestClient, analytics_setup_data):
    # td[0]: 1 parent
    # td[1]: 2 parents
    # td[3]: 0 parents
    response = client_with_clean_storage_analytics.get("/analytics/complex_tensors?min_parent_count=2")
    assert response.status_code == 200
    data = response.json()
    assert len(data) == 1
    assert data[0]["tensor_id"] == str(analytics_setup_data[1].tensor_id)

def test_get_complex_tensors_by_transformations(client_with_clean_storage_analytics: TestClient, analytics_setup_data):
    # td[0]: 1 step
    # td[1]: 3 steps
    # td[3]: 5 steps
    response = client_with_clean_storage_analytics.get("/analytics/complex_tensors?min_transformation_steps=4")
    assert response.status_code == 200
    data = response.json()
    assert len(data) == 1
    assert data[0]["tensor_id"] == str(analytics_setup_data[3].tensor_id)

def test_get_complex_tensors_by_either_criterion(client_with_clean_storage_analytics: TestClient, analytics_setup_data):
    # td[1]: 2 parents, 3 steps
    # td[3]: 0 parents, 5 steps
    response = client_with_clean_storage_analytics.get("/analytics/complex_tensors?min_parent_count=2&min_transformation_steps=4")
    assert response.status_code == 200
    data = response.json()
    assert len(data) == 2 # Both td[1] (by parents) and td[3] (by steps) should match if logic is OR
    # Current InMemoryStorage logic is OR, so this is correct.
    complex_ids = {item["tensor_id"] for item in data}
    assert str(analytics_setup_data[1].tensor_id) in complex_ids
    assert str(analytics_setup_data[3].tensor_id) in complex_ids


def test_get_complex_tensors_no_criteria(client_with_clean_storage_analytics: TestClient):
    response = client_with_clean_storage_analytics.get("/analytics/complex_tensors")
    assert response.status_code == 400 # Bad Request
    assert "At least one criterion" in response.json()["detail"]

def test_get_complex_tensors_limit(client_with_clean_storage_analytics: TestClient, analytics_setup_data):
    # All of td[0], td[1], td[3] have some lineage.
    # td[0]: 1 parent, 1 step
    # td[1]: 2 parents, 3 steps
    # td[3]: 0 parents, 5 steps
    response = client_with_clean_storage_analytics.get("/analytics/complex_tensors?min_parent_count=0&limit=1") # min_parent_count=0 should include those with lineage
    assert response.status_code == 200
    assert len(response.json()) == 1

    response_steps = client_with_clean_storage_analytics.get("/analytics/complex_tensors?min_transformation_steps=1&limit=2")
    assert response_steps.status_code == 200
    assert len(response_steps.json()) == 2


# Conceptual tests for Postgres (mocking storage methods) could be added here
# if specific API error handling or data transformation for these endpoints needed testing
# independent of InMemoryStorage logic. For now, covered by storage tests.