Leonardo
Update app.py
bb3c756 verified
raw
history blame
23.6 kB
"""Main application for the OpenDeepResearch Gradio interface."""
import mimetypes
import os
import re
import shutil
from typing import Optional
from cleantext import clean
from dotenv import load_dotenv
from huggingface_hub import login
import gradio as gr
from scripts.text_inspector_tool import TextInspectorTool
from scripts.text_web_browser import (
ArchiveSearchTool,
FinderTool,
FindNextTool,
PageDownTool,
PageUpTool,
SimpleTextBrowser,
VisitTool,
)
from scripts.visual_qa import visualizer
from scripts.text_cleaner_tool import TextCleanerTool
from smolagents import (
CodeAgent,
HfApiModel,
LiteLLMModel,
OpenAIServerModel,
TransformersModel,
GoogleSearchTool,
Tool,
)
from smolagents.agent_types import AgentText, AgentImage, AgentAudio
from smolagents.gradio_ui import pull_messages_from_step, handle_agent_output_types
# Constants and configurations
AUTHORIZED_IMPORTS = [
"requests", # Web requests (fetching data from the internet)
"zipfile", # Working with ZIP archives
"pandas", # Data manipulation and analysis (DataFrames)
"numpy", # Numerical computing (arrays, linear algebra)
"sympy", # Symbolic mathematics (algebra, calculus)
"json", # JSON data serialization/deserialization
"bs4", # Beautiful Soup for HTML/XML parsing
"pubchempy", # Accessing PubChem chemical database
"xml", # XML processing
"yahoo_finance", # Fetching stock data
"Bio", # Bioinformatics tools (e.g., sequence analysis)
"sklearn", # Scikit-learn for machine learning
"scipy", # Scientific computing (stats, optimization)
"pydub", # Audio manipulation
"PIL", # Pillow for image processing
"chess", # Chess-related functionality
"PyPDF2", # PDF manipulation
"pptx", # PowerPoint file manipulation
"torch", # PyTorch for neural networks
"datetime", # Date and time handling
"fractions", # Rational number arithmetic
"csv", # CSV file reading/writing
"cleantext", # Text cleaning and normalization
"os", # Operating system interaction (file system, etc.) VERY IMPORTANT
"re", # Regular expressions for text processing
"collections", # Useful data structures (e.g., defaultdict, Counter)
"math", # Basic mathematical functions
"random", # Random number generation
"io", # Input/output streams
"urllib.parse", # URL parsing and manipulation (safe URL handling)
"typing", # Support for type hints (improve code clarity)
"concurrent.futures", # For parallel execution
"time", # Measuring time
"tempfile", # Creating temporary files and directories
# Data Visualization (if needed) - Consider security implications carefully
"matplotlib", # Plotting library (basic charts) - Consider security implications
"seaborn", # Statistical data visualization (more advanced) - Consider security implications
# Web Scraping (more specific/controlled) - Consider ethical implications carefully
"lxml", # Faster XML/HTML processing (alternative to bs4)
"selenium", # Automated browser control (for dynamic websites) - HIGH SECURITY RISK
# Database interaction (if needed) - Handle credentials securely!
"sqlite3", # SQLite database access
# "psycopg2", # PostgreSQL adapter if needed
# Task scheduling
"schedule", # Allow the agent to schedule tasks
# Networking
# "socket", # Networking
]
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0"
BROWSER_CONFIG = {
"viewport_size": 1024 * 5,
"downloads_folder": "downloads_folder",
"request_kwargs": {
"headers": {"User-Agent": USER_AGENT},
"timeout": 300,
},
"serpapi_key": os.getenv("SERPAPI_API_KEY"),
}
CUSTOM_ROLE_CONVERSIONS = {"tool-call": "assistant", "tool-response": "user"}
ALLOWED_FILE_TYPES = [
"application/pdf",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"text/plain",
"text/markdown", # Added Markdown support
"application/json", # Added JSON support
"image/png",
"image/webp",
"image/jpeg", # Added JPEG support
"image/gif", # Added GIF support
"video/mp4",
"audio/mpeg", # Added MP3 support
"audio/wav", # Added WAV support
"audio/ogg", # Added OGG support
]
def setup_environment():
"""Initialize environment variables and authentication."""
load_dotenv(override=True)
hf_token = os.getenv("HF_TOKEN")
if hf_token: # Check if token is actually set
login(hf_token)
print("HF_TOKEN (last 10 characters):", hf_token[-10:])
else:
print("HF_TOKEN not found in environment variables.")
class ModelManager:
"""Manages model loading and initialization."""
@staticmethod
def load_model(chosen_inference: str, model_id: str, key_manager=None):
"""Load the specified model with appropriate configuration."""
try:
if chosen_inference == "hf_api":
return HfApiModel(model_id=model_id)
if chosen_inference == "hf_api_provider":
return HfApiModel(provider="together")
if chosen_inference == "litellm":
return LiteLLMModel(model_id=model_id)
if chosen_inference == "openai":
if not key_manager:
raise ValueError("Key manager required for OpenAI model")
return OpenAIServerModel(
model_id=model_id, api_key=key_manager.get_key("openai_api_key")
)
if chosen_inference == "transformers":
return TransformersModel(
model_id="HuggingFaceTB/SmolLM2-1.7B-Instruct",
device_map="auto",
max_new_tokens=1000,
)
raise ValueError(f"Invalid inference type: {chosen_inference}")
except Exception as e:
print(f"✗ Couldn't load model: {e}")
raise
class ToolRegistry:
"""Manages tool initialization and organization."""
@staticmethod
def load_web_tools(model, browser, text_limit=20000):
"""Initialize and return web-related tools."""
return [
GoogleSearchTool(provider="serper"),
VisitTool(browser),
PageUpTool(browser),
PageDownTool(browser),
FinderTool(browser),
FindNextTool(browser),
ArchiveSearchTool(browser),
TextInspectorTool(model, text_limit),
]
@staticmethod
def load_image_generation_tools():
"""Initialize and return image generation tools."""
try:
return Tool.from_space(
space_id="xkerser/FLUX.1-dev",
name="image_generator",
description="Generates high-quality AgentImage with text prompt (77 token limit).",
)
except Exception as e:
print(f"✗ Couldn't initialize image generation tool: {e}")
raise
@staticmethod
def load_clean_text_tool():
"""Initialize and return image generation tools."""
try:
return TextCleanerTool
except Exception as e:
print(f"✗ Couldn't initialize clean text tool: {e}")
raise
def create_agent():
"""Creates a fresh agent instance with properly configured tools."""
# Initialize model
model = LiteLLMModel(
custom_role_conversions=CUSTOM_ROLE_CONVERSIONS,
model_id="openrouter/deepseek/deepseek-chat-v3-0324:free", # currently serving:
) # DEEPSEEK = openrouter/perplexity/r1-1776 <--- boss model
# Initialize tools
text_limit = 30000
browser = SimpleTextBrowser(**BROWSER_CONFIG)
# Collect all tools in a single list
web_tools = ToolRegistry.load_web_tools(model, browser, text_limit)
image_generator = ToolRegistry.load_image_generation_tools()
clean_text = TextCleanerTool() # Instantiate TextCleanerTool
# Combine all tools into a single list (not a tuple)
all_tools = [visualizer] + web_tools + [image_generator] + [clean_text]
# Validate tools before creating agent
for tool in all_tools:
if not isinstance(tool, Tool):
raise ValueError(
"Invalid tool type: "
f"{type(tool)}. All tools must be instances of Tool class."
)
return CodeAgent(
model=model,
tools=all_tools, # Pass a single list containing all tools
max_steps=12,
verbosity_level=2,
additional_authorized_imports=AUTHORIZED_IMPORTS,
planning_interval=4,
)
def stream_to_gradio(
agent,
task: str,
reset_agent_memory: bool = False,
additional_args: Optional[dict] = None,
):
"""Runs an agent with the given task and streams messages as Gradio ChatMessages."""
for step_log in agent.run(
task, stream=True, reset=reset_agent_memory, additional_args=additional_args
):
yield from pull_messages_from_step(step_log)
# Process final answer : Use a more comprehensive media output
final_answer = step_log # Last log is the run's final_answer
final_answer = handle_agent_output_types(final_answer)
if isinstance(final_answer, AgentText):
yield gr.ChatMessage(
role="assistant",
content=f"**Final answer:**\n{final_answer.to_string()}\n",
)
elif isinstance(final_answer, AgentImage):
yield gr.ChatMessage(
role="assistant",
content={"image": final_answer.to_string(), "type": "file"},
) # Send as Gradio-compatible file object:
elif isinstance(final_answer, AgentAudio):
yield gr.ChatMessage(
role="assistant",
content={"audio": final_answer.to_string(), "type": "file"},
) # Send as Gradio-compatible file object
else:
yield gr.ChatMessage(
role="assistant", content=f"**Final answer:** {str(final_answer)}"
)
class GradioUI:
"""A one-line interface to launch your agent in Gradio."""
def __init__(self, file_upload_folder: str | None = None):
"""Initialize the Gradio UI with optional file upload functionality."""
self.file_upload_folder = file_upload_folder
if self.file_upload_folder is not None:
os.makedirs(file_upload_folder, exist_ok=True)
def interact_with_agent(self, prompt, messages, session_state):
"""Main interaction handler with the agent."""
# Get or create session-specific agent
if "agent" not in session_state:
session_state["agent"] = create_agent()
# Adding monitoring
try:
# Log the existence of agent memory
has_memory = hasattr(session_state["agent"], "memory")
print(f"Agent has memory: {has_memory}")
if has_memory:
print(f"Memory type: {type(session_state['agent'].memory)}")
messages.append(gr.ChatMessage(role="user", content=prompt))
yield messages
for msg in stream_to_gradio(
session_state["agent"], task=prompt, reset_agent_memory=False
):
messages.append(msg)
yield messages # Yield messages after each step
yield messages # Yield messages one last time
except Exception as e:
print(f"Error in interaction: {str(e)}")
raise
def upload_file(
self,
file,
file_uploads_log,
):
"""Handle file uploads with proper validation and security."""
if file is None:
return gr.Textbox("No file uploaded", visible=True), file_uploads_log
try:
mime_type, _ = mimetypes.guess_type(file.name)
except Exception as e:
return gr.Textbox(f"Error: {e}", visible=True), file_uploads_log
if mime_type not in ALLOWED_FILE_TYPES:
return gr.Textbox("File type disallowed", visible=True), file_uploads_log
# Sanitize file name
original_name = os.path.basename(file.name)
sanitized_name = re.sub(
r"[^\w\-.]", "_", original_name
) # Replace invalid chars with underscores
# Ensure the extension correlates to the mime type
type_to_ext = {}
for ext, t in mimetypes.types_map.items():
if t not in type_to_ext:
type_to_ext[t] = ext
# Build sanitized filename with proper extension
name_parts = sanitized_name.split(".")[:-1]
extension = type_to_ext.get(mime_type, "")
sanitized_name = "".join(name_parts) + extension
# Limit File Size, and Throw Error
max_file_size_mb = 50 # Define the limit
file_size_mb = os.path.getsize(file.name) / (1024 * 1024) # Size in MB
if file_size_mb > max_file_size_mb:
return (
gr.Textbox(
f"File size exceeds {max_file_size_mb} MB limit.", visible=True
),
file_uploads_log,
)
# Save the uploaded file to the specified folder
file_path = os.path.join(self.file_upload_folder, sanitized_name)
shutil.copy(file.name, file_path)
return gr.Textbox(
f"File uploaded: {file_path}", visible=True
), file_uploads_log + [file_path]
def log_user_message(self, text_input, file_uploads_log):
"""Process user message and handle file references."""
# Clean the user input using the TextCleanerTool
cleaned_message = clean(
text_input,
fix_unicode=True,
to_ascii=True,
lower=True,
no_line_breaks=False,
no_urls=False,
no_emails=False,
no_phone_numbers=False,
no_numbers=False,
no_digits=False,
no_currency_symbols=False,
no_punct=False,
lang="en",
) # Can change default behaviour by instantiating an object of TextCleanerTool class.
message = cleaned_message # Use the cleaned message
if file_uploads_log:
message += f"\nYou have been provided with these files, which might be helpful or not: {file_uploads_log}" # Added file list
return (
message,
gr.Textbox(
value="",
interactive=False,
placeholder="Processing...", # Changed placeholder.
),
gr.Button(interactive=False),
)
def detect_device(self, request: gr.Request):
"""Detect whether the user is on mobile or desktop device."""
if not request:
return "Unknown device" # Handle case where request is none.
# Method 1: Check sec-ch-ua-mobile header
is_mobile_header = request.headers.get("sec-ch-ua-mobile")
if is_mobile_header:
return "Mobile" if "?1" in is_mobile_header else "Desktop"
# Method 2: Check user-agent string
user_agent = request.headers.get("user-agent", "").lower()
mobile_keywords = ["android", "iphone", "ipad", "mobile", "phone"]
if any(keyword in user_agent for keyword in mobile_keywords):
return "Mobile"
# Method 3: Check platform
platform = request.headers.get("sec-ch-ua-platform", "").lower()
if platform:
if platform in ['"android"', '"ios"']:
return "Mobile"
elif platform in ['"windows"', '"macos"', '"linux"']:
return "Desktop"
# Default case if no clear indicators
return "Desktop"
def launch(self, **kwargs):
"""Launch the Gradio UI with responsive layout."""
with gr.Blocks(theme="ocean", fill_height=True) as demo:
# Different layouts for mobile and computer devices
@gr.render()
def layout(request: gr.Request):
device = self.detect_device(request)
print(f"device - {device}")
# Render layout with sidebar
if device == "Desktop":
return self._create_desktop_layout()
else:
return self._create_mobile_layout()
demo.queue(max_size=20).launch(
debug=True, **kwargs
) # Add queue with reasonable size
def _create_desktop_layout(self):
"""Create the desktop layout with sidebar."""
with gr.Blocks(fill_height=True) as sidebar_demo:
with gr.Sidebar():
gr.Markdown(
"""#OpenDeepResearch - 3theSmolagents!
Model_id: deepseek/deepseek-chat-v3-0324:free"""
)
with gr.Group():
gr.Markdown("**What's on your mind mate?**", container=True)
text_input = gr.Textbox(
lines=3,
label="Your request",
container=False,
placeholder="Enter your prompt here and press Shift+Enter or press the button",
)
launch_research_btn = gr.Button("Run", variant="primary")
# If an upload folder is provided, enable the upload feature
if self.file_upload_folder is not None:
upload_file = gr.File(label="Upload a file")
upload_status = gr.Textbox(
label="Upload Status", interactive=False, visible=False
)
file_uploads_log = gr.State([])
upload_file.change(
self.upload_file,
[upload_file, file_uploads_log],
[upload_status, file_uploads_log],
)
gr.HTML("<br><br><h4><center>Powered by:</center></h4>")
with gr.Row():
gr.HTML(
"""
<div style="display: flex; align-items: center; gap: 8px; font-family: system-ui, -apple-system, sans-serif;">
<img src="https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/smolagents/mascot_smol.png"
style="width: 32px; height: 32px; object-fit: contain;" alt="logo">
<a target="_blank" href="https://github.com/huggingface/smolagents">
<b>huggingface/smolagents</b>
</a>
</div>
"""
)
# Add session state to store session-specific data
session_state = gr.State({}) # Initialize empty state for each session
stored_messages = gr.State([])
if "file_uploads_log" not in locals():
file_uploads_log = gr.State([])
chatbot = gr.Chatbot(
label="ODR",
type="messages",
avatar_images=(
None,
"https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/smolagents/mascot_smol.png",
),
resizeable=False,
scale=1,
elem_id="my-chatbot",
)
self._connect_event_handlers(
text_input,
launch_research_btn,
file_uploads_log,
stored_messages,
chatbot,
session_state,
)
return sidebar_demo
def _create_mobile_layout(self):
"""Create the mobile layout (simpler without sidebar)."""
with gr.Blocks(fill_height=True) as simple_demo:
gr.Markdown("""#OpenDeepResearch - free the AI agents!""")
# Add session state to store session-specific data
session_state = gr.State({})
stored_messages = gr.State([])
file_uploads_log = gr.State([])
chatbot = gr.Chatbot(
label="ODR",
type="messages",
avatar_images=(
None,
"https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/smolagents/mascot_smol.png",
),
resizeable=True,
scale=1,
)
# If an upload folder is provided, enable the upload feature
if self.file_upload_folder is not None:
upload_file = gr.File(label="Upload a file")
upload_status = gr.Textbox(
label="Upload Status", interactive=False, visible=False
)
upload_file.change(
self.upload_file,
[upload_file, file_uploads_log],
[upload_status, file_uploads_log],
)
text_input = gr.Textbox(
lines=1,
label="What's on your mind mate?",
placeholder="Chuck in a question and we'll take care of the rest",
)
launch_research_btn = gr.Button("Run", variant="primary")
self._connect_event_handlers(
text_input,
launch_research_btn,
file_uploads_log,
stored_messages,
chatbot,
session_state,
)
return simple_demo
def _connect_event_handlers(
self,
text_input,
launch_research_btn,
file_uploads_log,
stored_messages,
chatbot,
session_state,
):
"""Connect the event handlers for input elements."""
# Connect text input submit event
text_input.submit(
self.log_user_message,
[text_input, file_uploads_log],
[stored_messages, text_input, launch_research_btn],
).then(
self.interact_with_agent,
[stored_messages, chatbot, session_state],
[chatbot],
).then(
lambda: (
gr.Textbox(
interactive=True,
placeholder="Enter your prompt here and press the button",
),
gr.Button(interactive=True),
),
None,
[text_input, launch_research_btn],
)
# Connect button click event
launch_research_btn.click(
self.log_user_message,
[text_input, file_uploads_log],
[stored_messages, text_input, launch_research_btn],
).then(
self.interact_with_agent,
[stored_messages, chatbot, session_state],
[chatbot],
).then(
lambda: (
gr.Textbox(
interactive=True,
placeholder="Enter your prompt here and press the button",
),
gr.Button(interactive=True),
),
None,
[text_input, launch_research_btn],
)
def main():
"""Main entry point for the application."""
# Initialize environment
setup_environment()
# Ensure downloads folder exists
os.makedirs(f"./{BROWSER_CONFIG['downloads_folder']}", exist_ok=True)
# Launch UI
GradioUI(file_upload_folder="uploaded_files").launch()
if __name__ == "__main__":
main()