from agents import Runner from search_agent import search_agent from planner_agent import planner_agent, WebSearchItem, WebSearchPlan from writer_agent import writer_agent, ReportData from email_agent import email_agent import asyncio import json class ResearchManager: async def run(self, query: str): """ Run the deep research process, yielding status updates and the final report """ # 1. Removed the 'with trace' block to stop the 401 API Key error print("Starting research...") yield "Starting research process..." try: # 2. Plan Searches search_plan = await self.plan_searches(query) yield f"Searches planned ({len(search_plan.searches)} queries), starting to search..." # 3. Perform Searches search_results = await self.perform_searches(search_plan) yield f"Searches complete ({len(search_results)} results found), writing report..." # 4. Write Report report = await self.write_report(query, search_results) yield "Report written, sending email..." # 5. Send Email await self.send_email(report) yield "Email sent, research complete!" # 6. Final Output yield report.markdown_report except Exception as e: yield f"Error during research: {str(e)}" async def plan_searches(self, query: str) -> WebSearchPlan: """ Plan the searches to perform for the query """ print("Planning searches...") result = await Runner.run( planner_agent, f"Query: {query}", ) # Manually parse the JSON string from Groq import json try: # Clean possible markdown formatting if the model adds it raw_content = str(result.final_output).strip().replace("```json", "").replace("```", "") data = json.loads(raw_content) plan = WebSearchPlan(**data) print(f"Will perform {len(plan.searches)} searches") return plan except Exception as e: print(f"Failed to parse plan: {e}") # Fallback to an empty plan to prevent crash return WebSearchPlan(searches=[]) async def perform_searches(self, search_plan: WebSearchPlan) -> list[str]: """ Perform the searches to perform for the query """ print("Searching...") num_completed = 0 tasks = [asyncio.create_task(self.search(item)) for item in search_plan.searches] results = [] for task in asyncio.as_completed(tasks): result = await task if result is not None: results.append(result) num_completed += 1 print(f"Searching... {num_completed}/{len(tasks)} completed") print("Finished searching") return results async def search(self, item: WebSearchItem) -> str | None: """ Perform a search for the query """ input = f"Search term: {item.query}\nReason for searching: {item.reason}" try: result = await Runner.run( search_agent, input, ) return str(result.final_output) except Exception: return None async def write_report(self, query: str, search_results: list[str]) -> ReportData: """ Write the report for the query """ print("Thinking about report...") input = f"Original query: {query}\nSummarized search results: {search_results}" result = await Runner.run( writer_agent, input, ) raw_content = str(result.final_output).strip().replace("```json", "").replace("```", "") data = json.loads(raw_content) return ReportData(**data) async def send_email(self, report: ReportData) -> None: print("Writing email...") result = await Runner.run( email_agent, report.markdown_report, ) print("Email sent") return report