Spaces:
Runtime error
Runtime error
File size: 9,481 Bytes
45d075b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 | # Import libraries
import gradio as gr
import logging
import time
from datetime import datetime
from collections import defaultdict
from typing import Optional, List, Tuple
from dotenv import load_dotenv
from clarifier_agent import clarifier_agent
from research_manager import ResearchManager
from agents import Runner
# Load environment variables
load_dotenv(override=True)
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Create RateLimiter class to manage user session within the deep research app
class RateLimiter:
def __init__(self, requests_per_minute: int = 2, daily_limit: int = 4):
self.requests_per_minute = requests_per_minute
self.daily_limit = daily_limit
# Track request timestamps and daily counts
self.request_time = defaultdict(list)
self.daily_counts = defaultdict(lambda: {"date": "", "count": 0})
# Get today's date
def get_today(self) -> str:
return datetime.now().strftime("%Y-%m-%d")
# Remove requests older than 1 minute
def cleanup_old_requests(self, user_id: str) -> None:
now = time.time()
self.request_time[user_id] = [
timestamp for timestamp in self.request_time[user_id]
if now - timestamp < 60
]
# Check if user can make a new request
def check_limits(self, user_id: str) -> Tuple[bool, str]:
# Clean up the old requests
self.cleanup_old_requests(user_id)
# Check limit of request per minute
recent_requests = len(self.request_time[user_id])
if recent_requests >= self.requests_per_minute:
return False, f"Rate limit exceeded: Max {self.requests_per_minute} requests per minute."
# Check daily limit
today = self.get_today()
user_data = self.daily_counts[user_id]
if user_data["date"] != today:
user_data["date"] = today
user_data["count"] = 0
if user_data["count"] >= self.daily_limit:
return False, f"Daily limit exceeded: Max {self.daily_limit} requests per day."
# Record if new day
self.request_time[user_id].append(time.time())
user_data["count"] += 1
return True, "OK"
# Create global rate limiter
rate_limiter = RateLimiter(requests_per_minute=2, daily_limit=2)
# Define a function to get user_id
def get_user_id(request: Optional[gr.Request] = None) -> str:
if request is None:
return "anonymous"
try:
forwarded_for = request.headers.get("X-Forwarded-For")
if forwarded_for:
return forwarded_for.split(",")[0].strip()
client_host = getattr(request, 'host', None)
if client_host:
return client_host
except Exception as e:
logger.error(f"Error getting user ID: {str(e)}")
return "unknown_user"
# Define a function to generate clarifying questions from clarifier_agent
async def generate_clarification_questions(query: str, request: gr.Request = None) -> List[str]:
# Input validation
if not query or not query.strip():
return ["Please enter a research query first."]
# Rate limiting
user_id = get_user_id(request)
allowed, message = rate_limiter.check_limits(user_id)
if not allowed:
logger.info(f"Rate limit exceeded for user {user_id}: {message}")
return f"{message}"
try:
result = await Runner.run(clarifier_agent, input=query.strip())
questions = result.final_output.questions
# Validate the results
if not questions or len(questions) == 0:
return ["Could not generate questions. Please try again."]
logger.info(f"Generated {len(questions)} questions for user {user_id}")
return questions
except Exception as e:
logger.error(f"Error generating questions for user {user_id}: {str(e)}")
return ["Error generating questions. Please try again."]
# Define a function to run the full research pipeline
async def run_deep_research_pipeline(query: str, q1: str, q2: str, q3: str,
a1: str, a2: str, a3: str,
send_email: bool, recipient_email: str,
request: gr.Request = None):
# Input validation
if not query or not query.strip():
yield "β Please enter a research query first."
return
# Validate email
if send_email and not recipient_email:
yield "β Please enter a recipient email to send the report."
return
# Rate limiting
user_id = get_user_id(request)
allowed, message = rate_limiter.check_limits(user_id)
if not allowed:
yield f"β {message}"
return
# Collect questions and answers for research
questions = [q1.strip(), q2.strip(), q3.strip()]
answers = [a1.strip(), a2.strip(), a3.strip()]
# Keep only non-empty pairs
valid_pairs = [(q, a) for q, a in zip(questions, answers) if q and a]
# Run the research manager agent
research_manager = ResearchManager()
try:
valid_questions = [q for q, a in valid_pairs]
valid_answers = [a for q, a in valid_pairs]
logger.info(f"Starting research for user {user_id} with {len(valid_questions)} question-answer pairs")
async for step in research_manager.run_pipeline(
query,
questions,
answers,
send_email,
recipient_email
):
yield step
except Exception as e:
logger.error(f"Error during research for user {user_id}: {str(e)}")
yield f"β Error during research: {str(e)}"
return
# Define a function for gradio ui
def create_ui() -> gr.Blocks:
with gr.Blocks(
theme=gr.themes.Default(primary_hue="blue"),
title="Deep Research Assistant"
) as interface:
# Header
gr.Markdown("# π Deep Research Agent")
gr.Markdown("**Step 1:** Enter query β **Step 2:** Answer questions β **Step 3:** Get research report")
# Input section
with gr.Group():
query_input = gr.Textbox(
label = "What would you like to reserach?",
placeholder="Enter your research question here...",
lines=2
)
generate_btn = gr.Button(
"Generate Clarifying Questions",
variant="primary",
size="lg"
)
# Question section
with gr.Group():
gr.Markdown("### π Clarifying Questions")
question_1 = gr.Textbox(label="Question 1", interactive=False)
answer_1 = gr.Textbox(label="Your Answer 1", placeholder="Enter your answer...")
question_2 = gr.Textbox(label="Question 2", interactive=False)
answer_2 = gr.Textbox(label="Your Answer 2", placeholder="Enter your answer...")
question_3 = gr.Textbox(label="Question 3", interactive=False)
answer_3 = gr.Textbox(label="Your Answer 3", placeholder="Enter your answer...")
# Email options
with gr.Group():
gr.Markdown("### π§ Email Options")
send_email_checkbox = gr.Checkbox(label="Send report via email")
email_input = gr.Textbox(
label="Recipient Email",
placeholder="recipient@example.com",
visible=False
)
# Action button
research_btn = gr.Button(
"π Start Research",
variant="secondary",
size="lg"
)
# Results
with gr.Group():
gr.Markdown("### π Results")
results_output = gr.Markdown(
value="Results will appear here...",
height=400
)
# Event handlers
generate_btn.click(
fn=generate_clarification_questions,
inputs=[query_input],
outputs=[question_1, question_2, question_3]
)
send_email_checkbox.change(
fn=lambda checked: gr.update(visible=checked),
inputs=[send_email_checkbox],
outputs=[email_input]
)
research_btn.click(
fn=run_deep_research_pipeline,
inputs=[
query_input,
question_1, question_2, question_3,
answer_1, answer_2, answer_3,
send_email_checkbox, email_input
],
outputs=[results_output]
)
return interface
def main():
"""Main application entry point"""
# Setup logging
logger.info("Starting Deep Research Agent...")
# Create and launch UI
interface = create_ui()
# Launch with sensible defaults
interface.launch(
server_name="127.0.0.1", # Local access only (secure)
server_port=7860, # Standard Gradio port
inbrowser=True, # Open browser automatically
share=False, # Don't create public link (secure)
show_error=True, # Show detailed errors in UI
quiet=False # Show startup logs
)
if __name__ == "__main__":
main()
|