Spaces:
Runtime error
Runtime error
File size: 7,405 Bytes
45d075b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 | # Import libraries
from agents import Runner, trace, gen_trace_id
from serach_agent import search_agent
from writer_agent import writer_agent, ReportData
from email_agent import email_agent
from planner_agent import planner_agent, WebSearchItem, WebSearchPlan
import asyncio
from typing import Optional, List, Dict
# Define the ResearchManager class
class ResearchManager():
def __init__(self):
self.stats = {
"total_searches": 0
}
# Method to run the pipeline
async def run_pipeline(self, query: str, questions: List[str], answers: List[str], recipient_email: str, send_email: bool = False):
# Validate the input
is_valid, error_message = self.validate_input(query, questions, answers)
if not is_valid:
yield f"❌ Input validation failed: {error_message}"
return
# Email validation
if send_email and not recipient_email:
yield "❌ Email sending requested but no recipient email provided."
return
self.stats["total_searches"] += 1
# Execute the research pipeline
try:
async for step in self.execute_pipeline_research(query, questions, answers, recipient_email, send_email):
yield step
except Exception as e:
yield f"❌ Research pipeline failed: {str(e)}"
return
# Method to execute the research
async def execute_pipeline_research(self, query: str, questions: List[str], answers: List[str], recipient_email: str, send_email: bool = False):
# Setup tracing
trace_id = gen_trace_id()
with trace("Research Pipeline", trace_id=trace_id):
yield f"Trace: https://platform.openai.com/traces/trace?trace_id={trace_id}"
async for step in self.run_agents_step(query, questions, answers, recipient_email, send_email):
yield step
# Method to run each agent in the research pipeline
async def run_agents_step(self, query: str, questions: List[str], answers: List[str], recipient_email: str, send_email: bool = False):
# Execute individual pipeline steps
# Step 1: Planning
yield "Planning searches based on clarifications..."
search_plan = await self.plan_searches(query, questions, answers)
# Step 2: Searching
yield f"Starting {len(search_plan.searches)} searches..."
search_results = await self.perform_searches(search_plan)
# Step 3: Writing Report
yield "Analyzing search results and writing report..."
report = await self.write_report(query, search_results)
# Step 4: Sending Email (optional)
if send_email and recipient_email:
yield f"Sending report to {recipient_email}..."
await self.send_report_email(report, recipient_email)
yield f"Report sent to {recipient_email}."
else:
yield "Email sending skipped."
# Return final report
yield report.markdown_report
# Method to validate the input
def validate_input(self, query: str, questions: List[str], answers: List[str]) -> tuple[bool, str]: # Return a tuple of (is_valid, error_message)
# Validate input parameters
if not query or not query.strip():
return False, "Query cannot be empty"
if len(questions) != len(answers):
return False, f"Mismatch: {len(questions)} questions but {len(answers)} answers"
# Check for empty items
for i, (q, a) in enumerate(zip(questions, answers)):
if not q.strip():
return False, f"Question {i+1} is empty"
if not a.strip():
return False, f"Answer {i+1} is empty"
return True, ""
# Method to plan the searches
async def plan_searches(self, query: str, questions: List[str], answers: List[str]):
# Build structure prompt for the planner_agent
clarifying_context = "\n".join(f"Q: {q}\nA: {a}" for q, a in zip(questions, answers))
final_prompt = f"Query: {query}\n\nClarifications:\n{clarifying_context}"
try:
result = await Runner.run(planner_agent, final_prompt)
search_plan = result.final_output
# Validate the result of search plan
if not search_plan.searches:
raise ValueError("Planner agent returned no searches")
print(f"Planned Searches: {len(search_plan.searches)} searches")
return search_plan
except Exception as e:
raise Exception(f"Search Planner failed: {str(e)}")
# Method to perform all searches concurrently
async def perform_searches(self, search_plan: WebSearchPlan) -> List[str]:
# Define the total number of searches based on the search plan
num_searches = len(search_plan.searches)
# Create tasks for concurrent execution
tasks = [asyncio.create_task(self.search_web(item)) for item in search_plan.searches]
results = []
completed = 0
# Gather results as they complete
for task in asyncio.as_completed(tasks):
result = await task
if result is not None:
results.append(result)
completed += 1
print(f"Seraching... {completed}/{num_searches} completed")
self.stats["total_searches"] += 1
print("Finished all searches.")
return results
# Method to search the web for a single search item
async def search_web(self, item: WebSearchItem) -> Optional[str]:
# Perform single search based on the WebSearchItem (query, reason)
input_text = f"Search: {item.query}\nReason: {item.reason}"
try:
result = await Runner.run(search_agent, input_text)
result = result.final_output
return str(result)
except Exception as e:
print(f"Search failed for '{item.query}': {str(e)}")
return None
# Method to synthesize the report
async def write_report(self, query: str, search_results: List[str]) -> ReportData:
# Define input message for the writer agent
input_text = f"Original query: {query}\n\nSearch Results:\n" + "\n---\n".join(search_results)
try:
result = await Runner.run(writer_agent, input_text)
report = result.final_output
# Validate the result
if not report.markdown_report or not report.short_summary:
raise ValueError("Writer agent returned incomplete report")
return report
except Exception as e:
raise Exception(f"Report Writing failed: {str(e)}")
# Method to send the report via email
async def send_report_email(self, report: ReportData, recipient_email: str) -> None:
# Define input message
input_text = f"""
Send the following research report as an email:
To: {recipient_email}
Body (HTML):
{report.markdown_report}
"""
try:
await Runner.run(email_agent, input_text)
print(f"✅ Email sent to {recipient_email}")
except Exception as e:
raise Exception(f"Email sending failed: {str(e)}")
|