VibecoderMcSwaggins's picture
feat: Implement Free Tier synthesis using HuggingFace Inference
e18ea9a
"""Simple Orchestrator - the basic agent loop connecting Search and Judge.
This orchestrator uses a simple loop pattern with pydantic-ai for structured
LLM outputs. It works with free tier (HuggingFace Inference) or paid APIs
(OpenAI, Anthropic).
Design Pattern: Template Method - defines the skeleton of the search-judge loop
while allowing handlers to implement specific behaviors.
"""
from __future__ import annotations
import asyncio
from collections.abc import AsyncGenerator
from typing import TYPE_CHECKING, Any, ClassVar
import structlog
from src.config.domain import ResearchDomain, get_domain_config
from src.orchestrators.base import JudgeHandlerProtocol, SearchHandlerProtocol
from src.prompts.synthesis import format_synthesis_prompt, get_synthesis_system_prompt
from src.utils.config import settings
from src.utils.exceptions import JudgeError, ModalError, SearchError
from src.utils.models import (
AgentEvent,
Evidence,
JudgeAssessment,
OrchestratorConfig,
SearchResult,
)
if TYPE_CHECKING:
from src.services.embeddings import EmbeddingService
from src.services.statistical_analyzer import StatisticalAnalyzer
logger = structlog.get_logger()
class Orchestrator:
"""
The simple agent orchestrator - runs the Search -> Judge -> Loop cycle.
This is a generator-based design that yields events for real-time UI updates.
Uses pydantic-ai for structured LLM outputs without requiring the full
Microsoft Agent Framework.
"""
# Termination thresholds (code-enforced, not LLM-decided)
TERMINATION_CRITERIA: ClassVar[dict[str, float]] = {
"min_combined_score": 12.0, # mechanism + clinical >= 12
"min_score_with_volume": 10.0, # >= 10 if 50+ sources
"min_evidence_for_volume": 50.0, # Priority 3: evidence count threshold
"late_iteration_threshold": 8.0, # >= 8 in iterations 8+
"max_evidence_threshold": 100.0, # Force synthesis with 100+ sources
"emergency_iteration": 8.0, # Last 2 iterations = emergency mode
"min_confidence": 0.5, # Minimum confidence for emergency synthesis
"min_evidence_for_emergency": 30.0, # Priority 6: min evidence for emergency
}
def __init__(
self,
search_handler: SearchHandlerProtocol,
judge_handler: JudgeHandlerProtocol,
config: OrchestratorConfig | None = None,
enable_analysis: bool = False,
enable_embeddings: bool = True,
domain: ResearchDomain | str | None = None,
):
"""
Initialize the orchestrator.
Args:
search_handler: Handler for executing searches
judge_handler: Handler for assessing evidence
config: Optional configuration (uses defaults if not provided)
enable_analysis: Whether to perform statistical analysis (if Modal available)
enable_embeddings: Whether to use semantic search for ranking/dedup
domain: Research domain for customization
"""
self.search = search_handler
self.judge = judge_handler
self.config = config or OrchestratorConfig()
self.history: list[dict[str, Any]] = []
self._enable_analysis = enable_analysis and settings.modal_available
self._enable_embeddings = enable_embeddings
self.domain = domain
self.domain_config = get_domain_config(domain)
# Lazy-load services (typed for IDE support)
self._analyzer: StatisticalAnalyzer | None = None
self._embeddings: EmbeddingService | None = None
def _get_analyzer(self) -> StatisticalAnalyzer | None:
"""Lazy initialization of StatisticalAnalyzer."""
if self._analyzer is None:
from src.utils.service_loader import get_analyzer_if_available
self._analyzer = get_analyzer_if_available()
if self._analyzer is None:
self._enable_analysis = False
return self._analyzer
async def _run_analysis_phase(
self, query: str, evidence: list[Evidence], iteration: int
) -> AsyncGenerator[AgentEvent, None]:
"""Run the optional analysis phase."""
if not self._enable_analysis:
return
yield AgentEvent(
type="analyzing",
message="Running statistical analysis in Modal sandbox...",
data={},
iteration=iteration,
)
try:
analyzer = self._get_analyzer()
if analyzer is None:
logger.info("StatisticalAnalyzer not available, skipping analysis phase")
return
# Run Modal analysis (no agent_framework needed!)
analysis_result = await analyzer.analyze(
query=query,
evidence=evidence,
hypothesis=None, # Could add hypothesis generation later
)
yield AgentEvent(
type="analysis_complete",
message=f"Analysis verdict: {analysis_result.verdict}",
data=analysis_result.model_dump(),
iteration=iteration,
)
except ModalError as e:
logger.error("Modal analysis failed", error=str(e), exc_type="ModalError")
yield AgentEvent(
type="error",
message=f"Modal analysis failed: {e}",
data={"error": str(e), "recoverable": True},
iteration=iteration,
)
except Exception as e:
# Unexpected error - log with full context for debugging
logger.error(
"Modal analysis failed unexpectedly",
error=str(e),
exc_type=type(e).__name__,
)
yield AgentEvent(
type="error",
message=f"Modal analysis failed: {e}",
data={"error": str(e), "recoverable": True},
iteration=iteration,
)
def _should_synthesize(
self,
assessment: JudgeAssessment,
iteration: int,
max_iterations: int,
evidence_count: int,
) -> tuple[bool, str]:
"""
Code-enforced synthesis decision.
Returns (should_synthesize, reason).
"""
combined_score = (
assessment.details.mechanism_score + assessment.details.clinical_evidence_score
)
has_drug_candidates = len(assessment.details.drug_candidates) > 0
confidence = assessment.confidence
# Priority 1: LLM explicitly says sufficient with good scores
if assessment.sufficient and assessment.recommendation == "synthesize":
if combined_score >= 10:
return True, "judge_approved"
# Priority 2: High scores with drug candidates
if (
combined_score >= self.TERMINATION_CRITERIA["min_combined_score"]
and has_drug_candidates
):
return True, "high_scores_with_candidates"
# Priority 3: Good scores with high evidence volume
if (
combined_score >= self.TERMINATION_CRITERIA["min_score_with_volume"]
and evidence_count >= self.TERMINATION_CRITERIA["min_evidence_for_volume"]
):
return True, "good_scores_high_volume"
# Priority 4: Late iteration with acceptable scores (diminishing returns)
is_late_iteration = iteration >= max_iterations - 2
if (
is_late_iteration
and combined_score >= self.TERMINATION_CRITERIA["late_iteration_threshold"]
):
return True, "late_iteration_acceptable"
# Priority 5: Very high evidence count (enough to synthesize something)
if evidence_count >= self.TERMINATION_CRITERIA["max_evidence_threshold"]:
return True, "max_evidence_reached"
# Priority 6: Emergency synthesis (avoid garbage output)
if (
is_late_iteration
and evidence_count >= self.TERMINATION_CRITERIA["min_evidence_for_emergency"]
and confidence >= self.TERMINATION_CRITERIA["min_confidence"]
):
return True, "emergency_synthesis"
return False, "continue_searching"
async def run(self, query: str) -> AsyncGenerator[AgentEvent, None]: # noqa: PLR0915
"""
Run the agent loop for a query.
Yields AgentEvent objects for each step, allowing real-time UI updates.
Args:
query: The user's research question
Yields:
AgentEvent objects for each step of the process
"""
# Import here to avoid circular deps if any
from src.agents.graph.state import Hypothesis
from src.services.research_memory import ResearchMemory
logger.info("Starting orchestrator", query=query)
yield AgentEvent(
type="started",
message=f"Starting research for: {query}",
iteration=0,
)
# Initialize Shared Memory
# We keep 'all_evidence' for local tracking/reporting, but use Memory for intelligence
memory = ResearchMemory(query=query)
all_evidence: list[Evidence] = []
current_queries = [query]
iteration = 0
while iteration < self.config.max_iterations:
iteration += 1
logger.info("Iteration", iteration=iteration, queries=current_queries)
# === SEARCH PHASE ===
yield AgentEvent(
type="searching",
message=f"Searching for: {', '.join(current_queries[:3])}...",
iteration=iteration,
)
try:
# Execute searches for all current queries
search_tasks = [
self.search.execute(q, self.config.max_results_per_tool)
for q in current_queries[:3] # Limit to 3 queries per iteration
]
search_results = await asyncio.gather(*search_tasks, return_exceptions=True)
# Collect evidence from successful searches
new_evidence: list[Evidence] = []
errors: list[str] = []
for q, result in zip(current_queries[:3], search_results, strict=False):
if isinstance(result, Exception):
errors.append(f"Search for '{q}' failed: {result!s}")
elif isinstance(result, SearchResult):
new_evidence.extend(result.evidence)
errors.extend(result.errors)
else:
# Should not happen with return_exceptions=True but safe fallback
errors.append(f"Unknown result type for '{q}': {type(result)}")
# === MEMORY INTEGRATION: Store and Deduplicate ===
# ResearchMemory handles semantic deduplication and persistence
# It returns IDs of actual NEW evidence
new_ids = await memory.store_evidence(new_evidence)
# Filter new_evidence to only keep what was actually new (based on IDs)
# Note: This assumes IDs are URLs, which match Citation.url
unique_new = [e for e in new_evidence if e.citation.url in new_ids]
all_evidence.extend(unique_new)
yield AgentEvent(
type="search_complete",
message=f"Found {len(unique_new)} new sources ({len(all_evidence)} total)",
data={
"new_count": len(unique_new),
"total_count": len(all_evidence),
},
iteration=iteration,
)
if errors:
logger.warning("Search errors", errors=errors)
except SearchError as e:
logger.error("Search phase failed", error=str(e), exc_type="SearchError")
yield AgentEvent(
type="error",
message=f"Search failed: {e!s}",
data={"recoverable": True, "error_type": "search"},
iteration=iteration,
)
continue
except Exception as e:
# Unexpected error - log full context for debugging
logger.error(
"Search phase failed unexpectedly",
error=str(e),
exc_type=type(e).__name__,
)
yield AgentEvent(
type="error",
message=f"Search failed: {e!s}",
data={"recoverable": True, "error_type": "unexpected"},
iteration=iteration,
)
continue
# === JUDGE PHASE ===
yield AgentEvent(
type="judging",
message=f"Evaluating evidence (Memory: {len(memory.evidence_ids)} docs)...",
iteration=iteration,
)
try:
# Retrieve RELEVANT evidence from memory for the judge
# This keeps the context window manageable and focused
judge_context = await memory.get_relevant_evidence(n=30)
# Fallback if memory is empty (shouldn't happen if search worked)
if not judge_context and all_evidence:
judge_context = all_evidence[-30:]
assessment = await self.judge.assess(
query, judge_context, iteration, self.config.max_iterations
)
# === MEMORY INTEGRATION: Track Hypotheses ===
# Convert loose strings to structured Hypotheses
for candidate in assessment.details.drug_candidates:
h = Hypothesis(
id=candidate.replace(" ", "_").lower(),
statement=f"{candidate} is a potential candidate for {query}",
status="proposed",
confidence=assessment.confidence,
reasoning=f" identified in iteration {iteration}",
)
memory.add_hypothesis(h)
yield AgentEvent(
type="judge_complete",
message=(
f"Assessment: {assessment.recommendation} "
f"(confidence: {assessment.confidence:.0%})"
),
data={
"sufficient": assessment.sufficient,
"confidence": assessment.confidence,
"mechanism_score": assessment.details.mechanism_score,
"clinical_score": assessment.details.clinical_evidence_score,
},
iteration=iteration,
)
# Record this iteration in history
self.history.append(
{
"iteration": iteration,
"queries": current_queries,
"evidence_count": len(all_evidence),
"assessment": assessment.model_dump(),
}
)
# === DECISION PHASE (Code-Enforced) ===
should_synth, reason = self._should_synthesize(
assessment=assessment,
iteration=iteration,
max_iterations=self.config.max_iterations,
evidence_count=len(all_evidence),
)
logger.info(
"Synthesis decision",
should_synthesize=should_synth,
reason=reason,
iteration=iteration,
combined_score=assessment.details.mechanism_score
+ assessment.details.clinical_evidence_score,
evidence_count=len(all_evidence),
confidence=assessment.confidence,
)
if should_synth:
# Log synthesis trigger reason for debugging
if reason != "judge_approved":
logger.info(f"Code-enforced synthesis triggered: {reason}")
# Optional Analysis Phase
async for event in self._run_analysis_phase(query, all_evidence, iteration):
yield event
yield AgentEvent(
type="synthesizing",
message=f"Evidence sufficient ({reason})! Preparing synthesis...",
iteration=iteration,
)
# Generate final response using LLM narrative synthesis
# Use all gathered evidence for the final report
final_response = await self._generate_synthesis(query, all_evidence, assessment)
yield AgentEvent(
type="complete",
message=final_response,
data={
"evidence_count": len(all_evidence),
"iterations": iteration,
"synthesis_reason": reason,
"drug_candidates": assessment.details.drug_candidates,
"key_findings": assessment.details.key_findings,
},
iteration=iteration,
)
return
else:
# Need more evidence - prepare next queries
current_queries = assessment.next_search_queries or [
f"{query} mechanism of action",
f"{query} clinical evidence",
]
yield AgentEvent(
type="looping",
message=(
f"Gathering more evidence (scores: {assessment.details.mechanism_score}"
f"+{assessment.details.clinical_evidence_score}). "
f"Next: {', '.join(current_queries[:2])}..."
),
data={"next_queries": current_queries, "reason": reason},
iteration=iteration,
)
except JudgeError as e:
logger.error("Judge phase failed", error=str(e), exc_type="JudgeError")
yield AgentEvent(
type="error",
message=f"Assessment failed: {e!s}",
data={"recoverable": True, "error_type": "judge"},
iteration=iteration,
)
continue
except Exception as e:
# Unexpected error - log full context for debugging
logger.error(
"Judge phase failed unexpectedly",
error=str(e),
exc_type=type(e).__name__,
)
yield AgentEvent(
type="error",
message=f"Assessment failed: {e!s}",
data={"recoverable": True, "error_type": "unexpected"},
iteration=iteration,
)
continue
# Max iterations reached
yield AgentEvent(
type="complete",
message=self._generate_partial_synthesis(query, all_evidence),
data={
"evidence_count": len(all_evidence),
"iterations": iteration,
"max_reached": True,
},
iteration=iteration,
)
async def _generate_synthesis(
self,
query: str,
evidence: list[Evidence],
assessment: JudgeAssessment,
) -> str:
"""
Generate the final synthesis response using LLM.
This method calls an LLM to generate a narrative research report,
following the Microsoft Agent Framework pattern of using LLM synthesis
instead of string templating.
Args:
query: The original question
evidence: All collected evidence
assessment: The final assessment
Returns:
Narrative synthesis as markdown
"""
# Build evidence summary for LLM context (limit to avoid token overflow)
evidence_lines = []
for e in evidence[:20]:
authors = ", ".join(e.citation.authors[:2]) if e.citation.authors else "Unknown"
content_preview = e.content[:200].replace("\n", " ")
evidence_lines.append(
f"- {e.citation.title} ({authors}, {e.citation.date}): {content_preview}..."
)
evidence_summary = "\n".join(evidence_lines)
# Format synthesis prompt with assessment data
user_prompt = format_synthesis_prompt(
query=query,
evidence_summary=evidence_summary,
drug_candidates=assessment.details.drug_candidates,
key_findings=assessment.details.key_findings,
mechanism_score=assessment.details.mechanism_score,
clinical_score=assessment.details.clinical_evidence_score,
confidence=assessment.confidence,
)
# Get domain-specific system prompt
system_prompt = get_synthesis_system_prompt(self.domain)
try:
# Check if judge has its own synthesize method (Free Tier uses HF Inference)
# This ensures Free Tier uses consistent free inference for BOTH judge AND synthesis
if hasattr(self.judge, "synthesize"):
logger.info("Using judge's free-tier synthesis method")
narrative = await self.judge.synthesize(system_prompt, user_prompt)
if narrative:
logger.info("Free-tier synthesis completed", chars=len(narrative))
else:
# Free tier synthesis failed, use template
raise RuntimeError("Free tier HF synthesis returned no content")
else:
# Paid tier: use PydanticAI with get_model()
from pydantic_ai import Agent
from src.agent_factory.judges import get_model
# Create synthesis agent with retries (matching Judge agent pattern)
# Without retries, transient errors immediately trigger fallback
agent: Agent[None, str] = Agent(
model=get_model(),
output_type=str,
system_prompt=system_prompt,
retries=3, # Match Judge agent - retry on transient errors
)
result = await agent.run(user_prompt)
narrative = result.output
logger.info("LLM narrative synthesis completed", chars=len(narrative))
except Exception as e:
# Fallback to template synthesis if LLM fails
# Log error details for debugging
logger.error(
"LLM synthesis failed, using template fallback",
error=str(e),
exc_type=type(e).__name__,
evidence_count=len(evidence),
exc_info=True, # Capture stack trace for debugging
)
# Surface the error to user (MS Agent Framework pattern)
# Don't silently fall back - let user know synthesis degraded
error_note = (
f"\n\n> ⚠️ **Note**: AI narrative synthesis unavailable. "
f"Showing structured summary.\n"
f"> _Error: {type(e).__name__}_\n"
)
template = self._generate_template_synthesis(query, evidence, assessment)
return f"{error_note}\n{template}"
# Add full citation list footer
citations = "\n".join(
f"{i + 1}. [{e.citation.title}]({e.citation.url}) "
f"({e.citation.source.upper()}, {e.citation.date})"
for i, e in enumerate(evidence[:15])
)
return f"""{narrative}
---
### Full Citation List ({len(evidence)} sources)
{citations}
*Analysis based on {len(evidence)} sources across {len(self.history)} iterations.*
"""
def _generate_template_synthesis(
self,
query: str,
evidence: list[Evidence],
assessment: JudgeAssessment,
) -> str:
"""
Generate fallback template synthesis (no LLM).
Used when LLM synthesis fails or is unavailable.
Args:
query: The original question
evidence: All collected evidence
assessment: The final assessment
Returns:
Formatted synthesis as markdown (bullet-point style)
"""
drug_list = (
"\n".join([f"- **{d}**" for d in assessment.details.drug_candidates])
or "- No specific candidates identified"
)
findings_list = (
"\n".join([f"- {f}" for f in assessment.details.key_findings]) or "- See evidence below"
)
citations = "\n".join(
[
f"{i + 1}. [{e.citation.title}]({e.citation.url}) "
f"({e.citation.source.upper()}, {e.citation.date})"
for i, e in enumerate(evidence[:10])
]
)
return f"""{self.domain_config.report_title}
### Question
{query}
### Drug Candidates
{drug_list}
### Key Findings
{findings_list}
### Assessment
- **Mechanism Score**: {assessment.details.mechanism_score}/10
- **Clinical Evidence Score**: {assessment.details.clinical_evidence_score}/10
- **Confidence**: {assessment.confidence:.0%}
### Reasoning
{assessment.reasoning}
### Citations ({len(evidence)} sources)
{citations}
---
*Analysis based on {len(evidence)} sources across {len(self.history)} iterations.*
"""
def _generate_partial_synthesis(
self,
query: str,
evidence: list[Evidence],
) -> str:
"""
Generate a REAL synthesis when max iterations reached.
Even when forced to stop, we should provide:
- Drug candidates (if any were found)
- Key findings
- Assessment scores
- Actionable citations
This is still better than a citation dump.
"""
# Extract data from last assessment if available
last_assessment = self.history[-1]["assessment"] if self.history else {}
details = last_assessment.get("details", {})
drug_candidates = details.get("drug_candidates", [])
key_findings = details.get("key_findings", [])
mechanism_score = details.get("mechanism_score", 0)
clinical_score = details.get("clinical_evidence_score", 0)
reasoning = last_assessment.get("reasoning", "Analysis incomplete due to iteration limit.")
# Format drug candidates
if drug_candidates:
drug_list = "\n".join([f"- **{d}**" for d in drug_candidates[:5]])
else:
drug_list = (
"- *No specific drug candidates identified in evidence*\n"
"- *Try a more specific query or add an API key for better analysis*"
)
# Format key findings
if key_findings:
findings_list = "\n".join([f"- {f}" for f in key_findings[:5]])
else:
findings_list = (
"- *Key findings require further analysis*\n"
"- *See citations below for relevant sources*"
)
# Format citations (top 10)
citations = "\n".join(
[
f"{i + 1}. [{e.citation.title}]({e.citation.url}) "
f"({e.citation.source.upper()}, {e.citation.date})"
for i, e in enumerate(evidence[:10])
]
)
combined_score = mechanism_score + clinical_score
mech_strength = (
"Strong" if mechanism_score >= 7 else "Moderate" if mechanism_score >= 4 else "Limited"
)
clin_strength = (
"Strong" if clinical_score >= 7 else "Moderate" if clinical_score >= 4 else "Limited"
)
comb_strength = "Sufficient" if combined_score >= 12 else "Partial"
return f"""{self.domain_config.report_title}
### Research Question
{query}
### Status
Analysis based on {len(evidence)} sources across {len(self.history)} iterations.
Maximum iterations reached - results may be incomplete.
### Drug Candidates Identified
{drug_list}
### Key Findings
{findings_list}
### Evidence Quality Scores
| Criterion | Score | Interpretation |
|-----------|-------|----------------|
| Mechanism | {mechanism_score}/10 | {mech_strength} mechanistic evidence |
| Clinical | {clinical_score}/10 | {clin_strength} clinical support |
| Combined | {combined_score}/20 | {comb_strength} for synthesis |
### Analysis Summary
{reasoning}
### Top Citations ({len(evidence)} sources total)
{citations}
---
*For more complete analysis:*
- *Add an OpenAI or Anthropic API key for enhanced LLM analysis*
- *Try a more specific query (e.g., include drug names)*
- *Use Advanced mode for multi-agent research*
"""