from __future__ import annotations """Gap identification for incomplete answers.""" from dataclasses import dataclass from typing import Any @dataclass class InformationGap: """An identified information gap.""" description: str gap_type: str # "missing_fact", "unclear", "unverified", "outdated" severity: str # "low", "medium", "high" suggested_search: str | None = None class GapIdentifier: """Identifies gaps in responses that need additional research.""" def __init__(self): """Initialize the gap identifier.""" pass def identify_gaps( self, query: str, answer: str, sources: list[dict[str, str]] | None = None, ) -> list[InformationGap]: """Identify information gaps in an answer. Args: query: Original user query answer: Generated answer sources: List of sources used Returns: List of identified gaps """ gaps = [] # Check for question words not addressed question_gaps = self._check_question_coverage(query, answer) gaps.extend(question_gaps) # Check for unsourced claims unsourced_gaps = self._check_unsourced_claims(answer, sources) gaps.extend(unsourced_gaps) # Check for hedging language (uncertainty) uncertainty_gaps = self._check_uncertainty(answer) gaps.extend(uncertainty_gaps) # Check for time-sensitive information temporal_gaps = self._check_temporal_issues(query, answer) gaps.extend(temporal_gaps) return gaps def get_refinement_suggestions( self, gaps: list[InformationGap], ) -> list[str]: """Get search suggestions to fill gaps. Args: gaps: List of identified gaps Returns: List of suggested search queries """ suggestions = [] for gap in gaps: if gap.suggested_search: suggestions.append(gap.suggested_search) return list(set(suggestions)) # Deduplicate def prioritize_gaps( self, gaps: list[InformationGap], ) -> list[InformationGap]: """Prioritize gaps by severity. Args: gaps: List of gaps to prioritize Returns: Sorted list of gaps (highest severity first) """ severity_order = {"high": 0, "medium": 1, "low": 2} return sorted( gaps, key=lambda g: severity_order.get(g.severity, 3), ) def _check_question_coverage( self, query: str, answer: str, ) -> list[InformationGap]: """Check if question elements are addressed. Args: query: User query answer: Generated answer Returns: List of gaps for unaddressed question elements """ gaps = [] query_lower = query.lower() answer_lower = answer.lower() # Check for common question patterns question_patterns = { "why": ("reason", "because", "since", "due to"), "how": ("method", "process", "step", "by", "through"), "when": ("date", "time", "year", "month", "day"), "where": ("location", "place", "in", "at"), "who": ("person", "people", "company", "organization"), "what": ("definition", "is", "are", "means"), } for question_word, answer_indicators in question_patterns.items(): if question_word in query_lower: # Check if any indicators are in answer if not any(ind in answer_lower for ind in answer_indicators): gaps.append(InformationGap( description=f"Question asks '{question_word}' but answer may not fully address it", gap_type="missing_fact", severity="medium", suggested_search=f"{query} {question_word}", )) return gaps def _check_unsourced_claims( self, answer: str, sources: list[dict[str, str]] | None, ) -> list[InformationGap]: """Check for claims without source support. Args: answer: Generated answer sources: List of sources Returns: List of gaps for unsourced claims """ gaps = [] # If no sources at all if not sources: gaps.append(InformationGap( description="No sources provided to support claims", gap_type="unverified", severity="high", suggested_search=None, )) return gaps # Check for statistical claims without citation statistical_patterns = [ "percent", "%", "million", "billion", "number of", "majority", "most", "few", "many", "study shows", ] for pattern in statistical_patterns: if pattern in answer.lower(): # Check if claim appears near a citation marker # (simplified check) if "[" not in answer and not any( s.get("snippet", "") in answer for s in sources ): gaps.append(InformationGap( description=f"Statistical claim ({pattern}) may need verification", gap_type="unverified", severity="medium", suggested_search=None, )) break return gaps def _check_uncertainty(self, answer: str) -> list[InformationGap]: """Check for uncertainty language. Args: answer: Generated answer Returns: List of gaps for uncertain statements """ gaps = [] answer_lower = answer.lower() uncertainty_phrases = [ ("i'm not sure", "high"), ("unclear", "medium"), ("might be", "low"), ("could be", "low"), ("possibly", "low"), ("it appears", "low"), ("seems to be", "low"), ("no clear answer", "high"), ("insufficient information", "high"), ] for phrase, severity in uncertainty_phrases: if phrase in answer_lower: gaps.append(InformationGap( description=f"Answer contains uncertainty: '{phrase}'", gap_type="unclear", severity=severity, suggested_search=None, )) return gaps def _check_temporal_issues( self, query: str, answer: str, ) -> list[InformationGap]: """Check for time-sensitive information issues. Args: query: User query answer: Generated answer Returns: List of gaps for temporal issues """ gaps = [] query_lower = query.lower() # Check if query asks about current/latest information temporal_indicators = [ "current", "latest", "now", "today", "recent", "this year", "2024", "2025", "updated", ] is_temporal_query = any(ind in query_lower for ind in temporal_indicators) if is_temporal_query: # Check if answer mentions dates import re date_pattern = r'\b(20\d{2}|19\d{2}|january|february|march|april|may|june|july|august|september|october|november|december)\b' has_date = bool(re.search(date_pattern, answer.lower())) if not has_date: gaps.append(InformationGap( description="Query asks for current information but answer may be outdated", gap_type="outdated", severity="high", suggested_search=f"{query} latest", )) return gaps