import asyncio import json import sqlite3 import httpx import pytest from app.pipeline.nodes.log_eval import make_log_eval_node from app.services import loki_sink class _FakeResponse: def raise_for_status(self) -> None: return None @pytest.mark.asyncio async def test_ship_to_loki_sanitises_pii(monkeypatch) -> None: captured: dict = {} class _FakeAsyncClient: def __init__(self, *args, **kwargs): # No-op test double constructor. self._args = args self._kwargs = kwargs async def __aenter__(self): return self async def __aexit__(self, exc_type, exc, tb): return None async def post(self, url, json): await asyncio.sleep(0) captured["url"] = url captured["payload"] = json return _FakeResponse() monkeypatch.setattr(loki_sink, "_LOKI_ENABLED", True) monkeypatch.setattr(loki_sink, "_LOKI_URL", "https://example.grafana.net/loki/api/v1/push") monkeypatch.setattr(loki_sink, "_LOKI_USERNAME", "123") monkeypatch.setattr(loki_sink, "_LOKI_API_KEY", "glc_test") monkeypatch.setattr(loki_sink.httpx, "AsyncClient", _FakeAsyncClient) await loki_sink.ship_to_loki( { "timestamp": "2026-04-21T00:00:00Z", "session_id": "session-123", "query": "my private query", "answer": "long text", "path": "rag", "cached": False, "latency_ms": 100, "retrieval_attempts": 1, "top_rerank_score": -1.0, "focused_source_type": "resume", "sibling_expansion_count": 0, "critic_groundedness": 3, "critic_completeness": 3, "critic_specificity": 3, "critic_quality": "high", "is_enumeration_query": False, "guard_passed": True, "query_complexity": "simple", "is_followup": False, "is_audio_mode": False, "follow_ups": [], "reranked_chunks": [], } ) values = captured["payload"]["streams"][0]["values"] assert values serialized = values[0][1] assert "my private query" not in serialized assert "long text" not in serialized payload_record = json.loads(serialized) assert payload_record.get("query_hash") @pytest.mark.asyncio async def test_ship_to_loki_noop_when_no_url(monkeypatch) -> None: called = {"post": False} class _NeverCalledClient: def __init__(self, *args, **kwargs): # No-op test double constructor. self._args = args self._kwargs = kwargs async def __aenter__(self): return self async def __aexit__(self, exc_type, exc, tb): return None async def post(self, url, json): await asyncio.sleep(0) called["post"] = True return _FakeResponse() monkeypatch.setattr(loki_sink, "_LOKI_ENABLED", False) monkeypatch.setattr(loki_sink.httpx, "AsyncClient", _NeverCalledClient) await loki_sink.ship_to_loki({"query": "x", "answer": "y"}) assert called["post"] is False @pytest.mark.asyncio async def test_ship_to_loki_swallows_http_error(monkeypatch) -> None: class _FailingClient: def __init__(self, *args, **kwargs): # No-op test double constructor. self._args = args self._kwargs = kwargs async def __aenter__(self): return self async def __aexit__(self, exc_type, exc, tb): return None async def post(self, url, json): await asyncio.sleep(0) raise httpx.ConnectError("connection failed") monkeypatch.setattr(loki_sink, "_LOKI_ENABLED", True) monkeypatch.setattr(loki_sink, "_LOKI_URL", "https://example.grafana.net/loki/api/v1/push") monkeypatch.setattr(loki_sink, "_LOKI_USERNAME", "123") monkeypatch.setattr(loki_sink, "_LOKI_API_KEY", "glc_test") monkeypatch.setattr(loki_sink.httpx, "AsyncClient", _FailingClient) await loki_sink.ship_to_loki({"query": "q", "answer": "a", "path": "rag", "reranked_chunks": []}) @pytest.mark.asyncio async def test_source_hit_proxy_logged_to_sqlite(tmp_path) -> None: db_path = str(tmp_path / "interactions.db") node = make_log_eval_node(db_path) state = { "session_id": "s1", "query": "What work experience does Darshan have?", "answer": "He worked at VK Live.", "reranked_chunks": [ {"text": "a", "metadata": {"doc_id": "d1", "source_type": "resume", "rerank_score": -1.0}}, {"text": "b", "metadata": {"doc_id": "d2", "source_type": "resume", "rerank_score": -1.2}}, {"text": "c", "metadata": {"doc_id": "d3", "source_type": "resume", "rerank_score": -1.3}}, ], "latency_ms": 123, "cached": False, "path": "rag", "is_enumeration_query": False, "top_rerank_score": -1.0, "retrieval_attempts": 1, "follow_ups": [], } result = node(state) assert "interaction_id" in result with sqlite3.connect(db_path) as conn: row = conn.execute("SELECT source_hit_proxy FROM interactions LIMIT 1").fetchone() assert row is not None assert row[0] == 1