File size: 5,353 Bytes
5941cd9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
import asyncio
import json
import sqlite3

import httpx
import pytest

from app.pipeline.nodes.log_eval import make_log_eval_node
from app.services import loki_sink


class _FakeResponse:
    def raise_for_status(self) -> None:
        return None


@pytest.mark.asyncio
async def test_ship_to_loki_sanitises_pii(monkeypatch) -> None:
    captured: dict = {}

    class _FakeAsyncClient:
        def __init__(self, *args, **kwargs):
            # No-op test double constructor.
            self._args = args
            self._kwargs = kwargs

        async def __aenter__(self):
            return self

        async def __aexit__(self, exc_type, exc, tb):
            return None

        async def post(self, url, json):
            await asyncio.sleep(0)
            captured["url"] = url
            captured["payload"] = json
            return _FakeResponse()

    monkeypatch.setattr(loki_sink, "_LOKI_ENABLED", True)
    monkeypatch.setattr(loki_sink, "_LOKI_URL", "https://example.grafana.net/loki/api/v1/push")
    monkeypatch.setattr(loki_sink, "_LOKI_USERNAME", "123")
    monkeypatch.setattr(loki_sink, "_LOKI_API_KEY", "glc_test")
    monkeypatch.setattr(loki_sink.httpx, "AsyncClient", _FakeAsyncClient)

    await loki_sink.ship_to_loki(
        {
            "timestamp": "2026-04-21T00:00:00Z",
            "session_id": "session-123",
            "query": "my private query",
            "answer": "long text",
            "path": "rag",
            "cached": False,
            "latency_ms": 100,
            "retrieval_attempts": 1,
            "top_rerank_score": -1.0,
            "focused_source_type": "resume",
            "sibling_expansion_count": 0,
            "critic_groundedness": 3,
            "critic_completeness": 3,
            "critic_specificity": 3,
            "critic_quality": "high",
            "is_enumeration_query": False,
            "guard_passed": True,
            "query_complexity": "simple",
            "is_followup": False,
            "is_audio_mode": False,
            "follow_ups": [],
            "reranked_chunks": [],
        }
    )

    values = captured["payload"]["streams"][0]["values"]
    assert values
    serialized = values[0][1]
    assert "my private query" not in serialized
    assert "long text" not in serialized

    payload_record = json.loads(serialized)
    assert payload_record.get("query_hash")


@pytest.mark.asyncio
async def test_ship_to_loki_noop_when_no_url(monkeypatch) -> None:
    called = {"post": False}

    class _NeverCalledClient:
        def __init__(self, *args, **kwargs):
            # No-op test double constructor.
            self._args = args
            self._kwargs = kwargs

        async def __aenter__(self):
            return self

        async def __aexit__(self, exc_type, exc, tb):
            return None

        async def post(self, url, json):
            await asyncio.sleep(0)
            called["post"] = True
            return _FakeResponse()

    monkeypatch.setattr(loki_sink, "_LOKI_ENABLED", False)
    monkeypatch.setattr(loki_sink.httpx, "AsyncClient", _NeverCalledClient)

    await loki_sink.ship_to_loki({"query": "x", "answer": "y"})

    assert called["post"] is False


@pytest.mark.asyncio
async def test_ship_to_loki_swallows_http_error(monkeypatch) -> None:
    class _FailingClient:
        def __init__(self, *args, **kwargs):
            # No-op test double constructor.
            self._args = args
            self._kwargs = kwargs

        async def __aenter__(self):
            return self

        async def __aexit__(self, exc_type, exc, tb):
            return None

        async def post(self, url, json):
            await asyncio.sleep(0)
            raise httpx.ConnectError("connection failed")

    monkeypatch.setattr(loki_sink, "_LOKI_ENABLED", True)
    monkeypatch.setattr(loki_sink, "_LOKI_URL", "https://example.grafana.net/loki/api/v1/push")
    monkeypatch.setattr(loki_sink, "_LOKI_USERNAME", "123")
    monkeypatch.setattr(loki_sink, "_LOKI_API_KEY", "glc_test")
    monkeypatch.setattr(loki_sink.httpx, "AsyncClient", _FailingClient)

    await loki_sink.ship_to_loki({"query": "q", "answer": "a", "path": "rag", "reranked_chunks": []})


@pytest.mark.asyncio
async def test_source_hit_proxy_logged_to_sqlite(tmp_path) -> None:
    db_path = str(tmp_path / "interactions.db")
    node = make_log_eval_node(db_path)

    state = {
        "session_id": "s1",
        "query": "What work experience does Darshan have?",
        "answer": "He worked at VK Live.",
        "reranked_chunks": [
            {"text": "a", "metadata": {"doc_id": "d1", "source_type": "resume", "rerank_score": -1.0}},
            {"text": "b", "metadata": {"doc_id": "d2", "source_type": "resume", "rerank_score": -1.2}},
            {"text": "c", "metadata": {"doc_id": "d3", "source_type": "resume", "rerank_score": -1.3}},
        ],
        "latency_ms": 123,
        "cached": False,
        "path": "rag",
        "is_enumeration_query": False,
        "top_rerank_score": -1.0,
        "retrieval_attempts": 1,
        "follow_ups": [],
    }

    result = node(state)
    assert "interaction_id" in result

    with sqlite3.connect(db_path) as conn:
        row = conn.execute("SELECT source_hit_proxy FROM interactions LIMIT 1").fetchone()

    assert row is not None
    assert row[0] == 1