Leonardo
Update app.py
c7daeea verified
raw
history blame
20.8 kB
"""Main application for the OpenDeepResearch Gradio interface."""
import mimetypes
import os
import re
import shutil
from typing import Optional
from dotenv import load_dotenv
from huggingface_hub import login
import gradio as gr
from scripts.text_inspector_tool import TextInspectorTool
from scripts.text_web_browser import (
ArchiveSearchTool,
FinderTool,
FindNextTool,
PageDownTool,
PageUpTool,
SimpleTextBrowser,
VisitTool,
)
from scripts.visual_qa import visualizer
from scripts.text_cleaner_tool import TextCleanerTool
from smolagents import (
CodeAgent,
HfApiModel,
LiteLLMModel,
OpenAIServerModel,
TransformersModel,
GoogleSearchTool,
Tool,
)
from smolagents.agent_types import AgentText, AgentImage, AgentAudio
from smolagents.gradio_ui import pull_messages_from_step, handle_agent_output_types
# Constants and configurations
AUTHORIZED_IMPORTS = [
"requests",
"zipfile",
"pandas",
"numpy",
"sympy",
"json",
"bs4",
"pubchempy",
"xml",
"yahoo_finance",
"Bio",
"sklearn",
"scipy",
"pydub",
"PIL",
"chess",
"PyPDF2",
"pptx",
"torch",
"datetime",
"fractions",
"csv",
"clean-text",
]
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0"
BROWSER_CONFIG = {
"viewport_size": 1024 * 5,
"downloads_folder": "downloads_folder",
"request_kwargs": {
"headers": {"User-Agent": USER_AGENT},
"timeout": 300,
},
"serpapi_key": os.getenv("SERPAPI_API_KEY"),
}
CUSTOM_ROLE_CONVERSIONS = {"tool-call": "assistant", "tool-response": "user"}
ALLOWED_FILE_TYPES = [
"application/pdf",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"text/plain",
"text/markdown", # Added Markdown support
"application/json", # Added JSON support
"image/png",
"image/webp",
"image/jpeg", # Added JPEG support
"image/gif", # Added GIF support
"video/mp4",
"audio/mpeg", # Added MP3 support
"audio/wav", # Added WAV support
"audio/ogg", # Added OGG support
]
def setup_environment():
"""Initialize environment variables and authentication."""
load_dotenv(override=True)
hf_token = os.getenv("HF_TOKEN")
if hf_token: # Check if token is actually set
login(hf_token)
print("HF_TOKEN (last 10 characters):", hf_token[-10:])
else:
print("HF_TOKEN not found in environment variables.")
class ModelManager:
"""Manages model loading and initialization."""
@staticmethod
def load_model(chosen_inference: str, model_id: str, key_manager=None):
"""Load the specified model with appropriate configuration."""
try:
if chosen_inference == "hf_api":
return HfApiModel(model_id=model_id)
if chosen_inference == "hf_api_provider":
return HfApiModel(provider="together")
if chosen_inference == "litellm":
return LiteLLMModel(model_id=model_id)
if chosen_inference == "openai":
if not key_manager:
raise ValueError("Key manager required for OpenAI model")
return OpenAIServerModel(
model_id=model_id, api_key=key_manager.get_key("openai_api_key")
)
if chosen_inference == "transformers":
return TransformersModel(
model_id="HuggingFaceTB/SmolLM2-1.7B-Instruct",
device_map="auto",
max_new_tokens=1000,
)
raise ValueError(f"Invalid inference type: {chosen_inference}")
except Exception as e:
print(f"✗ Couldn't load model: {e}")
raise
class ToolRegistry:
"""Manages tool initialization and organization."""
@staticmethod
def load_web_tools(model, browser, text_limit=20000):
"""Initialize and return web-related tools."""
return [
GoogleSearchTool(provider="serper"),
VisitTool(browser),
PageUpTool(browser),
PageDownTool(browser),
FinderTool(browser),
FindNextTool(browser),
ArchiveSearchTool(browser),
TextInspectorTool(model, text_limit),
]
@staticmethod
def load_image_generation_tools():
"""Initialize and return image generation tools."""
try:
return Tool.from_space(
space_id="xkerser/FLUX.1-dev",
name="image_generator",
description="Generates high-quality AgentImage with text prompt (77 token limit).",
)
except Exception as e:
print(f"✗ Couldn't initialize image generation tool: {e}")
raise
@staticmethod
def load_clean_text_tool():
"""Initialize and return image generation tools."""
try:
return TextCleanerTool
except Exception as e:
print(f"✗ Couldn't initialize clean text tool: {e}")
raise
def create_agent():
"""Creates a fresh agent instance with properly configured tools."""
# Initialize model
model = LiteLLMModel(
custom_role_conversions=CUSTOM_ROLE_CONVERSIONS,
model_id="openrouter/deepseek/deepseek-chat-v3-0324:free", # currently serving:
) # DEEPSEEK = openrouter/perplexity/r1-1776 <--- boss model
# Initialize tools
text_limit = 30000
browser = SimpleTextBrowser(**BROWSER_CONFIG)
# Collect all tools in a single list
web_tools = ToolRegistry.load_web_tools(model, browser, text_limit)
image_generator = ToolRegistry.load_image_generation_tools()
clean_text = TextCleanerTool() # Instantiate TextCleanerTool
# Combine all tools into a single list (not a tuple)
all_tools = [visualizer] + web_tools + [image_generator] + [clean_text]
# Validate tools before creating agent
for tool in all_tools:
if not isinstance(tool, Tool):
raise ValueError(
"Invalid tool type: "
f"{type(tool)}. All tools must be instances of Tool class."
)
return CodeAgent(
model=model,
tools=all_tools, # Pass a single list containing all tools
max_steps=12,
verbosity_level=2,
additional_authorized_imports=AUTHORIZED_IMPORTS,
planning_interval=4,
)
def stream_to_gradio(
agent,
task: str,
reset_agent_memory: bool = False,
additional_args: Optional[dict] = None,
):
"""Runs an agent with the given task and streams messages as Gradio ChatMessages."""
for step_log in agent.run(
task, stream=True, reset=reset_agent_memory, additional_args=additional_args
):
yield from pull_messages_from_step(step_log)
# Process final answer : Use a more comprehensive media output
final_answer = step_log # Last log is the run's final_answer
final_answer = handle_agent_output_types(final_answer)
if isinstance(final_answer, AgentText):
yield gr.ChatMessage(
role="assistant",
content=f"**Final answer:**\n{final_answer.to_string()}\n",
)
elif isinstance(final_answer, AgentImage):
yield gr.ChatMessage(
role="assistant",
content={"image": final_answer.to_string(), "type": "file"},
) # Send as Gradio-compatible file object:
elif isinstance(final_answer, AgentAudio):
yield gr.ChatMessage(
role="assistant",
content={"audio": final_answer.to_string(), "type": "file"},
) # Send as Gradio-compatible file object
else:
yield gr.ChatMessage(
role="assistant", content=f"**Final answer:** {str(final_answer)}"
)
class GradioUI:
"""A one-line interface to launch your agent in Gradio."""
def __init__(self, file_upload_folder: str | None = None):
"""Initialize the Gradio UI with optional file upload functionality."""
self.file_upload_folder = file_upload_folder
if self.file_upload_folder is not None:
os.makedirs(file_upload_folder, exist_ok=True)
def interact_with_agent(self, prompt, messages, session_state):
"""Main interaction handler with the agent."""
# Get or create session-specific agent
if "agent" not in session_state:
session_state["agent"] = create_agent()
# Adding monitoring
try:
# Log the existence of agent memory
has_memory = hasattr(session_state["agent"], "memory")
print(f"Agent has memory: {has_memory}")
if has_memory:
print(f"Memory type: {type(session_state['agent'].memory)}")
messages.append(gr.ChatMessage(role="user", content=prompt))
yield messages
for msg in stream_to_gradio(
session_state["agent"], task=prompt, reset_agent_memory=False
):
messages.append(msg)
yield messages # Yield messages after each step
yield messages # Yield messages one last time
except Exception as e:
print(f"Error in interaction: {str(e)}")
raise
def upload_file(
self,
file,
file_uploads_log,
):
"""Handle file uploads with proper validation and security."""
if file is None:
return gr.Textbox("No file uploaded", visible=True), file_uploads_log
try:
mime_type, _ = mimetypes.guess_type(file.name)
except Exception as e:
return gr.Textbox(f"Error: {e}", visible=True), file_uploads_log
if mime_type not in ALLOWED_FILE_TYPES:
return gr.Textbox("File type disallowed", visible=True), file_uploads_log
# Sanitize file name
original_name = os.path.basename(file.name)
sanitized_name = re.sub(
r"[^\w\-.]", "_", original_name
) # Replace invalid chars with underscores
# Ensure the extension correlates to the mime type
type_to_ext = {}
for ext, t in mimetypes.types_map.items():
if t not in type_to_ext:
type_to_ext[t] = ext
# Build sanitized filename with proper extension
name_parts = sanitized_name.split(".")[:-1]
extension = type_to_ext.get(mime_type, "")
sanitized_name = "".join(name_parts) + extension
# Limit File Size, and Throw Error
max_file_size_mb = 50 # Define the limit
file_size_mb = os.path.getsize(file.name) / (1024 * 1024) # Size in MB
if file_size_mb > max_file_size_mb:
return (
gr.Textbox(
f"File size exceeds {max_file_size_mb} MB limit.", visible=True
),
file_uploads_log,
)
# Save the uploaded file to the specified folder
file_path = os.path.join(self.file_upload_folder, sanitized_name)
shutil.copy(file.name, file_path)
return gr.Textbox(
f"File uploaded: {file_path}", visible=True
), file_uploads_log + [file_path]
def log_user_message(self, text_input, file_uploads_log):
"""Process user message and handle file references."""
message = text_input
if file_uploads_log:
message += f"\nYou have been provided with these files, which might be helpful or not: {file_uploads_log}" # Added file list
return (
message,
gr.Textbox(
value="",
interactive=False,
placeholder="Processing...", # Changed placeholder.
),
gr.Button(interactive=False),
)
def detect_device(self, request: gr.Request):
"""Detect whether the user is on mobile or desktop device."""
if not request:
return "Unknown device" # Handle case where request is none.
# Method 1: Check sec-ch-ua-mobile header
is_mobile_header = request.headers.get("sec-ch-ua-mobile")
if is_mobile_header:
return "Mobile" if "?1" in is_mobile_header else "Desktop"
# Method 2: Check user-agent string
user_agent = request.headers.get("user-agent", "").lower()
mobile_keywords = ["android", "iphone", "ipad", "mobile", "phone"]
if any(keyword in user_agent for keyword in mobile_keywords):
return "Mobile"
# Method 3: Check platform
platform = request.headers.get("sec-ch-ua-platform", "").lower()
if platform:
if platform in ['"android"', '"ios"']:
return "Mobile"
elif platform in ['"windows"', '"macos"', '"linux"']:
return "Desktop"
# Default case if no clear indicators
return "Desktop"
def launch(self, **kwargs):
"""Launch the Gradio UI with responsive layout."""
with gr.Blocks(theme="ocean", fill_height=True) as demo:
# Different layouts for mobile and computer devices
@gr.render()
def layout(request: gr.Request):
device = self.detect_device(request)
print(f"device - {device}")
# Render layout with sidebar
if device == "Desktop":
return self._create_desktop_layout()
else:
return self._create_mobile_layout()
demo.queue(max_size=20).launch(
debug=True, **kwargs
) # Add queue with reasonable size
def _create_desktop_layout(self):
"""Create the desktop layout with sidebar."""
with gr.Blocks(fill_height=True) as sidebar_demo:
with gr.Sidebar():
gr.Markdown(
"""#OpenDeepResearch - 3theSmolagents!
Model_id: deepseek/deepseek-chat-v3-0324:free"""
)
with gr.Group():
gr.Markdown("**What's on your mind mate?**", container=True)
text_input = gr.Textbox(
lines=3,
label="Your request",
container=False,
placeholder="Enter your prompt here and press Shift+Enter or press the button",
)
launch_research_btn = gr.Button("Run", variant="primary")
# If an upload folder is provided, enable the upload feature
if self.file_upload_folder is not None:
upload_file = gr.File(label="Upload a file")
upload_status = gr.Textbox(
label="Upload Status", interactive=False, visible=False
)
file_uploads_log = gr.State([])
upload_file.change(
self.upload_file,
[upload_file, file_uploads_log],
[upload_status, file_uploads_log],
)
gr.HTML("<br><br><h4><center>Powered by:</center></h4>")
with gr.Row():
gr.HTML(
"""
<div style="display: flex; align-items: center; gap: 8px; font-family: system-ui, -apple-system, sans-serif;">
<img src="https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/smolagents/mascot_smol.png"
style="width: 32px; height: 32px; object-fit: contain;" alt="logo">
<a target="_blank" href="https://github.com/huggingface/smolagents">
<b>huggingface/smolagents</b>
</a>
</div>
"""
)
# Add session state to store session-specific data
session_state = gr.State({}) # Initialize empty state for each session
stored_messages = gr.State([])
if "file_uploads_log" not in locals():
file_uploads_log = gr.State([])
chatbot = gr.Chatbot(
label="open-Deep-Research",
type="messages",
avatar_images=(
None,
"https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/smolagents/mascot_smol.png",
),
resizeable=False,
scale=1,
elem_id="my-chatbot",
)
self._connect_event_handlers(
text_input,
launch_research_btn,
file_uploads_log,
stored_messages,
chatbot,
session_state,
)
return sidebar_demo
def _create_mobile_layout(self):
"""Create the mobile layout (simpler without sidebar)."""
with gr.Blocks(fill_height=True) as simple_demo:
gr.Markdown("""#OpenDeepResearch - free the AI agents!""")
# Add session state to store session-specific data
session_state = gr.State({})
stored_messages = gr.State([])
file_uploads_log = gr.State([])
chatbot = gr.Chatbot(
label="ODR",
type="messages",
avatar_images=(
None,
"https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/smolagents/mascot_smol.png",
),
resizeable=True,
scale=1,
)
# If an upload folder is provided, enable the upload feature
if self.file_upload_folder is not None:
upload_file = gr.File(label="Upload a file")
upload_status = gr.Textbox(
label="Upload Status", interactive=False, visible=False
)
upload_file.change(
self.upload_file,
[upload_file, file_uploads_log],
[upload_status, file_uploads_log],
)
text_input = gr.Textbox(
lines=1,
label="What's on your mind mate?",
placeholder="Chuck in a question and we'll take care of the rest",
)
launch_research_btn = gr.Button("Run", variant="primary")
self._connect_event_handlers(
text_input,
launch_research_btn,
file_uploads_log,
stored_messages,
chatbot,
session_state,
)
return simple_demo
def _connect_event_handlers(
self,
text_input,
launch_research_btn,
file_uploads_log,
stored_messages,
chatbot,
session_state,
):
"""Connect the event handlers for input elements."""
# Connect text input submit event
text_input.submit(
self.log_user_message,
[text_input, file_uploads_log],
[stored_messages, text_input, launch_research_btn],
).then(
self.interact_with_agent,
[stored_messages, chatbot, session_state],
[chatbot],
).then(
lambda: (
gr.Textbox(
interactive=True,
placeholder="Enter your prompt here and press the button",
),
gr.Button(interactive=True),
),
None,
[text_input, launch_research_btn],
)
# Connect button click event
launch_research_btn.click(
self.log_user_message,
[text_input, file_uploads_log],
[stored_messages, text_input, launch_research_btn],
).then(
self.interact_with_agent,
[stored_messages, chatbot, session_state],
[chatbot],
).then(
lambda: (
gr.Textbox(
interactive=True,
placeholder="Enter your prompt here and press the button",
),
gr.Button(interactive=True),
),
None,
[text_input, launch_research_btn],
)
def main():
"""Main entry point for the application."""
# Initialize environment
setup_environment()
# Ensure downloads folder exists
os.makedirs(f"./{BROWSER_CONFIG['downloads_folder']}", exist_ok=True)
# Launch UI
GradioUI(file_upload_folder="uploaded_files").launch()
if __name__ == "__main__":
main()