"""Main application for the OpenDeepResearch Gradio interface.""" import sys import mimetypes import traceback from dataclasses import dataclass import os import re import shutil import time from typing import Optional, Dict, Any from datetime import datetime from cleantext import clean from dotenv import load_dotenv from huggingface_hub import login import gradio as gr from scripts.text_inspector_tool import TextInspectorTool from scripts.text_web_browser import ( ArchiveSearchTool, FinderTool, FindNextTool, PageDownTool, PageUpTool, SimpleTextBrowser, VisitTool, ) from scripts.visual_qa import visualizer from scripts.text_cleaner_tool import TextCleanerTool from smolagents import ( CodeAgent, HfApiModel, LiteLLMModel, OpenAIServerModel, TransformersModel, GoogleSearchTool, Tool, ) from smolagents.agent_types import AgentText # AgentImage, AgentAudio from smolagents.gradio_ui import pull_messages_from_step, handle_agent_output_types # Constants and configurations - Converted to UPPER_CASE AUTHORIZED_IMPORTS = [ "requests", # Web requests (fetching data from the internet) "zipfile", # Working with ZIP archives "pandas", # Data manipulation and analysis (DataFrames) "numpy", # Numerical computing (arrays, linear algebra) "sympy", # Symbolic mathematics (algebra, calculus) "json", # JSON data serialization/deserialization "bs4", # Beautiful Soup for HTML/XML parsing "pubchempy", # Accessing PubChem chemical database "xml", # XML processing "yahoo_finance", # Fetching stock data "Bio", # Bioinformatics tools (e.g., sequence analysis) "sklearn", # Scikit-learn for machine learning "scipy", # Scientific computing (stats, optimization) "pydub", # Audio manipulation "PIL", # Pillow for image processing "chess", # Chess-related functionality "PyPDF2", # PDF manipulation "pptx", # PowerPoint file manipulation "torch", # PyTorch for neural networks "datetime", # Date and time handling "fractions", # Rational number arithmetic "csv", # CSV file reading/writing "cleantext", # Text cleaning and normalization "os", # Operating system interaction (file system, etc.) VERY IMPORTANT "re", # Regular expressions for text processing "collections", # Useful data structures (e.g., defaultdict, Counter) "math", # Basic mathematical functions "random", # Random number generation "io", # Input/output streams "urllib.parse", # URL parsing and manipulation (safe URL handling) "typing", # Support for type hints (improve code clarity) "concurrent.futures", # For parallel execution "time", # Measuring time "tempfile", # Creating temporary files and directories # Data Visualization (if needed) - Consider security implications carefully "matplotlib", # Plotting library (basic charts) "seaborn", # Statistical data visualization (more advanced) # Web Scraping (more specific/controlled) - Consider ethical implications "lxml", # Faster XML/HTML processing (alternative to bs4) "selenium", # Automated browser control (for dynamic websites) # Database interaction (if needed) - Handle credentials securely! "sqlite3", # SQLite database access # Task scheduling "schedule", # Allow the agent to schedule tasks ] USER_AGENT = ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0" ) BROWSER_CONFIG = { "viewport_size": 1024 * 5, "downloads_folder": "downloads_folder", "request_kwargs": { "headers": {"User-Agent": USER_AGENT}, "timeout": 300, }, "serpapi_key": os.getenv("SERPAPI_API_KEY"), } CUSTOM_ROLE_CONVERSIONS = {"tool-call": "assistant", "tool-response": "user"} ALLOWED_FILE_TYPES = [ "application/pdf", "application/vnd.openxmlformats-officedocument.wordprocessingml.document", "text/plain", "text/markdown", "application/json", "image/png", "image/webp", "image/jpeg", "image/gif", "video/mp4", "audio/mpeg", "audio/wav", "audio/ogg", ] # Maximum chat history length to prevent memory issues MAX_CHAT_HISTORY = 100 # Maximum uploaded file size in MB MAX_FILE_SIZE_MB = 50 # File cleanup schedule (in days) FILE_RETENTION_DAYS = 7 def setup_environment(): """ Initialize environment variables and authentication. Returns: bool: True if setup was successful, False otherwise """ load_dotenv(override=True) hf_token = os.getenv("HF_TOKEN") if hf_token: # check if token is actually set try: login(hf_token) print("HF_TOKEN (last 10 characters):", hf_token[-10:]) return True except (ValueError, ConnectionError) as e: # More specific exceptions print(f"Failed to login with HF token: {e}") return False else: print("HF_TOKEN not found in environment variables.") return False class ModelManager: """Manages model loading and initialization.""" @staticmethod def load_model(chosen_inference: str, model_id: str, key_manager=None): """ Load the specified model with appropriate configuration. Args: chosen_inference: Type of inference to use model_id: ID of the model to load key_manager: Optional key manager for API keys Returns: Model instance Raises: ValueError: If inference type is invalid or required parameters missing RuntimeError: If model loading fails """ if chosen_inference == "hf_api": return HfApiModel(model_id=model_id) if chosen_inference == "hf_api_provider": return HfApiModel(provider="together") if chosen_inference == "litellm": return LiteLLMModel(model_id=model_id) if chosen_inference == "openai": if not key_manager: raise ValueError("Key manager required for OpenAI model") return OpenAIServerModel( model_id=model_id, api_key=key_manager.get_key("openai_api_key") ) if chosen_inference == "transformers": return TransformersModel( model_id="huggingfacetb/smollm2-1.7b-instruct", device_map="auto", max_new_tokens=1000, ) raise ValueError(f"Invalid inference type: {chosen_inference}") # This class only has one public method, but that's acceptable for a registry class # whose purpose is to provide factory methods class ToolRegistry: """Manages tool initialization and organization.""" @staticmethod def load_web_tools(model, browser, text_limit=20000): """ Initialize and return web-related tools. Args: model: LLM model for text inspector browser: Browser instance for web tools text_limit: Maximum text length for processing Returns: List of web tools """ return [ GoogleSearchTool(provider="serper"), VisitTool(browser), PageUpTool(browser), PageDownTool(browser), FinderTool(browser), FindNextTool(browser), ArchiveSearchTool(browser), TextInspectorTool(model, text_limit), ] @staticmethod def load_image_generation_tools(): """ Initialize and return image generation tools. Returns: Image generation tool Raises: RuntimeError: If tool initialization fails """ try: return Tool.from_space( space_id="xkerser/flux.1-dev", name="image_generator", description=( "Generates high-quality AgentImage. " "With text prompt (77 token limit)." ), ) except ( ConnectionError, ValueError, RuntimeError, ) as e: # More specific exceptions print(f" Couldn't initialize image generation tool: {e}") raise RuntimeError(f"Image generation tool initialization failed: {e}") @staticmethod def load_clean_text_tool(): """ Initialize and return text cleaning tool. Returns: Text cleaning tool Raises: RuntimeError: If tool initialization fails """ try: return TextCleanerTool() except (ValueError, RuntimeError) as e: # More specific exceptions print(f" Couldn't initialize clean text tool: {e}") raise RuntimeError(f"Clean text tool initialization failed: {e}") def create_agent(): """ Creates a fresh agent instance with properly configured tools. Returns: CodeAgent: Configured agent ready for use Raises: ValueError: If tool validation fails RuntimeError: If agent creation fails """ try: # Initialize model model = LiteLLMModel( custom_role_conversions=CUSTOM_ROLE_CONVERSIONS, model_id="openrouter/deepseek/deepseek-chat-v3-0324:free", ) # Initialize tools text_limit = 30000 browser = SimpleTextBrowser(**BROWSER_CONFIG) # Collect all tools in a single list web_tools = ToolRegistry.load_web_tools(model, browser, text_limit) image_generator = ToolRegistry.load_image_generation_tools() clean_text = TextCleanerTool() # Combine all tools into a single list all_tools = [visualizer] + web_tools + [image_generator, clean_text] # Validate tools before creating agent for tool in all_tools: if not isinstance(tool, Tool): raise ValueError( f"Invalid tool type: {type(tool)}. " f"All tools must be instances of Tool class." ) return CodeAgent( model=model, tools=all_tools, max_steps=12, verbosity_level=2, additional_authorized_imports=AUTHORIZED_IMPORTS, planning_interval=4, ) except (ValueError, RuntimeError) as e: print(f"Failed to create agent: {e}") raise RuntimeError(f"Agent creation failed: {e}") # Define standalone functions outside of classes def process_message_content(content_lower: str) -> Dict[str, bool]: """ Process message content to determine message type. Args: content_lower: Lowercase message content Returns: Dictionary with message type flags """ return { "is_document_analysis": "document analysis" in content_lower, "is_search": "search" in content_lower, "is_error": "error" in content_lower, } def stream_to_gradio( agent, task: str, reset_agent_memory: bool = False, additional_args: Optional[Dict] = None, ): """ Streams agent responses with improved status indicators. Args: agent: The agent instance to use task: The task to perform reset_agent_memory: Whether to reset agent memory additional_args: Optional additional arguments Yields: Gradio ChatMessage objects """ try: # Initial processing indicator yield gr.ChatMessage(role="assistant", content="⏳ Processing your request...") # Track what we've yielded to replace the processing indicator first_message_yielded = False # Store the step_log outside the loop to avoid the undefined-loop-variable issue steps = list( agent.run( task, stream=True, reset=reset_agent_memory, additional_args=additional_args, ) ) # If no steps were returned, handle it gracefully if not steps: yield gr.ChatMessage( role="assistant", content="⚠️ No response from agent. Please try again." ) return # Process each step for step_log in steps: # pull_messages_from_step is a generator function that yields messages for message in pull_messages_from_step(step_log): if not first_message_yielded: # Replace the initial "processing" message first_message_yielded = True message.content = message.content.replace( "⏳ Processing your request...", "" ) # Check message content for document analysis or search references if hasattr(message, "content") and message.content: content_lower = message.content.lower() message_types = process_message_content(content_lower) if message_types["is_document_analysis"]: message.content = f"📄 **Document Analysis:** {message.content}" elif message_types["is_search"]: message.content = f"🔍 **Search:** {message.content}" yield message # Final answer with enhanced formatting if steps: # Make sure we have at least one step before accessing final_answer = handle_agent_output_types(steps[-1]) # Use the last step if isinstance(final_answer, AgentText): yield gr.ChatMessage( role="assistant", content=f"✅ **Final Answer:**\n{final_answer.to_string()}", ) else: yield gr.ChatMessage( role="assistant", content=f"✅ **Final Answer:** {str(final_answer)}", ) except (ValueError, RuntimeError) as e: # More specific error handling yield gr.ChatMessage( role="assistant", content=( f"❌ **Error:** {str(e)}\n" f"Please try again with a different query." ), ) except Exception as e: # Fallback for truly unexpected errors print(f"Unexpected error in stream_to_gradio: {e}") traceback.print_exc() yield gr.ChatMessage( role="assistant", content=( "❌ **Unexpected Error:** An unknown error occurred.\n" "Please try again or contact support if the issue persists." ), ) # This is a helper method that can be called statically def cleanup_old_files(directory: str, days: int = FILE_RETENTION_DAYS): """ Removes files older than the specified number of days. Args: directory: Directory to clean up days: Number of days to keep files """ if not os.path.exists(directory): return cutoff_time = time.time() - (days * 24 * 60 * 60) for filename in os.listdir(directory): file_path = os.path.join(directory, filename) if os.path.isfile(file_path): file_mod_time = os.path.getmtime(file_path) if file_mod_time < cutoff_time: try: os.remove(file_path) print(f"Deleted old file: {file_path}") except (PermissionError, OSError) as e: print(f"Failed to delete {file_path}: {str(e)}") @dataclass class UIComponents: """Container for UI components to reduce main class attribute count.""" text_input: Any = None submit_btn: Any = None stop_btn: Any = None clear_btn: Any = None status: Any = None chatbot: Any = None file_uploader: Any = None # renamed from upload_file to avoid conflict upload_status: Any = None class GradioUI: """Gradio user interface for the OpenDeepResearch application.""" def __init__(self, file_upload_folder=None, max_queue_size=50): """Initialize the Gradio UI.""" # Basic configuration self.file_upload_folder = file_upload_folder self.max_queue_size = max_queue_size self.max_chat_history = MAX_CHAT_HISTORY self.max_file_size_mb = MAX_FILE_SIZE_MB # Initialize UI components container self.components = UIComponents() # Job handle for cancellation self.job = None # Create upload directory if specified if self.file_upload_folder is not None: # Simplified if expression os.makedirs(file_upload_folder, exist_ok=True) # Clean up old files if file_upload_folder: cleanup_old_files(file_upload_folder) def interact_with_agent(self, prompt, messages, session_state): """ Main interaction handler with the agent. Args: prompt: User input prompt messages: Current message history session_state: Session state dictionary Yields: Updated message history """ # Get or create session-specific agent if "agent" not in session_state: try: session_state["agent"] = create_agent() except RuntimeError as e: messages.append( gr.ChatMessage( role="assistant", content=f"Failed to create agent: {str(e)}" ) ) yield messages return try: # Log the existence of agent memory has_memory = hasattr(session_state["agent"], "memory") print(f"Agent has memory: {has_memory}") if has_memory and hasattr(session_state["agent"].memory, "steps"): print(f"Memory steps: {len(session_state['agent'].memory.steps)}") # Truncate messages if they exceed the maximum if len(messages) > self.max_chat_history: # Keep only the latest messages messages = messages[-self.max_chat_history :] # Add user message messages.append(gr.ChatMessage(role="user", content=prompt)) yield messages # Process with agent and stream responses for msg in stream_to_gradio( session_state["agent"], task=prompt, reset_agent_memory=False ): messages.append(msg) yield messages except ValueError as e: print(f"Value error in interaction: {str(e)}") messages.append( gr.ChatMessage(role="assistant", content=f"Input error: {str(e)}") ) yield messages except Exception as e: print(f"Error in interaction: {str(e)}") traceback.print_exc() messages.append( gr.ChatMessage(role="assistant", content=f"Error occurred: {str(e)}") ) yield messages def handle_file_upload(self, files, file_uploads_log): """ Handle file uploads with proper validation and security. Args: files: Files to upload file_uploads_log: List of uploaded files Returns: Tuple of (status textbox, updated file_uploads_log, updated upload button visibility) """ if not files: return ( gr.Textbox(value="No file uploaded", visible=True), file_uploads_log, ) try: # Process the file (files[0] since we're using file_count="single") file = files[0] # Validate file exists if not os.path.exists(file.name): return ( gr.Textbox(value="File not found", visible=True), file_uploads_log, ) # Check file size file_size_mb = os.path.getsize(file.name) / (1024 * 1024) if file_size_mb > self.max_file_size_mb: return ( gr.Textbox( value=f"File size exceeds {self.max_file_size_mb} MB limit.", visible=True, ), file_uploads_log, ) # Validate mime type mime_type, _ = mimetypes.guess_type(file.name) if mime_type not in ALLOWED_FILE_TYPES: return ( gr.Textbox(value="File type disallowed", visible=True), file_uploads_log, ) # Sanitize file name original_name = os.path.basename(file.name) # Replace invalid chars with underscores sanitized_name = re.sub(r"[^\w\-.]", "_", original_name) # Add timestamp to ensure uniqueness timestamp = datetime.now().strftime( "%y%m%d_%H%M%S" ) # Correct format string name_parts = os.path.splitext(sanitized_name) sanitized_name = f"{name_parts[0]}_{timestamp}{name_parts[1]}" # Save the uploaded file to the specified folder file_path = os.path.join(self.file_upload_folder, sanitized_name) shutil.copy(file.name, file_path) return ( gr.Textbox(value=f"File uploaded: {original_name}", visible=True), file_uploads_log + [file_path], ) except FileNotFoundError as e: return ( gr.Textbox(value=f"File not found: {str(e)}", visible=True), file_uploads_log, ) except PermissionError as e: return ( gr.Textbox(value=f"Permission denied: {str(e)}", visible=True), file_uploads_log, ) except (IOError, OSError) as e: return ( gr.Textbox(value=f"I/O error during upload: {str(e)}", visible=True), file_uploads_log, ) except Exception as e: # For truly unexpected errors, log with more detail print(f"Unexpected upload error: {e}") traceback.print_exc() return ( gr.Textbox(value=f"Error processing upload: {str(e)}", visible=True), file_uploads_log, ) def log_user_message(self, text_input, file_uploads_log): """ Process user message and handle file references. Args: text_input: User's text input file_uploads_log: List of uploaded files Returns: Tuple of (processed message, updated text input, submit button) """ if not text_input.strip(): return ( "", gr.Textbox(value="", interactive=True), gr.Button(interactive=True), ) # Only clean if necessary (avoid unnecessary processing) message = text_input if any(char in text_input for char in "€¥£-"): message = clean( text_input, fix_unicode=True, to_ascii=True, lower=False, # Keep original case no_line_breaks=False, no_urls=False, no_emails=False, no_phone_numbers=False, no_numbers=False, no_digits=False, no_currency_symbols=False, no_punct=False, lang="en", ) # Add file references if any if file_uploads_log: files_info = "\n".join( [f"- {os.path.basename(f)}" for f in file_uploads_log] ) message += f"\nYou have been provided with these files:\n{files_info}" return ( message, gr.Textbox( value="", interactive=False, placeholder="Processing your request...", ), gr.Button(interactive=False), ) def clear_chat(self): """ Clear the chat history and reset UI elements. Returns: Tuple of (empty chat history, interactive text input, interactive button, empty status) """ return ( [], # Empty chat history [], # Empty stored messages gr.Textbox(value="", interactive=True), gr.Button(interactive=True), gr.Textbox(value="", visible=False), # Clear status ) def launch(self, share=False, **kwargs): """ Launch the Gradio UI with responsive layout. Args: share: Whether to create a public link **kwargs: Additional keyword arguments for launch """ with gr.Blocks(theme="ocean", fill_height=True) as demo: # Use Gradio's built-in responsive layout with gr.Row(): # Sidebar (smaller on mobile) with gr.Column(scale=1, min_width=100): gr.Markdown( """# OpenDeepResearch AI-powered research assistant using SmoLAgents Model: deepseek/deepseek-chat-v3-0324:free""" ) with gr.Group(): gr.Markdown("**Research Query**", container=True) self.components.text_input = gr.Textbox( lines=3, label="Your request", placeholder="Enter your research question or task", container=False, ) with gr.Row(): self.components.submit_btn = gr.Button( "Run", variant="primary" ) self.components.stop_btn = gr.Button("Stop", variant="stop") self.components.clear_btn = gr.Button( "Clear", variant="secondary" ) # File upload in collapsible section if self.file_upload_folder is not None: with gr.Accordion("Upload Files", open=False): self.components.file_uploader = gr.UploadButton( "Upload a file", file_count="single", file_types=["pdf", "docx", "txt", "md", "json"], ) self.components.upload_status = gr.Textbox( label="Upload status", interactive=False, visible=False ) # Tool information with gr.Accordion("Available Tools", open=False): gr.Markdown( """ - **Web Search**: Find information online - **Document Analysis**: Analyze uploaded documents - **Text Cleaning**: Format and clean text - **Image Generation**: Create images from descriptions """ ) gr.HTML("
Powered by:
") with gr.Row(): gr.HTML( """
logo huggingface/smolagents
""" ) # Main chat area (larger) with gr.Column(scale=3, min_width=500): # Add session state to store session-specific data session_state = gr.State({}) stored_messages = gr.State([]) file_uploads_log = gr.State([]) # Chat interface self.components.chatbot = gr.Chatbot( label="Research Assistant", type="messages", avatar_images=( None, "https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/smolagents/mascot_smol.png", ), height=600, elem_id="research-chatbot", ) # Status indicator self.components.status = gr.Textbox( "", label="Status", interactive=False, visible=False ) # Connect event handlers with appropriate cancellation # File upload handler - Updated for UploadButton if hasattr(self.components, "file_uploader") and hasattr( self.components, "upload_status" ): self.components.file_uploader.upload( self.handle_file_upload, [self.components.file_uploader, file_uploads_log], [self.components.upload_status, file_uploads_log], ) # Text input handler with cancellation submit_event = ( self.components.text_input.submit( self.log_user_message, [self.components.text_input, file_uploads_log], [ stored_messages, self.components.text_input, self.components.submit_btn, ], ) .then( self.interact_with_agent, [stored_messages, self.components.chatbot, session_state], [self.components.chatbot], ) .then( lambda: ( gr.Textbox(interactive=True), gr.Button(interactive=True), ), None, [self.components.text_input, self.components.submit_btn], ) ) # Button click handler with same flow click_event = ( self.components.submit_btn.click( self.log_user_message, [self.components.text_input, file_uploads_log], [ stored_messages, self.components.text_input, self.components.submit_btn, ], ) .then( self.interact_with_agent, [stored_messages, self.components.chatbot, session_state], [self.components.chatbot], ) .then( lambda: ( gr.Textbox(interactive=True), gr.Button(interactive=True), ), None, [self.components.text_input, self.components.submit_btn], ) ) # Stop button cancels ongoing operations self.components.stop_btn.click( None, None, None, cancels=[submit_event, click_event] ) # Clear button self.components.clear_btn.click( self.clear_chat, None, [ self.components.chatbot, stored_messages, self.components.text_input, self.components.submit_btn, self.components.status, ], ) # Launch with fixed queue settings (avoiding the problematic parameter) demo.queue( max_size=self.max_queue_size, ).launch( share=share, debug=True, # Enable HTTPS in production ssl_verify=False if kwargs.get("local_port") else True, **kwargs, ) def main(): """ Main entry point for the application. Returns: int: Exit code (0 for success, 1 for failure) """ try: # Initialize environment if not setup_environment(): print("Failed to set up environment properly.") return 1 # Ensure downloads folder exists downloads_folder = BROWSER_CONFIG["downloads_folder"] os.makedirs(f"./{downloads_folder}", exist_ok=True) # Create uploads folder uploads_folder = "uploaded_files" os.makedirs(uploads_folder, exist_ok=True) # Launch UI print("Starting OpenDeepResearch Gradio interface...") gradio_ui = GradioUI(file_upload_folder=uploads_folder) gradio_ui.launch() return 0 except KeyError as e: print(f"Configuration error: Missing key {e}") traceback.print_exc() return 1 except Exception as e: print(f"Application failed to start: {e}") traceback.print_exc() return 1 if __name__ == "__main__": EXIT_CODE = main() # UPPER_CASE for constants sys.exit(EXIT_CODE) # Use sys.exit instead of exit