GitHub Actions commited on
Commit
fd63c95
·
1 Parent(s): 523b45a

Deploy daa0049

Browse files
app/pipeline/nodes/log_eval.py CHANGED
@@ -7,7 +7,7 @@ from datetime import datetime, timezone
7
  from typing import Callable
8
 
9
  from app.models.pipeline import PipelineState
10
- from app.services.loki_sink import ship_to_loki
11
 
12
  logger = logging.getLogger(__name__)
13
  _PENDING_TASKS: set[asyncio.Task[None]] = set()
@@ -73,7 +73,7 @@ def _source_hit_proxy(state: PipelineState) -> int:
73
  def make_log_eval_node(db_path: str) -> Callable[[PipelineState], dict]:
74
  """
75
  Writes interaction to SQLite synchronously (<5ms) inside the request lifespan.
76
- Ships a sanitised observability record to Grafana Loki asynchronously.
77
 
78
  The `path` field tags which pipeline branch produced the answer:
79
  "cache_hit" — served from semantic cache, no LLM called.
@@ -141,7 +141,7 @@ def make_log_eval_node(db_path: str) -> Callable[[PipelineState], dict]:
141
  )
142
  return cursor.lastrowid # type: ignore[return-value]
143
 
144
- def _build_loki_record(state: PipelineState) -> dict:
145
  reranked_chunks = state.get("reranked_chunks", [])
146
  return {
147
  "timestamp": datetime.now(tz=timezone.utc).isoformat(),
@@ -183,13 +183,13 @@ def make_log_eval_node(db_path: str) -> Callable[[PipelineState], dict]:
183
  row_id = _write_to_sqlite(state)
184
  try:
185
  loop = asyncio.get_running_loop()
186
- task = loop.create_task(ship_to_loki(_build_loki_record(state)))
187
  _PENDING_TASKS.add(task)
188
  task.add_done_callback(_PENDING_TASKS.discard)
189
  except RuntimeError:
190
  # Called outside an event loop (for example, synchronous unit tests).
191
- # SQLite write still succeeds; Loki shipping is skipped.
192
- logger.debug("No running event loop; skipping async Loki ship for this call.")
193
 
194
  return {"interaction_id": row_id}
195
  except Exception as e:
 
7
  from typing import Callable
8
 
9
  from app.models.pipeline import PipelineState
10
+ from app.services.axiom_sink import ship_to_axiom
11
 
12
  logger = logging.getLogger(__name__)
13
  _PENDING_TASKS: set[asyncio.Task[None]] = set()
 
73
  def make_log_eval_node(db_path: str) -> Callable[[PipelineState], dict]:
74
  """
75
  Writes interaction to SQLite synchronously (<5ms) inside the request lifespan.
76
+ Ships a sanitised observability record to Axiom asynchronously.
77
 
78
  The `path` field tags which pipeline branch produced the answer:
79
  "cache_hit" — served from semantic cache, no LLM called.
 
141
  )
142
  return cursor.lastrowid # type: ignore[return-value]
143
 
144
+ def _build_axiom_record(state: PipelineState) -> dict:
145
  reranked_chunks = state.get("reranked_chunks", [])
