| """Turn inline [n] markers in an answer into Citation objects. |
| |
| We only emit citations for markers the model actually used AND that map to a real |
| retrieved chunk. Markers that point past the context are dropped (defensive |
| against a model inventing [9] when only 4 passages exist). |
| """ |
| from __future__ import annotations |
|
|
| import re |
| from typing import List |
|
|
| from app.schemas import Citation, RetrievedChunk |
|
|
| _MARKER_RE = re.compile(r"\[(\d+)\]") |
|
|
|
|
| def _snippet(text: str, limit: int = 320) -> str: |
| text = " ".join(text.split()) |
| return text if len(text) <= limit else text[: limit - 1].rstrip() + "…" |
|
|
|
|
| def used_marker_indices(answer: str) -> List[int]: |
| """Distinct 1-based marker numbers used in the answer, in first-seen order.""" |
| seen = [] |
| for m in _MARKER_RE.finditer(answer): |
| n = int(m.group(1)) |
| if n not in seen: |
| seen.append(n) |
| return seen |
|
|
|
|
| def build_citations(answer: str, context: List[RetrievedChunk]) -> List[Citation]: |
| """Map used [n] markers to Citation objects (n is 1-based into context).""" |
| citations: List[Citation] = [] |
| for n in used_marker_indices(answer): |
| if 1 <= n <= len(context): |
| c = context[n - 1] |
| citations.append( |
| Citation( |
| marker=str(n), |
| source_id=c.source_id, |
| chunk_id=c.chunk_id, |
| title=c.title, |
| url=c.url, |
| publisher=c.publisher, |
| retrieved=c.retrieved, |
| snippet=_snippet(c.text), |
| ) |
| ) |
| return citations |
|
|
|
|
| def strip_unsupported_markers(answer: str, context_len: int) -> str: |
| """Remove any [n] where n is out of range, so the UI never shows a dead cite.""" |
|
|
| def repl(m: "re.Match") -> str: |
| n = int(m.group(1)) |
| return m.group(0) if 1 <= n <= context_len else "" |
|
|
| cleaned = _MARKER_RE.sub(repl, answer) |
| |
| return re.sub(r"[ \t]{2,}", " ", cleaned).strip() |
|
|