Spaces:
Runtime error
Runtime error
| # Import libraries | |
| from agents import Runner, trace, gen_trace_id | |
| from serach_agent import search_agent | |
| from writer_agent import writer_agent, ReportData | |
| from email_agent import email_agent | |
| from planner_agent import planner_agent, WebSearchItem, WebSearchPlan | |
| import asyncio | |
| from typing import Optional, List, Dict | |
| # Define the ResearchManager class | |
| class ResearchManager(): | |
| def __init__(self): | |
| self.stats = { | |
| "total_searches": 0 | |
| } | |
| # Method to run the pipeline | |
| async def run_pipeline(self, query: str, questions: List[str], answers: List[str], recipient_email: str, send_email: bool = False): | |
| # Validate the input | |
| is_valid, error_message = self.validate_input(query, questions, answers) | |
| if not is_valid: | |
| yield f"❌ Input validation failed: {error_message}" | |
| return | |
| # Email validation | |
| if send_email and not recipient_email: | |
| yield "❌ Email sending requested but no recipient email provided." | |
| return | |
| self.stats["total_searches"] += 1 | |
| # Execute the research pipeline | |
| try: | |
| async for step in self.execute_pipeline_research(query, questions, answers, recipient_email, send_email): | |
| yield step | |
| except Exception as e: | |
| yield f"❌ Research pipeline failed: {str(e)}" | |
| return | |
| # Method to execute the research | |
| async def execute_pipeline_research(self, query: str, questions: List[str], answers: List[str], recipient_email: str, send_email: bool = False): | |
| # Setup tracing | |
| trace_id = gen_trace_id() | |
| with trace("Research Pipeline", trace_id=trace_id): | |
| yield f"Trace: https://platform.openai.com/traces/trace?trace_id={trace_id}" | |
| async for step in self.run_agents_step(query, questions, answers, recipient_email, send_email): | |
| yield step | |
| # Method to run each agent in the research pipeline | |
| async def run_agents_step(self, query: str, questions: List[str], answers: List[str], recipient_email: str, send_email: bool = False): | |
| # Execute individual pipeline steps | |
| # Step 1: Planning | |
| yield "Planning searches based on clarifications..." | |
| search_plan = await self.plan_searches(query, questions, answers) | |
| # Step 2: Searching | |
| yield f"Starting {len(search_plan.searches)} searches..." | |
| search_results = await self.perform_searches(search_plan) | |
| # Step 3: Writing Report | |
| yield "Analyzing search results and writing report..." | |
| report = await self.write_report(query, search_results) | |
| # Step 4: Sending Email (optional) | |
| if send_email and recipient_email: | |
| yield f"Sending report to {recipient_email}..." | |
| await self.send_report_email(report, recipient_email) | |
| yield f"Report sent to {recipient_email}." | |
| else: | |
| yield "Email sending skipped." | |
| # Return final report | |
| yield report.markdown_report | |
| # Method to validate the input | |
| def validate_input(self, query: str, questions: List[str], answers: List[str]) -> tuple[bool, str]: # Return a tuple of (is_valid, error_message) | |
| # Validate input parameters | |
| if not query or not query.strip(): | |
| return False, "Query cannot be empty" | |
| if len(questions) != len(answers): | |
| return False, f"Mismatch: {len(questions)} questions but {len(answers)} answers" | |
| # Check for empty items | |
| for i, (q, a) in enumerate(zip(questions, answers)): | |
| if not q.strip(): | |
| return False, f"Question {i+1} is empty" | |
| if not a.strip(): | |
| return False, f"Answer {i+1} is empty" | |
| return True, "" | |
| # Method to plan the searches | |
| async def plan_searches(self, query: str, questions: List[str], answers: List[str]): | |
| # Build structure prompt for the planner_agent | |
| clarifying_context = "\n".join(f"Q: {q}\nA: {a}" for q, a in zip(questions, answers)) | |
| final_prompt = f"Query: {query}\n\nClarifications:\n{clarifying_context}" | |
| try: | |
| result = await Runner.run(planner_agent, final_prompt) | |
| search_plan = result.final_output | |
| # Validate the result of search plan | |
| if not search_plan.searches: | |
| raise ValueError("Planner agent returned no searches") | |
| print(f"Planned Searches: {len(search_plan.searches)} searches") | |
| return search_plan | |
| except Exception as e: | |
| raise Exception(f"Search Planner failed: {str(e)}") | |
| # Method to perform all searches concurrently | |
| async def perform_searches(self, search_plan: WebSearchPlan) -> List[str]: | |
| # Define the total number of searches based on the search plan | |
| num_searches = len(search_plan.searches) | |
| # Create tasks for concurrent execution | |
| tasks = [asyncio.create_task(self.search_web(item)) for item in search_plan.searches] | |
| results = [] | |
| completed = 0 | |
| # Gather results as they complete | |
| for task in asyncio.as_completed(tasks): | |
| result = await task | |
| if result is not None: | |
| results.append(result) | |
| completed += 1 | |
| print(f"Seraching... {completed}/{num_searches} completed") | |
| self.stats["total_searches"] += 1 | |
| print("Finished all searches.") | |
| return results | |
| # Method to search the web for a single search item | |
| async def search_web(self, item: WebSearchItem) -> Optional[str]: | |
| # Perform single search based on the WebSearchItem (query, reason) | |
| input_text = f"Search: {item.query}\nReason: {item.reason}" | |
| try: | |
| result = await Runner.run(search_agent, input_text) | |
| result = result.final_output | |
| return str(result) | |
| except Exception as e: | |
| print(f"Search failed for '{item.query}': {str(e)}") | |
| return None | |
| # Method to synthesize the report | |
| async def write_report(self, query: str, search_results: List[str]) -> ReportData: | |
| # Define input message for the writer agent | |
| input_text = f"Original query: {query}\n\nSearch Results:\n" + "\n---\n".join(search_results) | |
| try: | |
| result = await Runner.run(writer_agent, input_text) | |
| report = result.final_output | |
| # Validate the result | |
| if not report.markdown_report or not report.short_summary: | |
| raise ValueError("Writer agent returned incomplete report") | |
| return report | |
| except Exception as e: | |
| raise Exception(f"Report Writing failed: {str(e)}") | |
| # Method to send the report via email | |
| async def send_report_email(self, report: ReportData, recipient_email: str) -> None: | |
| # Define input message | |
| input_text = f""" | |
| Send the following research report as an email: | |
| To: {recipient_email} | |
| Body (HTML): | |
| {report.markdown_report} | |
| """ | |
| try: | |
| await Runner.run(email_agent, input_text) | |
| print(f"✅ Email sent to {recipient_email}") | |
| except Exception as e: | |
| raise Exception(f"Email sending failed: {str(e)}") | |