Spaces:
Sleeping
Sleeping
| import asyncio | |
| from typing import Dict | |
| from agents import trace, gen_trace_id, Runner | |
| from web_searcher import web_searcher | |
| from research_planner import research_planner, WebSearchItem, WebSearchResultsList | |
| from research_writer import research_writer, ReportFormat | |
| from email_agent import email_agent | |
| import markdown2 | |
| from weasyprint import HTML | |
| import tempfile | |
| import os | |
| class ResearchManager: | |
| async def run(self, query: str): | |
| """Runs the deep research process, yielding the status updates and final report.""" | |
| trace_id = gen_trace_id() | |
| with trace("Research in progress", trace_id=trace_id): | |
| print(f"View trace: https://platform.openai.com/traces/{trace_id}") | |
| yield f"View trace: https://platform.openai.com/traces/{trace_id}" | |
| print("Initializing research.....") | |
| research_plan = await self.plan_search(query) | |
| yield "Search planning completed, preparing for search" | |
| search_results = await self.perform_search(research_plan) | |
| yield "Searches complete, drafting report...." | |
| research_report = await self.draft_research_report(query, search_results) | |
| yield "Research report drafted. Sending email with the report attached." | |
| await self.send_email(research_report) | |
| yield "Email sent successfully! Research Task completed." | |
| pdf_path = self.pdf_report_generator(research_report.markdown_report) | |
| yield{ | |
| "status" : "Email sent successfully.", | |
| "report" : research_report.markdown_report, | |
| "path" : pdf_path | |
| } | |
| async def plan_search(self, query: str) -> WebSearchResultsList: | |
| """Plans search to be perfomed for the query""" | |
| print("Planning searches") | |
| results = await Runner.run(research_planner, f"Query:{query}") | |
| print(f"Will perform {len(results.final_output.web_search_results)} searches") | |
| return results.final_output_as(WebSearchResultsList) | |
| async def perform_search(self, research_plan: WebSearchResultsList)-> list[str]: | |
| """Runs the searches to perform for the query""" | |
| print("Searching....") | |
| num_completed = 0 | |
| tasks = [asyncio.create_task(self.search(item)) for item in research_plan.web_search_results] | |
| results = [] | |
| for task in asyncio.as_completed(tasks): | |
| result = await task | |
| if result is not None: | |
| results.append(result) | |
| num_completed += 1 | |
| print(f"Searches completed: {num_completed}/{len(tasks)} completed.") | |
| print("Search Complete!") | |
| return results | |
| async def search(self, item: WebSearchItem) -> str | None: | |
| """Performs a search for the query""" | |
| input = f"Query : {item.query}, Reason for searching/querying: {item.query}" | |
| try: | |
| result = await Runner.run(web_searcher, input) | |
| except Exception: | |
| return None | |
| async def draft_research_report(self, query:str, search_results: list[str]) -> ReportFormat: | |
| """Drafting a research report for the query""" | |
| print("Preparing research report....") | |
| input = f"Original query: {query}, Summarized searches: {search_results}" | |
| result = await Runner.run(research_writer, input) | |
| print("Draft of research report completed!") | |
| return result.final_output_as(ReportFormat) | |
| async def send_email(self, research_report: ReportFormat) -> None: | |
| print("Sending email with research report attached....") | |
| result = await Runner.run(email_agent, research_report.markdown_report) | |
| print("Email sent sucessfully with research report attached!") | |
| return research_report | |
| def pdf_report_generator(self, markdown_content: str) -> str: | |
| """Generates a PDF from markdown content and returns the path to the PDF file""" | |
| html = markdown2.markdown(markdown_content) | |
| """Defining a temporary file for pdf""" | |
| temporary_directory = tempfile.gettempdir() | |
| pdf_path = os.path.join(temporary_directory, 'research_report.pdf') | |
| HTML(string=html).write_pdf(pdf_path) | |
| return pdf_path | |