File size: 4,157 Bytes
6bef416
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
from __future__ import annotations

import pandas as pd
import streamlit as st

from src.analytics import RETRIEVAL_OK_THRESHOLD, top_examples
from src.text_search import literal_text_mask
from src.ui import callout, section_title, trace_box


class TraceExplorerViewMixin:
    """Example-level trace review page."""

    def _page_trace_explorer(self) -> None:
        section_title("Trace Explorer", "Inspect examples, retrieved chunks, diagnosis, and metadata.")
        mode = st.radio("Example queue", ["High risk", "Incorrect", "Hallucination", "Low retrieval"], horizontal=True, key="trace_mode")
        examples = top_examples(self.ctx.filtered_eval, mode=mode, n=150, reference_df=self.data.eval_runs)
        search = st.text_input("Search questions", "", key="trace_search")
        if search and "query" in examples.columns:
            examples = examples[literal_text_mask(examples["query"], search)]
        if examples.empty:
            callout("warn", "No examples", "Try another queue or relax filters.")
            return

        id_col = "example_id" if "example_id" in examples.columns else examples.columns[0]
        selected_id = st.selectbox("Select example", examples[id_col].astype(str).tolist(), key="trace_example_id")
        ex = examples[examples[id_col].astype(str) == selected_id].iloc[0]
        self._render_trace_detail(ex, selected_id)

    def _render_trace_detail(self, ex: pd.Series, selected_id: str) -> None:
        left, middle, right = st.columns([1.15, 1.0, 0.85], gap="large")
        with left:
            trace_box("Question", str(ex.get("query", "No query column available.")))
            trace_box("Gold answer", str(ex.get("gold_answer", "No gold answer available.")))
        with middle:
            st.json(
                {
                    "domain": ex.get("domain"),
                    "scenario_type": ex.get("scenario_type"),
                    "difficulty": ex.get("difficulty"),
                    "correct": ex.get("is_correct"),
                    "hallucination": ex.get("hallucination_flag"),
                    "recall_at_10": ex.get("recall_at_10"),
                    "mrr_at_10": ex.get("mrr_at_10"),
                }
            )
        with right:
            diagnosis, kind = self._diagnose_example(ex)
            callout(kind, diagnosis, "Use the retrieved chunks below to validate evidence and decide the next debugging action.")
        self._render_retrieved_chunks(selected_id)

    @staticmethod
    def _diagnose_example(ex: pd.Series) -> tuple[str, str]:
        correct = float(ex.get("is_correct", 0) or 0)
        hallucination = float(ex.get("hallucination_flag", 0) or 0)
        recall = float(ex.get("recall_at_10", 0) or 0)
        if hallucination >= 0.5:
            return "Hallucination review needed", "bad"
        if recall < RETRIEVAL_OK_THRESHOLD and correct < 0.5:
            return "Likely retrieval failure", "bad"
        if recall >= RETRIEVAL_OK_THRESHOLD and correct < 0.5:
            return "Likely generation failure", "warn"
        return "Healthy or recovered case", "good"

    def _render_retrieved_chunks(self, selected_id: str) -> None:
        if "example_id" not in self.data.retrieval_events.columns or "chunk_id" not in self.data.retrieval_events.columns:
            callout("info", "No retrieval table", "Retrieval events are unavailable in the packaged data.")
            return
        ret = self.ctx.filtered_retrieval[self.ctx.filtered_retrieval["example_id"].astype(str) == str(selected_id)].copy()
        if ret.empty:
            callout("info", "No retrieval rows", "This example has no matching retrieval event rows under the current filters.")
            return
        chunk_lookup = self.data.chunks[[c for c in ["chunk_id", "doc_id", "chunk_text"] if c in self.data.chunks.columns]].copy()
        ret = ret.merge(chunk_lookup, on="chunk_id", how="left")
        show_cols = [c for c in ["rank", "chunk_id", "retrieval_score", "is_relevant", "doc_id", "chunk_text"] if c in ret.columns]
        st.dataframe(ret[show_cols].sort_values("rank").head(12), use_container_width=True, hide_index=True)