han-na's picture
human in the loop and other changes
6910834
"""
evaluation/logger.py
Structured logger for every query that passes through the pipeline.
Writes to:
1. The `query_logs` table in PostgreSQL (Railway) β€” for dashboarding and
SQL analysis. Returns the inserted row's id so callers can attach
downstream feedback events.
2. A local CSV file (for portability and DeepEval input).
Usage:
from evaluation.logger import log_query
query_id = log_query(intent, retrieved_docs, generation_result,
latency_ms=240, session_id="...", parent_query_id=None)
"""
from __future__ import annotations
import csv
import json
from datetime import datetime, timezone
from pathlib import Path
from typing import List, Optional
from database.schema import get_conn, get_cursor
from retrieval.query_understanding import QueryIntent
from retrieval.retriever import RetrievedDocument
from generation.generator import GenerationResult
LOG_CSV_PATH = Path("logs/query_log.csv")
CSV_HEADERS = [
"query_id",
"queried_at",
"session_id",
"parent_query_id",
"raw_query",
"rewritten_query",
"year_min",
"year_max",
"geography",
"topics",
"retrieved_ark_ids",
"response",
"latency_ms",
"relevancy_score",
"faithfulness_score",
]
def _ensure_csv(path: Path):
"""Create CSV with headers if it doesn't exist."""
path.parent.mkdir(parents=True, exist_ok=True)
if not path.exists():
with open(path, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=CSV_HEADERS)
writer.writeheader()
def _serialize_docs(docs: List[RetrievedDocument]) -> str:
"""JSON-encode the retrieved doc list with the fields that matter for
replay/audit (rank, ark_id, title, url, score)."""
payload = []
for rank, d in enumerate(docs, start=1):
payload.append({
"rank": rank,
"ark_id": d.ark_id,
"title": d.title,
"source_url": d.source_url,
"institution": d.institution,
"issue_date": d.issue_date,
"rrf_score": round(d.rrf_score, 6),
"metadata_sim": round(d.metadata_sim, 6),
"final_score": round(d.final_score, 6),
})
return json.dumps(payload, ensure_ascii=False)
def log_query(
intent: QueryIntent,
retrieved_docs: List[RetrievedDocument],
generation_result: GenerationResult,
latency_ms: int = 0,
relevancy_score: Optional[float] = None,
faithfulness_score: Optional[float] = None,
session_id: Optional[str] = None,
parent_query_id: Optional[int] = None,
) -> Optional[int]:
"""
Persist one query event to the DB and CSV log.
Returns the inserted query_logs.id, or None if the DB write failed.
Fails silently on errors so a logging failure never breaks the search UX.
"""
queried_at = datetime.now(timezone.utc).isoformat()
retrieved_arks = [d.ark_id for d in retrieved_docs]
filters_json = json.dumps({
"year_min": intent.date_filter.year_min,
"year_max": intent.date_filter.year_max,
})
docs_json = _serialize_docs(retrieved_docs)
titles_json = json.dumps(generation_result.source_titles or [], ensure_ascii=False)
urls_json = json.dumps(generation_result.source_urls or [], ensure_ascii=False)
# ── 1. Write to PostgreSQL ──────────────────────────────────────────────
query_id: Optional[int] = None
try:
with get_conn() as conn:
with get_cursor(conn) as cur:
cur.execute(
"""
INSERT INTO query_logs (
queried_at, raw_query, rewritten_query,
filters, retrieved_ark_ids, response,
relevancy_score, faithfulness_score, latency_ms,
session_id, retrieved_docs, source_titles, source_urls,
parent_query_id
) VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, %s,
%s, %s::jsonb, %s::jsonb, %s::jsonb, %s
)
RETURNING id
""",
(
queried_at,
intent.raw_query,
intent.rewritten_query,
filters_json,
retrieved_arks,
generation_result.response,
relevancy_score,
faithfulness_score,
latency_ms,
session_id,
docs_json,
titles_json,
urls_json,
parent_query_id,
),
)
row = cur.fetchone()
if row:
query_id = row["id"] if isinstance(row, dict) else row[0]
except Exception as e:
print(f"[logger] DB write failed (non-fatal): {e}")
# ── 2. Append to CSV ────────────────────────────────────────────────────
try:
_ensure_csv(LOG_CSV_PATH)
row = {
"query_id": query_id,
"queried_at": queried_at,
"session_id": session_id or "",
"parent_query_id": parent_query_id or "",
"raw_query": intent.raw_query,
"rewritten_query": intent.rewritten_query,
"year_min": intent.date_filter.year_min,
"year_max": intent.date_filter.year_max,
"geography": "",
"topics": "",
"retrieved_ark_ids": "|".join(retrieved_arks),
"response": generation_result.response,
"latency_ms": latency_ms,
"relevancy_score": relevancy_score,
"faithfulness_score": faithfulness_score,
}
with open(LOG_CSV_PATH, "a", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=CSV_HEADERS)
writer.writerow(row)
except Exception as e:
print(f"[logger] CSV write failed (non-fatal): {e}")
return query_id