Spaces:
Build error
Build error
| from agents import Runner, trace, gen_trace_id, Agent, ModelSettings | |
| from search_agent import search_agent | |
| from planner_agent import planner_agent, WebSearchItem, WebSearchPlan | |
| from writer_agent import writer_agent_plain, writer_agent, ReportData, INSTRUCTIONS as WRITER_INSTRUCTIONS | |
| from email_agent import email_agent | |
| import asyncio | |
| import time | |
| from pydantic import BaseModel, Field | |
| from model_config import get_model, get_model_display_name | |
| class ResearchManager: | |
| def __init__(self, model_choice="gemini-2.5-flash (Default)"): | |
| # Track request timestamps for intelligent rate limiting | |
| self.request_times = [] | |
| self.max_rpm = 10 # Default: 10 requests per minute | |
| self.rate_limit_window = 60 # seconds | |
| self.model_choice = model_choice | |
| self.model = get_model(model_choice) | |
| # Adjust rate limits based on model | |
| if "llama" in model_choice.lower(): | |
| self.max_rpm = 30 # Groq has higher limits | |
| elif "pro" in model_choice.lower(): | |
| self.max_rpm = 5 # Experimental models have lower limits | |
| async def wait_for_rate_limit(self): | |
| """Intelligent rate limiting: only sleep as much as needed""" | |
| current_time = time.time() | |
| # Remove requests older than the rate limit window | |
| self.request_times = [t for t in self.request_times if current_time - t < self.rate_limit_window] | |
| # If we've hit the rate limit, wait until the oldest request expires | |
| if len(self.request_times) >= self.max_rpm: | |
| oldest_request = self.request_times[0] | |
| wait_time = self.rate_limit_window - (current_time - oldest_request) + 0.1 # Add 0.1s buffer | |
| if wait_time > 0: | |
| print(f"Rate limit: waiting {wait_time:.1f}s...") | |
| await asyncio.sleep(wait_time) | |
| current_time = time.time() | |
| # Record this request | |
| self.request_times.append(current_time) | |
| def format_attachments_context(self, attachments: list) -> str: | |
| """Format attachments for context injection into agent prompts""" | |
| if not attachments: | |
| return "" | |
| formatted = "\n\n=== ATTACHED DOCUMENTS ===\n\n" | |
| formatted += f"The user has attached {len(attachments)} document(s) for context:\n\n" | |
| for idx, att in enumerate(attachments, 1): | |
| formatted += f"📎 Document {idx}: {att['filename']} ({att['file_type'].upper()} file, {att.get('char_count', 0):,} characters)\n" | |
| formatted += f"Content:\n{att['content']}\n\n" | |
| formatted += "---\n\n" | |
| formatted += "=== END ATTACHED DOCUMENTS ===\n\n" | |
| formatted += "IMPORTANT: Use the attached documents as primary reference material when answering the user's query.\n\n" | |
| return formatted | |
| def format_conversation_history(self, conversation_history: list, attachments: list = None) -> str: | |
| """Format conversation history + attachments for context injection into agent prompts""" | |
| formatted = "" | |
| # Add attachments first (at the top of context) | |
| if attachments: | |
| formatted += self.format_attachments_context(attachments) | |
| # Then add conversation history | |
| if conversation_history: | |
| formatted += "\n=== PREVIOUS CONVERSATION HISTORY ===\n\n" | |
| for idx, turn in enumerate(conversation_history, 1): | |
| if turn.get("type") == "query": | |
| formatted += f"--- Previous Query {idx} ---\n{turn['content']}\n\n" | |
| elif turn.get("type") in ["report", "simple_search"]: | |
| # Truncate long reports to first 2000 chars to save context | |
| content = turn['content'] | |
| if len(content) > 2000: | |
| content = content[:2000] + "\n... [Report truncated for context] ..." | |
| formatted += f"--- Previous {'Report' if turn['type'] == 'report' else 'Answer'} {idx} ---\n{content}\n\n" | |
| formatted += "=== END OF PREVIOUS CONVERSATION ===\n\n" | |
| return formatted | |
| async def run(self, query: str, conversation_history: list = None, attachments: list = None): | |
| """ Run the deep research process, yielding the status updates and the final report""" | |
| if conversation_history is None: | |
| conversation_history = [] | |
| if attachments is None: | |
| attachments = [] | |
| trace_id = gen_trace_id() | |
| model_display = get_model_display_name(self.model_choice) | |
| with trace("Research trace", trace_id=trace_id): | |
| print(f"Using Brave Search API and {model_display}") | |
| if attachments: | |
| print(f"With {len(attachments)} attached document(s)") | |
| yield f"INIT|Using Brave Search API and {model_display} (with {len(attachments)} attachment(s))" | |
| else: | |
| yield f"INIT|Using Brave Search API and {model_display}" | |
| print("Starting research...") | |
| search_plan = await self.plan_searches(query, conversation_history, attachments) | |
| num_searches = len(search_plan.searches) | |
| yield f"PLANNING_COMPLETE|{num_searches}" | |
| # Perform searches with progress updates | |
| results = [] | |
| total_searches = len(search_plan.searches) | |
| for idx, item in enumerate(search_plan.searches, 1): | |
| yield f"SEARCH_PROGRESS|{idx}|{total_searches}" | |
| result = await self.search(item) | |
| if result is not None: | |
| results.append(result) | |
| yield "SEARCH_COMPLETE|All searches finished" | |
| yield "WRITING_START|Starting to write report..." | |
| report = await self.write_report(query, results, conversation_history, attachments) | |
| print(f"DEBUG: Report object created, markdown_report length: {len(report.markdown_report)}") | |
| # Yield the report BEFORE sending email | |
| yield f"REPORT_READY|{report.markdown_report}" | |
| yield "EMAIL_START|Sending email..." | |
| await self.send_email(report) | |
| yield "COMPLETE|Research complete" | |
| async def plan_searches(self, query: str, conversation_history: list = None, attachments: list = None) -> WebSearchPlan: | |
| """ Plan the searches to perform for the query """ | |
| if conversation_history is None: | |
| conversation_history = [] | |
| if attachments is None: | |
| attachments = [] | |
| print("Planning searches...") | |
| await self.wait_for_rate_limit() | |
| # Use selected model for planning | |
| from planner_agent import INSTRUCTIONS as PLANNER_INSTRUCTIONS, WebSearchPlan | |
| dynamic_planner = Agent( | |
| name="PlannerAgent", | |
| instructions=PLANNER_INSTRUCTIONS, | |
| model=self.model, | |
| output_type=WebSearchPlan, | |
| ) | |
| # Format the input with conversation history and attachments if available | |
| context = self.format_conversation_history(conversation_history, attachments) | |
| input_text = f"{context}Current Query: {query}" | |
| result = await Runner.run( | |
| dynamic_planner, | |
| input_text, | |
| ) | |
| print(f"Will perform {len(result.final_output.searches)} searches") | |
| return result.final_output_as(WebSearchPlan) | |
| async def perform_searches(self, search_plan: WebSearchPlan, progress_callback=None): | |
| """ Perform the searches to perform for the query """ | |
| print("Searching...") | |
| num_completed = 0 | |
| results = [] | |
| total_searches = len(search_plan.searches) | |
| # Process searches sequentially with intelligent rate limiting | |
| for idx, item in enumerate(search_plan.searches, 1): | |
| # Notify progress before each search | |
| if progress_callback: | |
| await progress_callback(f"SEARCH_PROGRESS|{idx}|{total_searches}") | |
| result = await self.search(item) | |
| if result is not None: | |
| results.append(result) | |
| num_completed += 1 | |
| print(f"Searching... {num_completed}/{total_searches} completed") | |
| print("Finished searching") | |
| return results | |
| async def search(self, item: WebSearchItem) -> str | None: | |
| """ Perform a search for the query with retry logic for rate limits """ | |
| input = f"Search term: {item.query}\nReason for searching: {item.reason}" | |
| max_retries = 3 | |
| for attempt in range(max_retries): | |
| try: | |
| await self.wait_for_rate_limit() | |
| result = await Runner.run( | |
| search_agent, | |
| input, | |
| ) | |
| return str(result.final_output) | |
| except Exception as e: | |
| # Check if it's a rate limit error | |
| if "429" in str(e) or "quota" in str(e).lower(): | |
| if attempt < max_retries - 1: | |
| wait_time = 6 * (attempt + 1) # Exponential backoff: 6, 12, 18 seconds | |
| print(f"Rate limit hit, waiting {wait_time}s before retry...") | |
| await asyncio.sleep(wait_time) | |
| else: | |
| print(f"Failed after {max_retries} retries due to rate limits") | |
| return None | |
| else: | |
| print(f"Search failed: {str(e)}") | |
| return None | |
| return None | |
| async def write_report(self, query: str, search_results: list[str], conversation_history: list = None, attachments: list = None) -> ReportData: | |
| """ Write the report for the query with retry logic """ | |
| if conversation_history is None: | |
| conversation_history = [] | |
| if attachments is None: | |
| attachments = [] | |
| print("Thinking about report...") | |
| # Format the input with conversation history and attachments if available | |
| context = self.format_conversation_history(conversation_history, attachments) | |
| input = f"{context}Current Query: {query}\n\nNew Search Results: {search_results}" | |
| # Create writer with selected model | |
| dynamic_writer = Agent( | |
| name="WriterAgentPlain", | |
| instructions=WRITER_INSTRUCTIONS, | |
| model=self.model, | |
| output_type=str, | |
| model_settings=ModelSettings( | |
| max_tokens=32000, | |
| temperature=0.7, | |
| ), | |
| ) | |
| max_retries = 3 | |
| for attempt in range(max_retries): | |
| try: | |
| # Use intelligent rate limiting instead of fixed delay | |
| await self.wait_for_rate_limit() | |
| # Use plain text agent to avoid structured output truncation | |
| result = await Runner.run( | |
| dynamic_writer, | |
| input, | |
| ) | |
| print("Finished writing report") | |
| # Extract the markdown report from plain text output | |
| markdown_report = str(result.final_output) | |
| # Create a ReportData object manually | |
| report_data = ReportData( | |
| short_summary="Research report generated successfully.", | |
| markdown_report=markdown_report, | |
| follow_up_questions=[] | |
| ) | |
| return report_data | |
| except Exception as e: | |
| if "429" in str(e) or "quota" in str(e).lower(): | |
| if attempt < max_retries - 1: | |
| wait_time = 10 * (attempt + 1) # Wait 10, 20, 30 seconds | |
| print(f"Rate limit hit while writing report, waiting {wait_time}s...") | |
| await asyncio.sleep(wait_time) | |
| else: | |
| raise Exception("Failed to write report after multiple retries due to rate limits") | |
| else: | |
| raise | |
| async def send_email(self, report: ReportData) -> None: | |
| """ Send email with retry logic for rate limits """ | |
| print("Writing email...") | |
| max_retries = 3 | |
| for attempt in range(max_retries): | |
| try: | |
| await self.wait_for_rate_limit() | |
| result = await Runner.run( | |
| email_agent, | |
| report.markdown_report, | |
| ) | |
| print("Email sent") | |
| return report | |
| except Exception as e: | |
| if "429" in str(e) or "quota" in str(e).lower(): | |
| if attempt < max_retries - 1: | |
| wait_time = 15 * (attempt + 1) # Wait 15, 30, 45 seconds | |
| print(f"Rate limit hit while sending email, waiting {wait_time}s...") | |
| await asyncio.sleep(wait_time) | |
| else: | |
| print("Failed to send email after multiple retries due to rate limits") | |
| return report # Return without sending email | |
| else: | |
| print(f"Email sending failed: {str(e)}") | |
| return report | |
| async def run_simple_search(self, query: str, conversation_history: list = None, attachments: list = None): | |
| """Run a quick follow-up search without full research workflow""" | |
| if conversation_history is None: | |
| conversation_history = [] | |
| if attachments is None: | |
| attachments = [] | |
| print("Running simple search...") | |
| if attachments: | |
| print(f"With {len(attachments)} attached document(s)") | |
| yield f"SIMPLE_SEARCH_START|Starting quick search (with {len(attachments)} attachment(s))..." | |
| else: | |
| yield "SIMPLE_SEARCH_START|Starting quick search..." | |
| # Import simple search agent | |
| from simple_search_agent import simple_search_agent | |
| # Format conversation history and attachments for context | |
| context = self.format_conversation_history(conversation_history, attachments) | |
| input_text = f"{context}Current Question: {query}" | |
| try: | |
| await self.wait_for_rate_limit() | |
| # Run the simple search agent | |
| result = await Runner.run( | |
| simple_search_agent, | |
| input_text, | |
| ) | |
| answer = str(result.final_output) | |
| print("Simple search complete") | |
| yield f"SIMPLE_SEARCH_COMPLETE|{answer}" | |
| except Exception as e: | |
| error_message = f"Simple search failed: {str(e)}" | |
| print(error_message) | |
| yield f"SIMPLE_SEARCH_ERROR|{error_message}" |