|
|
"""Text processing utilities for evidence handling.""" |
|
|
|
|
|
from typing import TYPE_CHECKING |
|
|
|
|
|
import numpy as np |
|
|
|
|
|
if TYPE_CHECKING: |
|
|
from src.services.embedding_protocol import EmbeddingServiceProtocol |
|
|
from src.utils.models import Evidence |
|
|
|
|
|
|
|
|
def truncate_at_sentence(text: str, max_chars: int = 300) -> str: |
|
|
"""Truncate text at sentence boundary, preserving meaning. |
|
|
|
|
|
Args: |
|
|
text: The text to truncate |
|
|
max_chars: Maximum characters (default 300) |
|
|
|
|
|
Returns: |
|
|
Text truncated at last complete sentence within limit |
|
|
""" |
|
|
if len(text) <= max_chars: |
|
|
return text |
|
|
|
|
|
|
|
|
truncated = text[:max_chars] |
|
|
|
|
|
|
|
|
|
|
|
for sep in [". ", "! ", "? ", ".\n", "!\n", "?\n"]: |
|
|
last_sep = truncated.rfind(sep) |
|
|
if last_sep > max_chars // 2: |
|
|
return text[: last_sep + 1].strip() |
|
|
|
|
|
|
|
|
last_period = truncated.rfind(".") |
|
|
if last_period > max_chars // 2: |
|
|
return text[: last_period + 1].strip() |
|
|
|
|
|
|
|
|
last_space = truncated.rfind(" ") |
|
|
if last_space > 0: |
|
|
return text[:last_space].strip() + "..." |
|
|
|
|
|
return truncated + "..." |
|
|
|
|
|
|
|
|
async def select_diverse_evidence( |
|
|
evidence: list["Evidence"], |
|
|
n: int, |
|
|
query: str, |
|
|
embeddings: "EmbeddingServiceProtocol | None" = None, |
|
|
) -> list["Evidence"]: |
|
|
"""Select n most diverse and relevant evidence items. |
|
|
|
|
|
Uses Maximal Marginal Relevance (MMR) when embeddings available, |
|
|
falls back to relevance_score sorting otherwise. |
|
|
|
|
|
Args: |
|
|
evidence: All available evidence |
|
|
n: Number of items to select |
|
|
query: Original query for relevance scoring |
|
|
embeddings: Optional EmbeddingService for semantic diversity |
|
|
|
|
|
Returns: |
|
|
Selected evidence items, diverse and relevant |
|
|
""" |
|
|
if not evidence: |
|
|
return [] |
|
|
|
|
|
if n >= len(evidence): |
|
|
return evidence |
|
|
|
|
|
|
|
|
if embeddings is None: |
|
|
return sorted( |
|
|
evidence, |
|
|
key=lambda e: e.relevance, |
|
|
reverse=True, |
|
|
)[:n] |
|
|
|
|
|
|
|
|
|
|
|
lambda_param = 0.7 |
|
|
|
|
|
|
|
|
query_emb = await embeddings.embed(query) |
|
|
|
|
|
|
|
|
evidence_embs = await embeddings.embed_batch([e.content for e in evidence]) |
|
|
|
|
|
|
|
|
def cosine(a: list[float], b: list[float]) -> float: |
|
|
arr_a, arr_b = np.array(a), np.array(b) |
|
|
denominator = float(np.linalg.norm(arr_a) * np.linalg.norm(arr_b)) |
|
|
if denominator == 0: |
|
|
return 0.0 |
|
|
return float(np.dot(arr_a, arr_b) / denominator) |
|
|
|
|
|
|
|
|
|
|
|
relevance_scores = [cosine(query_emb, emb) for emb in evidence_embs] |
|
|
|
|
|
|
|
|
selected_indices: list[int] = [] |
|
|
remaining = set(range(len(evidence))) |
|
|
|
|
|
for _ in range(n): |
|
|
best_score = float("-inf") |
|
|
best_idx = -1 |
|
|
|
|
|
for idx in remaining: |
|
|
|
|
|
relevance = relevance_scores[idx] |
|
|
|
|
|
|
|
|
if selected_indices: |
|
|
max_sim = max( |
|
|
cosine(evidence_embs[idx], evidence_embs[sel]) for sel in selected_indices |
|
|
) |
|
|
else: |
|
|
max_sim = 0 |
|
|
|
|
|
|
|
|
mmr_score = lambda_param * relevance - (1 - lambda_param) * max_sim |
|
|
|
|
|
if mmr_score > best_score: |
|
|
best_score = mmr_score |
|
|
best_idx = idx |
|
|
|
|
|
if best_idx >= 0: |
|
|
selected_indices.append(best_idx) |
|
|
remaining.remove(best_idx) |
|
|
|
|
|
return [evidence[i] for i in selected_indices] |
|
|
|