Leonardo
Update app.py
bfce6cb verified
raw
history blame
24.8 kB
import os
import re
import shutil
import datetime
import mimetypes
from typing import Optional, List, Dict, Tuple
from dotenv import load_dotenv
from huggingface_hub import login
import gradio as gr
from scripts.text_inspector_tool import TextInspectorTool
from scripts.text_web_browser import (
ArchiveSearchTool,
FinderTool,
FindNextTool,
PageDownTool,
PageUpTool,
SimpleTextBrowser,
VisitTool,
)
from scripts.visual_qa import visualizer
from scripts.frontmatter_tool import FrontmatterGeneratorTool
from scripts.text_cleaner_tool import TextCleanerTool
from smolagents import (
CodeAgent,
HfApiModel,
LiteLLMModel,
OpenAIServerModel,
TransformersModel,
GoogleSearchTool,
Tool,
)
from smolagents.agent_types import AgentText, AgentImage, AgentAudio
from smolagents.gradio_ui import pull_messages_from_step, handle_agent_output_types
# ------------------------ Configuration and Setup ------------------------
AUTHORIZED_IMPORTS = [
"requests",
"zipfile",
"pandas",
"numpy",
"sympy",
"json",
"bs4",
"pubchempy",
"yaml",
"xml",
"yahoo_finance",
"Bio",
"sklearn",
"scipy",
"pydub",
"PIL",
"chess",
"PyPDF2",
"pptx",
"torch",
"datetime",
"fractions",
"csv",
"cleantext",
"os",
"re",
"collections",
"math",
"random",
"io",
"urllib.parse",
"typing",
"concurrent.futures",
"time",
"tempfile",
"matplotlib",
"seaborn",
"lxml",
"selenium",
"sqlite3",
"schedule",
]
USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0"
)
BROWSER_CONFIG = {
"viewport_size": 1024 * 5,
"downloads_folder": "downloads_folder",
"request_kwargs": {
"headers": {"User-Agent": USER_AGENT},
"timeout": 300,
},
"serpapi_key": os.getenv("SERPAPI_API_KEY"),
}
CUSTOM_ROLE_CONVERSIONS = {"tool-call": "assistant", "tool-response": "user"}
ALLOWED_FILE_TYPES = [
"application/pdf",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"text/plain",
"text/markdown",
"application/json",
"image/png",
"image/webp",
"image/jpeg",
"image/gif",
"video/mp4",
"audio/mpeg",
"audio/wav",
"audio/ogg",
]
ALLOWED_EXTENSIONS = [
".pdf",
".docx",
".txt",
".md",
".json",
".png",
".webp",
".jpeg",
".jpg",
".gif",
".mp4",
".mpeg",
".wav",
".ogg",
]
def setup_environment():
"""Initialize environment variables and authenticate with Hugging Face Hub."""
load_dotenv(override=True)
hf_token = os.getenv("HF_TOKEN")
if hf_token:
login(hf_token)
print(f"HF_TOKEN (last 10 characters): {hf_token[-10:]}")
else:
print("HF_TOKEN not found in environment variables.")
# ------------------------ Model and Tool Management ------------------------
class ModelManager:
"""Manages model loading and initialization."""
@staticmethod
def load_model(
chosen_inference: str, model_id: str, key_manager: Optional[object] = None
):
"""Load the specified model with appropriate configuration.
Args:
chosen_inference: The type of inference to use (e.g., "hf_api", "openai").
model_id: The ID of the model to load.
key_manager: Key manager for API keys (required for OpenAI).
Returns:
An instance of the specified model class.
Raises:
ValueError: If an invalid inference type is specified or if the key manager
is missing for OpenAI models.
Exception: If the model fails to load.
"""
try:
if chosen_inference == "hf_api":
return HfApiModel(model_id=model_id)
if chosen_inference == "hf_api_provider":
return HfApiModel(provider="together")
if chosen_inference == "litellm":
return LiteLLMModel(model_id=model_id)
if chosen_inference == "openai":
if not key_manager:
raise ValueError("Key manager required for OpenAI model")
return OpenAIServerModel(
model_id=model_id, api_key=key_manager.get_key("openai_api_key")
)
if chosen_inference == "transformers":
return TransformersModel(
model_id="HuggingFaceTB/SmolLM2-1.7B-Instruct",
device_map="auto",
max_new_tokens=1000,
)
raise ValueError(f"Invalid inference type: {chosen_inference}")
except Exception as e:
print(f"✗ Couldn't load model: {e}")
raise
class ToolRegistry:
"""Manages tool initialization and organization."""
@staticmethod
def load_web_tools(model, browser, text_limit: int = 20000) -> List[Tool]:
"""Initialize and return web-related tools.
Args:
model: The language model to use.
browser: The web browser instance.
text_limit: The maximum text length for the text inspector tool.
Returns:
A list of web-related tools.
"""
return [
GoogleSearchTool(provider="serper"),
VisitTool(browser),
PageUpTool(browser),
PageDownTool(browser),
FinderTool(browser),
FindNextTool(browser),
ArchiveSearchTool(browser),
TextInspectorTool(model, text_limit),
]
@staticmethod
def load_document_tools() -> List[Tool]:
"""Initialize and return document processing tools.
Returns:
List of document tools.
"""
return [FrontmatterGeneratorTool(), TextCleanerTool()]
@staticmethod
def load_image_generation_tools() -> Optional[Tool]:
"""Initialize and return image generation tools.
Returns:
The image generation tool or None if initialization fails.
"""
try:
return Tool.from_space(
space_id="xkerser/FLUX.1-dev",
name="image_generator",
description="Generates high-quality AgentImage using the FLUX.1-dev model based on text prompts.",
)
except Exception as e:
print(f"✗ Couldn't initialize image generation tool: {e}")
return None
# ------------------------ Agent Creation and Execution ------------------------
def create_agent() -> CodeAgent:
"""Creates a fresh agent instance with configured tools.
Returns:
CodeAgent: Configured agent ready for use.
Raises:
ValueError: If tool validation fails.
RuntimeError: If agent creation fails.
"""
try:
# Initialize model
model = LiteLLMModel(
custom_role_conversions=CUSTOM_ROLE_CONVERSIONS,
model_id="openrouter/google/gemini-2.0-flash-001",
)
# Initialize tools
text_limit = 30000
browser = SimpleTextBrowser(**BROWSER_CONFIG)
# Create tool instances with proper error handling
web_tools = ToolRegistry.load_web_tools(model, browser, text_limit)
doc_tools = [] # Initialize as empty list
image_generator = None # Initialize as None
try:
doc_tools = ToolRegistry.load_document_tools()
except AssertionError as e:
print(f"Warning: Error loading document tools: {str(e)}")
print("Attempting to continue with available tools...")
image_generator = ToolRegistry.load_image_generation_tools()
# Combine available tools (filter out None values)
all_tools = [visualizer] + web_tools + doc_tools
if image_generator: # Add only if it's not None
all_tools.append(image_generator)
# Log available tools
print(f"Loaded {len(all_tools)} tools successfully")
for tool in all_tools:
print(f"- {tool.name}: {tool.description[:50]}...")
return CodeAgent(
model=model,
tools=all_tools,
max_steps=12,
verbosity_level=2,
additional_authorized_imports=AUTHORIZED_IMPORTS,
planning_interval=4,
)
except Exception as e:
print(f"Failed to create agent: {e}")
raise RuntimeError(f"Agent creation failed: {e}")
def stream_to_gradio(
agent,
task: str,
reset_agent_memory: bool = False,
additional_args: Optional[dict] = None,
):
"""Runs an agent with the given task and streams messages as Gradio ChatMessages."""
try:
for step_log in agent.run(
task, stream=True, reset=reset_agent_memory, additional_args=additional_args
):
for message in pull_messages_from_step(step_log):
yield message
# Process final answer with comprehensive media output
final_answer = step_log # Last log is the run's final_answer
final_answer = handle_agent_output_types(final_answer)
# Output handling based on type
if isinstance(final_answer, AgentText):
yield gr.ChatMessage(
role="assistant",
content=f"Final answer:\n{final_answer.to_string()}\n",
)
elif isinstance(final_answer, AgentImage):
yield gr.ChatMessage(
role="assistant",
content={"image": final_answer.to_string(), "type": "file"},
)
elif isinstance(final_answer, AgentAudio):
yield gr.ChatMessage(
role="assistant",
content={"audio": final_answer.to_string(), "type": "file"},
)
else:
yield gr.ChatMessage(
role="assistant", content=f"Final answer: {str(final_answer)}"
)
except Exception as e:
error_message = f"Error occurred during processing: {str(e)}\n\nPlease try again with a different query or check your inputs."
yield gr.ChatMessage(
role="assistant",
content=error_message,
)
# ------------------------ Gradio UI Components ------------------------
class GradioUI:
"""A one-line interface to launch your agent in Gradio."""
def __init__(self, file_upload_folder: str | None = None):
"""Initialize the Gradio UI with optional file upload functionality."""
self.file_upload_folder = file_upload_folder
self.allowed_extensions = ALLOWED_EXTENSIONS # Use the constant
if self.file_upload_folder:
os.makedirs(self.file_upload_folder, exist_ok=True)
def interact_with_agent(
self,
prompt: str,
messages: List[Dict],
session_state: Dict,
uploaded_files: List[str],
) -> List[Dict]:
"""Main interaction handler with the agent."""
if "agent" not in session_state:
try:
session_state["agent"] = create_agent()
session_state["creation_time"] = datetime.datetime.now()
session_state["request_count"] = 0
except Exception as e:
error_message = f"Error initializing agent: {str(e)}\n\nPlease refresh the page and try again."
messages.append(
gr.ChatMessage(
role="assistant",
content=error_message,
)
)
yield messages
return # Exit if can't create agent
session_state["request_count"] += 1
messages.append(gr.ChatMessage(role="user", content=prompt))
yield messages
file_message = ""
try:
if uploaded_files:
file_info = {}
for file_path in uploaded_files:
ext = os.path.splitext(file_path)[1].lower()
if ext in [".jpg", ".jpeg", ".png", ".gif", ".webp"]:
category = "images"
elif ext in [".mp3", ".wav", ".ogg"]:
category = "audio"
else:
category = "documents"
if category not in file_info:
file_info[category] = []
file_info[category].append(os.path.basename(file_path))
file_message = "\nYou have been provided with these files:\n"
for category, files in file_info.items():
file_message += f"- {category.capitalize()}: {', '.join(files)}\n"
prompt_with_files = prompt + file_message
else:
prompt_with_files = prompt
except Exception as e:
prompt_with_files = prompt
print(
f"WARNING: Error processing files: {e}. Continuing without file info."
)
try:
reset_needed = session_state["request_count"] > 15
for msg in stream_to_gradio(
session_state["agent"],
task=prompt_with_files,
reset_agent_memory=reset_needed,
):
messages.append(msg)
yield messages
if reset_needed:
session_state["request_count"] = 1
except Exception as e:
error_message = f"Error processing your request: {str(e)}\n\nPlease try again with a different query."
messages.append(
gr.ChatMessage(
role="assistant",
content=error_message,
)
)
yield messages
def log_user_message(self, text_input: str) -> Tuple[str, gr.Textbox, gr.Button]:
"""Process user message log files."""
return (
text_input,
gr.Textbox(value="", interactive=False, placeholder="Processing..."),
gr.Button(interactive=False),
)
def upload_file(self, files: List[str]) -> Tuple[str, List[str]]:
"""Handle file uploads with validation, security, and clear feedback."""
if not files:
return "No file uploaded", []
uploaded_files = []
error_message = None
for file_path in files:
try:
file_extension = os.path.splitext(file_path)[1].lower()
if file_extension not in self.allowed_extensions:
error_message = (
f"❌ File type '{file_extension}' is not allowed. "
f"Supported types: {', '.join(ALLOWED_EXTENSIONS)}"
)
return error_message, []
file_size_mb = os.path.getsize(file_path) / (1024 * 1024)
max_file_size_mb = 50
if file_size_mb > max_file_size_mb:
error_message = f"❌ File size ({file_size_mb:.1f} MB) exceeds {max_file_size_mb} MB limit."
return error_message, []
sanitized_name = re.sub(r"[^\w\-.]", "", os.path.basename(file_path))
dest_path = os.path.join(self.file_upload_folder, sanitized_name)
shutil.copy(file_path, dest_path)
uploaded_files.append(dest_path)
print(f"Uploaded {file_path} to {dest_path}")
except Exception as e:
error_message = f"❌ Upload error: {str(e)}"
return error_message, []
if error_message:
return error_message, []
return (
f"✓ Files uploaded successfully: {', '.join([os.path.basename(f) for f in uploaded_files])}",
uploaded_files,
)
def detect_device(self, request: gr.Request):
"""Detect whether the user is on mobile or desktop device."""
if not request:
return "Unknown device"
is_mobile_header = request.headers.get("sec-ch-ua-mobile")
if is_mobile_header:
return "Mobile" if "?1" in is_mobile_header else "Desktop"
user_agent = request.headers.get("user-agent", "").lower()
mobile_keywords = ["android", "iphone", "ipad", "mobile", "phone"]
if any(keyword in user_agent for keyword in mobile_keywords):
return "Mobile"
platform = request.headers.get("sec-ch-ua-platform", "").lower()
if platform:
if platform in ['"android"', '"ios"']:
return "Mobile"
if platform in ['"windows"', '"macos"', '"linux"']:
return "Desktop"
return "Desktop"
def launch(self, **kwargs):
"""Launch the Gradio UI with responsive layout."""
with gr.Blocks(theme="ocean", fill_height=True) as demo:
@gr.render()
def layout(request: gr.Request):
device = self.detect_device(request)
print(f"device - {device}")
if device == "Desktop":
return self._create_desktop_layout()
return self._create_mobile_layout()
demo.queue(max_size=20).launch(debug=True, **kwargs)
def _create_desktop_layout(self):
"""Create the desktop layout with sidebar and enhanced styling."""
with gr.Blocks(fill_height=True) as sidebar_demo:
with gr.Sidebar():
gr.Markdown(
"""# 🔍 OpenDeepResearch
### Smolagents + Document Tools
"""
)
with gr.Group():
gr.Markdown("What can I help you with today?", container=True)
text_input = gr.Textbox(
lines=4,
label="Your request",
container=False,
placeholder="Enter your question or task here...",
show_label=False,
)
with gr.Row():
clear_btn = gr.Button("Clear", variant="secondary")
launch_research_btn = gr.Button("Run", variant="primary")
if self.file_upload_folder:
with gr.Group():
gr.Markdown("📎 Upload Documents")
file_upload = gr.File(
label="Upload files for analysis",
file_types=self.allowed_extensions,
file_count="multiple",
)
upload_status = gr.Textbox(
label="Upload Status", interactive=False, visible=False
)
uploaded_files_state = gr.State([])
gr.HTML("<br><hr><h4><center>Powered by:</center></h4>")
with gr.Row():
gr.HTML(
"""
<div style="display: flex; align-items: center; justify-content: center; gap: 8px; font-family: system-ui, -apple-system, sans-serif;">
<img src="https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/smolagents/mascot_smol.png"
style="width: 32px; height: 32px; object-fit: contain;" alt="logo">
<a target="_blank" href="https://github.com/huggingface/smolagents">
<b>huggingface/smolagents</b>
</a>
</div>
"""
)
session_state = gr.State({})
stored_messages = gr.State([])
chatbot = gr.Chatbot(
label="OpenDeepResearch Assistant",
type="messages",
avatar_images=(
None,
"https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/smolagents/mascot_smol.png",
),
resizeable=True,
show_copy_button=True,
scale=1,
elem_id="my-chatbot",
height=700,
)
clear_btn.click(
lambda: ([], [], {"agent": session_state.get("agent")}, []),
None,
[chatbot, stored_messages, session_state, uploaded_files_state],
)
if self.file_upload_folder:
file_upload.change(
self.upload_file,
[file_upload],
[upload_status, uploaded_files_state],
)
self._connect_event_handlers(
text_input,
launch_research_btn,
stored_messages,
chatbot,
session_state,
uploaded_files_state,
)
return sidebar_demo
def _create_mobile_layout(self):
"""Create the mobile layout (simpler without sidebar)."""
with gr.Blocks(fill_height=True) as simple_demo:
gr.Markdown("""#OpenDeepResearch - free the AI agents!""")
session_state = gr.State({})
stored_messages = gr.State([])
file_upload = gr.File(
label="Upload files for analysis",
file_types=self.allowed_extensions,
file_count="multiple",
)
uploaded_files_state = gr.State([])
chatbot = gr.Chatbot(
label="open-Deep-Research",
type="messages",
avatar_images=(
None,
"https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/smolagents/mascot_smol.png",
),
resizeable=True,
scale=1,
)
if self.file_upload_folder:
upload_status = gr.Textbox(
label="Upload Status", interactive=False, visible=False
)
file_upload.change(
self.upload_file,
[file_upload],
[upload_status, uploaded_files_state],
)
text_input = gr.Textbox(
lines=1,
label="What's on your mind mate?",
placeholder="Chuck in a question and we'll take care of the rest",
)
launch_research_btn = gr.Button("Run", variant="primary")
self._connect_event_handlers(
text_input,
launch_research_btn,
stored_messages,
chatbot,
session_state,
uploaded_files_state,
)
return simple_demo
def _connect_event_handlers(
self,
text_input,
launch_research_btn,
stored_messages,
chatbot,
session_state,
uploaded_files_state,
):
"""Connect the event handlers for input elements."""
text_input.submit(
self.log_user_message,
[text_input],
[stored_messages, text_input, launch_research_btn],
).then(
self.interact_with_agent,
[stored_messages, chatbot, session_state, uploaded_files_state],
[chatbot],
).then(
lambda: (
gr.Textbox(
interactive=True,
placeholder="Enter your prompt here and press the button",
),
gr.Button(interactive=True),
),
None,
[text_input, launch_research_btn],
)
launch_research_btn.click(
self.log_user_message,
[text_input],
[stored_messages, text_input, launch_research_btn],
).then(
self.interact_with_agent,
[stored_messages, chatbot, session_state, uploaded_files_state],
[chatbot],
).then(
lambda: (
gr.Textbox(
interactive=True,
placeholder="Enter your prompt here and press the button",
),
gr.Button(interactive=True),
),
None,
[text_input, launch_research_btn],
)
# ------------------------ Execution ------------------------
def main():
"""Main entry point for the application."""
setup_environment()
os.makedirs(f"./{BROWSER_CONFIG['downloads_folder']}", exist_ok=True)
GradioUI(file_upload_folder="uploaded_files").launch()
if __name__ == "__main__":
main()