bioinformatics-scout / src /scout_manager.py
Mituvinci
Add tracing, error handling, and requirements.txt for HF Spaces
688ba8d
"""Scout Manager β€” async orchestrator that runs the full pipeline."""
import asyncio
import traceback
from typing import AsyncGenerator
from agents import Runner, trace
from src.scout_agents.planner_agent import planner_agent, SearchPlan
from src.scout_agents.search_agent import search_agent, SearchResult
from src.scout_agents.synthesis_agent import synthesis_agent
async def _run_single_search(search_term: str, reasoning: str) -> SearchResult | None:
"""Run a single search agent for one sub-topic. Returns None on failure."""
try:
prompt = (
f"Search for papers on: {search_term}\n"
f"Context: {reasoning}"
)
result = await Runner.run(search_agent, input=prompt, max_turns=25)
return result.final_output
except Exception as e:
print(f"[ERROR] Search failed for '{search_term}': {e}")
traceback.print_exc()
return None
async def run_pipeline(query: str) -> AsyncGenerator[str, None]:
"""Run the full literature scout pipeline, yielding status updates.
Stages:
1. Planner β€” decompose query into 5 sub-topics
2. Search β€” run 5 search agents in parallel
3. Synthesize β€” produce a structured research brief
"""
with trace("Literature Scout Pipeline"):
# --- Stage 1: Planning ---
yield "πŸ” **Stage 1/3 β€” Planning search strategy...**"
plan_result = await Runner.run(planner_agent, input=query)
search_plan: SearchPlan = plan_result.final_output
plan_summary = "\n".join(
f" {i}. {item.search_term}" for i, item in enumerate(search_plan.items, 1)
)
yield f"πŸ“‹ **Search plan ready:**\n{plan_summary}"
# --- Stage 2: Parallel search ---
yield "πŸ”Ž **Stage 2/3 β€” Searching PubMed & ArXiv (5 parallel searches)...**"
tasks = [
asyncio.create_task(_run_single_search(item.search_term, item.reasoning))
for item in search_plan.items
]
all_results: list[SearchResult] = []
for coro in asyncio.as_completed(tasks):
result = await coro
if result is not None:
all_results.append(result)
yield f" βœ… Completed search: *{result.search_term}* β€” found {len(result.papers)} papers"
else:
yield " ⚠️ One search failed β€” continuing with remaining results"
total_papers = sum(len(r.papers) for r in all_results)
yield f"πŸ“š **All searches complete β€” {total_papers} papers found total**"
if not all_results:
yield "❌ **All searches failed. Please check API keys and try again.**"
return
# --- Stage 3: Synthesis ---
yield "✍️ **Stage 3/3 β€” Synthesizing research brief...**"
# Build context for synthesis agent
context_parts = [f"# Original Research Query\n{query}\n"]
for sr in all_results:
context_parts.append(f"\n## Sub-topic: {sr.search_term}")
for p in sr.papers:
context_parts.append(
f"- **{p.title}** ({p.source}, relevance: {p.relevance_score}/10)\n"
f" Abstract: {p.abstract[:300]}...\n"
f" DOI: {p.doi} | URL: {p.url}"
)
synthesis_input = "\n".join(context_parts)
synthesis_result = await Runner.run(synthesis_agent, input=synthesis_input)
brief: str = synthesis_result.final_output
yield "---\n"
yield brief