| """Simple Orchestrator - the basic agent loop connecting Search and Judge. |
| |
| This orchestrator uses a simple loop pattern with pydantic-ai for structured |
| LLM outputs. It works with free tier (HuggingFace Inference) or paid APIs |
| (OpenAI, Anthropic). |
| |
| Design Pattern: Template Method - defines the skeleton of the search-judge loop |
| while allowing handlers to implement specific behaviors. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import asyncio |
| from collections.abc import AsyncGenerator |
| from typing import TYPE_CHECKING, Any, ClassVar |
|
|
| import structlog |
|
|
| from src.orchestrators.base import JudgeHandlerProtocol, SearchHandlerProtocol |
| from src.utils.config import settings |
| from src.utils.models import ( |
| AgentEvent, |
| Evidence, |
| JudgeAssessment, |
| OrchestratorConfig, |
| SearchResult, |
| ) |
|
|
| if TYPE_CHECKING: |
| from src.services.embeddings import EmbeddingService |
| from src.services.statistical_analyzer import StatisticalAnalyzer |
|
|
| logger = structlog.get_logger() |
|
|
|
|
| class Orchestrator: |
| """ |
| The simple agent orchestrator - runs the Search -> Judge -> Loop cycle. |
| |
| This is a generator-based design that yields events for real-time UI updates. |
| Uses pydantic-ai for structured LLM outputs without requiring the full |
| Microsoft Agent Framework. |
| """ |
|
|
| |
| TERMINATION_CRITERIA: ClassVar[dict[str, float]] = { |
| "min_combined_score": 12.0, |
| "min_score_with_volume": 10.0, |
| "min_evidence_for_volume": 50.0, |
| "late_iteration_threshold": 8.0, |
| "max_evidence_threshold": 100.0, |
| "emergency_iteration": 8.0, |
| "min_confidence": 0.5, |
| "min_evidence_for_emergency": 30.0, |
| } |
|
|
| def __init__( |
| self, |
| search_handler: SearchHandlerProtocol, |
| judge_handler: JudgeHandlerProtocol, |
| config: OrchestratorConfig | None = None, |
| enable_analysis: bool = False, |
| enable_embeddings: bool = True, |
| ): |
| """ |
| Initialize the orchestrator. |
| |
| Args: |
| search_handler: Handler for executing searches |
| judge_handler: Handler for assessing evidence |
| config: Optional configuration (uses defaults if not provided) |
| enable_analysis: Whether to perform statistical analysis (if Modal available) |
| enable_embeddings: Whether to use semantic search for ranking/dedup |
| """ |
| self.search = search_handler |
| self.judge = judge_handler |
| self.config = config or OrchestratorConfig() |
| self.history: list[dict[str, Any]] = [] |
| self._enable_analysis = enable_analysis and settings.modal_available |
| self._enable_embeddings = enable_embeddings |
|
|
| |
| self._analyzer: StatisticalAnalyzer | None = None |
| self._embeddings: EmbeddingService | None = None |
|
|
| def _get_analyzer(self) -> StatisticalAnalyzer | None: |
| """Lazy initialization of StatisticalAnalyzer.""" |
| if self._analyzer is None: |
| from src.utils.service_loader import get_analyzer_if_available |
|
|
| self._analyzer = get_analyzer_if_available() |
| if self._analyzer is None: |
| self._enable_analysis = False |
| return self._analyzer |
|
|
| async def _run_analysis_phase( |
| self, query: str, evidence: list[Evidence], iteration: int |
| ) -> AsyncGenerator[AgentEvent, None]: |
| """Run the optional analysis phase.""" |
| if not self._enable_analysis: |
| return |
|
|
| yield AgentEvent( |
| type="analyzing", |
| message="Running statistical analysis in Modal sandbox...", |
| data={}, |
| iteration=iteration, |
| ) |
|
|
| try: |
| analyzer = self._get_analyzer() |
| if analyzer is None: |
| logger.info("StatisticalAnalyzer not available, skipping analysis phase") |
| return |
|
|
| |
| analysis_result = await analyzer.analyze( |
| query=query, |
| evidence=evidence, |
| hypothesis=None, |
| ) |
|
|
| yield AgentEvent( |
| type="analysis_complete", |
| message=f"Analysis verdict: {analysis_result.verdict}", |
| data=analysis_result.model_dump(), |
| iteration=iteration, |
| ) |
|
|
| except Exception as e: |
| logger.error("Modal analysis failed", error=str(e)) |
| yield AgentEvent( |
| type="error", |
| message=f"Modal analysis failed: {e}", |
| data={"error": str(e)}, |
| iteration=iteration, |
| ) |
|
|
| def _should_synthesize( |
| self, |
| assessment: JudgeAssessment, |
| iteration: int, |
| max_iterations: int, |
| evidence_count: int, |
| ) -> tuple[bool, str]: |
| """ |
| Code-enforced synthesis decision. |
| |
| Returns (should_synthesize, reason). |
| """ |
| combined_score = ( |
| assessment.details.mechanism_score + assessment.details.clinical_evidence_score |
| ) |
| has_drug_candidates = len(assessment.details.drug_candidates) > 0 |
| confidence = assessment.confidence |
|
|
| |
| if assessment.sufficient and assessment.recommendation == "synthesize": |
| if combined_score >= 10: |
| return True, "judge_approved" |
|
|
| |
| if ( |
| combined_score >= self.TERMINATION_CRITERIA["min_combined_score"] |
| and has_drug_candidates |
| ): |
| return True, "high_scores_with_candidates" |
|
|
| |
| if ( |
| combined_score >= self.TERMINATION_CRITERIA["min_score_with_volume"] |
| and evidence_count >= self.TERMINATION_CRITERIA["min_evidence_for_volume"] |
| ): |
| return True, "good_scores_high_volume" |
|
|
| |
| is_late_iteration = iteration >= max_iterations - 2 |
| if ( |
| is_late_iteration |
| and combined_score >= self.TERMINATION_CRITERIA["late_iteration_threshold"] |
| ): |
| return True, "late_iteration_acceptable" |
|
|
| |
| if evidence_count >= self.TERMINATION_CRITERIA["max_evidence_threshold"]: |
| return True, "max_evidence_reached" |
|
|
| |
| if ( |
| is_late_iteration |
| and evidence_count >= self.TERMINATION_CRITERIA["min_evidence_for_emergency"] |
| and confidence >= self.TERMINATION_CRITERIA["min_confidence"] |
| ): |
| return True, "emergency_synthesis" |
|
|
| return False, "continue_searching" |
|
|
| async def run(self, query: str) -> AsyncGenerator[AgentEvent, None]: |
| """ |
| Run the agent loop for a query. |
| |
| Yields AgentEvent objects for each step, allowing real-time UI updates. |
| |
| Args: |
| query: The user's research question |
| |
| Yields: |
| AgentEvent objects for each step of the process |
| """ |
| |
| from src.agents.graph.state import Hypothesis |
| from src.services.research_memory import ResearchMemory |
|
|
| logger.info("Starting orchestrator", query=query) |
|
|
| yield AgentEvent( |
| type="started", |
| message=f"Starting research for: {query}", |
| iteration=0, |
| ) |
|
|
| |
| |
| memory = ResearchMemory(query=query) |
| all_evidence: list[Evidence] = [] |
| current_queries = [query] |
| iteration = 0 |
|
|
| while iteration < self.config.max_iterations: |
| iteration += 1 |
| logger.info("Iteration", iteration=iteration, queries=current_queries) |
|
|
| |
| yield AgentEvent( |
| type="searching", |
| message=f"Searching for: {', '.join(current_queries[:3])}...", |
| iteration=iteration, |
| ) |
|
|
| try: |
| |
| search_tasks = [ |
| self.search.execute(q, self.config.max_results_per_tool) |
| for q in current_queries[:3] |
| ] |
| search_results = await asyncio.gather(*search_tasks, return_exceptions=True) |
|
|
| |
| new_evidence: list[Evidence] = [] |
| errors: list[str] = [] |
|
|
| for q, result in zip(current_queries[:3], search_results, strict=False): |
| if isinstance(result, Exception): |
| errors.append(f"Search for '{q}' failed: {result!s}") |
| elif isinstance(result, SearchResult): |
| new_evidence.extend(result.evidence) |
| errors.extend(result.errors) |
| else: |
| |
| errors.append(f"Unknown result type for '{q}': {type(result)}") |
|
|
| |
| |
| |
| new_ids = await memory.store_evidence(new_evidence) |
|
|
| |
| |
| unique_new = [e for e in new_evidence if e.citation.url in new_ids] |
|
|
| all_evidence.extend(unique_new) |
|
|
| yield AgentEvent( |
| type="search_complete", |
| message=f"Found {len(unique_new)} new sources ({len(all_evidence)} total)", |
| data={ |
| "new_count": len(unique_new), |
| "total_count": len(all_evidence), |
| }, |
| iteration=iteration, |
| ) |
|
|
| if errors: |
| logger.warning("Search errors", errors=errors) |
|
|
| except Exception as e: |
| logger.error("Search phase failed", error=str(e)) |
| yield AgentEvent( |
| type="error", |
| message=f"Search failed: {e!s}", |
| iteration=iteration, |
| ) |
| continue |
|
|
| |
| yield AgentEvent( |
| type="judging", |
| message=f"Evaluating evidence (Memory: {len(memory.evidence_ids)} docs)...", |
| iteration=iteration, |
| ) |
|
|
| try: |
| |
| |
| judge_context = await memory.get_relevant_evidence(n=30) |
|
|
| |
| if not judge_context and all_evidence: |
| judge_context = all_evidence[-30:] |
|
|
| assessment = await self.judge.assess( |
| query, judge_context, iteration, self.config.max_iterations |
| ) |
|
|
| |
| |
| for candidate in assessment.details.drug_candidates: |
| h = Hypothesis( |
| id=candidate.replace(" ", "_").lower(), |
| statement=f"{candidate} is a potential candidate for {query}", |
| status="proposed", |
| confidence=assessment.confidence, |
| reasoning=f" identified in iteration {iteration}", |
| ) |
| memory.add_hypothesis(h) |
|
|
| yield AgentEvent( |
| type="judge_complete", |
| message=( |
| f"Assessment: {assessment.recommendation} " |
| f"(confidence: {assessment.confidence:.0%})" |
| ), |
| data={ |
| "sufficient": assessment.sufficient, |
| "confidence": assessment.confidence, |
| "mechanism_score": assessment.details.mechanism_score, |
| "clinical_score": assessment.details.clinical_evidence_score, |
| }, |
| iteration=iteration, |
| ) |
|
|
| |
| self.history.append( |
| { |
| "iteration": iteration, |
| "queries": current_queries, |
| "evidence_count": len(all_evidence), |
| "assessment": assessment.model_dump(), |
| } |
| ) |
|
|
| |
| should_synth, reason = self._should_synthesize( |
| assessment=assessment, |
| iteration=iteration, |
| max_iterations=self.config.max_iterations, |
| evidence_count=len(all_evidence), |
| ) |
|
|
| logger.info( |
| "Synthesis decision", |
| should_synthesize=should_synth, |
| reason=reason, |
| iteration=iteration, |
| combined_score=assessment.details.mechanism_score |
| + assessment.details.clinical_evidence_score, |
| evidence_count=len(all_evidence), |
| confidence=assessment.confidence, |
| ) |
|
|
| if should_synth: |
| |
| if reason != "judge_approved": |
| logger.info(f"Code-enforced synthesis triggered: {reason}") |
|
|
| |
| async for event in self._run_analysis_phase(query, all_evidence, iteration): |
| yield event |
|
|
| yield AgentEvent( |
| type="synthesizing", |
| message=f"Evidence sufficient ({reason})! Preparing synthesis...", |
| iteration=iteration, |
| ) |
|
|
| |
| |
| final_response = self._generate_synthesis(query, all_evidence, assessment) |
|
|
| yield AgentEvent( |
| type="complete", |
| message=final_response, |
| data={ |
| "evidence_count": len(all_evidence), |
| "iterations": iteration, |
| "synthesis_reason": reason, |
| "drug_candidates": assessment.details.drug_candidates, |
| "key_findings": assessment.details.key_findings, |
| }, |
| iteration=iteration, |
| ) |
| return |
|
|
| else: |
| |
| current_queries = assessment.next_search_queries or [ |
| f"{query} mechanism of action", |
| f"{query} clinical evidence", |
| ] |
|
|
| yield AgentEvent( |
| type="looping", |
| message=( |
| f"Gathering more evidence (scores: {assessment.details.mechanism_score}" |
| f"+{assessment.details.clinical_evidence_score}). " |
| f"Next: {', '.join(current_queries[:2])}..." |
| ), |
| data={"next_queries": current_queries, "reason": reason}, |
| iteration=iteration, |
| ) |
|
|
| except Exception as e: |
| logger.error("Judge phase failed", error=str(e)) |
| yield AgentEvent( |
| type="error", |
| message=f"Assessment failed: {e!s}", |
| iteration=iteration, |
| ) |
| continue |
|
|
| |
| yield AgentEvent( |
| type="complete", |
| message=self._generate_partial_synthesis(query, all_evidence), |
| data={ |
| "evidence_count": len(all_evidence), |
| "iterations": iteration, |
| "max_reached": True, |
| }, |
| iteration=iteration, |
| ) |
|
|
| def _generate_synthesis( |
| self, |
| query: str, |
| evidence: list[Evidence], |
| assessment: JudgeAssessment, |
| ) -> str: |
| """ |
| Generate the final synthesis response. |
| |
| Args: |
| query: The original question |
| evidence: All collected evidence |
| assessment: The final assessment |
| |
| Returns: |
| Formatted synthesis as markdown |
| """ |
| drug_list = ( |
| "\n".join([f"- **{d}**" for d in assessment.details.drug_candidates]) |
| or "- No specific candidates identified" |
| ) |
| findings_list = ( |
| "\n".join([f"- {f}" for f in assessment.details.key_findings]) or "- See evidence below" |
| ) |
|
|
| citations = "\n".join( |
| [ |
| f"{i + 1}. [{e.citation.title}]({e.citation.url}) " |
| f"({e.citation.source.upper()}, {e.citation.date})" |
| for i, e in enumerate(evidence[:10]) |
| ] |
| ) |
|
|
| return f"""## Drug Repurposing Analysis |
| |
| ### Question |
| {query} |
| |
| ### Drug Candidates |
| {drug_list} |
| |
| ### Key Findings |
| {findings_list} |
| |
| ### Assessment |
| - **Mechanism Score**: {assessment.details.mechanism_score}/10 |
| - **Clinical Evidence Score**: {assessment.details.clinical_evidence_score}/10 |
| - **Confidence**: {assessment.confidence:.0%} |
| |
| ### Reasoning |
| {assessment.reasoning} |
| |
| ### Citations ({len(evidence)} sources) |
| {citations} |
| |
| --- |
| *Analysis based on {len(evidence)} sources across {len(self.history)} iterations.* |
| """ |
|
|
| def _generate_partial_synthesis( |
| self, |
| query: str, |
| evidence: list[Evidence], |
| ) -> str: |
| """ |
| Generate a REAL synthesis when max iterations reached. |
| |
| Even when forced to stop, we should provide: |
| - Drug candidates (if any were found) |
| - Key findings |
| - Assessment scores |
| - Actionable citations |
| |
| This is still better than a citation dump. |
| """ |
| |
| last_assessment = self.history[-1]["assessment"] if self.history else {} |
| details = last_assessment.get("details", {}) |
|
|
| drug_candidates = details.get("drug_candidates", []) |
| key_findings = details.get("key_findings", []) |
| mechanism_score = details.get("mechanism_score", 0) |
| clinical_score = details.get("clinical_evidence_score", 0) |
| reasoning = last_assessment.get("reasoning", "Analysis incomplete due to iteration limit.") |
|
|
| |
| if drug_candidates: |
| drug_list = "\n".join([f"- **{d}**" for d in drug_candidates[:5]]) |
| else: |
| drug_list = ( |
| "- *No specific drug candidates identified in evidence*\n" |
| "- *Try a more specific query or add an API key for better analysis*" |
| ) |
|
|
| |
| if key_findings: |
| findings_list = "\n".join([f"- {f}" for f in key_findings[:5]]) |
| else: |
| findings_list = ( |
| "- *Key findings require further analysis*\n" |
| "- *See citations below for relevant sources*" |
| ) |
|
|
| |
| citations = "\n".join( |
| [ |
| f"{i + 1}. [{e.citation.title}]({e.citation.url}) " |
| f"({e.citation.source.upper()}, {e.citation.date})" |
| for i, e in enumerate(evidence[:10]) |
| ] |
| ) |
|
|
| combined_score = mechanism_score + clinical_score |
| mech_strength = ( |
| "Strong" if mechanism_score >= 7 else "Moderate" if mechanism_score >= 4 else "Limited" |
| ) |
| clin_strength = ( |
| "Strong" if clinical_score >= 7 else "Moderate" if clinical_score >= 4 else "Limited" |
| ) |
| comb_strength = "Sufficient" if combined_score >= 12 else "Partial" |
|
|
| return f"""## Drug Repurposing Analysis |
| |
| ### Research Question |
| {query} |
| |
| ### Status |
| Analysis based on {len(evidence)} sources across {len(self.history)} iterations. |
| Maximum iterations reached - results may be incomplete. |
| |
| ### Drug Candidates Identified |
| {drug_list} |
| |
| ### Key Findings |
| {findings_list} |
| |
| ### Evidence Quality Scores |
| | Criterion | Score | Interpretation | |
| |-----------|-------|----------------| |
| | Mechanism | {mechanism_score}/10 | {mech_strength} mechanistic evidence | |
| | Clinical | {clinical_score}/10 | {clin_strength} clinical support | |
| | Combined | {combined_score}/20 | {comb_strength} for synthesis | |
| |
| ### Analysis Summary |
| {reasoning} |
| |
| ### Top Citations ({len(evidence)} sources total) |
| {citations} |
| |
| --- |
| *For more complete analysis:* |
| - *Add an OpenAI or Anthropic API key for enhanced LLM analysis* |
| - *Try a more specific query (e.g., include drug names)* |
| - *Use Advanced mode for multi-agent research* |
| """ |
|
|