Leonardo
Update app.py
25af899 verified
raw
history blame
34.6 kB
"""Main application for the OpenDeepResearch Gradio interface."""
import sys
import mimetypes
import traceback
from dataclasses import dataclass
import os
import re
import shutil
import time
from typing import Optional, Dict, Any
from datetime import datetime
from cleantext import clean
from dotenv import load_dotenv
from huggingface_hub import login
import gradio as gr
from scripts.text_inspector_tool import TextInspectorTool
from scripts.text_web_browser import (
ArchiveSearchTool,
FinderTool,
FindNextTool,
PageDownTool,
PageUpTool,
SimpleTextBrowser,
VisitTool,
)
from scripts.visual_qa import visualizer
from scripts.text_cleaner_tool import TextCleanerTool
from smolagents import (
CodeAgent,
HfApiModel,
LiteLLMModel,
OpenAIServerModel,
TransformersModel,
GoogleSearchTool,
Tool,
)
from smolagents.agent_types import AgentText # AgentImage, AgentAudio
from smolagents.gradio_ui import pull_messages_from_step, handle_agent_output_types
# Constants and configurations - Converted to UPPER_CASE
AUTHORIZED_IMPORTS = [
"requests", # Web requests (fetching data from the internet)
"zipfile", # Working with ZIP archives
"pandas", # Data manipulation and analysis (DataFrames)
"numpy", # Numerical computing (arrays, linear algebra)
"sympy", # Symbolic mathematics (algebra, calculus)
"json", # JSON data serialization/deserialization
"bs4", # Beautiful Soup for HTML/XML parsing
"pubchempy", # Accessing PubChem chemical database
"xml", # XML processing
"yahoo_finance", # Fetching stock data
"Bio", # Bioinformatics tools (e.g., sequence analysis)
"sklearn", # Scikit-learn for machine learning
"scipy", # Scientific computing (stats, optimization)
"pydub", # Audio manipulation
"PIL", # Pillow for image processing
"chess", # Chess-related functionality
"PyPDF2", # PDF manipulation
"pptx", # PowerPoint file manipulation
"torch", # PyTorch for neural networks
"datetime", # Date and time handling
"fractions", # Rational number arithmetic
"csv", # CSV file reading/writing
"cleantext", # Text cleaning and normalization
"os", # Operating system interaction (file system, etc.) VERY IMPORTANT
"re", # Regular expressions for text processing
"collections", # Useful data structures (e.g., defaultdict, Counter)
"math", # Basic mathematical functions
"random", # Random number generation
"io", # Input/output streams
"urllib.parse", # URL parsing and manipulation (safe URL handling)
"typing", # Support for type hints (improve code clarity)
"concurrent.futures", # For parallel execution
"time", # Measuring time
"tempfile", # Creating temporary files and directories
# Data Visualization (if needed) - Consider security implications carefully
"matplotlib", # Plotting library (basic charts)
"seaborn", # Statistical data visualization (more advanced)
# Web Scraping (more specific/controlled) - Consider ethical implications
"lxml", # Faster XML/HTML processing (alternative to bs4)
"selenium", # Automated browser control (for dynamic websites)
# Database interaction (if needed) - Handle credentials securely!
"sqlite3", # SQLite database access
# Task scheduling
"schedule", # Allow the agent to schedule tasks
]
USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0"
)
BROWSER_CONFIG = {
"viewport_size": 1024 * 5,
"downloads_folder": "downloads_folder",
"request_kwargs": {
"headers": {"User-Agent": USER_AGENT},
"timeout": 300,
},
"serpapi_key": os.getenv("SERPAPI_API_KEY"),
}
CUSTOM_ROLE_CONVERSIONS = {"tool-call": "assistant", "tool-response": "user"}
ALLOWED_FILE_TYPES = [
"application/pdf",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"text/plain",
"text/markdown",
"application/json",
"image/png",
"image/webp",
"image/jpeg",
"image/gif",
"video/mp4",
"audio/mpeg",
"audio/wav",
"audio/ogg",
]
# Maximum chat history length to prevent memory issues
MAX_CHAT_HISTORY = 100
# Maximum uploaded file size in MB
MAX_FILE_SIZE_MB = 50
# File cleanup schedule (in days)
FILE_RETENTION_DAYS = 7
def setup_environment():
"""
Initialize environment variables and authentication.
Returns:
bool: True if setup was successful, False otherwise
"""
load_dotenv(override=True)
hf_token = os.getenv("HF_TOKEN")
if hf_token: # check if token is actually set
try:
login(hf_token)
print("HF_TOKEN (last 10 characters):", hf_token[-10:])
return True
except (ValueError, ConnectionError) as e: # More specific exceptions
print(f"Failed to login with HF token: {e}")
return False
else:
print("HF_TOKEN not found in environment variables.")
return False
class ModelManager:
"""Manages model loading and initialization."""
@staticmethod
def load_model(chosen_inference: str, model_id: str, key_manager=None):
"""
Load the specified model with appropriate configuration.
Args:
chosen_inference: Type of inference to use
model_id: ID of the model to load
key_manager: Optional key manager for API keys
Returns:
Model instance
Raises:
ValueError: If inference type is invalid or required parameters missing
RuntimeError: If model loading fails
"""
if chosen_inference == "hf_api":
return HfApiModel(model_id=model_id)
if chosen_inference == "hf_api_provider":
return HfApiModel(provider="together")
if chosen_inference == "litellm":
return LiteLLMModel(model_id=model_id)
if chosen_inference == "openai":
if not key_manager:
raise ValueError("Key manager required for OpenAI model")
return OpenAIServerModel(
model_id=model_id, api_key=key_manager.get_key("openai_api_key")
)
if chosen_inference == "transformers":
return TransformersModel(
model_id="huggingfacetb/smollm2-1.7b-instruct",
device_map="auto",
max_new_tokens=1000,
)
raise ValueError(f"Invalid inference type: {chosen_inference}")
# This class only has one public method, but that's acceptable for a registry class
# whose purpose is to provide factory methods
class ToolRegistry:
"""Manages tool initialization and organization."""
@staticmethod
def load_web_tools(model, browser, text_limit=20000):
"""
Initialize and return web-related tools.
Args:
model: LLM model for text inspector
browser: Browser instance for web tools
text_limit: Maximum text length for processing
Returns:
List of web tools
"""
return [
GoogleSearchTool(provider="serper"),
VisitTool(browser),
PageUpTool(browser),
PageDownTool(browser),
FinderTool(browser),
FindNextTool(browser),
ArchiveSearchTool(browser),
TextInspectorTool(model, text_limit),
]
@staticmethod
def load_image_generation_tools():
"""
Initialize and return image generation tools.
Returns:
Image generation tool
Raises:
RuntimeError: If tool initialization fails
"""
try:
return Tool.from_space(
space_id="xkerser/flux.1-dev",
name="image_generator",
description=(
"Generates high-quality AgentImage. "
"With text prompt (77 token limit)."
),
)
except (
ConnectionError,
ValueError,
RuntimeError,
) as e: # More specific exceptions
print(f" Couldn't initialize image generation tool: {e}")
raise RuntimeError(f"Image generation tool initialization failed: {e}")
@staticmethod
def load_clean_text_tool():
"""
Initialize and return text cleaning tool.
Returns:
Text cleaning tool
Raises:
RuntimeError: If tool initialization fails
"""
try:
return TextCleanerTool()
except (ValueError, RuntimeError) as e: # More specific exceptions
print(f" Couldn't initialize clean text tool: {e}")
raise RuntimeError(f"Clean text tool initialization failed: {e}")
def create_agent():
"""
Creates a fresh agent instance with properly configured tools.
Returns:
CodeAgent: Configured agent ready for use
Raises:
ValueError: If tool validation fails
RuntimeError: If agent creation fails
"""
try:
# Initialize model
model = LiteLLMModel(
custom_role_conversions=CUSTOM_ROLE_CONVERSIONS,
model_id="openrouter/deepseek/deepseek-chat-v3-0324:free",
)
# Initialize tools
text_limit = 30000
browser = SimpleTextBrowser(**BROWSER_CONFIG)
# Collect all tools in a single list
web_tools = ToolRegistry.load_web_tools(model, browser, text_limit)
image_generator = ToolRegistry.load_image_generation_tools()
clean_text = TextCleanerTool()
# Combine all tools into a single list
all_tools = [visualizer] + web_tools + [image_generator, clean_text]
# Validate tools before creating agent
for tool in all_tools:
if not isinstance(tool, Tool):
raise ValueError(
f"Invalid tool type: {type(tool)}. "
f"All tools must be instances of Tool class."
)
return CodeAgent(
model=model,
tools=all_tools,
max_steps=12,
verbosity_level=2,
additional_authorized_imports=AUTHORIZED_IMPORTS,
planning_interval=4,
)
except (ValueError, RuntimeError) as e:
print(f"Failed to create agent: {e}")
raise RuntimeError(f"Agent creation failed: {e}")
# Define standalone functions outside of classes
def process_message_content(content_lower: str) -> Dict[str, bool]:
"""
Process message content to determine message type.
Args:
content_lower: Lowercase message content
Returns:
Dictionary with message type flags
"""
return {
"is_document_analysis": "document analysis" in content_lower,
"is_search": "search" in content_lower,
"is_error": "error" in content_lower,
}
def stream_to_gradio(
agent,
task: str,
reset_agent_memory: bool = False,
additional_args: Optional[Dict] = None,
):
"""
Streams agent responses with improved status indicators.
Args:
agent: The agent instance to use
task: The task to perform
reset_agent_memory: Whether to reset agent memory
additional_args: Optional additional arguments
Yields:
Gradio ChatMessage objects
"""
try:
# Initial processing indicator
yield gr.ChatMessage(role="assistant", content="⏳ Processing your request...")
# Track what we've yielded to replace the processing indicator
first_message_yielded = False
# Store the step_log outside the loop to avoid the undefined-loop-variable issue
steps = list(
agent.run(
task,
stream=True,
reset=reset_agent_memory,
additional_args=additional_args,
)
)
# If no steps were returned, handle it gracefully
if not steps:
yield gr.ChatMessage(
role="assistant", content="⚠️ No response from agent. Please try again."
)
return
# Process each step
for step_log in steps:
# pull_messages_from_step is a generator function that yields messages
for message in pull_messages_from_step(step_log):
if not first_message_yielded:
# Replace the initial "processing" message
first_message_yielded = True
message.content = message.content.replace(
"⏳ Processing your request...", ""
)
# Check message content for document analysis or search references
if hasattr(message, "content") and message.content:
content_lower = message.content.lower()
message_types = process_message_content(content_lower)
if message_types["is_document_analysis"]:
message.content = f"📄 **Document Analysis:** {message.content}"
elif message_types["is_search"]:
message.content = f"🔍 **Search:** {message.content}"
yield message
# Final answer with enhanced formatting
if steps: # Make sure we have at least one step before accessing
final_answer = handle_agent_output_types(steps[-1]) # Use the last step
if isinstance(final_answer, AgentText):
yield gr.ChatMessage(
role="assistant",
content=f"✅ **Final Answer:**\n{final_answer.to_string()}",
)
else:
yield gr.ChatMessage(
role="assistant",
content=f"✅ **Final Answer:** {str(final_answer)}",
)
except (ValueError, RuntimeError) as e:
# More specific error handling
yield gr.ChatMessage(
role="assistant",
content=(
f"❌ **Error:** {str(e)}\n" f"Please try again with a different query."
),
)
except Exception as e: # Fallback for truly unexpected errors
print(f"Unexpected error in stream_to_gradio: {e}")
traceback.print_exc()
yield gr.ChatMessage(
role="assistant",
content=(
"❌ **Unexpected Error:** An unknown error occurred.\n"
"Please try again or contact support if the issue persists."
),
)
# This is a helper method that can be called statically
def cleanup_old_files(directory: str, days: int = FILE_RETENTION_DAYS):
"""
Removes files older than the specified number of days.
Args:
directory: Directory to clean up
days: Number of days to keep files
"""
if not os.path.exists(directory):
return
cutoff_time = time.time() - (days * 24 * 60 * 60)
for filename in os.listdir(directory):
file_path = os.path.join(directory, filename)
if os.path.isfile(file_path):
file_mod_time = os.path.getmtime(file_path)
if file_mod_time < cutoff_time:
try:
os.remove(file_path)
print(f"Deleted old file: {file_path}")
except (PermissionError, OSError) as e:
print(f"Failed to delete {file_path}: {str(e)}")
@dataclass
class UIComponents:
"""Container for UI components to reduce main class attribute count."""
text_input: Any = None
submit_btn: Any = None
stop_btn: Any = None
clear_btn: Any = None
status: Any = None
chatbot: Any = None
file_uploader: Any = None # renamed from upload_file to avoid conflict
upload_status: Any = None
class GradioUI:
"""Gradio user interface for the OpenDeepResearch application."""
def __init__(self, file_upload_folder=None, max_queue_size=50):
"""Initialize the Gradio UI."""
# Basic configuration
self.file_upload_folder = file_upload_folder
self.max_queue_size = max_queue_size
self.max_chat_history = MAX_CHAT_HISTORY
self.max_file_size_mb = MAX_FILE_SIZE_MB
# Initialize UI components container
self.components = UIComponents()
# Job handle for cancellation
self.job = None
# Create upload directory if specified
if self.file_upload_folder is not None: # Simplified if expression
os.makedirs(file_upload_folder, exist_ok=True)
# Clean up old files
if file_upload_folder:
cleanup_old_files(file_upload_folder)
def interact_with_agent(self, prompt, messages, session_state):
"""
Main interaction handler with the agent.
Args:
prompt: User input prompt
messages: Current message history
session_state: Session state dictionary
Yields:
Updated message history
"""
# Get or create session-specific agent
if "agent" not in session_state:
try:
session_state["agent"] = create_agent()
except RuntimeError as e:
messages.append(
gr.ChatMessage(
role="assistant", content=f"Failed to create agent: {str(e)}"
)
)
yield messages
return
try:
# Log the existence of agent memory
has_memory = hasattr(session_state["agent"], "memory")
print(f"Agent has memory: {has_memory}")
if has_memory and hasattr(session_state["agent"].memory, "steps"):
print(f"Memory steps: {len(session_state['agent'].memory.steps)}")
# Truncate messages if they exceed the maximum
if len(messages) > self.max_chat_history:
# Keep only the latest messages
messages = messages[-self.max_chat_history :]
# Add user message
messages.append(gr.ChatMessage(role="user", content=prompt))
yield messages
# Process with agent and stream responses
for msg in stream_to_gradio(
session_state["agent"], task=prompt, reset_agent_memory=False
):
messages.append(msg)
yield messages
except ValueError as e:
print(f"Value error in interaction: {str(e)}")
messages.append(
gr.ChatMessage(role="assistant", content=f"Input error: {str(e)}")
)
yield messages
except Exception as e:
print(f"Error in interaction: {str(e)}")
traceback.print_exc()
messages.append(
gr.ChatMessage(role="assistant", content=f"Error occurred: {str(e)}")
)
yield messages
def handle_file_upload(self, files, file_uploads_log):
"""
Handle file uploads with proper validation and security.
Args:
files: Files to upload
file_uploads_log: List of uploaded files
Returns:
Tuple of (status textbox, updated file_uploads_log, updated upload button visibility)
"""
if not files:
return (
gr.Textbox(value="No file uploaded", visible=True),
file_uploads_log,
)
try:
# Process the file (files[0] since we're using file_count="single")
file = files[0]
# Validate file exists
if not os.path.exists(file.name):
return (
gr.Textbox(value="File not found", visible=True),
file_uploads_log,
)
# Check file size
file_size_mb = os.path.getsize(file.name) / (1024 * 1024)
if file_size_mb > self.max_file_size_mb:
return (
gr.Textbox(
value=f"File size exceeds {self.max_file_size_mb} MB limit.",
visible=True,
),
file_uploads_log,
)
# Validate mime type
mime_type, _ = mimetypes.guess_type(file.name)
if mime_type not in ALLOWED_FILE_TYPES:
return (
gr.Textbox(value="File type disallowed", visible=True),
file_uploads_log,
)
# Sanitize file name
original_name = os.path.basename(file.name)
# Replace invalid chars with underscores
sanitized_name = re.sub(r"[^\w\-.]", "_", original_name)
# Add timestamp to ensure uniqueness
timestamp = datetime.now().strftime(
"%y%m%d_%H%M%S"
) # Correct format string
name_parts = os.path.splitext(sanitized_name)
sanitized_name = f"{name_parts[0]}_{timestamp}{name_parts[1]}"
# Save the uploaded file to the specified folder
file_path = os.path.join(self.file_upload_folder, sanitized_name)
shutil.copy(file.name, file_path)
return (
gr.Textbox(value=f"File uploaded: {original_name}", visible=True),
file_uploads_log + [file_path],
)
except FileNotFoundError as e:
return (
gr.Textbox(value=f"File not found: {str(e)}", visible=True),
file_uploads_log,
)
except PermissionError as e:
return (
gr.Textbox(value=f"Permission denied: {str(e)}", visible=True),
file_uploads_log,
)
except (IOError, OSError) as e:
return (
gr.Textbox(value=f"I/O error during upload: {str(e)}", visible=True),
file_uploads_log,
)
except Exception as e:
# For truly unexpected errors, log with more detail
print(f"Unexpected upload error: {e}")
traceback.print_exc()
return (
gr.Textbox(value=f"Error processing upload: {str(e)}", visible=True),
file_uploads_log,
)
def log_user_message(self, text_input, file_uploads_log):
"""
Process user message and handle file references.
Args:
text_input: User's text input
file_uploads_log: List of uploaded files
Returns:
Tuple of (processed message, updated text input, submit button)
"""
if not text_input.strip():
return (
"",
gr.Textbox(value="", interactive=True),
gr.Button(interactive=True),
)
# Only clean if necessary (avoid unnecessary processing)
message = text_input
if any(char in text_input for char in "€¥£-"):
message = clean(
text_input,
fix_unicode=True,
to_ascii=True,
lower=False, # Keep original case
no_line_breaks=False,
no_urls=False,
no_emails=False,
no_phone_numbers=False,
no_numbers=False,
no_digits=False,
no_currency_symbols=False,
no_punct=False,
lang="en",
)
# Add file references if any
if file_uploads_log:
files_info = "\n".join(
[f"- {os.path.basename(f)}" for f in file_uploads_log]
)
message += f"\nYou have been provided with these files:\n{files_info}"
return (
message,
gr.Textbox(
value="",
interactive=False,
placeholder="Processing your request...",
),
gr.Button(interactive=False),
)
def clear_chat(self):
"""
Clear the chat history and reset UI elements.
Returns:
Tuple of (empty chat history, interactive text input, interactive button, empty status)
"""
return (
[], # Empty chat history
[], # Empty stored messages
gr.Textbox(value="", interactive=True),
gr.Button(interactive=True),
gr.Textbox(value="", visible=False), # Clear status
)
def launch(self, share=False, **kwargs):
"""
Launch the Gradio UI with responsive layout.
Args:
share: Whether to create a public link
**kwargs: Additional keyword arguments for launch
"""
with gr.Blocks(theme="ocean", fill_height=True) as demo:
# Use Gradio's built-in responsive layout
with gr.Row():
# Sidebar (smaller on mobile)
with gr.Column(scale=1, min_width=100):
gr.Markdown(
"""# OpenDeepResearch
AI-powered research assistant using SmoLAgents
Model: deepseek/deepseek-chat-v3-0324:free"""
)
with gr.Group():
gr.Markdown("**Research Query**", container=True)
self.components.text_input = gr.Textbox(
lines=3,
label="Your request",
placeholder="Enter your research question or task",
container=False,
)
with gr.Row():
self.components.submit_btn = gr.Button(
"Run", variant="primary"
)
self.components.stop_btn = gr.Button("Stop", variant="stop")
self.components.clear_btn = gr.Button(
"Clear", variant="secondary"
)
# File upload in collapsible section
if self.file_upload_folder is not None:
with gr.Accordion("Upload Files", open=False):
self.components.file_uploader = gr.UploadButton(
"Upload a file",
file_count="single",
file_types=["pdf", "docx", "txt", "md", "json"],
)
self.components.upload_status = gr.Textbox(
label="Upload status", interactive=False, visible=False
)
# Tool information
with gr.Accordion("Available Tools", open=False):
gr.Markdown(
"""
- **Web Search**: Find information online
- **Document Analysis**: Analyze uploaded documents
- **Text Cleaning**: Format and clean text
- **Image Generation**: Create images from descriptions
"""
)
gr.HTML("<br><h5>Powered by:</h5>")
with gr.Row():
gr.HTML(
"""
<div style="display: flex; align-items: center; gap: 8px;
font-family: system-ui, -apple-system, sans-serif;">
<img src="https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/smolagents/mascot_smol.png"
style="width: 32px; height: 32px; object-fit: contain;"
alt="logo">
<a target="_blank" href="https://github.com/huggingface/smolagents">
<b>huggingface/smolagents</b>
</a>
</div>
"""
)
# Main chat area (larger)
with gr.Column(scale=3, min_width=500):
# Add session state to store session-specific data
session_state = gr.State({})
stored_messages = gr.State([])
file_uploads_log = gr.State([])
# Chat interface
self.components.chatbot = gr.Chatbot(
label="Research Assistant",
type="messages",
avatar_images=(
None,
"https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/smolagents/mascot_smol.png",
),
height=600,
elem_id="research-chatbot",
)
# Status indicator
self.components.status = gr.Textbox(
"", label="Status", interactive=False, visible=False
)
# Connect event handlers with appropriate cancellation
# File upload handler - Updated for UploadButton
if hasattr(self.components, "file_uploader") and hasattr(
self.components, "upload_status"
):
self.components.file_uploader.upload(
self.handle_file_upload,
[self.components.file_uploader, file_uploads_log],
[self.components.upload_status, file_uploads_log],
)
# Text input handler with cancellation
submit_event = (
self.components.text_input.submit(
self.log_user_message,
[self.components.text_input, file_uploads_log],
[
stored_messages,
self.components.text_input,
self.components.submit_btn,
],
)
.then(
self.interact_with_agent,
[stored_messages, self.components.chatbot, session_state],
[self.components.chatbot],
)
.then(
lambda: (
gr.Textbox(interactive=True),
gr.Button(interactive=True),
),
None,
[self.components.text_input, self.components.submit_btn],
)
)
# Button click handler with same flow
click_event = (
self.components.submit_btn.click(
self.log_user_message,
[self.components.text_input, file_uploads_log],
[
stored_messages,
self.components.text_input,
self.components.submit_btn,
],
)
.then(
self.interact_with_agent,
[stored_messages, self.components.chatbot, session_state],
[self.components.chatbot],
)
.then(
lambda: (
gr.Textbox(interactive=True),
gr.Button(interactive=True),
),
None,
[self.components.text_input, self.components.submit_btn],
)
)
# Stop button cancels ongoing operations
self.components.stop_btn.click(
None, None, None, cancels=[submit_event, click_event]
)
# Clear button
self.components.clear_btn.click(
self.clear_chat,
None,
[
self.components.chatbot,
stored_messages,
self.components.text_input,
self.components.submit_btn,
self.components.status,
],
)
# Launch with fixed queue settings (avoiding the problematic parameter)
demo.queue(
max_size=self.max_queue_size,
).launch(
share=share,
debug=True,
# Enable HTTPS in production
ssl_verify=False if kwargs.get("local_port") else True,
**kwargs,
)
def main():
"""
Main entry point for the application.
Returns:
int: Exit code (0 for success, 1 for failure)
"""
try:
# Initialize environment
if not setup_environment():
print("Failed to set up environment properly.")
return 1
# Ensure downloads folder exists
downloads_folder = BROWSER_CONFIG["downloads_folder"]
os.makedirs(f"./{downloads_folder}", exist_ok=True)
# Create uploads folder
uploads_folder = "uploaded_files"
os.makedirs(uploads_folder, exist_ok=True)
# Launch UI
print("Starting OpenDeepResearch Gradio interface...")
gradio_ui = GradioUI(file_upload_folder=uploads_folder)
gradio_ui.launch()
return 0
except KeyError as e:
print(f"Configuration error: Missing key {e}")
traceback.print_exc()
return 1
except Exception as e:
print(f"Application failed to start: {e}")
traceback.print_exc()
return 1
if __name__ == "__main__":
EXIT_CODE = main() # UPPER_CASE for constants
sys.exit(EXIT_CODE) # Use sys.exit instead of exit