Spaces:
Sleeping
Sleeping
| """ | |
| evaluation/logger.py | |
| Structured logger for every query that passes through the pipeline. | |
| Writes to: | |
| 1. The `query_logs` table in PostgreSQL (Railway) β for dashboarding and | |
| SQL analysis. Returns the inserted row's id so callers can attach | |
| downstream feedback events. | |
| 2. A local CSV file (for portability and DeepEval input). | |
| Usage: | |
| from evaluation.logger import log_query | |
| query_id = log_query(intent, retrieved_docs, generation_result, | |
| latency_ms=240, session_id="...", parent_query_id=None) | |
| """ | |
| from __future__ import annotations | |
| import csv | |
| import json | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from typing import List, Optional | |
| from database.schema import get_conn, get_cursor | |
| from retrieval.query_understanding import QueryIntent | |
| from retrieval.retriever import RetrievedDocument | |
| from generation.generator import GenerationResult | |
| LOG_CSV_PATH = Path("logs/query_log.csv") | |
| CSV_HEADERS = [ | |
| "query_id", | |
| "queried_at", | |
| "session_id", | |
| "parent_query_id", | |
| "raw_query", | |
| "rewritten_query", | |
| "year_min", | |
| "year_max", | |
| "geography", | |
| "topics", | |
| "retrieved_ark_ids", | |
| "response", | |
| "latency_ms", | |
| "relevancy_score", | |
| "faithfulness_score", | |
| ] | |
| def _ensure_csv(path: Path): | |
| """Create CSV with headers if it doesn't exist.""" | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| if not path.exists(): | |
| with open(path, "w", newline="", encoding="utf-8") as f: | |
| writer = csv.DictWriter(f, fieldnames=CSV_HEADERS) | |
| writer.writeheader() | |
| def _serialize_docs(docs: List[RetrievedDocument]) -> str: | |
| """JSON-encode the retrieved doc list with the fields that matter for | |
| replay/audit (rank, ark_id, title, url, score).""" | |
| payload = [] | |
| for rank, d in enumerate(docs, start=1): | |
| payload.append({ | |
| "rank": rank, | |
| "ark_id": d.ark_id, | |
| "title": d.title, | |
| "source_url": d.source_url, | |
| "institution": d.institution, | |
| "issue_date": d.issue_date, | |
| "rrf_score": round(d.rrf_score, 6), | |
| "metadata_sim": round(d.metadata_sim, 6), | |
| "final_score": round(d.final_score, 6), | |
| }) | |
| return json.dumps(payload, ensure_ascii=False) | |
| def log_query( | |
| intent: QueryIntent, | |
| retrieved_docs: List[RetrievedDocument], | |
| generation_result: GenerationResult, | |
| latency_ms: int = 0, | |
| relevancy_score: Optional[float] = None, | |
| faithfulness_score: Optional[float] = None, | |
| session_id: Optional[str] = None, | |
| parent_query_id: Optional[int] = None, | |
| ) -> Optional[int]: | |
| """ | |
| Persist one query event to the DB and CSV log. | |
| Returns the inserted query_logs.id, or None if the DB write failed. | |
| Fails silently on errors so a logging failure never breaks the search UX. | |
| """ | |
| queried_at = datetime.now(timezone.utc).isoformat() | |
| retrieved_arks = [d.ark_id for d in retrieved_docs] | |
| filters_json = json.dumps({ | |
| "year_min": intent.date_filter.year_min, | |
| "year_max": intent.date_filter.year_max, | |
| }) | |
| docs_json = _serialize_docs(retrieved_docs) | |
| titles_json = json.dumps(generation_result.source_titles or [], ensure_ascii=False) | |
| urls_json = json.dumps(generation_result.source_urls or [], ensure_ascii=False) | |
| # ββ 1. Write to PostgreSQL ββββββββββββββββββββββββββββββββββββββββββββββ | |
| query_id: Optional[int] = None | |
| try: | |
| with get_conn() as conn: | |
| with get_cursor(conn) as cur: | |
| cur.execute( | |
| """ | |
| INSERT INTO query_logs ( | |
| queried_at, raw_query, rewritten_query, | |
| filters, retrieved_ark_ids, response, | |
| relevancy_score, faithfulness_score, latency_ms, | |
| session_id, retrieved_docs, source_titles, source_urls, | |
| parent_query_id | |
| ) VALUES ( | |
| %s, %s, %s, %s, %s, %s, %s, %s, %s, | |
| %s, %s::jsonb, %s::jsonb, %s::jsonb, %s | |
| ) | |
| RETURNING id | |
| """, | |
| ( | |
| queried_at, | |
| intent.raw_query, | |
| intent.rewritten_query, | |
| filters_json, | |
| retrieved_arks, | |
| generation_result.response, | |
| relevancy_score, | |
| faithfulness_score, | |
| latency_ms, | |
| session_id, | |
| docs_json, | |
| titles_json, | |
| urls_json, | |
| parent_query_id, | |
| ), | |
| ) | |
| row = cur.fetchone() | |
| if row: | |
| query_id = row["id"] if isinstance(row, dict) else row[0] | |
| except Exception as e: | |
| print(f"[logger] DB write failed (non-fatal): {e}") | |
| # ββ 2. Append to CSV ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| try: | |
| _ensure_csv(LOG_CSV_PATH) | |
| row = { | |
| "query_id": query_id, | |
| "queried_at": queried_at, | |
| "session_id": session_id or "", | |
| "parent_query_id": parent_query_id or "", | |
| "raw_query": intent.raw_query, | |
| "rewritten_query": intent.rewritten_query, | |
| "year_min": intent.date_filter.year_min, | |
| "year_max": intent.date_filter.year_max, | |
| "geography": "", | |
| "topics": "", | |
| "retrieved_ark_ids": "|".join(retrieved_arks), | |
| "response": generation_result.response, | |
| "latency_ms": latency_ms, | |
| "relevancy_score": relevancy_score, | |
| "faithfulness_score": faithfulness_score, | |
| } | |
| with open(LOG_CSV_PATH, "a", newline="", encoding="utf-8") as f: | |
| writer = csv.DictWriter(f, fieldnames=CSV_HEADERS) | |
| writer.writerow(row) | |
| except Exception as e: | |
| print(f"[logger] CSV write failed (non-fatal): {e}") | |
| return query_id | |