"""Judge handler for evidence assessment using PydanticAI.""" from typing import Any import structlog from pydantic_ai import Agent from pydantic_ai.models.anthropic import AnthropicModel from pydantic_ai.models.openai import OpenAIModel from pydantic_ai.providers.anthropic import AnthropicProvider from pydantic_ai.providers.openai import OpenAIProvider from src.prompts.judge import ( SYSTEM_PROMPT, format_empty_evidence_prompt, format_user_prompt, ) from src.utils.config import settings from src.utils.models import AssessmentDetails, Evidence, JudgeAssessment logger = structlog.get_logger() def get_model() -> Any: """Get the LLM model based on configuration. Explicitly passes API keys from settings to avoid requiring users to export environment variables manually. """ llm_provider = settings.llm_provider if llm_provider == "anthropic": provider = AnthropicProvider(api_key=settings.anthropic_api_key) return AnthropicModel(settings.anthropic_model, provider=provider) if llm_provider != "openai": logger.warning("Unknown LLM provider, defaulting to OpenAI", provider=llm_provider) openai_provider = OpenAIProvider(api_key=settings.openai_api_key) return OpenAIModel(settings.openai_model, provider=openai_provider) class JudgeHandler: """ Handles evidence assessment using an LLM with structured output. Uses PydanticAI to ensure responses match the JudgeAssessment schema. """ def __init__(self, model: Any = None) -> None: """ Initialize the JudgeHandler. Args: model: Optional PydanticAI model. If None, uses config default. """ self.model = model or get_model() self.agent = Agent( model=self.model, output_type=JudgeAssessment, system_prompt=SYSTEM_PROMPT, retries=3, ) async def assess( self, question: str, evidence: list[Evidence], ) -> JudgeAssessment: """ Assess evidence and determine if it's sufficient. Args: question: The user's research question evidence: List of Evidence objects from search Returns: JudgeAssessment with evaluation results Raises: JudgeError: If assessment fails after retries """ logger.info( "Starting evidence assessment", question=question[:100], evidence_count=len(evidence), ) # Format the prompt based on whether we have evidence if evidence: user_prompt = format_user_prompt(question, evidence) else: user_prompt = format_empty_evidence_prompt(question) try: # Run the agent with structured output result = await self.agent.run(user_prompt) assessment = result.output logger.info( "Assessment complete", sufficient=assessment.sufficient, recommendation=assessment.recommendation, confidence=assessment.confidence, ) return assessment except Exception as e: logger.error("Assessment failed", error=str(e)) # Return a safe default assessment on failure return self._create_fallback_assessment(question, str(e)) def _create_fallback_assessment( self, question: str, error: str, ) -> JudgeAssessment: """ Create a fallback assessment when LLM fails. Args: question: The original question error: The error message Returns: Safe fallback JudgeAssessment """ return JudgeAssessment( details=AssessmentDetails( mechanism_score=0, mechanism_reasoning="Assessment failed due to LLM error", clinical_evidence_score=0, clinical_reasoning="Assessment failed due to LLM error", drug_candidates=[], key_findings=[], ), sufficient=False, confidence=0.0, recommendation="continue", next_search_queries=[ f"{question} mechanism", f"{question} clinical trials", f"{question} drug candidates", ], reasoning=f"Assessment failed: {error}. Recommend retrying with refined queries.", ) class MockJudgeHandler: """ Mock JudgeHandler for demo mode without LLM calls. Extracts meaningful information from real search results to provide a useful demo experience without requiring API keys. """ def __init__(self, mock_response: JudgeAssessment | None = None) -> None: """ Initialize with optional mock response. Args: mock_response: The assessment to return. If None, extracts from evidence. """ self.mock_response = mock_response self.call_count = 0 self.last_question: str | None = None self.last_evidence: list[Evidence] | None = None def _extract_key_findings(self, evidence: list[Evidence], max_findings: int = 5) -> list[str]: """Extract key findings from evidence titles.""" findings = [] for e in evidence[:max_findings]: # Use first 150 chars of title as a finding title = e.citation.title if len(title) > 150: title = title[:147] + "..." findings.append(title) return findings if findings else ["No specific findings extracted (demo mode)"] def _extract_drug_candidates(self, question: str, evidence: list[Evidence]) -> list[str]: """Extract potential drug names from question and evidence.""" # Common drug-related keywords to look for candidates = set() # Extract from question (simple heuristic) question_words = question.lower().split() for word in question_words: # Skip common words, keep potential drug names if len(word) > 3 and word not in { "what", "which", "could", "drugs", "drug", "medications", "medicine", "treat", "treatment", "help", "best", "effective", "repurposed", "repurposing", "disease", "condition", "therapy", }: # Capitalize as potential drug name candidates.add(word.capitalize()) # Extract from evidence titles (look for capitalized terms) for e in evidence[:10]: words = e.citation.title.split() for word in words: # Look for capitalized words that might be drug names cleaned = word.strip(".,;:()[]") if ( len(cleaned) > 3 and cleaned[0].isupper() and cleaned.lower() not in {"the", "and", "for", "with", "from"} ): candidates.add(cleaned) # Return top candidates or placeholder candidate_list = list(candidates)[:5] return candidate_list if candidate_list else ["See evidence below for potential candidates"] async def assess( self, question: str, evidence: list[Evidence], ) -> JudgeAssessment: """Return assessment based on actual evidence (demo mode).""" self.call_count += 1 self.last_question = question self.last_evidence = evidence if self.mock_response: return self.mock_response min_evidence = 3 evidence_count = len(evidence) # Extract meaningful data from actual evidence drug_candidates = self._extract_drug_candidates(question, evidence) key_findings = self._extract_key_findings(evidence) # Calculate scores based on evidence quantity mechanism_score = min(10, evidence_count * 2) if evidence_count > 0 else 0 clinical_score = min(10, evidence_count) if evidence_count > 0 else 0 return JudgeAssessment( details=AssessmentDetails( mechanism_score=mechanism_score, mechanism_reasoning=( f"Demo mode: Found {evidence_count} sources. " "Configure LLM API key for detailed mechanism analysis." ), clinical_evidence_score=clinical_score, clinical_reasoning=( f"Demo mode: {evidence_count} sources retrieved from PubMed, " "ClinicalTrials.gov, and bioRxiv. Full analysis requires LLM API key." ), drug_candidates=drug_candidates, key_findings=key_findings, ), sufficient=evidence_count >= min_evidence, confidence=min(0.5, evidence_count * 0.1) if evidence_count > 0 else 0.0, recommendation="synthesize" if evidence_count >= min_evidence else "continue", next_search_queries=( [f"{question} mechanism", f"{question} clinical trials"] if evidence_count < min_evidence else [] ), reasoning=( f"Demo mode assessment based on {evidence_count} real search results. " "For AI-powered analysis with drug candidate identification and " "evidence synthesis, configure OPENAI_API_KEY or ANTHROPIC_API_KEY." ), )