| """Text processing utilities for evidence handling.""" |
|
|
| from typing import TYPE_CHECKING |
|
|
| import numpy as np |
|
|
| if TYPE_CHECKING: |
| from src.services.embeddings import EmbeddingService |
| from src.utils.models import Evidence |
|
|
|
|
| def truncate_at_sentence(text: str, max_chars: int = 300) -> str: |
| """Truncate text at sentence boundary, preserving meaning. |
| |
| Args: |
| text: The text to truncate |
| max_chars: Maximum characters (default 300) |
| |
| Returns: |
| Text truncated at last complete sentence within limit |
| """ |
| if len(text) <= max_chars: |
| return text |
|
|
| |
| truncated = text[:max_chars] |
|
|
| |
| |
| for sep in [". ", "! ", "? ", ".\n", "!\n", "?\n"]: |
| last_sep = truncated.rfind(sep) |
| if last_sep > max_chars // 2: |
| return text[: last_sep + 1].strip() |
|
|
| |
| last_period = truncated.rfind(".") |
| if last_period > max_chars // 2: |
| return text[: last_period + 1].strip() |
|
|
| |
| last_space = truncated.rfind(" ") |
| if last_space > 0: |
| return text[:last_space].strip() + "..." |
|
|
| return truncated + "..." |
|
|
|
|
| async def select_diverse_evidence( |
| evidence: list["Evidence"], n: int, query: str, embeddings: "EmbeddingService | None" = None |
| ) -> list["Evidence"]: |
| """Select n most diverse and relevant evidence items. |
| |
| Uses Maximal Marginal Relevance (MMR) when embeddings available, |
| falls back to relevance_score sorting otherwise. |
| |
| Args: |
| evidence: All available evidence |
| n: Number of items to select |
| query: Original query for relevance scoring |
| embeddings: Optional EmbeddingService for semantic diversity |
| |
| Returns: |
| Selected evidence items, diverse and relevant |
| """ |
| if not evidence: |
| return [] |
|
|
| if n >= len(evidence): |
| return evidence |
|
|
| |
| if embeddings is None: |
| return sorted( |
| evidence, |
| key=lambda e: e.relevance, |
| reverse=True, |
| )[:n] |
|
|
| |
| |
| lambda_param = 0.7 |
|
|
| |
| query_emb = await embeddings.embed(query) |
|
|
| |
| evidence_embs = await embeddings.embed_batch([e.content for e in evidence]) |
|
|
| |
| def cosine(a: list[float], b: list[float]) -> float: |
| arr_a, arr_b = np.array(a), np.array(b) |
| denominator = float(np.linalg.norm(arr_a) * np.linalg.norm(arr_b)) |
| if denominator == 0: |
| return 0.0 |
| return float(np.dot(arr_a, arr_b) / denominator) |
|
|
| |
| |
| relevance_scores = [cosine(query_emb, emb) for emb in evidence_embs] |
|
|
| |
| selected_indices: list[int] = [] |
| remaining = set(range(len(evidence))) |
|
|
| for _ in range(n): |
| best_score = float("-inf") |
| best_idx = -1 |
|
|
| for idx in remaining: |
| |
| relevance = relevance_scores[idx] |
|
|
| |
| if selected_indices: |
| max_sim = max( |
| cosine(evidence_embs[idx], evidence_embs[sel]) for sel in selected_indices |
| ) |
| else: |
| max_sim = 0 |
|
|
| |
| mmr_score = lambda_param * relevance - (1 - lambda_param) * max_sim |
|
|
| if mmr_score > best_score: |
| best_score = mmr_score |
| best_idx = idx |
|
|
| if best_idx >= 0: |
| selected_indices.append(best_idx) |
| remaining.remove(best_idx) |
|
|
| return [evidence[i] for i in selected_indices] |
|
|