Spaces:
Sleeping
Sleeping
File size: 6,314 Bytes
c509b44 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
"""
Tests for AnalyticsStore - tenant-level analytics logging
"""
import sys
from pathlib import Path
# Add backend directory to Python path
backend_dir = Path(__file__).parent.parent
sys.path.insert(0, str(backend_dir))
import pytest
import time
import tempfile
import os
from api.storage.analytics_store import AnalyticsStore
@pytest.fixture
def temp_analytics_db():
"""Create a temporary database for testing."""
with tempfile.NamedTemporaryFile(delete=False, suffix='.db') as f:
db_path = f.name
yield db_path
# Cleanup - close any connections first
try:
if os.path.exists(db_path):
# On Windows, we need to ensure the file is closed
import time
time.sleep(0.1) # Brief delay to ensure file is released
os.unlink(db_path)
except (PermissionError, OSError):
# File might still be in use, that's okay for temp files
pass
@pytest.fixture
def analytics_store(temp_analytics_db):
"""Create an AnalyticsStore instance with temporary database."""
return AnalyticsStore(db_path=temp_analytics_db)
def test_analytics_store_init(analytics_store):
"""Test that AnalyticsStore initializes correctly."""
assert analytics_store is not None
assert analytics_store.db_path.exists()
def test_log_tool_usage(analytics_store):
"""Test logging tool usage events."""
analytics_store.log_tool_usage(
tenant_id="test_tenant",
tool_name="rag",
latency_ms=150,
tokens_used=500,
success=True,
user_id="user123"
)
stats = analytics_store.get_tool_usage_stats("test_tenant")
assert "rag" in stats
assert stats["rag"]["count"] == 1
assert stats["rag"]["avg_latency_ms"] == 150.0
assert stats["rag"]["total_tokens"] == 500
def test_log_redflag_violation(analytics_store):
"""Test logging red-flag violations."""
analytics_store.log_redflag_violation(
tenant_id="test_tenant",
rule_id="rule123",
rule_pattern=".*password.*",
severity="high",
matched_text="password123",
confidence=0.95,
message_preview="User entered password123",
user_id="user123"
)
violations = analytics_store.get_redflag_violations("test_tenant", limit=10)
assert len(violations) == 1
assert violations[0]["severity"] == "high"
assert violations[0]["confidence"] == 0.95
assert violations[0]["matched_text"] == "password123"
def test_log_rag_search(analytics_store):
"""Test logging RAG search events with quality metrics."""
analytics_store.log_rag_search(
tenant_id="test_tenant",
query="What is the policy?",
hits_count=5,
avg_score=0.85,
top_score=0.92,
latency_ms=120
)
metrics = analytics_store.get_rag_quality_metrics("test_tenant")
assert metrics["total_searches"] == 1
assert metrics["avg_hits_per_search"] == 5.0
assert metrics["avg_score"] == 0.85
assert metrics["avg_top_score"] == 0.92
def test_log_agent_query(analytics_store):
"""Test logging agent query events."""
analytics_store.log_agent_query(
tenant_id="test_tenant",
message_preview="What is the company policy?",
intent="rag",
tools_used=["rag", "llm"],
total_tokens=1000,
total_latency_ms=250,
success=True,
user_id="user123"
)
activity = analytics_store.get_activity_summary("test_tenant")
assert activity["total_queries"] == 1
assert activity["active_users"] == 1
def test_tool_usage_stats_filtered_by_time(analytics_store):
"""Test that tool usage stats can be filtered by timestamp."""
# Log an old event (1 day ago)
old_timestamp = int(time.time()) - 86400
# Note: We can't directly set timestamp in current implementation,
# but we can test the filtering works
analytics_store.log_tool_usage(
tenant_id="test_tenant",
tool_name="web",
latency_ms=100
)
# Get stats without time filter
all_stats = analytics_store.get_tool_usage_stats("test_tenant")
assert "web" in all_stats
# Get stats with recent time filter
recent_timestamp = int(time.time()) - 3600 # Last hour
recent_stats = analytics_store.get_tool_usage_stats("test_tenant", recent_timestamp)
assert "web" in recent_stats
def test_get_activity_summary(analytics_store):
"""Test getting activity summary for a tenant."""
# Log multiple queries
for i in range(3):
analytics_store.log_agent_query(
tenant_id="test_tenant",
message_preview=f"Query {i}",
intent="general",
tools_used=["llm"],
user_id=f"user{i}"
)
activity = analytics_store.get_activity_summary("test_tenant")
assert activity["total_queries"] == 3
assert activity["active_users"] == 3
def test_get_rag_quality_metrics(analytics_store):
"""Test getting RAG quality metrics."""
# Log multiple RAG searches
for i in range(3):
analytics_store.log_rag_search(
tenant_id="test_tenant",
query=f"Query {i}",
hits_count=5 + i,
avg_score=0.8 + i * 0.05,
top_score=0.9 + i * 0.05,
latency_ms=100 + i * 10
)
metrics = analytics_store.get_rag_quality_metrics("test_tenant")
assert metrics["total_searches"] == 3
assert metrics["avg_hits_per_search"] > 0
assert metrics["avg_score"] > 0
def test_multiple_tenants_isolation(analytics_store):
"""Test that analytics are properly isolated by tenant."""
# Log events for tenant1
analytics_store.log_tool_usage(
tenant_id="tenant1",
tool_name="rag",
latency_ms=100
)
# Log events for tenant2
analytics_store.log_tool_usage(
tenant_id="tenant2",
tool_name="web",
latency_ms=200
)
# Check tenant1 stats
tenant1_stats = analytics_store.get_tool_usage_stats("tenant1")
assert "rag" in tenant1_stats
assert "web" not in tenant1_stats
# Check tenant2 stats
tenant2_stats = analytics_store.get_tool_usage_stats("tenant2")
assert "web" in tenant2_stats
assert "rag" not in tenant2_stats
|