File size: 3,551 Bytes
5e4cf1b
 
 
688ba8d
5e4cf1b
 
688ba8d
5e4cf1b
 
 
 
 
 
688ba8d
 
 
 
 
 
 
 
 
 
 
 
 
5e4cf1b
 
 
 
 
 
 
 
 
 
688ba8d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
"""Scout Manager β€” async orchestrator that runs the full pipeline."""

import asyncio
import traceback
from typing import AsyncGenerator

from agents import Runner, trace

from src.scout_agents.planner_agent import planner_agent, SearchPlan
from src.scout_agents.search_agent import search_agent, SearchResult
from src.scout_agents.synthesis_agent import synthesis_agent


async def _run_single_search(search_term: str, reasoning: str) -> SearchResult | None:
    """Run a single search agent for one sub-topic. Returns None on failure."""
    try:
        prompt = (
            f"Search for papers on: {search_term}\n"
            f"Context: {reasoning}"
        )
        result = await Runner.run(search_agent, input=prompt, max_turns=25)
        return result.final_output
    except Exception as e:
        print(f"[ERROR] Search failed for '{search_term}': {e}")
        traceback.print_exc()
        return None


async def run_pipeline(query: str) -> AsyncGenerator[str, None]:
    """Run the full literature scout pipeline, yielding status updates.

    Stages:
    1. Planner β€” decompose query into 5 sub-topics
    2. Search  β€” run 5 search agents in parallel
    3. Synthesize β€” produce a structured research brief
    """
    with trace("Literature Scout Pipeline"):

        # --- Stage 1: Planning ---
        yield "πŸ” **Stage 1/3 β€” Planning search strategy...**"

        plan_result = await Runner.run(planner_agent, input=query)
        search_plan: SearchPlan = plan_result.final_output

        plan_summary = "\n".join(
            f"  {i}. {item.search_term}" for i, item in enumerate(search_plan.items, 1)
        )
        yield f"πŸ“‹ **Search plan ready:**\n{plan_summary}"

        # --- Stage 2: Parallel search ---
        yield "πŸ”Ž **Stage 2/3 β€” Searching PubMed & ArXiv (5 parallel searches)...**"

        tasks = [
            asyncio.create_task(_run_single_search(item.search_term, item.reasoning))
            for item in search_plan.items
        ]

        all_results: list[SearchResult] = []
        for coro in asyncio.as_completed(tasks):
            result = await coro
            if result is not None:
                all_results.append(result)
                yield f"  βœ… Completed search: *{result.search_term}* β€” found {len(result.papers)} papers"
            else:
                yield "  ⚠️ One search failed β€” continuing with remaining results"

        total_papers = sum(len(r.papers) for r in all_results)
        yield f"πŸ“š **All searches complete β€” {total_papers} papers found total**"

        if not all_results:
            yield "❌ **All searches failed. Please check API keys and try again.**"
            return

        # --- Stage 3: Synthesis ---
        yield "✍️ **Stage 3/3 β€” Synthesizing research brief...**"

        # Build context for synthesis agent
        context_parts = [f"# Original Research Query\n{query}\n"]
        for sr in all_results:
            context_parts.append(f"\n## Sub-topic: {sr.search_term}")
            for p in sr.papers:
                context_parts.append(
                    f"- **{p.title}** ({p.source}, relevance: {p.relevance_score}/10)\n"
                    f"  Abstract: {p.abstract[:300]}...\n"
                    f"  DOI: {p.doi} | URL: {p.url}"
                )

        synthesis_input = "\n".join(context_parts)
        synthesis_result = await Runner.run(synthesis_agent, input=synthesis_input)
        brief: str = synthesis_result.final_output

        yield "---\n"
        yield brief