Leonardo
Update app.py
57d4e61 verified
raw
history blame
25.5 kB
"""Main application for the OpenDeepResearch Gradio interface."""
import mimetypes
import os
import re
import shutil
from typing import Optional
from cleantext import clean
from dotenv import load_dotenv
from huggingface_hub import login
import gradio as gr
from scripts.text_inspector_tool import TextInspectorTool
from scripts.text_web_browser import (
ArchiveSearchTool,
FinderTool,
FindNextTool,
PageDownTool,
PageUpTool,
SimpleTextBrowser,
VisitTool,
)
from scripts.visual_qa import visualizer
from scripts.text_cleaner_tool import TextCleanerTool
from smolagents import (
CodeAgent,
HfApiModel,
LiteLLMModel,
OpenAIServerModel,
TransformersModel,
GoogleSearchTool,
Tool,
)
from smolagents.agent_types import AgentText # AgentImage, AgentAudio
from smolagents.gradio_ui import pull_messages_from_step, handle_agent_output_types
# Constants and configurations
AUTHORIZED_IMPORTS = [
"requests", # Web requests (fetching data from the internet)
"zipfile", # Working with ZIP archives
"pandas", # Data manipulation and analysis (DataFrames)
"numpy", # Numerical computing (arrays, linear algebra)
"sympy", # Symbolic mathematics (algebra, calculus)
"json", # JSON data serialization/deserialization
"bs4", # Beautiful Soup for HTML/XML parsing
"pubchempy", # Accessing PubChem chemical database
"xml", # XML processing
"yahoo_finance", # Fetching stock data
"Bio", # Bioinformatics tools (e.g., sequence analysis)
"sklearn", # Scikit-learn for machine learning
"scipy", # Scientific computing (stats, optimization)
"pydub", # Audio manipulation
"PIL", # Pillow for image processing
"chess", # Chess-related functionality
"PyPDF2", # PDF manipulation
"pptx", # PowerPoint file manipulation
"torch", # PyTorch for neural networks
"datetime", # Date and time handling
"fractions", # Rational number arithmetic
"csv", # CSV file reading/writing
"cleantext", # Text cleaning and normalization
"os", # Operating system interaction (file system, etc.) VERY IMPORTANT
"re", # Regular expressions for text processing
"collections", # Useful data structures (e.g., defaultdict, Counter)
"math", # Basic mathematical functions
"random", # Random number generation
"io", # Input/output streams
"urllib.parse", # URL parsing and manipulation (safe URL handling)
"typing", # Support for type hints (improve code clarity)
"concurrent.futures", # For parallel execution
"time", # Measuring time
"tempfile", # Creating temporary files and directories
# Data Visualization (if needed) - Consider security implications carefully
"matplotlib", # Plotting library (basic charts)
"seaborn", # Statistical data visualization (more advanced)
# Web Scraping (more specific/controlled) - Consider ethical implications
"lxml", # Faster XML/HTML processing (alternative to bs4)
"selenium", # Automated browser control (for dynamic websites)
# Database interaction (if needed) - Handle credentials securely!
"sqlite3", # SQLite database access
# "psycopg2", # PostgreSQL adapter if needed
# Task scheduling
"schedule", # Allow the agent to schedule tasks
# Networking
# "socket", # Networking
]
USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0"
)
BROWSER_CONFIG = {
"viewport_size": 1024 * 5,
"downloads_folder": "downloads_folder",
"request_kwargs": {
"headers": {"User-Agent": USER_AGENT},
"timeout": 300,
},
"serpapi_key": os.getenv("SERPAPI_API_KEY"),
}
CUSTOM_ROLE_CONVERSIONS = {"tool-call": "assistant", "tool-response": "user"}
ALLOWED_FILE_TYPES = [
"application/pdf",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"text/plain",
"text/markdown", # Added Markdown support
"application/json", # Added JSON support
"image/png",
"image/webp",
"image/jpeg", # Added JPEG support
"image/gif", # Added GIF support
"video/mp4",
"audio/mpeg", # Added MP3 support
"audio/wav", # Added WAV support
"audio/ogg", # Added OGG support
]
def setup_environment():
"""Initialize environment variables and authentication."""
load_dotenv(override=True)
hf_token = os.getenv("HF_TOKEN")
if hf_token: # Check if token is actually set
login(hf_token)
print("HF_TOKEN (last 10 characters):", hf_token[-10:])
else:
print("HF_TOKEN not found in environment variables.")
class ModelManager:
"""Manages model loading and initialization."""
@staticmethod
def load_model(chosen_inference: str, model_id: str, key_manager=None):
"""Load the specified model with appropriate configuration."""
try:
if chosen_inference == "hf_api":
return HfApiModel(model_id=model_id)
if chosen_inference == "hf_api_provider":
return HfApiModel(provider="together")
if chosen_inference == "litellm":
return LiteLLMModel(model_id=model_id)
if chosen_inference == "openai":
if not key_manager:
raise ValueError("Key manager required for OpenAI model")
return OpenAIServerModel(
model_id=model_id, api_key=key_manager.get_key("openai_api_key")
)
if chosen_inference == "transformers":
return TransformersModel(
model_id="HuggingFaceTB/SmolLM2-1.7B-Instruct",
device_map="auto",
max_new_tokens=1000,
)
raise ValueError(f"Invalid inference type: {chosen_inference}")
except (ValueError, RuntimeError) as e: # More specific exceptions
print(f"Model loading failed: {e}")
raise
class ToolRegistry:
"""Manages tool initialization and organization."""
@staticmethod
def load_web_tools(model, browser, text_limit=20000):
"""Initialize and return web-related tools."""
return [
GoogleSearchTool(provider="serper"),
VisitTool(browser),
PageUpTool(browser),
PageDownTool(browser),
FinderTool(browser),
FindNextTool(browser),
ArchiveSearchTool(browser),
TextInspectorTool(model, text_limit),
]
@staticmethod
def load_image_generation_tools():
"""Initialize and return image generation tools."""
try:
return Tool.from_space(
space_id="xkerser/FLUX.1-dev",
name="image_generator",
description=(
"Generates high-quality AgentImage. "
"with text prompt (77 token limit)."
),
)
except Exception as e:
print(f"✗ Couldn't initialize image generation tool: {e}")
raise
@staticmethod
def load_clean_text_tool():
"""Initialize and return text cleaning tool."""
try:
return TextCleanerTool()
except Exception as e:
print(f"✗ Couldn't initialize clean text tool: {e}")
raise
def create_agent():
"""Creates a fresh agent instance with properly configured tools."""
# Initialize model
model = LiteLLMModel(
custom_role_conversions=CUSTOM_ROLE_CONVERSIONS,
# Currently serving:
model_id="openrouter/anthropic/claude-3.7-sonnet",
) # DEEPSEEK = openrouter/perplexity/r1-1776 <--- boss model
# Initialize tools
text_limit = 20000
browser = SimpleTextBrowser(**BROWSER_CONFIG)
# Collect all tools in a single list
web_tools = ToolRegistry.load_web_tools(model, browser, text_limit)
image_generator = ToolRegistry.load_image_generation_tools()
clean_text = TextCleanerTool() # Instantiate TextCleanerTool
# Combine all tools into a single list
all_tools = [visualizer] + web_tools + [image_generator, clean_text]
# Validate tools before creating agent
for tool in all_tools:
if not isinstance(tool, Tool):
raise ValueError(
f"Invalid tool type: {type(tool)}. "
f"All tools must be instances of Tool class."
)
return CodeAgent(
model=model,
tools=all_tools, # Pass a single list containing all tools
max_steps=10,
verbosity_level=1,
additional_authorized_imports=AUTHORIZED_IMPORTS,
planning_interval=4,
)
def stream_to_gradio(
agent,
task: str,
reset_agent_memory: bool = False,
additional_args: Optional[dict] = None,
):
"""Streams agent responses with improved status indicators."""
try:
# Initial processing indicator
yield gr.ChatMessage(role="assistant", content="⏳ Processing your request...")
# Track what we've yielded to replace the processing indicator
first_message_yielded = False
for step_log in agent.run(
task, stream=True, reset=reset_agent_memory, additional_args=additional_args
):
# pull_messages_from_step is a generator function that yields messages
# We need to iterate through each yielded message
for message in pull_messages_from_step(step_log):
if not first_message_yielded:
# Replace the initial "Processing" message
first_message_yielded = True
message.content = message.content.replace(
"⏳ Processing your request...", ""
)
# Check message content for document analysis or search references
content_lower = (
message.content.lower() if hasattr(message, "content") else ""
)
if "document analysis" in content_lower:
message.content = f"📄 **Document Analysis:** {message.content}"
elif "search" in content_lower:
message.content = f"🔍 **Search:** {message.content}"
yield message
# Final answer with enhanced formatting
final_answer = handle_agent_output_types(step_log)
if isinstance(final_answer, AgentText):
yield gr.ChatMessage(
role="assistant",
content=f"✅ **Final Answer:**\n\n{final_answer.to_string()}",
)
else:
yield gr.ChatMessage(
role="assistant", content=f"✅ **Final Answer:** {str(final_answer)}"
)
except Exception as e:
yield gr.ChatMessage(
role="assistant",
content=(
f"❌ **Error:** {str(e)}\n\n"
f"Please try again with a different query."
),
)
class GradioUI:
def __init__(self, file_upload_folder=None, max_queue_size=50):
# Initialize all attributes here
self.file_upload_folder = file_upload_folder
self.max_queue_size = max_queue_size
self.text_input = None
self.submit_btn = None
self.stop_btn = None
self.clear_btn = None
self.status = None
self.chatbot = None
self.session_state = None
self.job = None
if self.file_upload_folder is not None:
os.makedirs(file_upload_folder, exist_ok=True)
def interact_with_agent(self, prompt, messages, session_state):
"""Main interaction handler with the agent."""
# Get or create session-specific agent
if "agent" not in session_state:
session_state["agent"] = create_agent()
# Adding monitoring
try:
# Log the existence of agent memory
has_memory = hasattr(session_state["agent"], "memory")
print(f"Agent has memory: {has_memory}")
if has_memory:
print(f"Memory type: {type(session_state['agent'].memory)}")
messages.append(gr.ChatMessage(role="user", content=prompt))
yield messages
for msg in stream_to_gradio(
session_state["agent"], task=prompt, reset_agent_memory=False
):
messages.append(msg)
yield messages # Yield messages after each step
yield messages # Yield messages one last time
except Exception as e:
print(f"Error in interaction: {str(e)}")
raise
def upload_file(
self,
file,
file_uploads_log,
):
"""Handle file uploads with proper validation and security."""
if file is None:
return gr.Textbox("No file uploaded", visible=True), file_uploads_log
try:
mime_type, _ = mimetypes.guess_type(file.name)
except Exception as e:
return gr.Textbox(f"Error: {e}", visible=True), file_uploads_log
if mime_type not in ALLOWED_FILE_TYPES:
return gr.Textbox("File type disallowed", visible=True), file_uploads_log
# Sanitize file name
original_name = os.path.basename(file.name)
# Replace invalid chars with underscores
sanitized_name = re.sub(r"[^\w\-.]", "_", original_name)
# Ensure the extension correlates to the mime type
type_to_ext = {}
for ext, t in mimetypes.types_map.items():
if t not in type_to_ext:
type_to_ext[t] = ext
# Build sanitized filename with proper extension
name_parts = sanitized_name.split(".")[:-1]
extension = type_to_ext.get(mime_type, "")
sanitized_name = "".join(name_parts) + extension
# Limit File Size, and Throw Error
max_file_size_mb = 50 # Define the limit
file_size_mb = os.path.getsize(file.name) / (1024 * 1024) # Size in MB
if file_size_mb > max_file_size_mb:
return (
gr.Textbox(
f"File size exceeds {max_file_size_mb} MB limit.", visible=True
),
file_uploads_log,
)
# Save the uploaded file to the specified folder
file_path = os.path.join(self.file_upload_folder, sanitized_name)
shutil.copy(file.name, file_path)
return gr.Textbox(
f"File uploaded: {file_path}", visible=True
), file_uploads_log + [file_path]
def log_user_message(self, text_input, file_uploads_log):
"""Process user message and handle file references."""
cleaned_message = clean(
text_input,
fix_unicode=True,
to_ascii=True,
lower=True,
no_line_breaks=False,
no_urls=False,
no_emails=False,
no_phone_numbers=False,
no_numbers=False,
no_digits=False,
no_currency_symbols=False,
no_punct=False,
lang="en",
) # Can change default behaviour with TextCleanerTool
message = cleaned_message # Use the cleaned message
if file_uploads_log:
# Added file list to message
message += (
f"\nYou have been provided with these files, which might be "
f"helpful or not: {file_uploads_log}"
)
return (
message,
gr.Textbox(
value="",
interactive=False,
placeholder="Processing...", # Changed placeholder.
),
gr.Button(interactive=False),
)
def detect_device(self, request: gr.Request):
"""Detect whether the user is on mobile or desktop device."""
if not request:
return "Unknown device" # Handle case where request is none.
# Method 1: Check sec-ch-ua-mobile header
is_mobile_header = request.headers.get("sec-ch-ua-mobile")
if is_mobile_header:
return "Mobile" if "?1" in is_mobile_header else "Desktop"
# Method 2: Check user-agent string
user_agent = request.headers.get("user-agent", "").lower()
mobile_keywords = ["android", "iphone", "ipad", "mobile", "phone"]
if any(keyword in user_agent for keyword in mobile_keywords):
return "Mobile"
# Method 3: Check platform
platform = request.headers.get("sec-ch-ua-platform", "").lower()
if platform:
if platform in ['"android"', '"ios"']:
return "Mobile"
return "Desktop"
# Default case if no clear indicators
return "Desktop"
def launch(self, **kwargs):
"""Launch the Gradio UI with responsive layout."""
with gr.Blocks(theme="ocean", fill_height=True) as demo:
# Different layouts for mobile and computer devices
@gr.render()
def layout(request: gr.Request):
device = self.detect_device(request)
print(f"device - {device}")
# Render layout with sidebar
if device == "Desktop":
return self._create_desktop_layout()
else:
return self._create_mobile_layout()
demo.queue(max_size=20).launch(
debug=True, **kwargs
) # Add queue with reasonable size
def _create_desktop_layout(self):
"""Create the desktop layout with sidebar."""
with gr.Blocks(fill_height=True) as sidebar_demo:
with gr.Sidebar():
gr.Markdown(
"""#OpenDeepResearch - 3theSmolagents!
Model_id: anthropic/claude-3.7-sonnet"""
)
with gr.Group():
gr.Markdown("**What's on your mind mate?**", container=True)
text_input = gr.Textbox(
lines=3,
label="Your request",
container=False,
placeholder=(
"Enter your prompt here and press Shift+Enter or "
"press the button"
),
)
launch_research_btn = gr.Button("Run", variant="primary")
# If an upload folder is provided, enable the upload feature
if self.file_upload_folder is not None:
upload_file = gr.File(label="Upload a file")
upload_status = gr.Textbox(
label="Upload Status", interactive=False, visible=False
)
file_uploads_log = gr.State([])
upload_file.change(
self.upload_file,
[upload_file, file_uploads_log],
[upload_status, file_uploads_log],
)
gr.HTML("<br><br><h4><center>Powered by:</center></h4>")
with gr.Row():
gr.HTML(
"""
<div style="display: flex; align-items: center; gap: 8px;
font-family: system-ui, -apple-system, sans-serif;">
<img src="https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/smolagents/mascot_smol.png"
style="width: 32px; height: 32px; object-fit: contain;"
alt="logo">
<a target="_blank" href="https://github.com/huggingface/smolagents">
<b>huggingface/smolagents</b>
</a>
</div>
"""
)
# Add session state to store session-specific data
# Initialize empty state for each session
session_state = gr.State({})
stored_messages = gr.State([])
if "file_uploads_log" not in locals():
file_uploads_log = gr.State([])
chatbot = gr.Chatbot(
label="ODR",
type="messages",
avatar_images=(
None,
"https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/smolagents/mascot_smol.png",
),
resizeable=False,
scale=1,
elem_id="my-chatbot",
)
self._connect_event_handlers(
text_input,
launch_research_btn,
file_uploads_log,
stored_messages,
chatbot,
session_state,
)
return sidebar_demo
def _create_mobile_layout(self):
"""Create the mobile layout (simpler without sidebar)."""
with gr.Blocks(fill_height=True) as simple_demo:
gr.Markdown("""#OpenDeepResearch - free the AI agents!""")
# Add session state to store session-specific data
session_state = gr.State({})
stored_messages = gr.State([])
file_uploads_log = gr.State([])
chatbot = gr.Chatbot(
label="ODR",
type="messages",
avatar_images=(
None,
"https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/smolagents/mascot_smol.png",
),
resizeable=True,
scale=1,
)
# If an upload folder is provided, enable the upload feature
if self.file_upload_folder is not None:
upload_file = gr.File(label="Upload a file")
upload_status = gr.Textbox(
label="Upload Status", interactive=False, visible=False
)
upload_file.change(
self.upload_file,
[upload_file, file_uploads_log],
[upload_status, file_uploads_log],
)
text_input = gr.Textbox(
lines=1,
label="What's on your mind mate?",
placeholder="Chuck in a question and we'll take care of the rest",
)
launch_research_btn = gr.Button("Run", variant="primary")
self._connect_event_handlers(
text_input,
launch_research_btn,
file_uploads_log,
stored_messages,
chatbot,
session_state,
)
return simple_demo
def _create_common_ui_elements(self):
"""Create common UI elements with control buttons."""
with gr.Group():
self.text_input = gr.Textbox(
lines=3,
label="Your request",
placeholder="Enter your question about the documents...",
elem_classes=["prompt-box"],
)
with gr.Row():
self.submit_btn = gr.Button("Run", variant="primary")
self.stop_btn = gr.Button("Stop Generation", variant="stop")
self.clear_btn = gr.Button("Clear Chat", variant="secondary")
# Status indicator for document processing
self.status = gr.Textbox(
"", label="Status", interactive=False, visible=True
)
def _connect_event_handlers(
self,
text_input,
launch_research_btn,
file_uploads_log,
stored_messages,
chatbot,
session_state,
):
"""Connect event handlers with appropriate parameters."""
# Define the job handler for stopping generation
self.job = None
def start_processing(prompt, chat_history):
# We'll use the passed components directly rather than self.status
return prompt, chat_history
def stop_generation():
if self.job:
self.job.cancel()
def clear_chat():
return [], gr.Textbox(interactive=True), gr.Button(interactive=True), ""
# Connect text input submission
text_input.submit(
self.log_user_message,
[text_input, file_uploads_log],
[stored_messages, text_input, launch_research_btn],
).then(
self.interact_with_agent,
[stored_messages, chatbot, session_state],
[chatbot],
)
# Connect button click
launch_research_btn.click(
self.log_user_message,
[text_input, file_uploads_log],
[stored_messages, text_input, launch_research_btn],
).then(
self.interact_with_agent,
[stored_messages, chatbot, session_state],
[chatbot],
)
# Store the job for cancellation if needed
self.job = None # This would need to be assigned to an actual event
def main():
"""Main entry point for the application."""
# Initialize environment
setup_environment()
# Ensure downloads folder exists
os.makedirs(f"./{BROWSER_CONFIG['downloads_folder']}", exist_ok=True)
# Launch UI
GradioUI(file_upload_folder="uploaded_files").launch()
if __name__ == "__main__":
main()