Spaces:
Running
Running
File size: 5,353 Bytes
5941cd9 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 | import asyncio
import json
import sqlite3
import httpx
import pytest
from app.pipeline.nodes.log_eval import make_log_eval_node
from app.services import loki_sink
class _FakeResponse:
def raise_for_status(self) -> None:
return None
@pytest.mark.asyncio
async def test_ship_to_loki_sanitises_pii(monkeypatch) -> None:
captured: dict = {}
class _FakeAsyncClient:
def __init__(self, *args, **kwargs):
# No-op test double constructor.
self._args = args
self._kwargs = kwargs
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
return None
async def post(self, url, json):
await asyncio.sleep(0)
captured["url"] = url
captured["payload"] = json
return _FakeResponse()
monkeypatch.setattr(loki_sink, "_LOKI_ENABLED", True)
monkeypatch.setattr(loki_sink, "_LOKI_URL", "https://example.grafana.net/loki/api/v1/push")
monkeypatch.setattr(loki_sink, "_LOKI_USERNAME", "123")
monkeypatch.setattr(loki_sink, "_LOKI_API_KEY", "glc_test")
monkeypatch.setattr(loki_sink.httpx, "AsyncClient", _FakeAsyncClient)
await loki_sink.ship_to_loki(
{
"timestamp": "2026-04-21T00:00:00Z",
"session_id": "session-123",
"query": "my private query",
"answer": "long text",
"path": "rag",
"cached": False,
"latency_ms": 100,
"retrieval_attempts": 1,
"top_rerank_score": -1.0,
"focused_source_type": "resume",
"sibling_expansion_count": 0,
"critic_groundedness": 3,
"critic_completeness": 3,
"critic_specificity": 3,
"critic_quality": "high",
"is_enumeration_query": False,
"guard_passed": True,
"query_complexity": "simple",
"is_followup": False,
"is_audio_mode": False,
"follow_ups": [],
"reranked_chunks": [],
}
)
values = captured["payload"]["streams"][0]["values"]
assert values
serialized = values[0][1]
assert "my private query" not in serialized
assert "long text" not in serialized
payload_record = json.loads(serialized)
assert payload_record.get("query_hash")
@pytest.mark.asyncio
async def test_ship_to_loki_noop_when_no_url(monkeypatch) -> None:
called = {"post": False}
class _NeverCalledClient:
def __init__(self, *args, **kwargs):
# No-op test double constructor.
self._args = args
self._kwargs = kwargs
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
return None
async def post(self, url, json):
await asyncio.sleep(0)
called["post"] = True
return _FakeResponse()
monkeypatch.setattr(loki_sink, "_LOKI_ENABLED", False)
monkeypatch.setattr(loki_sink.httpx, "AsyncClient", _NeverCalledClient)
await loki_sink.ship_to_loki({"query": "x", "answer": "y"})
assert called["post"] is False
@pytest.mark.asyncio
async def test_ship_to_loki_swallows_http_error(monkeypatch) -> None:
class _FailingClient:
def __init__(self, *args, **kwargs):
# No-op test double constructor.
self._args = args
self._kwargs = kwargs
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
return None
async def post(self, url, json):
await asyncio.sleep(0)
raise httpx.ConnectError("connection failed")
monkeypatch.setattr(loki_sink, "_LOKI_ENABLED", True)
monkeypatch.setattr(loki_sink, "_LOKI_URL", "https://example.grafana.net/loki/api/v1/push")
monkeypatch.setattr(loki_sink, "_LOKI_USERNAME", "123")
monkeypatch.setattr(loki_sink, "_LOKI_API_KEY", "glc_test")
monkeypatch.setattr(loki_sink.httpx, "AsyncClient", _FailingClient)
await loki_sink.ship_to_loki({"query": "q", "answer": "a", "path": "rag", "reranked_chunks": []})
@pytest.mark.asyncio
async def test_source_hit_proxy_logged_to_sqlite(tmp_path) -> None:
db_path = str(tmp_path / "interactions.db")
node = make_log_eval_node(db_path)
state = {
"session_id": "s1",
"query": "What work experience does Darshan have?",
"answer": "He worked at VK Live.",
"reranked_chunks": [
{"text": "a", "metadata": {"doc_id": "d1", "source_type": "resume", "rerank_score": -1.0}},
{"text": "b", "metadata": {"doc_id": "d2", "source_type": "resume", "rerank_score": -1.2}},
{"text": "c", "metadata": {"doc_id": "d3", "source_type": "resume", "rerank_score": -1.3}},
],
"latency_ms": 123,
"cached": False,
"path": "rag",
"is_enumeration_query": False,
"top_rerank_score": -1.0,
"retrieval_attempts": 1,
"follow_ups": [],
}
result = node(state)
assert "interaction_id" in result
with sqlite3.connect(db_path) as conn:
row = conn.execute("SELECT source_hit_proxy FROM interactions LIMIT 1").fetchone()
assert row is not None
assert row[0] == 1
|