Spaces:
Running
Running
| import asyncio | |
| import json | |
| import sqlite3 | |
| import httpx | |
| import pytest | |
| from app.pipeline.nodes.log_eval import make_log_eval_node | |
| from app.services import loki_sink | |
| class _FakeResponse: | |
| def raise_for_status(self) -> None: | |
| return None | |
| async def test_ship_to_loki_sanitises_pii(monkeypatch) -> None: | |
| captured: dict = {} | |
| class _FakeAsyncClient: | |
| def __init__(self, *args, **kwargs): | |
| # No-op test double constructor. | |
| self._args = args | |
| self._kwargs = kwargs | |
| async def __aenter__(self): | |
| return self | |
| async def __aexit__(self, exc_type, exc, tb): | |
| return None | |
| async def post(self, url, json): | |
| await asyncio.sleep(0) | |
| captured["url"] = url | |
| captured["payload"] = json | |
| return _FakeResponse() | |
| monkeypatch.setattr(loki_sink, "_LOKI_ENABLED", True) | |
| monkeypatch.setattr(loki_sink, "_LOKI_URL", "https://example.grafana.net/loki/api/v1/push") | |
| monkeypatch.setattr(loki_sink, "_LOKI_USERNAME", "123") | |
| monkeypatch.setattr(loki_sink, "_LOKI_API_KEY", "glc_test") | |
| monkeypatch.setattr(loki_sink.httpx, "AsyncClient", _FakeAsyncClient) | |
| await loki_sink.ship_to_loki( | |
| { | |
| "timestamp": "2026-04-21T00:00:00Z", | |
| "session_id": "session-123", | |
| "query": "my private query", | |
| "answer": "long text", | |
| "path": "rag", | |
| "cached": False, | |
| "latency_ms": 100, | |
| "retrieval_attempts": 1, | |
| "top_rerank_score": -1.0, | |
| "focused_source_type": "resume", | |
| "sibling_expansion_count": 0, | |
| "critic_groundedness": 3, | |
| "critic_completeness": 3, | |
| "critic_specificity": 3, | |
| "critic_quality": "high", | |
| "is_enumeration_query": False, | |
| "guard_passed": True, | |
| "query_complexity": "simple", | |
| "is_followup": False, | |
| "is_audio_mode": False, | |
| "follow_ups": [], | |
| "reranked_chunks": [], | |
| } | |
| ) | |
| values = captured["payload"]["streams"][0]["values"] | |
| assert values | |
| serialized = values[0][1] | |
| assert "my private query" not in serialized | |
| assert "long text" not in serialized | |
| payload_record = json.loads(serialized) | |
| assert payload_record.get("query_hash") | |
| async def test_ship_to_loki_noop_when_no_url(monkeypatch) -> None: | |
| called = {"post": False} | |
| class _NeverCalledClient: | |
| def __init__(self, *args, **kwargs): | |
| # No-op test double constructor. | |
| self._args = args | |
| self._kwargs = kwargs | |
| async def __aenter__(self): | |
| return self | |
| async def __aexit__(self, exc_type, exc, tb): | |
| return None | |
| async def post(self, url, json): | |
| await asyncio.sleep(0) | |
| called["post"] = True | |
| return _FakeResponse() | |
| monkeypatch.setattr(loki_sink, "_LOKI_ENABLED", False) | |
| monkeypatch.setattr(loki_sink.httpx, "AsyncClient", _NeverCalledClient) | |
| await loki_sink.ship_to_loki({"query": "x", "answer": "y"}) | |
| assert called["post"] is False | |
| async def test_ship_to_loki_swallows_http_error(monkeypatch) -> None: | |
| class _FailingClient: | |
| def __init__(self, *args, **kwargs): | |
| # No-op test double constructor. | |
| self._args = args | |
| self._kwargs = kwargs | |
| async def __aenter__(self): | |
| return self | |
| async def __aexit__(self, exc_type, exc, tb): | |
| return None | |
| async def post(self, url, json): | |
| await asyncio.sleep(0) | |
| raise httpx.ConnectError("connection failed") | |
| monkeypatch.setattr(loki_sink, "_LOKI_ENABLED", True) | |
| monkeypatch.setattr(loki_sink, "_LOKI_URL", "https://example.grafana.net/loki/api/v1/push") | |
| monkeypatch.setattr(loki_sink, "_LOKI_USERNAME", "123") | |
| monkeypatch.setattr(loki_sink, "_LOKI_API_KEY", "glc_test") | |
| monkeypatch.setattr(loki_sink.httpx, "AsyncClient", _FailingClient) | |
| await loki_sink.ship_to_loki({"query": "q", "answer": "a", "path": "rag", "reranked_chunks": []}) | |
| async def test_source_hit_proxy_logged_to_sqlite(tmp_path) -> None: | |
| db_path = str(tmp_path / "interactions.db") | |
| node = make_log_eval_node(db_path) | |
| state = { | |
| "session_id": "s1", | |
| "query": "What work experience does Darshan have?", | |
| "answer": "He worked at VK Live.", | |
| "reranked_chunks": [ | |
| {"text": "a", "metadata": {"doc_id": "d1", "source_type": "resume", "rerank_score": -1.0}}, | |
| {"text": "b", "metadata": {"doc_id": "d2", "source_type": "resume", "rerank_score": -1.2}}, | |
| {"text": "c", "metadata": {"doc_id": "d3", "source_type": "resume", "rerank_score": -1.3}}, | |
| ], | |
| "latency_ms": 123, | |
| "cached": False, | |
| "path": "rag", | |
| "is_enumeration_query": False, | |
| "top_rerank_score": -1.0, | |
| "retrieval_attempts": 1, | |
| "follow_ups": [], | |
| } | |
| result = node(state) | |
| assert "interaction_id" in result | |
| with sqlite3.connect(db_path) as conn: | |
| row = conn.execute("SELECT source_hit_proxy FROM interactions LIMIT 1").fetchone() | |
| assert row is not None | |
| assert row[0] == 1 | |