Leonardo
Update app.py
fbec7e7 verified
raw
history blame
28.2 kB
"""
OpenDeepResearch Web Interface Application
This module provides a Gradio-based web interface for interacting with AI agents
using the smolagents framework. It integrates document processing tools,
web searching, and image generation capabilities.
"""
import mimetypes
import os
import re
import shutil
import datetime
from dotenv import load_dotenv
from huggingface_hub import login
import gradio as gr
from scripts.text_inspector_tool import TextInspectorTool
from scripts.text_web_browser import (
ArchiveSearchTool,
FinderTool,
FindNextTool,
PageDownTool,
PageUpTool,
SimpleTextBrowser,
VisitTool,
)
from scripts.visual_qa import visualizer
from scripts.frontmatter_tool import FrontmatterGeneratorTool
from scripts.text_cleaner_tool import TextCleanerTool
from smolagents import (
CodeAgent,
HfApiModel,
LiteLLMModel,
OpenAIServerModel,
TransformersModel,
GoogleSearchTool,
Tool,
)
from smolagents.agent_types import AgentText, AgentImage, AgentAudio
from smolagents.gradio_ui import pull_messages_from_step, handle_agent_output_types
# ------------------------ Configuration and Setup ------------------------
# Constants and configurations
AUTHORIZED_IMPORTS = [
"requests", # Web requests (fetching data from the internet)
"zipfile", # Working with ZIP archives
"pandas", # Data manipulation and analysis (DataFrames)
"numpy", # Numerical computing (arrays, linear algebra)
"sympy", # Symbolic mathematics (algebra, calculus)
"json", # JSON data serialization/deserialization
"bs4", # Beautiful Soup for HTML/XML parsing
"pubchempy", # Accessing PubChem chemical database
"yaml",
"xml", # XML processing
"yahoo_finance", # Fetching stock data
"Bio", # Bioinformatics tools (e.g., sequence analysis)
"sklearn", # Scikit-learn for machine learning
"scipy", # Scientific computing (stats, optimization)
"pydub", # Audio manipulation
"PIL", # Pillow for image processing
"chess", # Chess-related functionality
"PyPDF2", # PDF manipulation
"pptx", # PowerPoint file manipulation
"torch", # PyTorch for neural networks
"datetime", # Date and time handling
"fractions", # Rational number arithmetic
"csv", # CSV file reading/writing
"cleantext", # Text cleaning and normalization
"os", # Operating system interaction (file system, etc.) VERY IMPORTANT
"re", # Regular expressions for text processing
"collections", # Useful data structures (e.g., defaultdict, Counter)
"math", # Basic mathematical functions
"random", # Random number generation
"io", # Input/output streams
"urllib.parse", # URL parsing and manipulation (safe URL handling)
"typing", # Support for type hints (improve code clarity)
"concurrent.futures", # For parallel execution
"time", # Measuring time
"tempfile", # Creating temporary files and directories
# Data Visualization (if needed) - Consider security implications carefully
"matplotlib", # Plotting library (basic charts)
"seaborn", # Statistical data visualization (more advanced)
# Web Scraping (more specific/controlled) - Consider ethical implications
"lxml", # Faster XML/HTML processing (alternative to bs4)
"selenium", # Automated browser control (for dynamic websites)
# Database interaction (if needed) - Handle credentials securely!
"sqlite3", # SQLite database access
# Task scheduling
"schedule", # Allow the agent to schedule tasks
]
USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0"
)
BROWSER_CONFIG = {
"viewport_size": 1024 * 5,
"downloads_folder": "downloads_folder",
"request_kwargs": {
"headers": {"User-Agent": USER_AGENT},
"timeout": 300,
},
"serpapi_key": os.getenv("SERPAPI_API_KEY"),
}
CUSTOM_ROLE_CONVERSIONS = {"tool-call": "assistant", "tool-response": "user"}
ALLOWED_FILE_TYPES = [
"application/pdf",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"text/plain",
"text/markdown",
"application/json",
"image/png",
"image/webp",
"image/jpeg",
"image/gif",
"video/mp4",
"audio/mpeg",
"audio/wav",
"audio/ogg",
]
def setup_environment():
"""Initialize environment variables and authentication."""
load_dotenv(override=True)
if os.getenv("HF_TOKEN"): # Check if token is actually set
login(os.getenv("HF_TOKEN"))
print("HF_TOKEN (last 10 characters):", os.getenv("HF_TOKEN")[-10:])
else:
print("HF_TOKEN not found in environment variables.")
# ------------------------ Model and Tool Management ------------------------
class ModelManager:
"""Manages model loading and initialization."""
@staticmethod
def load_model(chosen_inference: str, model_id: str, key_manager=None):
"""Load the specified model with appropriate configuration."""
try:
if chosen_inference == "hf_api":
return HfApiModel(model_id=model_id)
if chosen_inference == "hf_api_provider":
return HfApiModel(provider="together")
if chosen_inference == "litellm":
return LiteLLMModel(model_id=model_id)
if chosen_inference == "openai":
if not key_manager:
raise ValueError("Key manager required for OpenAI model")
return OpenAIServerModel(
model_id=model_id, api_key=key_manager.get_key("openai_api_key")
)
if chosen_inference == "transformers":
return TransformersModel(
model_id="HuggingFaceTB/SmolLM2-1.7B-Instruct",
device_map="auto",
max_new_tokens=1000,
)
raise ValueError(f"Invalid inference type: {chosen_inference}")
except Exception as e:
print(f"✗ Couldn't load model: {e}")
raise
class ToolRegistry:
"""Manages tool initialization and organization."""
@staticmethod
def load_web_tools(model, browser, text_limit=20000):
"""Initialize and return web-related tools."""
return [
GoogleSearchTool(provider="serper"),
VisitTool(browser),
PageUpTool(browser),
PageDownTool(browser),
FinderTool(browser),
FindNextTool(browser),
ArchiveSearchTool(browser),
TextInspectorTool(model, text_limit),
]
@staticmethod
def load_document_tools():
"""
Initialize and return document processing, i.e. sanitisation and indexing, tools.
Returns:
List of document tools
"""
return [
FrontmatterGeneratorTool(),
TextCleanerTool(),
]
@staticmethod
def load_image_generation_tools():
"""Initialize and return image generation tools."""
try:
return Tool.from_space(
space_id="xkerser/FLUX.1-dev",
name="image_generator",
description=(
"Generates high-quality AgentImage using the FLUX.1-dev model based on text prompts."
),
)
except Exception as e:
print(f"✗ Couldn't initialize image generation tool: {e}")
raise
# ------------------------ Agent Creation and Execution ------------------------
def create_agent():
"""
Creates a fresh agent instance with properly configured tools.
Returns:
CodeAgent: Configured agent ready for use
Raises:
ValueError: If tool validation fails
RuntimeError: If agent creation fails
"""
try:
# Initialize model
model = LiteLLMModel(
custom_role_conversions=CUSTOM_ROLE_CONVERSIONS,
model_id="openrouter/google/gemini-2.0-flash-001",
)
# Initialize tools
text_limit = 30000
browser = SimpleTextBrowser(**BROWSER_CONFIG)
# Create tool instances with proper error handling
web_tools = ToolRegistry.load_web_tools(model, browser, text_limit)
try:
doc_tools = ToolRegistry.load_document_tools()
except AssertionError as e:
print(f"Warning: Error loading document tools: {str(e)}")
print("Attempting to continue with available tools...")
doc_tools = []
try:
image_generator = ToolRegistry.load_image_generation_tools()
except Exception as e:
print(f"Warning: Image generation tools unavailable: {str(e)}")
image_generator = None
# Combine available tools (filter out None values)
all_tools = [
tool
for tool in (
[visualizer]
+ web_tools
+ doc_tools
+ ([image_generator] if image_generator else [])
)
if tool is not None
]
# Log available tools
print(f"Loaded {len(all_tools)} tools successfully")
for tool in all_tools:
print(f"- {tool.name}: {tool.description[:50]}...")
return CodeAgent(
model=model,
tools=all_tools,
max_steps=12,
verbosity_level=2,
additional_authorized_imports=AUTHORIZED_IMPORTS,
planning_interval=4,
)
except Exception as e:
print(f"Failed to create agent: {e}")
raise RuntimeError(f"Agent creation failed: {e}") from e
def stream_to_gradio(agent, task, reset_agent_memory=False, additional_args=None):
"""Runs an agent with the given task and streams messages as Gradio ChatMessages."""
try:
for step_log in agent.run(
task, stream=True, reset=reset_agent_memory, additional_args=additional_args
):
yield from pull_messages_from_step(step_log)
# Get the last step log from the agent's memory for final answer
last_step_log = agent.memory.steps[-1] if agent.memory.steps else None
if last_step_log:
# Process final answer with comprehensive media output
final_answer = handle_agent_output_types(last_step_log)
# Output handling based on type
if isinstance(final_answer, AgentText):
yield gr.ChatMessage(
role="assistant",
content=f"**Final answer:**\n{final_answer.to_string()}\n",
)
elif isinstance(final_answer, AgentImage):
yield gr.ChatMessage(
role="assistant",
content={"image": final_answer.to_string(), "type": "file"},
)
elif isinstance(final_answer, AgentAudio):
yield gr.ChatMessage(
role="assistant",
content={"audio": final_answer.to_string(), "type": "file"},
)
else:
yield gr.ChatMessage(
role="assistant", content=f"**Final answer:** {str(final_answer)}"
)
else:
yield gr.ChatMessage(
role="assistant",
content="No final answer was generated. Please try again.",
)
except Exception as e:
yield gr.ChatMessage(
role="assistant",
content=f"**Error occurred during processing**: {str(e)}\n\nPlease try again with a different query or check your inputs.",
)
# ------------------------ Gradio UI Components ------------------------
class GradioUI:
"""A one-line interface to launch your agent in Gradio."""
def __init__(self, file_upload_folder: str | None = None):
"""Initialize the Gradio UI with optional file upload functionality."""
self.file_upload_folder = file_upload_folder
if self.file_upload_folder is not None:
if not os.path.exists(file_upload_folder):
os.mkdir(file_upload_folder)
def interact_with_agent(self, prompt, messages, session_state):
"""Main interaction handler with the agent."""
# Get or create session-specific agent with cache persistence
if "agent" not in session_state:
try:
session_state["agent"] = create_agent()
session_state["creation_time"] = datetime.datetime.now()
session_state["request_count"] = 0
except Exception as e:
messages.append(
gr.ChatMessage(
role="assistant",
content=f"**Error initializing agent**: {str(e)}\n\nPlease refresh the page and try again.",
)
)
yield messages
return
session_state["request_count"] += 1
# Add user message
messages.append(gr.ChatMessage(role="user", content=prompt))
yield messages
try:
# Check if agent should be reset (e.g., if too many requests)
reset_needed = session_state["request_count"] > 15
for msg in stream_to_gradio(
session_state["agent"], task=prompt, reset_agent_memory=reset_needed
):
messages.append(msg)
yield messages
# If we reset the agent memory, update the request count
if reset_needed:
session_state["request_count"] = 1
except Exception as e:
messages.append(
gr.ChatMessage(
role="assistant",
content=f"**Error processing your request**: {str(e)}\n\nPlease try again with a different query.",
)
)
yield messages
def upload_file(self, file, file_uploads_log):
"""Handle file uploads with validation, security, and clear feedback."""
if file is None:
return gr.Textbox("No file uploaded", visible=True), file_uploads_log
try:
# Get file size and check limit before processing
file_size_mb = os.path.getsize(file.name) / (1024 * 1024) # Size in MB
max_file_size_mb = 50 # Define the limit
if file_size_mb > max_file_size_mb:
return (
gr.Textbox(
f"❌ File size ({file_size_mb:.1f} MB) exceeds {max_file_size_mb} MB limit.",
visible=True,
),
file_uploads_log,
)
# Check MIME type
mime_type, _ = mimetypes.guess_type(file.name)
if mime_type not in ALLOWED_FILE_TYPES:
allowed_extensions = [
t.rsplit("/", maxsplit=1)[-1] for t in ALLOWED_FILE_TYPES
]
return (
gr.Textbox(
f"❌ File type '{mime_type or 'unknown'}' is not allowed. Supported types: {', '.join(allowed_extensions)}",
visible=True,
),
file_uploads_log,
)
# Sanitize file name with better pattern
original_name = os.path.basename(file.name)
sanitized_name = re.sub(r"[^\w\-.]", "_", original_name)
# Save the uploaded file
file_path = os.path.join(self.file_upload_folder, sanitized_name)
shutil.copy(file.name, file_path)
return gr.Textbox(
f"✓ File uploaded successfully: {os.path.basename(file_path)} ({file_size_mb:.1f} MB)",
visible=True,
), file_uploads_log + [file_path]
except Exception as e:
return (
gr.Textbox(f"❌ Upload error: {str(e)}", visible=True),
file_uploads_log,
)
def log_user_message(self, text_input, file_uploads_log):
"""Process user message and handle file references with proper agent types."""
message = text_input
if len(file_uploads_log) > 0:
# Group files by type for better agent processing
file_info = {}
for file_path in file_uploads_log:
ext = os.path.splitext(file_path)[1].lower()
if ext in [".jpg", ".jpeg", ".png", ".gif", ".webp"]:
category = "images"
elif ext in [".mp3", ".wav", ".ogg"]:
category = "audio"
else:
category = "documents"
if category not in file_info:
file_info[category] = []
file_info[category].append(os.path.basename(file_path))
# Format file information for the agent
file_message = "\nYou have been provided with these files:\n"
for category, files in file_info.items():
file_message += f"- {category.capitalize()}: {', '.join(files)}\n"
message += file_message
message += "\nUse inspect_file_as_text for documents, visualizer for images, and the appropriate tools for audio files."
return (
message,
gr.Textbox(value="", interactive=False, placeholder="Processing..."),
gr.Button(interactive=False),
)
def detect_device(self, request: gr.Request):
"""Detect whether the user is on mobile or desktop device."""
if not request:
return "Unknown device" # Handle case where request is none.
# Method 1: Check sec-ch-ua-mobile header
is_mobile_header = request.headers.get("sec-ch-ua-mobile")
if is_mobile_header:
return "Mobile" if "?1" in is_mobile_header else "Desktop"
# Method 2: Check user-agent string
user_agent = request.headers.get("user-agent", "").lower()
mobile_keywords = ["android", "iphone", "ipad", "mobile", "phone"]
if any(keyword in user_agent for keyword in mobile_keywords):
return "Mobile"
# Method 3: Check platform
platform = request.headers.get("sec-ch-ua-platform", "").lower()
if platform:
if platform in ['"android"', '"ios"']:
return "Mobile"
if platform in ['"windows"', '"macos"', '"linux"']:
return "Desktop"
# Default case if no clear indicators
return "Desktop"
def launch(self, **kwargs):
"""Launch the Gradio UI with responsive layout."""
with gr.Blocks(theme="ocean", fill_height=True) as demo:
# Different layouts for mobile and computer devices
@gr.render()
def layout(request: gr.Request):
device = self.detect_device(request)
print(f"device - {device}")
# Render layout with sidebar
if device == "Desktop":
return self._create_desktop_layout()
return self._create_mobile_layout()
demo.queue(max_size=20).launch(
debug=True, **kwargs
) # Add queue with reasonable size
def _create_desktop_layout(self):
"""Create the desktop layout with sidebar and enhanced styling."""
with gr.Blocks(fill_height=True) as sidebar_demo:
with gr.Sidebar():
gr.Markdown(
"""#
### Smolagents + Document Tools
"""
)
with gr.Group():
gr.Markdown("**What can I help you with today?**", container=True)
text_input = gr.Textbox(
lines=4,
label="Your request",
container=False,
placeholder="Enter your question or task here...",
show_label=False,
)
with gr.Row():
clear_btn = gr.Button("Clear", variant="secondary")
launch_research_btn = gr.Button("Run", variant="primary")
# File upload section with better labeling
if self.file_upload_folder is not None:
with gr.Group():
gr.Markdown("** Upload Documents**")
upload_file = gr.File(
label="Upload files for analysis",
file_types=[
"pdf",
"docx",
"txt",
"md",
"csv",
"xlsx",
"jpg",
"png",
],
file_count="multiple",
)
upload_status = gr.Textbox(
label="Upload Status", interactive=False, visible=False
)
file_uploads_log = gr.State([])
# Show uploaded files list
uploaded_files_display = gr.Markdown("No files uploaded yet")
upload_file.change(
self.upload_file,
[upload_file, file_uploads_log],
[upload_status, file_uploads_log],
).then(
lambda files: (
"**Uploaded Files:**\n"
+ "\n".join([f"- {os.path.basename(f)}" for f in files])
if files
else "No files uploaded yet"
),
[file_uploads_log],
[uploaded_files_display],
)
gr.HTML("<br><hr><h4><center>Powered by:</center></h4>")
with gr.Row():
gr.HTML(
"""
<div style="display: flex; align-items: center; justify-content: center; gap: 8px; font-family: system-ui, -apple-system, sans-serif;">
<img src="https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/smolagents/mascot_smol.png"
style="width: 32px; height: 32px; object-fit: contain;" alt="logo">
<a target="_blank" href="https://github.com/huggingface/smolagents">
<b>huggingface/smolagents</b>
</a>
</div>
"""
)
# Main chat area with improved styling
session_state = gr.State({})
stored_messages = gr.State([])
if "file_uploads_log" not in locals():
file_uploads_log = gr.State([])
chatbot = gr.Chatbot(
label="OpenDeepResearch Assistant",
type="messages",
avatar_images=(
None,
"https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/smolagents/mascot_smol.png",
),
resizeable=True,
show_copy_button=True,
scale=1,
elem_id="my-chatbot",
height=700,
)
# Connect clear button
clear_btn.click(
lambda: ([], [], {"agent": session_state.get("agent")}),
None,
[chatbot, stored_messages, session_state],
)
# Connect event handlers
self._connect_event_handlers(
text_input,
launch_research_btn,
file_uploads_log,
stored_messages,
chatbot,
session_state,
)
return sidebar_demo
def _create_mobile_layout(self):
"""Create the mobile layout (simpler without sidebar)."""
with gr.Blocks(fill_height=True) as simple_demo:
gr.Markdown("""#OpenDeepResearch - free the AI agents!""")
# Add session state to store session-specific data
session_state = gr.State({})
stored_messages = gr.State([])
file_uploads_log = gr.State([])
chatbot = gr.Chatbot(
label="open-Deep-Research",
type="messages",
avatar_images=(
None,
"https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/smolagents/mascot_smol.png",
),
resizeable=True,
scale=1,
)
# If an upload folder is provided, enable the upload feature
if self.file_upload_folder is not None:
upload_file = gr.File(label="Upload a file")
upload_status = gr.Textbox(
label="Upload Status", interactive=False, visible=False
)
upload_file.change(
self.upload_file,
[upload_file, file_uploads_log],
[upload_status, file_uploads_log],
)
text_input = gr.Textbox(
lines=1,
label="What's on your mind mate?",
placeholder="Chuck in a question and we'll take care of the rest",
)
launch_research_btn = gr.Button("Run", variant="primary")
self._connect_event_handlers(
text_input,
launch_research_btn,
file_uploads_log,
stored_messages,
chatbot,
session_state,
)
return simple_demo
def _connect_event_handlers(
self,
text_input,
launch_research_btn,
file_uploads_log,
stored_messages,
chatbot,
session_state,
):
"""Connect the event handlers for input elements."""
# Connect text input submit event
text_input.submit(
self.log_user_message,
[text_input, file_uploads_log],
[stored_messages, text_input, launch_research_btn],
).then(
self.interact_with_agent,
[stored_messages, chatbot, session_state],
[chatbot],
).then(
lambda: (
gr.Textbox(
interactive=True,
placeholder="Enter your prompt here and press the button",
),
gr.Button(interactive=True),
),
None,
[text_input, launch_research_btn],
)
# Connect button click event
launch_research_btn.click(
self.log_user_message,
[text_input, file_uploads_log],
[stored_messages, text_input, launch_research_btn],
).then(
self.interact_with_agent,
[stored_messages, chatbot, session_state],
[chatbot],
).then(
lambda: (
gr.Textbox(
interactive=True,
placeholder="Enter your prompt here and press the button",
),
gr.Button(interactive=True),
),
None,
[text_input, launch_research_btn],
)
# ------------------------ Execution ------------------------
def main():
"""Main entry point for the application."""
# Initialize environment
setup_environment()
# Ensure downloads folder exists
os.makedirs(f"./{BROWSER_CONFIG['downloads_folder']}", exist_ok=True)
# Launch UI
GradioUI(file_upload_folder="uploaded_files").launch()
if __name__ == "__main__":
main()