|
|
"""Simple Orchestrator - the basic agent loop connecting Search and Judge. |
|
|
|
|
|
This orchestrator uses a simple loop pattern with pydantic-ai for structured |
|
|
LLM outputs. It works with free tier (HuggingFace Inference) or paid APIs |
|
|
(OpenAI, Anthropic). |
|
|
|
|
|
Design Pattern: Template Method - defines the skeleton of the search-judge loop |
|
|
while allowing handlers to implement specific behaviors. |
|
|
""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import asyncio |
|
|
from collections.abc import AsyncGenerator |
|
|
from typing import TYPE_CHECKING, Any, ClassVar |
|
|
|
|
|
import structlog |
|
|
|
|
|
from src.orchestrators.base import JudgeHandlerProtocol, SearchHandlerProtocol |
|
|
from src.utils.config import settings |
|
|
from src.utils.models import ( |
|
|
AgentEvent, |
|
|
Evidence, |
|
|
JudgeAssessment, |
|
|
OrchestratorConfig, |
|
|
SearchResult, |
|
|
) |
|
|
|
|
|
if TYPE_CHECKING: |
|
|
from src.services.embeddings import EmbeddingService |
|
|
from src.services.statistical_analyzer import StatisticalAnalyzer |
|
|
|
|
|
logger = structlog.get_logger() |
|
|
|
|
|
|
|
|
class Orchestrator: |
|
|
""" |
|
|
The simple agent orchestrator - runs the Search -> Judge -> Loop cycle. |
|
|
|
|
|
This is a generator-based design that yields events for real-time UI updates. |
|
|
Uses pydantic-ai for structured LLM outputs without requiring the full |
|
|
Microsoft Agent Framework. |
|
|
""" |
|
|
|
|
|
|
|
|
TERMINATION_CRITERIA: ClassVar[dict[str, float]] = { |
|
|
"min_combined_score": 12.0, |
|
|
"min_score_with_volume": 10.0, |
|
|
"min_evidence_for_volume": 50.0, |
|
|
"late_iteration_threshold": 8.0, |
|
|
"max_evidence_threshold": 100.0, |
|
|
"emergency_iteration": 8.0, |
|
|
"min_confidence": 0.5, |
|
|
"min_evidence_for_emergency": 30.0, |
|
|
} |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
search_handler: SearchHandlerProtocol, |
|
|
judge_handler: JudgeHandlerProtocol, |
|
|
config: OrchestratorConfig | None = None, |
|
|
enable_analysis: bool = False, |
|
|
enable_embeddings: bool = True, |
|
|
): |
|
|
""" |
|
|
Initialize the orchestrator. |
|
|
|
|
|
Args: |
|
|
search_handler: Handler for executing searches |
|
|
judge_handler: Handler for assessing evidence |
|
|
config: Optional configuration (uses defaults if not provided) |
|
|
enable_analysis: Whether to perform statistical analysis (if Modal available) |
|
|
enable_embeddings: Whether to use semantic search for ranking/dedup |
|
|
""" |
|
|
self.search = search_handler |
|
|
self.judge = judge_handler |
|
|
self.config = config or OrchestratorConfig() |
|
|
self.history: list[dict[str, Any]] = [] |
|
|
self._enable_analysis = enable_analysis and settings.modal_available |
|
|
self._enable_embeddings = enable_embeddings |
|
|
|
|
|
|
|
|
self._analyzer: StatisticalAnalyzer | None = None |
|
|
self._embeddings: EmbeddingService | None = None |
|
|
|
|
|
def _get_analyzer(self) -> StatisticalAnalyzer | None: |
|
|
"""Lazy initialization of StatisticalAnalyzer.""" |
|
|
if self._analyzer is None: |
|
|
from src.utils.service_loader import get_analyzer_if_available |
|
|
|
|
|
self._analyzer = get_analyzer_if_available() |
|
|
if self._analyzer is None: |
|
|
self._enable_analysis = False |
|
|
return self._analyzer |
|
|
|
|
|
async def _run_analysis_phase( |
|
|
self, query: str, evidence: list[Evidence], iteration: int |
|
|
) -> AsyncGenerator[AgentEvent, None]: |
|
|
"""Run the optional analysis phase.""" |
|
|
if not self._enable_analysis: |
|
|
return |
|
|
|
|
|
yield AgentEvent( |
|
|
type="analyzing", |
|
|
message="Running statistical analysis in Modal sandbox...", |
|
|
data={}, |
|
|
iteration=iteration, |
|
|
) |
|
|
|
|
|
try: |
|
|
analyzer = self._get_analyzer() |
|
|
if analyzer is None: |
|
|
logger.info("StatisticalAnalyzer not available, skipping analysis phase") |
|
|
return |
|
|
|
|
|
|
|
|
analysis_result = await analyzer.analyze( |
|
|
query=query, |
|
|
evidence=evidence, |
|
|
hypothesis=None, |
|
|
) |
|
|
|
|
|
yield AgentEvent( |
|
|
type="analysis_complete", |
|
|
message=f"Analysis verdict: {analysis_result.verdict}", |
|
|
data=analysis_result.model_dump(), |
|
|
iteration=iteration, |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error("Modal analysis failed", error=str(e)) |
|
|
yield AgentEvent( |
|
|
type="error", |
|
|
message=f"Modal analysis failed: {e}", |
|
|
data={"error": str(e)}, |
|
|
iteration=iteration, |
|
|
) |
|
|
|
|
|
def _should_synthesize( |
|
|
self, |
|
|
assessment: JudgeAssessment, |
|
|
iteration: int, |
|
|
max_iterations: int, |
|
|
evidence_count: int, |
|
|
) -> tuple[bool, str]: |
|
|
""" |
|
|
Code-enforced synthesis decision. |
|
|
|
|
|
Returns (should_synthesize, reason). |
|
|
""" |
|
|
combined_score = ( |
|
|
assessment.details.mechanism_score + assessment.details.clinical_evidence_score |
|
|
) |
|
|
has_drug_candidates = len(assessment.details.drug_candidates) > 0 |
|
|
confidence = assessment.confidence |
|
|
|
|
|
|
|
|
if assessment.sufficient and assessment.recommendation == "synthesize": |
|
|
if combined_score >= 10: |
|
|
return True, "judge_approved" |
|
|
|
|
|
|
|
|
if ( |
|
|
combined_score >= self.TERMINATION_CRITERIA["min_combined_score"] |
|
|
and has_drug_candidates |
|
|
): |
|
|
return True, "high_scores_with_candidates" |
|
|
|
|
|
|
|
|
if ( |
|
|
combined_score >= self.TERMINATION_CRITERIA["min_score_with_volume"] |
|
|
and evidence_count >= self.TERMINATION_CRITERIA["min_evidence_for_volume"] |
|
|
): |
|
|
return True, "good_scores_high_volume" |
|
|
|
|
|
|
|
|
is_late_iteration = iteration >= max_iterations - 2 |
|
|
if ( |
|
|
is_late_iteration |
|
|
and combined_score >= self.TERMINATION_CRITERIA["late_iteration_threshold"] |
|
|
): |
|
|
return True, "late_iteration_acceptable" |
|
|
|
|
|
|
|
|
if evidence_count >= self.TERMINATION_CRITERIA["max_evidence_threshold"]: |
|
|
return True, "max_evidence_reached" |
|
|
|
|
|
|
|
|
if ( |
|
|
is_late_iteration |
|
|
and evidence_count >= self.TERMINATION_CRITERIA["min_evidence_for_emergency"] |
|
|
and confidence >= self.TERMINATION_CRITERIA["min_confidence"] |
|
|
): |
|
|
return True, "emergency_synthesis" |
|
|
|
|
|
return False, "continue_searching" |
|
|
|
|
|
async def run(self, query: str) -> AsyncGenerator[AgentEvent, None]: |
|
|
""" |
|
|
Run the agent loop for a query. |
|
|
|
|
|
Yields AgentEvent objects for each step, allowing real-time UI updates. |
|
|
|
|
|
Args: |
|
|
query: The user's research question |
|
|
|
|
|
Yields: |
|
|
AgentEvent objects for each step of the process |
|
|
""" |
|
|
|
|
|
from src.agents.graph.state import Hypothesis |
|
|
from src.services.research_memory import ResearchMemory |
|
|
|
|
|
logger.info("Starting orchestrator", query=query) |
|
|
|
|
|
yield AgentEvent( |
|
|
type="started", |
|
|
message=f"Starting research for: {query}", |
|
|
iteration=0, |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
memory = ResearchMemory(query=query) |
|
|
all_evidence: list[Evidence] = [] |
|
|
current_queries = [query] |
|
|
iteration = 0 |
|
|
|
|
|
while iteration < self.config.max_iterations: |
|
|
iteration += 1 |
|
|
logger.info("Iteration", iteration=iteration, queries=current_queries) |
|
|
|
|
|
|
|
|
yield AgentEvent( |
|
|
type="searching", |
|
|
message=f"Searching for: {', '.join(current_queries[:3])}...", |
|
|
iteration=iteration, |
|
|
) |
|
|
|
|
|
try: |
|
|
|
|
|
search_tasks = [ |
|
|
self.search.execute(q, self.config.max_results_per_tool) |
|
|
for q in current_queries[:3] |
|
|
] |
|
|
search_results = await asyncio.gather(*search_tasks, return_exceptions=True) |
|
|
|
|
|
|
|
|
new_evidence: list[Evidence] = [] |
|
|
errors: list[str] = [] |
|
|
|
|
|
for q, result in zip(current_queries[:3], search_results, strict=False): |
|
|
if isinstance(result, Exception): |
|
|
errors.append(f"Search for '{q}' failed: {result!s}") |
|
|
elif isinstance(result, SearchResult): |
|
|
new_evidence.extend(result.evidence) |
|
|
errors.extend(result.errors) |
|
|
else: |
|
|
|
|
|
errors.append(f"Unknown result type for '{q}': {type(result)}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
new_ids = await memory.store_evidence(new_evidence) |
|
|
|
|
|
|
|
|
|
|
|
unique_new = [e for e in new_evidence if e.citation.url in new_ids] |
|
|
|
|
|
all_evidence.extend(unique_new) |
|
|
|
|
|
yield AgentEvent( |
|
|
type="search_complete", |
|
|
message=f"Found {len(unique_new)} new sources ({len(all_evidence)} total)", |
|
|
data={ |
|
|
"new_count": len(unique_new), |
|
|
"total_count": len(all_evidence), |
|
|
}, |
|
|
iteration=iteration, |
|
|
) |
|
|
|
|
|
if errors: |
|
|
logger.warning("Search errors", errors=errors) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error("Search phase failed", error=str(e)) |
|
|
yield AgentEvent( |
|
|
type="error", |
|
|
message=f"Search failed: {e!s}", |
|
|
iteration=iteration, |
|
|
) |
|
|
continue |
|
|
|
|
|
|
|
|
yield AgentEvent( |
|
|
type="judging", |
|
|
message=f"Evaluating evidence (Memory: {len(memory.evidence_ids)} docs)...", |
|
|
iteration=iteration, |
|
|
) |
|
|
|
|
|
try: |
|
|
|
|
|
|
|
|
judge_context = await memory.get_relevant_evidence(n=30) |
|
|
|
|
|
|
|
|
if not judge_context and all_evidence: |
|
|
judge_context = all_evidence[-30:] |
|
|
|
|
|
assessment = await self.judge.assess( |
|
|
query, judge_context, iteration, self.config.max_iterations |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
for candidate in assessment.details.drug_candidates: |
|
|
h = Hypothesis( |
|
|
id=candidate.replace(" ", "_").lower(), |
|
|
statement=f"{candidate} is a potential candidate for {query}", |
|
|
status="proposed", |
|
|
confidence=assessment.confidence, |
|
|
reasoning=f" identified in iteration {iteration}", |
|
|
) |
|
|
memory.add_hypothesis(h) |
|
|
|
|
|
yield AgentEvent( |
|
|
type="judge_complete", |
|
|
message=( |
|
|
f"Assessment: {assessment.recommendation} " |
|
|
f"(confidence: {assessment.confidence:.0%})" |
|
|
), |
|
|
data={ |
|
|
"sufficient": assessment.sufficient, |
|
|
"confidence": assessment.confidence, |
|
|
"mechanism_score": assessment.details.mechanism_score, |
|
|
"clinical_score": assessment.details.clinical_evidence_score, |
|
|
}, |
|
|
iteration=iteration, |
|
|
) |
|
|
|
|
|
|
|
|
self.history.append( |
|
|
{ |
|
|
"iteration": iteration, |
|
|
"queries": current_queries, |
|
|
"evidence_count": len(all_evidence), |
|
|
"assessment": assessment.model_dump(), |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
should_synth, reason = self._should_synthesize( |
|
|
assessment=assessment, |
|
|
iteration=iteration, |
|
|
max_iterations=self.config.max_iterations, |
|
|
evidence_count=len(all_evidence), |
|
|
) |
|
|
|
|
|
logger.info( |
|
|
"Synthesis decision", |
|
|
should_synthesize=should_synth, |
|
|
reason=reason, |
|
|
iteration=iteration, |
|
|
combined_score=assessment.details.mechanism_score |
|
|
+ assessment.details.clinical_evidence_score, |
|
|
evidence_count=len(all_evidence), |
|
|
confidence=assessment.confidence, |
|
|
) |
|
|
|
|
|
if should_synth: |
|
|
|
|
|
if reason != "judge_approved": |
|
|
logger.info(f"Code-enforced synthesis triggered: {reason}") |
|
|
|
|
|
|
|
|
async for event in self._run_analysis_phase(query, all_evidence, iteration): |
|
|
yield event |
|
|
|
|
|
yield AgentEvent( |
|
|
type="synthesizing", |
|
|
message=f"Evidence sufficient ({reason})! Preparing synthesis...", |
|
|
iteration=iteration, |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
final_response = self._generate_synthesis(query, all_evidence, assessment) |
|
|
|
|
|
yield AgentEvent( |
|
|
type="complete", |
|
|
message=final_response, |
|
|
data={ |
|
|
"evidence_count": len(all_evidence), |
|
|
"iterations": iteration, |
|
|
"synthesis_reason": reason, |
|
|
"drug_candidates": assessment.details.drug_candidates, |
|
|
"key_findings": assessment.details.key_findings, |
|
|
}, |
|
|
iteration=iteration, |
|
|
) |
|
|
return |
|
|
|
|
|
else: |
|
|
|
|
|
current_queries = assessment.next_search_queries or [ |
|
|
f"{query} mechanism of action", |
|
|
f"{query} clinical evidence", |
|
|
] |
|
|
|
|
|
yield AgentEvent( |
|
|
type="looping", |
|
|
message=( |
|
|
f"Gathering more evidence (scores: {assessment.details.mechanism_score}" |
|
|
f"+{assessment.details.clinical_evidence_score}). " |
|
|
f"Next: {', '.join(current_queries[:2])}..." |
|
|
), |
|
|
data={"next_queries": current_queries, "reason": reason}, |
|
|
iteration=iteration, |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error("Judge phase failed", error=str(e)) |
|
|
yield AgentEvent( |
|
|
type="error", |
|
|
message=f"Assessment failed: {e!s}", |
|
|
iteration=iteration, |
|
|
) |
|
|
continue |
|
|
|
|
|
|
|
|
yield AgentEvent( |
|
|
type="complete", |
|
|
message=self._generate_partial_synthesis(query, all_evidence), |
|
|
data={ |
|
|
"evidence_count": len(all_evidence), |
|
|
"iterations": iteration, |
|
|
"max_reached": True, |
|
|
}, |
|
|
iteration=iteration, |
|
|
) |
|
|
|
|
|
def _generate_synthesis( |
|
|
self, |
|
|
query: str, |
|
|
evidence: list[Evidence], |
|
|
assessment: JudgeAssessment, |
|
|
) -> str: |
|
|
""" |
|
|
Generate the final synthesis response. |
|
|
|
|
|
Args: |
|
|
query: The original question |
|
|
evidence: All collected evidence |
|
|
assessment: The final assessment |
|
|
|
|
|
Returns: |
|
|
Formatted synthesis as markdown |
|
|
""" |
|
|
drug_list = ( |
|
|
"\n".join([f"- **{d}**" for d in assessment.details.drug_candidates]) |
|
|
or "- No specific candidates identified" |
|
|
) |
|
|
findings_list = ( |
|
|
"\n".join([f"- {f}" for f in assessment.details.key_findings]) or "- See evidence below" |
|
|
) |
|
|
|
|
|
citations = "\n".join( |
|
|
[ |
|
|
f"{i + 1}. [{e.citation.title}]({e.citation.url}) " |
|
|
f"({e.citation.source.upper()}, {e.citation.date})" |
|
|
for i, e in enumerate(evidence[:10]) |
|
|
] |
|
|
) |
|
|
|
|
|
return f"""## Drug Repurposing Analysis |
|
|
|
|
|
### Question |
|
|
{query} |
|
|
|
|
|
### Drug Candidates |
|
|
{drug_list} |
|
|
|
|
|
### Key Findings |
|
|
{findings_list} |
|
|
|
|
|
### Assessment |
|
|
- **Mechanism Score**: {assessment.details.mechanism_score}/10 |
|
|
- **Clinical Evidence Score**: {assessment.details.clinical_evidence_score}/10 |
|
|
- **Confidence**: {assessment.confidence:.0%} |
|
|
|
|
|
### Reasoning |
|
|
{assessment.reasoning} |
|
|
|
|
|
### Citations ({len(evidence)} sources) |
|
|
{citations} |
|
|
|
|
|
--- |
|
|
*Analysis based on {len(evidence)} sources across {len(self.history)} iterations.* |
|
|
""" |
|
|
|
|
|
def _generate_partial_synthesis( |
|
|
self, |
|
|
query: str, |
|
|
evidence: list[Evidence], |
|
|
) -> str: |
|
|
""" |
|
|
Generate a REAL synthesis when max iterations reached. |
|
|
|
|
|
Even when forced to stop, we should provide: |
|
|
- Drug candidates (if any were found) |
|
|
- Key findings |
|
|
- Assessment scores |
|
|
- Actionable citations |
|
|
|
|
|
This is still better than a citation dump. |
|
|
""" |
|
|
|
|
|
last_assessment = self.history[-1]["assessment"] if self.history else {} |
|
|
details = last_assessment.get("details", {}) |
|
|
|
|
|
drug_candidates = details.get("drug_candidates", []) |
|
|
key_findings = details.get("key_findings", []) |
|
|
mechanism_score = details.get("mechanism_score", 0) |
|
|
clinical_score = details.get("clinical_evidence_score", 0) |
|
|
reasoning = last_assessment.get("reasoning", "Analysis incomplete due to iteration limit.") |
|
|
|
|
|
|
|
|
if drug_candidates: |
|
|
drug_list = "\n".join([f"- **{d}**" for d in drug_candidates[:5]]) |
|
|
else: |
|
|
drug_list = ( |
|
|
"- *No specific drug candidates identified in evidence*\n" |
|
|
"- *Try a more specific query or add an API key for better analysis*" |
|
|
) |
|
|
|
|
|
|
|
|
if key_findings: |
|
|
findings_list = "\n".join([f"- {f}" for f in key_findings[:5]]) |
|
|
else: |
|
|
findings_list = ( |
|
|
"- *Key findings require further analysis*\n" |
|
|
"- *See citations below for relevant sources*" |
|
|
) |
|
|
|
|
|
|
|
|
citations = "\n".join( |
|
|
[ |
|
|
f"{i + 1}. [{e.citation.title}]({e.citation.url}) " |
|
|
f"({e.citation.source.upper()}, {e.citation.date})" |
|
|
for i, e in enumerate(evidence[:10]) |
|
|
] |
|
|
) |
|
|
|
|
|
combined_score = mechanism_score + clinical_score |
|
|
mech_strength = ( |
|
|
"Strong" if mechanism_score >= 7 else "Moderate" if mechanism_score >= 4 else "Limited" |
|
|
) |
|
|
clin_strength = ( |
|
|
"Strong" if clinical_score >= 7 else "Moderate" if clinical_score >= 4 else "Limited" |
|
|
) |
|
|
comb_strength = "Sufficient" if combined_score >= 12 else "Partial" |
|
|
|
|
|
return f"""## Drug Repurposing Analysis |
|
|
|
|
|
### Research Question |
|
|
{query} |
|
|
|
|
|
### Status |
|
|
Analysis based on {len(evidence)} sources across {len(self.history)} iterations. |
|
|
Maximum iterations reached - results may be incomplete. |
|
|
|
|
|
### Drug Candidates Identified |
|
|
{drug_list} |
|
|
|
|
|
### Key Findings |
|
|
{findings_list} |
|
|
|
|
|
### Evidence Quality Scores |
|
|
| Criterion | Score | Interpretation | |
|
|
|-----------|-------|----------------| |
|
|
| Mechanism | {mechanism_score}/10 | {mech_strength} mechanistic evidence | |
|
|
| Clinical | {clinical_score}/10 | {clin_strength} clinical support | |
|
|
| Combined | {combined_score}/20 | {comb_strength} for synthesis | |
|
|
|
|
|
### Analysis Summary |
|
|
{reasoning} |
|
|
|
|
|
### Top Citations ({len(evidence)} sources total) |
|
|
{citations} |
|
|
|
|
|
--- |
|
|
*For more complete analysis:* |
|
|
- *Add an OpenAI or Anthropic API key for enhanced LLM analysis* |
|
|
- *Try a more specific query (e.g., include drug names)* |
|
|
- *Use Advanced mode for multi-agent research* |
|
|
""" |
|
|
|