personabot-api / tests /test_loki_sink.py
GitHub Actions
Deploy a9a1f6e
5941cd9
import asyncio
import json
import sqlite3
import httpx
import pytest
from app.pipeline.nodes.log_eval import make_log_eval_node
from app.services import loki_sink
class _FakeResponse:
def raise_for_status(self) -> None:
return None
@pytest.mark.asyncio
async def test_ship_to_loki_sanitises_pii(monkeypatch) -> None:
captured: dict = {}
class _FakeAsyncClient:
def __init__(self, *args, **kwargs):
# No-op test double constructor.
self._args = args
self._kwargs = kwargs
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
return None
async def post(self, url, json):
await asyncio.sleep(0)
captured["url"] = url
captured["payload"] = json
return _FakeResponse()
monkeypatch.setattr(loki_sink, "_LOKI_ENABLED", True)
monkeypatch.setattr(loki_sink, "_LOKI_URL", "https://example.grafana.net/loki/api/v1/push")
monkeypatch.setattr(loki_sink, "_LOKI_USERNAME", "123")
monkeypatch.setattr(loki_sink, "_LOKI_API_KEY", "glc_test")
monkeypatch.setattr(loki_sink.httpx, "AsyncClient", _FakeAsyncClient)
await loki_sink.ship_to_loki(
{
"timestamp": "2026-04-21T00:00:00Z",
"session_id": "session-123",
"query": "my private query",
"answer": "long text",
"path": "rag",
"cached": False,
"latency_ms": 100,
"retrieval_attempts": 1,
"top_rerank_score": -1.0,
"focused_source_type": "resume",
"sibling_expansion_count": 0,
"critic_groundedness": 3,
"critic_completeness": 3,
"critic_specificity": 3,
"critic_quality": "high",
"is_enumeration_query": False,
"guard_passed": True,
"query_complexity": "simple",
"is_followup": False,
"is_audio_mode": False,
"follow_ups": [],
"reranked_chunks": [],
}
)
values = captured["payload"]["streams"][0]["values"]
assert values
serialized = values[0][1]
assert "my private query" not in serialized
assert "long text" not in serialized
payload_record = json.loads(serialized)
assert payload_record.get("query_hash")
@pytest.mark.asyncio
async def test_ship_to_loki_noop_when_no_url(monkeypatch) -> None:
called = {"post": False}
class _NeverCalledClient:
def __init__(self, *args, **kwargs):
# No-op test double constructor.
self._args = args
self._kwargs = kwargs
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
return None
async def post(self, url, json):
await asyncio.sleep(0)
called["post"] = True
return _FakeResponse()
monkeypatch.setattr(loki_sink, "_LOKI_ENABLED", False)
monkeypatch.setattr(loki_sink.httpx, "AsyncClient", _NeverCalledClient)
await loki_sink.ship_to_loki({"query": "x", "answer": "y"})
assert called["post"] is False
@pytest.mark.asyncio
async def test_ship_to_loki_swallows_http_error(monkeypatch) -> None:
class _FailingClient:
def __init__(self, *args, **kwargs):
# No-op test double constructor.
self._args = args
self._kwargs = kwargs
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
return None
async def post(self, url, json):
await asyncio.sleep(0)
raise httpx.ConnectError("connection failed")
monkeypatch.setattr(loki_sink, "_LOKI_ENABLED", True)
monkeypatch.setattr(loki_sink, "_LOKI_URL", "https://example.grafana.net/loki/api/v1/push")
monkeypatch.setattr(loki_sink, "_LOKI_USERNAME", "123")
monkeypatch.setattr(loki_sink, "_LOKI_API_KEY", "glc_test")
monkeypatch.setattr(loki_sink.httpx, "AsyncClient", _FailingClient)
await loki_sink.ship_to_loki({"query": "q", "answer": "a", "path": "rag", "reranked_chunks": []})
@pytest.mark.asyncio
async def test_source_hit_proxy_logged_to_sqlite(tmp_path) -> None:
db_path = str(tmp_path / "interactions.db")
node = make_log_eval_node(db_path)
state = {
"session_id": "s1",
"query": "What work experience does Darshan have?",
"answer": "He worked at VK Live.",
"reranked_chunks": [
{"text": "a", "metadata": {"doc_id": "d1", "source_type": "resume", "rerank_score": -1.0}},
{"text": "b", "metadata": {"doc_id": "d2", "source_type": "resume", "rerank_score": -1.2}},
{"text": "c", "metadata": {"doc_id": "d3", "source_type": "resume", "rerank_score": -1.3}},
],
"latency_ms": 123,
"cached": False,
"path": "rag",
"is_enumeration_query": False,
"top_rerank_score": -1.0,
"retrieval_attempts": 1,
"follow_ups": [],
}
result = node(state)
assert "interaction_id" in result
with sqlite3.connect(db_path) as conn:
row = conn.execute("SELECT source_hit_proxy FROM interactions LIMIT 1").fetchone()
assert row is not None
assert row[0] == 1