VibecoderMcSwaggins's picture
feat(SPEC-08): Integrate shared memory layer + CodeRabbit fixes (#74)
e0c585c unverified
raw
history blame
22.3 kB
"""Simple Orchestrator - the basic agent loop connecting Search and Judge.
This orchestrator uses a simple loop pattern with pydantic-ai for structured
LLM outputs. It works with free tier (HuggingFace Inference) or paid APIs
(OpenAI, Anthropic).
Design Pattern: Template Method - defines the skeleton of the search-judge loop
while allowing handlers to implement specific behaviors.
"""
from __future__ import annotations
import asyncio
from collections.abc import AsyncGenerator
from typing import TYPE_CHECKING, Any, ClassVar
import structlog
from src.orchestrators.base import JudgeHandlerProtocol, SearchHandlerProtocol
from src.utils.config import settings
from src.utils.models import (
AgentEvent,
Evidence,
JudgeAssessment,
OrchestratorConfig,
SearchResult,
)
if TYPE_CHECKING:
from src.services.embeddings import EmbeddingService
from src.services.statistical_analyzer import StatisticalAnalyzer
logger = structlog.get_logger()
class Orchestrator:
"""
The simple agent orchestrator - runs the Search -> Judge -> Loop cycle.
This is a generator-based design that yields events for real-time UI updates.
Uses pydantic-ai for structured LLM outputs without requiring the full
Microsoft Agent Framework.
"""
# Termination thresholds (code-enforced, not LLM-decided)
TERMINATION_CRITERIA: ClassVar[dict[str, float]] = {
"min_combined_score": 12.0, # mechanism + clinical >= 12
"min_score_with_volume": 10.0, # >= 10 if 50+ sources
"min_evidence_for_volume": 50.0, # Priority 3: evidence count threshold
"late_iteration_threshold": 8.0, # >= 8 in iterations 8+
"max_evidence_threshold": 100.0, # Force synthesis with 100+ sources
"emergency_iteration": 8.0, # Last 2 iterations = emergency mode
"min_confidence": 0.5, # Minimum confidence for emergency synthesis
"min_evidence_for_emergency": 30.0, # Priority 6: min evidence for emergency
}
def __init__(
self,
search_handler: SearchHandlerProtocol,
judge_handler: JudgeHandlerProtocol,
config: OrchestratorConfig | None = None,
enable_analysis: bool = False,
enable_embeddings: bool = True,
):
"""
Initialize the orchestrator.
Args:
search_handler: Handler for executing searches
judge_handler: Handler for assessing evidence
config: Optional configuration (uses defaults if not provided)
enable_analysis: Whether to perform statistical analysis (if Modal available)
enable_embeddings: Whether to use semantic search for ranking/dedup
"""
self.search = search_handler
self.judge = judge_handler
self.config = config or OrchestratorConfig()
self.history: list[dict[str, Any]] = []
self._enable_analysis = enable_analysis and settings.modal_available
self._enable_embeddings = enable_embeddings
# Lazy-load services (typed for IDE support)
self._analyzer: StatisticalAnalyzer | None = None
self._embeddings: EmbeddingService | None = None
def _get_analyzer(self) -> StatisticalAnalyzer | None:
"""Lazy initialization of StatisticalAnalyzer."""
if self._analyzer is None:
from src.utils.service_loader import get_analyzer_if_available
self._analyzer = get_analyzer_if_available()
if self._analyzer is None:
self._enable_analysis = False
return self._analyzer
async def _run_analysis_phase(
self, query: str, evidence: list[Evidence], iteration: int
) -> AsyncGenerator[AgentEvent, None]:
"""Run the optional analysis phase."""
if not self._enable_analysis:
return
yield AgentEvent(
type="analyzing",
message="Running statistical analysis in Modal sandbox...",
data={},
iteration=iteration,
)
try:
analyzer = self._get_analyzer()
if analyzer is None:
logger.info("StatisticalAnalyzer not available, skipping analysis phase")
return
# Run Modal analysis (no agent_framework needed!)
analysis_result = await analyzer.analyze(
query=query,
evidence=evidence,
hypothesis=None, # Could add hypothesis generation later
)
yield AgentEvent(
type="analysis_complete",
message=f"Analysis verdict: {analysis_result.verdict}",
data=analysis_result.model_dump(),
iteration=iteration,
)
except Exception as e:
logger.error("Modal analysis failed", error=str(e))
yield AgentEvent(
type="error",
message=f"Modal analysis failed: {e}",
data={"error": str(e)},
iteration=iteration,
)
def _should_synthesize(
self,
assessment: JudgeAssessment,
iteration: int,
max_iterations: int,
evidence_count: int,
) -> tuple[bool, str]:
"""
Code-enforced synthesis decision.
Returns (should_synthesize, reason).
"""
combined_score = (
assessment.details.mechanism_score + assessment.details.clinical_evidence_score
)
has_drug_candidates = len(assessment.details.drug_candidates) > 0
confidence = assessment.confidence
# Priority 1: LLM explicitly says sufficient with good scores
if assessment.sufficient and assessment.recommendation == "synthesize":
if combined_score >= 10:
return True, "judge_approved"
# Priority 2: High scores with drug candidates
if (
combined_score >= self.TERMINATION_CRITERIA["min_combined_score"]
and has_drug_candidates
):
return True, "high_scores_with_candidates"
# Priority 3: Good scores with high evidence volume
if (
combined_score >= self.TERMINATION_CRITERIA["min_score_with_volume"]
and evidence_count >= self.TERMINATION_CRITERIA["min_evidence_for_volume"]
):
return True, "good_scores_high_volume"
# Priority 4: Late iteration with acceptable scores (diminishing returns)
is_late_iteration = iteration >= max_iterations - 2
if (
is_late_iteration
and combined_score >= self.TERMINATION_CRITERIA["late_iteration_threshold"]
):
return True, "late_iteration_acceptable"
# Priority 5: Very high evidence count (enough to synthesize something)
if evidence_count >= self.TERMINATION_CRITERIA["max_evidence_threshold"]:
return True, "max_evidence_reached"
# Priority 6: Emergency synthesis (avoid garbage output)
if (
is_late_iteration
and evidence_count >= self.TERMINATION_CRITERIA["min_evidence_for_emergency"]
and confidence >= self.TERMINATION_CRITERIA["min_confidence"]
):
return True, "emergency_synthesis"
return False, "continue_searching"
async def run(self, query: str) -> AsyncGenerator[AgentEvent, None]: # noqa: PLR0915
"""
Run the agent loop for a query.
Yields AgentEvent objects for each step, allowing real-time UI updates.
Args:
query: The user's research question
Yields:
AgentEvent objects for each step of the process
"""
# Import here to avoid circular deps if any
from src.agents.graph.state import Hypothesis
from src.services.research_memory import ResearchMemory
logger.info("Starting orchestrator", query=query)
yield AgentEvent(
type="started",
message=f"Starting research for: {query}",
iteration=0,
)
# Initialize Shared Memory
# We keep 'all_evidence' for local tracking/reporting, but use Memory for intelligence
memory = ResearchMemory(query=query)
all_evidence: list[Evidence] = []
current_queries = [query]
iteration = 0
while iteration < self.config.max_iterations:
iteration += 1
logger.info("Iteration", iteration=iteration, queries=current_queries)
# === SEARCH PHASE ===
yield AgentEvent(
type="searching",
message=f"Searching for: {', '.join(current_queries[:3])}...",
iteration=iteration,
)
try:
# Execute searches for all current queries
search_tasks = [
self.search.execute(q, self.config.max_results_per_tool)
for q in current_queries[:3] # Limit to 3 queries per iteration
]
search_results = await asyncio.gather(*search_tasks, return_exceptions=True)
# Collect evidence from successful searches
new_evidence: list[Evidence] = []
errors: list[str] = []
for q, result in zip(current_queries[:3], search_results, strict=False):
if isinstance(result, Exception):
errors.append(f"Search for '{q}' failed: {result!s}")
elif isinstance(result, SearchResult):
new_evidence.extend(result.evidence)
errors.extend(result.errors)
else:
# Should not happen with return_exceptions=True but safe fallback
errors.append(f"Unknown result type for '{q}': {type(result)}")
# === MEMORY INTEGRATION: Store and Deduplicate ===
# ResearchMemory handles semantic deduplication and persistence
# It returns IDs of actual NEW evidence
new_ids = await memory.store_evidence(new_evidence)
# Filter new_evidence to only keep what was actually new (based on IDs)
# Note: This assumes IDs are URLs, which match Citation.url
unique_new = [e for e in new_evidence if e.citation.url in new_ids]
all_evidence.extend(unique_new)
yield AgentEvent(
type="search_complete",
message=f"Found {len(unique_new)} new sources ({len(all_evidence)} total)",
data={
"new_count": len(unique_new),
"total_count": len(all_evidence),
},
iteration=iteration,
)
if errors:
logger.warning("Search errors", errors=errors)
except Exception as e:
logger.error("Search phase failed", error=str(e))
yield AgentEvent(
type="error",
message=f"Search failed: {e!s}",
iteration=iteration,
)
continue
# === JUDGE PHASE ===
yield AgentEvent(
type="judging",
message=f"Evaluating evidence (Memory: {len(memory.evidence_ids)} docs)...",
iteration=iteration,
)
try:
# Retrieve RELEVANT evidence from memory for the judge
# This keeps the context window manageable and focused
judge_context = await memory.get_relevant_evidence(n=30)
# Fallback if memory is empty (shouldn't happen if search worked)
if not judge_context and all_evidence:
judge_context = all_evidence[-30:]
assessment = await self.judge.assess(
query, judge_context, iteration, self.config.max_iterations
)
# === MEMORY INTEGRATION: Track Hypotheses ===
# Convert loose strings to structured Hypotheses
for candidate in assessment.details.drug_candidates:
h = Hypothesis(
id=candidate.replace(" ", "_").lower(),
statement=f"{candidate} is a potential candidate for {query}",
status="proposed",
confidence=assessment.confidence,
reasoning=f" identified in iteration {iteration}",
)
memory.add_hypothesis(h)
yield AgentEvent(
type="judge_complete",
message=(
f"Assessment: {assessment.recommendation} "
f"(confidence: {assessment.confidence:.0%})"
),
data={
"sufficient": assessment.sufficient,
"confidence": assessment.confidence,
"mechanism_score": assessment.details.mechanism_score,
"clinical_score": assessment.details.clinical_evidence_score,
},
iteration=iteration,
)
# Record this iteration in history
self.history.append(
{
"iteration": iteration,
"queries": current_queries,
"evidence_count": len(all_evidence),
"assessment": assessment.model_dump(),
}
)
# === DECISION PHASE (Code-Enforced) ===
should_synth, reason = self._should_synthesize(
assessment=assessment,
iteration=iteration,
max_iterations=self.config.max_iterations,
evidence_count=len(all_evidence),
)
logger.info(
"Synthesis decision",
should_synthesize=should_synth,
reason=reason,
iteration=iteration,
combined_score=assessment.details.mechanism_score
+ assessment.details.clinical_evidence_score,
evidence_count=len(all_evidence),
confidence=assessment.confidence,
)
if should_synth:
# Log synthesis trigger reason for debugging
if reason != "judge_approved":
logger.info(f"Code-enforced synthesis triggered: {reason}")
# Optional Analysis Phase
async for event in self._run_analysis_phase(query, all_evidence, iteration):
yield event
yield AgentEvent(
type="synthesizing",
message=f"Evidence sufficient ({reason})! Preparing synthesis...",
iteration=iteration,
)
# Generate final response
# Use all gathered evidence for the final report
final_response = self._generate_synthesis(query, all_evidence, assessment)
yield AgentEvent(
type="complete",
message=final_response,
data={
"evidence_count": len(all_evidence),
"iterations": iteration,
"synthesis_reason": reason,
"drug_candidates": assessment.details.drug_candidates,
"key_findings": assessment.details.key_findings,
},
iteration=iteration,
)
return
else:
# Need more evidence - prepare next queries
current_queries = assessment.next_search_queries or [
f"{query} mechanism of action",
f"{query} clinical evidence",
]
yield AgentEvent(
type="looping",
message=(
f"Gathering more evidence (scores: {assessment.details.mechanism_score}"
f"+{assessment.details.clinical_evidence_score}). "
f"Next: {', '.join(current_queries[:2])}..."
),
data={"next_queries": current_queries, "reason": reason},
iteration=iteration,
)
except Exception as e:
logger.error("Judge phase failed", error=str(e))
yield AgentEvent(
type="error",
message=f"Assessment failed: {e!s}",
iteration=iteration,
)
continue
# Max iterations reached
yield AgentEvent(
type="complete",
message=self._generate_partial_synthesis(query, all_evidence),
data={
"evidence_count": len(all_evidence),
"iterations": iteration,
"max_reached": True,
},
iteration=iteration,
)
def _generate_synthesis(
self,
query: str,
evidence: list[Evidence],
assessment: JudgeAssessment,
) -> str:
"""
Generate the final synthesis response.
Args:
query: The original question
evidence: All collected evidence
assessment: The final assessment
Returns:
Formatted synthesis as markdown
"""
drug_list = (
"\n".join([f"- **{d}**" for d in assessment.details.drug_candidates])
or "- No specific candidates identified"
)
findings_list = (
"\n".join([f"- {f}" for f in assessment.details.key_findings]) or "- See evidence below"
)
citations = "\n".join(
[
f"{i + 1}. [{e.citation.title}]({e.citation.url}) "
f"({e.citation.source.upper()}, {e.citation.date})"
for i, e in enumerate(evidence[:10]) # Limit to 10 citations
]
)
return f"""## Drug Repurposing Analysis
### Question
{query}
### Drug Candidates
{drug_list}
### Key Findings
{findings_list}
### Assessment
- **Mechanism Score**: {assessment.details.mechanism_score}/10
- **Clinical Evidence Score**: {assessment.details.clinical_evidence_score}/10
- **Confidence**: {assessment.confidence:.0%}
### Reasoning
{assessment.reasoning}
### Citations ({len(evidence)} sources)
{citations}
---
*Analysis based on {len(evidence)} sources across {len(self.history)} iterations.*
"""
def _generate_partial_synthesis(
self,
query: str,
evidence: list[Evidence],
) -> str:
"""
Generate a REAL synthesis when max iterations reached.
Even when forced to stop, we should provide:
- Drug candidates (if any were found)
- Key findings
- Assessment scores
- Actionable citations
This is still better than a citation dump.
"""
# Extract data from last assessment if available
last_assessment = self.history[-1]["assessment"] if self.history else {}
details = last_assessment.get("details", {})
drug_candidates = details.get("drug_candidates", [])
key_findings = details.get("key_findings", [])
mechanism_score = details.get("mechanism_score", 0)
clinical_score = details.get("clinical_evidence_score", 0)
reasoning = last_assessment.get("reasoning", "Analysis incomplete due to iteration limit.")
# Format drug candidates
if drug_candidates:
drug_list = "\n".join([f"- **{d}**" for d in drug_candidates[:5]])
else:
drug_list = (
"- *No specific drug candidates identified in evidence*\n"
"- *Try a more specific query or add an API key for better analysis*"
)
# Format key findings
if key_findings:
findings_list = "\n".join([f"- {f}" for f in key_findings[:5]])
else:
findings_list = (
"- *Key findings require further analysis*\n"
"- *See citations below for relevant sources*"
)
# Format citations (top 10)
citations = "\n".join(
[
f"{i + 1}. [{e.citation.title}]({e.citation.url}) "
f"({e.citation.source.upper()}, {e.citation.date})"
for i, e in enumerate(evidence[:10])
]
)
combined_score = mechanism_score + clinical_score
mech_strength = (
"Strong" if mechanism_score >= 7 else "Moderate" if mechanism_score >= 4 else "Limited"
)
clin_strength = (
"Strong" if clinical_score >= 7 else "Moderate" if clinical_score >= 4 else "Limited"
)
comb_strength = "Sufficient" if combined_score >= 12 else "Partial"
return f"""## Drug Repurposing Analysis
### Research Question
{query}
### Status
Analysis based on {len(evidence)} sources across {len(self.history)} iterations.
Maximum iterations reached - results may be incomplete.
### Drug Candidates Identified
{drug_list}
### Key Findings
{findings_list}
### Evidence Quality Scores
| Criterion | Score | Interpretation |
|-----------|-------|----------------|
| Mechanism | {mechanism_score}/10 | {mech_strength} mechanistic evidence |
| Clinical | {clinical_score}/10 | {clin_strength} clinical support |
| Combined | {combined_score}/20 | {comb_strength} for synthesis |
### Analysis Summary
{reasoning}
### Top Citations ({len(evidence)} sources total)
{citations}
---
*For more complete analysis:*
- *Add an OpenAI or Anthropic API key for enhanced LLM analysis*
- *Try a more specific query (e.g., include drug names)*
- *Use Advanced mode for multi-agent research*
"""