146
  return {
147
  "timestamp": datetime.now(tz=timezone.utc).isoformat(),
 
183
  row_id = _write_to_sqlite(state)
184
  try:
185
  loop = asyncio.get_running_loop()
186
+ task = loop.create_task(ship_to_axiom(_build_axiom_record(state)))
187
  _PENDING_TASKS.add(task)
188
  task.add_done_callback(_PENDING_TASKS.discard)
189
  except RuntimeError:
190
  # Called outside an event loop (for example, synchronous unit tests).
191
+ # SQLite write still succeeds; Axiom shipping is skipped.
192
+ logger.debug("No running event loop; skipping async Axiom ship for this call.")
193
 
194
  return {"interaction_id": row_id}
195
  except Exception as e:
app/services/axiom_sink.py ADDED
@@ -0,0 +1,106 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import hashlib
4
+ import json
5
+ import logging
6
+ import os
7
+ from typing import Any
8
+
9
+ import httpx
10
+
11
+ logger = logging.getLogger(__name__)
12
+
13
+ _AXIOM_TOKEN = os.environ.get("AXIOM_TOKEN", "").strip()
14
+ _AXIOM_DATASET = os.environ.get("AXIOM_DATASET", "").strip()
15
+ _AXIOM_ORG_ID = os.environ.get("AXIOM_ORG_ID", "").strip()
16
+ _AXIOM_INGEST_URL = os.environ.get("AXIOM_INGEST_URL", "").strip()
17
+
18
+ if _AXIOM_INGEST_URL:
19
+ _AXIOM_URL = _AXIOM_INGEST_URL
20
+ elif _AXIOM_DATASET:
21
+ _AXIOM_URL = f"https://api.axiom.co/v1/datasets/{_AXIOM_DATASET}/ingest"
22
+ else:
23
+ _AXIOM_URL = ""
24
+
25
+ _AXIOM_ENABLED = bool(_AXIOM_URL and _AXIOM_TOKEN)
26
+ if not _AXIOM_ENABLED:
27
+ logger.warning("Axiom is not configured; Axiom shipping is disabled.")
28
+
29
+
30
+ def _sha_prefix(value: str, prefix_len: int) -> str:
31
+ return hashlib.sha256(value.encode("utf-8")).hexdigest()[:prefix_len]
32
+
33
+
34
+ def _to_float_or_none(value: Any) -> float | None:
35
+ if isinstance(value, (int, float)):
36
+ return float(value)
37
+ return None
38
+
39
+
40
+ def _resolve_source_hit_proxy(record: dict[str, Any], top_rerank_score: float | None, chunk_count: int) -> int:
41
+ explicit = record.get("source_hit_proxy")
42
+ if explicit is not None:
43
+ return int(explicit)
44
+ return int(top_rerank_score is not None and top_rerank_score > -1.5 and chunk_count >= 2)
45
+
46
+
47
+ def _build_sanitized_record(record: dict[str, Any]) -> dict[str, Any]:
48
+ query = str(record.get("query", ""))
49
+ session_id = str(record.get("session_id", ""))
50
+
51
+ rerank_scores = record.get("rerank_scores") or []
52
+ source_types_used = record.get("source_types_used") or []
53
+ chunk_count = int(record.get("chunk_count", 0) or 0)
54
+ top_chunk_doc_id = str(record.get("top_chunk_doc_id", "") or "")
55
+
56
+ top_rerank_score = _to_float_or_none(record.get("top_rerank_score"))
57
+ source_hit_proxy = _resolve_source_hit_proxy(record, top_rerank_score, chunk_count)
58
+
59
+ return {
60
+ "timestamp": str(record.get("timestamp", "")),
61
+ "session_id": _sha_prefix(session_id, 12) if session_id else "",
62
+ "path": str(record.get("path", "rag") or "rag"),
63
+ "cached": bool(record.get("cached", False)),
64
+ "latency_ms": int(record.get("latency_ms", 0) or 0),
65
+ "retrieval_attempts": int(record.get("retrieval_attempts", 0) or 0),
66
+ "top_rerank_score": top_rerank_score,
67
+ "focused_source_type": str(record.get("focused_source_type", "") or ""),
68
+ "sibling_expansion_count": int(record.get("sibling_expansion_count", 0) or 0),
69
+ "critic_groundedness": record.get("critic_groundedness"),
70
+ "critic_completeness": record.get("critic_completeness"),
71
+ "critic_specificity": record.get("critic_specificity"),
72
+ "critic_quality": str(record.get("critic_quality", "") or ""),
73
+ "is_enumeration_query": bool(record.get("is_enumeration_query", False)),
74
+ "guard_passed": bool(record.get("guard_passed", False)),
75
+ "query_complexity": str(record.get("query_complexity", "") or ""),
76
+ "is_followup": bool(record.get("is_followup", False)),
77
+ "is_audio_mode": bool(record.get("is_audio_mode", False)),
78
+ "query_hash": _sha_prefix(query, 16) if query else "",
79
+ "chunk_count": chunk_count,
80
+ "top_chunk_doc_id": top_chunk_doc_id,
81
+ "source_types_used": sorted(str(source_type) for source_type in source_types_used if str(source_type)),
82
+ "follow_up_count": len(record.get("follow_ups") or []),
83
+ "rerank_scores": [float(score) for score in rerank_scores if isinstance(score, (int, float))],
84
+ "source_hit_proxy": source_hit_proxy,
85
+ }
86
+
87
+
88
+ async def ship_to_axiom(record: dict[str, Any]) -> None:
89
+ if not _AXIOM_ENABLED:
90
+ return
91
+
92
+ try:
93
+ sanitized = _build_sanitized_record(record)
94
+ headers = {
95
+ "Authorization": f"Bearer {_AXIOM_TOKEN}",
96
+ "Content-Type": "application/json",
97
+ }
98
+ if _AXIOM_ORG_ID:
99
+ headers["X-Axiom-Org-Id"] = _AXIOM_ORG_ID
100
+
101
+ timeout = httpx.Timeout(3.0)
102
+ async with httpx.AsyncClient(timeout=timeout, headers=headers) as client:
103
+ response = await client.post(_AXIOM_URL, json=[sanitized])
104
+ response.raise_for_status()
105
+ except Exception as exc:
106
+ logger.warning("Axiom ship failed: %s", exc)
tests/test_axiom_sink.py ADDED
@@ -0,0 +1,166 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ import json
3
+ import sqlite3
4
+
5
+ import httpx
6
+ import pytest
7
+
8
+ from app.pipeline.nodes.log_eval import make_log_eval_node
9
+ from app.services import axiom_sink
10
+
11
+
12
+ class _FakeResponse:
13
+ def raise_for_status(self) -> None:
14
+ return None
15
+
16
+
17
+ @pytest.mark.asyncio
18
+ async def test_ship_to_axiom_sanitises_pii(monkeypatch) -> None:
19
+ captured: dict = {}
20
+
21
+ class _FakeAsyncClient:
22
+ def __init__(self, *args, **kwargs):
23
+ self._args = args
24
+ self._kwargs = kwargs
25
+
26
+ async def __aenter__(self):
27
+ return self
28
+
29
+ async def __aexit__(self, exc_type, exc, tb):
30
+ return None
31
+
32
+ async def post(self, url, json):
33
+ await asyncio.sleep(0)
34
+ captured["url"] = url
35
+ captured["payload"] = json
36
+ return _FakeResponse()
37
+
38
+ monkeypatch.setattr(axiom_sink, "_AXIOM_ENABLED", True)
39
+ monkeypatch.setattr(axiom_sink, "_AXIOM_URL", "https://api.axiom.co/v1/datasets/personabot/ingest")
40
+ monkeypatch.setattr(axiom_sink, "_AXIOM_TOKEN", "xaat-test-token")
41
+ monkeypatch.setattr(axiom_sink, "_AXIOM_ORG_ID", "")
42
+ monkeypatch.setattr(axiom_sink.httpx, "AsyncClient", _FakeAsyncClient)
43
+
44
+ await axiom_sink.ship_to_axiom(
45
+ {
46
+ "timestamp": "2026-04-21T00:00:00Z",
47
+ "session_id": "session-123",
48
+ "query": "my private query",
49
+ "answer": "long text",
50
+ "path": "rag",
51
+ "cached": False,
52
+ "latency_ms": 100,
53
+ "retrieval_attempts": 1,
54
+ "top_rerank_score": -1.0,
55
+ "focused_source_type": "resume",
56
+ "sibling_expansion_count": 0,
57
+ "critic_groundedness": 3,
58
+ "critic_completeness": 3,
59
+ "critic_specificity": 3,
60
+ "critic_quality": "high",
61
+ "is_enumeration_query": False,
62
+ "guard_passed": True,
63
+ "query_complexity": "simple",
64
+ "is_followup": False,
65
+ "is_audio_mode": False,
66
+ "follow_ups": [],
67
+ "chunk_count": 0,
68
+ "top_chunk_doc_id": "",
69
+ "source_types_used": [],
70
+ "rerank_scores": [],
71
+ }
72
+ )
73
+
74
+ serialized = json.dumps(captured["payload"])
75
+ assert "my private query" not in serialized
76
+ assert "long text" not in serialized
77
+
78
+ payload_record = captured["payload"][0]
79
+ assert payload_record.get("query_hash")
80
+
81
+
82
+ @pytest.mark.asyncio
83
+ async def test_ship_to_axiom_noop_when_not_configured(monkeypatch) -> None:
84
+ called = {"post": False}
85
+
86
+ class _NeverCalledClient:
87
+ def __init__(self, *args, **kwargs):
88
+ self._args = args
89
+ self._kwargs = kwargs
90
+
91
+ async def __aenter__(self):
92
+ return self
93
+
94
+ async def __aexit__(self, exc_type, exc, tb):
95
+ return None
96
+
97
+ async def post(self, url, json):
98
+ await asyncio.sleep(0)
99
+ called["post"] = True
100
+ return _FakeResponse()
101
+
102
+ monkeypatch.setattr(axiom_sink, "_AXIOM_ENABLED", False)
103
+ monkeypatch.setattr(axiom_sink.httpx, "AsyncClient", _NeverCalledClient)
104
+
105
+ await axiom_sink.ship_to_axiom({"query": "x", "answer": "y"})
106
+
107
+ assert called["post"] is False
108
+
109
+
110
+ @pytest.mark.asyncio
111
+ async def test_ship_to_axiom_swallows_http_error(monkeypatch) -> None:
112
+ class _FailingClient:
113
+ def __init__(self, *args, **kwargs):
114
+ self._args = args
115
+ self._kwargs = kwargs
116
+
117
+ async def __aenter__(self):
118
+ return self
119
+
120
+ async def __aexit__(self, exc_type, exc, tb):
121
+ return None
122
+
123
+ async def post(self, url, json):
124
+ await asyncio.sleep(0)
125
+ raise httpx.ConnectError("connection failed")
126
+
127
+ monkeypatch.setattr(axiom_sink, "_AXIOM_ENABLED", True)
128
+ monkeypatch.setattr(axiom_sink, "_AXIOM_URL", "https://api.axiom.co/v1/datasets/personabot/ingest")
129
+ monkeypatch.setattr(axiom_sink, "_AXIOM_TOKEN", "xaat-test-token")
130
+ monkeypatch.setattr(axiom_sink, "_AXIOM_ORG_ID", "")
131
+ monkeypatch.setattr(axiom_sink.httpx, "AsyncClient", _FailingClient)
132
+
133
+ await axiom_sink.ship_to_axiom({"query": "q", "answer": "a", "path": "rag", "chunk_count": 0})
134
+
135
+
136
+ @pytest.mark.asyncio
137
+ async def test_source_hit_proxy_logged_to_sqlite(tmp_path) -> None:
138
+ db_path = str(tmp_path / "interactions.db")
139
+ node = make_log_eval_node(db_path)
140
+
141
+ state = {
142
+ "session_id": "s1",
143
+ "query": "What work experience does Darshan have?",
144
+ "answer": "He worked at VK Live.",
145
+ "reranked_chunks": [
146
+ {"text": "a", "metadata": {"doc_id": "d1", "source_type": "resume", "rerank_score": -1.0}},
147
+ {"text": "b", "metadata": {"doc_id": "d2", "source_type": "resume", "rerank_score": -1.2}},
148
+ {"text": "c", "metadata": {"doc_id": "d3", "source_type": "resume", "rerank_score": -1.3}},
149
+ ],
150
+ "latency_ms": 123,
151
+ "cached": False,
152
+ "path": "rag",
153
+ "is_enumeration_query": False,
154
+ "top_rerank_score": -1.0,
155
+ "retrieval_attempts": 1,
156
+ "follow_ups": [],
157
+ }
158
+
159
+ result = node(state)
160
+ assert "interaction_id" in result
161
+
162
+ with sqlite3.connect(db_path) as conn:
163
+ row = conn.execute("SELECT source_hit_proxy FROM interactions LIMIT 1").fetchone()
164
+
165
+ assert row is not None
166
+ assert row[0] == 1
tests/test_log_eval_privacy.py CHANGED
@@ -46,16 +46,16 @@ def test_log_eval_stores_chunk_metadata_without_text(tmp_path) -> None:
46
 
47
 
48
  @pytest.mark.asyncio
49
- async def test_log_eval_sends_sanitized_loki_payload(monkeypatch, tmp_path) -> None:
50
  db_path = str(tmp_path / "interactions.db")
51
  node = make_log_eval_node(db_path)
52
  captured: dict = {}
53
 
54
- async def _fake_ship_to_loki(record: dict) -> None:
55
  await asyncio.sleep(0)
56
  captured["record"] = record
57
 
58
- monkeypatch.setattr("app.pipeline.nodes.log_eval.ship_to_loki", _fake_ship_to_loki)
59
 
60
  node(
61
  {
 
46
 
47
 
48
  @pytest.mark.asyncio
49
+ async def test_log_eval_sends_sanitized_axiom_payload(monkeypatch, tmp_path) -> None:
50
  db_path = str(tmp_path / "interactions.db")
51
  node = make_log_eval_node(db_path)
52
  captured: dict = {}
53
 
54
+ async def _fake_ship_to_axiom(record: dict) -> None:
55
  await asyncio.sleep(0)
56
  captured["record"] = record
57
 
58
+ monkeypatch.setattr("app.pipeline.nodes.log_eval.ship_to_axiom", _fake_ship_to_axiom)
59
 
60
  node(
61
  {