deep_research / research_manager.py
OzanSevindir's picture
Upload folder using huggingface_hub
96ad218 verified
from agents import Runner, trace, gen_trace_id, Agent, ModelSettings
from search_agent import search_agent
from planner_agent import planner_agent, WebSearchItem, WebSearchPlan
from writer_agent import writer_agent_plain, writer_agent, ReportData, INSTRUCTIONS as WRITER_INSTRUCTIONS
from email_agent import email_agent
import asyncio
import time
from pydantic import BaseModel, Field
from model_config import get_model, get_model_display_name
class ResearchManager:
def __init__(self, model_choice="gemini-2.5-flash (Default)"):
# Track request timestamps for intelligent rate limiting
self.request_times = []
self.max_rpm = 10 # Default: 10 requests per minute
self.rate_limit_window = 60 # seconds
self.model_choice = model_choice
self.model = get_model(model_choice)
# Adjust rate limits based on model
if "llama" in model_choice.lower():
self.max_rpm = 30 # Groq has higher limits
elif "pro" in model_choice.lower():
self.max_rpm = 5 # Experimental models have lower limits
async def wait_for_rate_limit(self):
"""Intelligent rate limiting: only sleep as much as needed"""
current_time = time.time()
# Remove requests older than the rate limit window
self.request_times = [t for t in self.request_times if current_time - t < self.rate_limit_window]
# If we've hit the rate limit, wait until the oldest request expires
if len(self.request_times) >= self.max_rpm:
oldest_request = self.request_times[0]
wait_time = self.rate_limit_window - (current_time - oldest_request) + 0.1 # Add 0.1s buffer
if wait_time > 0:
print(f"Rate limit: waiting {wait_time:.1f}s...")
await asyncio.sleep(wait_time)
current_time = time.time()
# Record this request
self.request_times.append(current_time)
def format_attachments_context(self, attachments: list) -> str:
"""Format attachments for context injection into agent prompts"""
if not attachments:
return ""
formatted = "\n\n=== ATTACHED DOCUMENTS ===\n\n"
formatted += f"The user has attached {len(attachments)} document(s) for context:\n\n"
for idx, att in enumerate(attachments, 1):
formatted += f"📎 Document {idx}: {att['filename']} ({att['file_type'].upper()} file, {att.get('char_count', 0):,} characters)\n"
formatted += f"Content:\n{att['content']}\n\n"
formatted += "---\n\n"
formatted += "=== END ATTACHED DOCUMENTS ===\n\n"
formatted += "IMPORTANT: Use the attached documents as primary reference material when answering the user's query.\n\n"
return formatted
def format_conversation_history(self, conversation_history: list, attachments: list = None) -> str:
"""Format conversation history + attachments for context injection into agent prompts"""
formatted = ""
# Add attachments first (at the top of context)
if attachments:
formatted += self.format_attachments_context(attachments)
# Then add conversation history
if conversation_history:
formatted += "\n=== PREVIOUS CONVERSATION HISTORY ===\n\n"
for idx, turn in enumerate(conversation_history, 1):
if turn.get("type") == "query":
formatted += f"--- Previous Query {idx} ---\n{turn['content']}\n\n"
elif turn.get("type") in ["report", "simple_search"]:
# Truncate long reports to first 2000 chars to save context
content = turn['content']
if len(content) > 2000:
content = content[:2000] + "\n... [Report truncated for context] ..."
formatted += f"--- Previous {'Report' if turn['type'] == 'report' else 'Answer'} {idx} ---\n{content}\n\n"
formatted += "=== END OF PREVIOUS CONVERSATION ===\n\n"
return formatted
async def run(self, query: str, conversation_history: list = None, attachments: list = None):
""" Run the deep research process, yielding the status updates and the final report"""
if conversation_history is None:
conversation_history = []
if attachments is None:
attachments = []
trace_id = gen_trace_id()
model_display = get_model_display_name(self.model_choice)
with trace("Research trace", trace_id=trace_id):
print(f"Using Brave Search API and {model_display}")
if attachments:
print(f"With {len(attachments)} attached document(s)")
yield f"INIT|Using Brave Search API and {model_display} (with {len(attachments)} attachment(s))"
else:
yield f"INIT|Using Brave Search API and {model_display}"
print("Starting research...")
search_plan = await self.plan_searches(query, conversation_history, attachments)
num_searches = len(search_plan.searches)
yield f"PLANNING_COMPLETE|{num_searches}"
# Perform searches with progress updates
results = []
total_searches = len(search_plan.searches)
for idx, item in enumerate(search_plan.searches, 1):
yield f"SEARCH_PROGRESS|{idx}|{total_searches}"
result = await self.search(item)
if result is not None:
results.append(result)
yield "SEARCH_COMPLETE|All searches finished"
yield "WRITING_START|Starting to write report..."
report = await self.write_report(query, results, conversation_history, attachments)
print(f"DEBUG: Report object created, markdown_report length: {len(report.markdown_report)}")
# Yield the report BEFORE sending email
yield f"REPORT_READY|{report.markdown_report}"
yield "EMAIL_START|Sending email..."
await self.send_email(report)
yield "COMPLETE|Research complete"
async def plan_searches(self, query: str, conversation_history: list = None, attachments: list = None) -> WebSearchPlan:
""" Plan the searches to perform for the query """
if conversation_history is None:
conversation_history = []
if attachments is None:
attachments = []
print("Planning searches...")
await self.wait_for_rate_limit()
# Use selected model for planning
from planner_agent import INSTRUCTIONS as PLANNER_INSTRUCTIONS, WebSearchPlan
dynamic_planner = Agent(
name="PlannerAgent",
instructions=PLANNER_INSTRUCTIONS,
model=self.model,
output_type=WebSearchPlan,
)
# Format the input with conversation history and attachments if available
context = self.format_conversation_history(conversation_history, attachments)
input_text = f"{context}Current Query: {query}"
result = await Runner.run(
dynamic_planner,
input_text,
)
print(f"Will perform {len(result.final_output.searches)} searches")
return result.final_output_as(WebSearchPlan)
async def perform_searches(self, search_plan: WebSearchPlan, progress_callback=None):
""" Perform the searches to perform for the query """
print("Searching...")
num_completed = 0
results = []
total_searches = len(search_plan.searches)
# Process searches sequentially with intelligent rate limiting
for idx, item in enumerate(search_plan.searches, 1):
# Notify progress before each search
if progress_callback:
await progress_callback(f"SEARCH_PROGRESS|{idx}|{total_searches}")
result = await self.search(item)
if result is not None:
results.append(result)
num_completed += 1
print(f"Searching... {num_completed}/{total_searches} completed")
print("Finished searching")
return results
async def search(self, item: WebSearchItem) -> str | None:
""" Perform a search for the query with retry logic for rate limits """
input = f"Search term: {item.query}\nReason for searching: {item.reason}"
max_retries = 3
for attempt in range(max_retries):
try:
await self.wait_for_rate_limit()
result = await Runner.run(
search_agent,
input,
)
return str(result.final_output)
except Exception as e:
# Check if it's a rate limit error
if "429" in str(e) or "quota" in str(e).lower():
if attempt < max_retries - 1:
wait_time = 6 * (attempt + 1) # Exponential backoff: 6, 12, 18 seconds
print(f"Rate limit hit, waiting {wait_time}s before retry...")
await asyncio.sleep(wait_time)
else:
print(f"Failed after {max_retries} retries due to rate limits")
return None
else:
print(f"Search failed: {str(e)}")
return None
return None
async def write_report(self, query: str, search_results: list[str], conversation_history: list = None, attachments: list = None) -> ReportData:
""" Write the report for the query with retry logic """
if conversation_history is None:
conversation_history = []
if attachments is None:
attachments = []
print("Thinking about report...")
# Format the input with conversation history and attachments if available
context = self.format_conversation_history(conversation_history, attachments)
input = f"{context}Current Query: {query}\n\nNew Search Results: {search_results}"
# Create writer with selected model
dynamic_writer = Agent(
name="WriterAgentPlain",
instructions=WRITER_INSTRUCTIONS,
model=self.model,
output_type=str,
model_settings=ModelSettings(
max_tokens=32000,
temperature=0.7,
),
)
max_retries = 3
for attempt in range(max_retries):
try:
# Use intelligent rate limiting instead of fixed delay
await self.wait_for_rate_limit()
# Use plain text agent to avoid structured output truncation
result = await Runner.run(
dynamic_writer,
input,
)
print("Finished writing report")
# Extract the markdown report from plain text output
markdown_report = str(result.final_output)
# Create a ReportData object manually
report_data = ReportData(
short_summary="Research report generated successfully.",
markdown_report=markdown_report,
follow_up_questions=[]
)
return report_data
except Exception as e:
if "429" in str(e) or "quota" in str(e).lower():
if attempt < max_retries - 1:
wait_time = 10 * (attempt + 1) # Wait 10, 20, 30 seconds
print(f"Rate limit hit while writing report, waiting {wait_time}s...")
await asyncio.sleep(wait_time)
else:
raise Exception("Failed to write report after multiple retries due to rate limits")
else:
raise
async def send_email(self, report: ReportData) -> None:
""" Send email with retry logic for rate limits """
print("Writing email...")
max_retries = 3
for attempt in range(max_retries):
try:
await self.wait_for_rate_limit()
result = await Runner.run(
email_agent,
report.markdown_report,
)
print("Email sent")
return report
except Exception as e:
if "429" in str(e) or "quota" in str(e).lower():
if attempt < max_retries - 1:
wait_time = 15 * (attempt + 1) # Wait 15, 30, 45 seconds
print(f"Rate limit hit while sending email, waiting {wait_time}s...")
await asyncio.sleep(wait_time)
else:
print("Failed to send email after multiple retries due to rate limits")
return report # Return without sending email
else:
print(f"Email sending failed: {str(e)}")
return report
async def run_simple_search(self, query: str, conversation_history: list = None, attachments: list = None):
"""Run a quick follow-up search without full research workflow"""
if conversation_history is None:
conversation_history = []
if attachments is None:
attachments = []
print("Running simple search...")
if attachments:
print(f"With {len(attachments)} attached document(s)")
yield f"SIMPLE_SEARCH_START|Starting quick search (with {len(attachments)} attachment(s))..."
else:
yield "SIMPLE_SEARCH_START|Starting quick search..."
# Import simple search agent
from simple_search_agent import simple_search_agent
# Format conversation history and attachments for context
context = self.format_conversation_history(conversation_history, attachments)
input_text = f"{context}Current Question: {query}"
try:
await self.wait_for_rate_limit()
# Run the simple search agent
result = await Runner.run(
simple_search_agent,
input_text,
)
answer = str(result.final_output)
print("Simple search complete")
yield f"SIMPLE_SEARCH_COMPLETE|{answer}"
except Exception as e:
error_message = f"Simple search failed: {str(e)}"
print(error_message)
yield f"SIMPLE_SEARCH_ERROR|{error_message}"