import os import re import shutil import datetime import mimetypes from typing import Optional, List, Dict, Tuple from dotenv import load_dotenv from huggingface_hub import login import gradio as gr from scripts.text_inspector_tool import TextInspectorTool from scripts.text_web_browser import ( ArchiveSearchTool, FinderTool, FindNextTool, PageDownTool, PageUpTool, SimpleTextBrowser, VisitTool, ) from scripts.visual_qa import visualizer from scripts.frontmatter_tool import FrontmatterGeneratorTool from scripts.text_cleaner_tool import TextCleanerTool from smolagents import ( CodeAgent, HfApiModel, LiteLLMModel, OpenAIServerModel, TransformersModel, GoogleSearchTool, Tool, ) from smolagents.agent_types import AgentText, AgentImage, AgentAudio from smolagents.gradio_ui import pull_messages_from_step, handle_agent_output_types # ------------------------ Configuration and Setup ------------------------ AUTHORIZED_IMPORTS = [ "requests", "zipfile", "pandas", "numpy", "sympy", "json", "bs4", "pubchempy", "yaml", "xml", "yahoo_finance", "Bio", "sklearn", "scipy", "pydub", "PIL", "chess", "PyPDF2", "pptx", "torch", "datetime", "fractions", "csv", "cleantext", "os", "re", "collections", "math", "random", "io", "urllib.parse", "typing", "concurrent.futures", "time", "tempfile", "matplotlib", "seaborn", "lxml", "selenium", "sqlite3", "schedule", ] USER_AGENT = ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0" ) BROWSER_CONFIG = { "viewport_size": 1024 * 5, "downloads_folder": "downloads_folder", "request_kwargs": { "headers": {"User-Agent": USER_AGENT}, "timeout": 300, }, "serpapi_key": os.getenv("SERPAPI_API_KEY"), } CUSTOM_ROLE_CONVERSIONS = {"tool-call": "assistant", "tool-response": "user"} ALLOWED_FILE_TYPES = [ "application/pdf", "application/vnd.openxmlformats-officedocument.wordprocessingml.document", "text/plain", "text/markdown", "application/json", "image/png", "image/webp", "image/jpeg", "image/gif", "video/mp4", "audio/mpeg", "audio/wav", "audio/ogg", ] ALLOWED_EXTENSIONS = [ ".pdf", ".docx", ".txt", ".md", ".json", ".png", ".webp", ".jpeg", ".jpg", ".gif", ".mp4", ".mpeg", ".wav", ".ogg", ] def setup_environment(): """Initialize environment variables and authenticate with Hugging Face Hub.""" load_dotenv(override=True) hf_token = os.getenv("HF_TOKEN") if hf_token: login(hf_token) print(f"HF_TOKEN (last 10 characters): {hf_token[-10:]}") else: print("HF_TOKEN not found in environment variables.") # ------------------------ Model and Tool Management ------------------------ class ModelManager: """Manages model loading and initialization.""" @staticmethod def load_model( chosen_inference: str, model_id: str, key_manager: Optional[object] = None ): """Load the specified model with appropriate configuration. Args: chosen_inference: The type of inference to use (e.g., "hf_api", "openai"). model_id: The ID of the model to load. key_manager: Key manager for API keys (required for OpenAI). Returns: An instance of the specified model class. Raises: ValueError: If an invalid inference type is specified or if the key manager is missing for OpenAI models. Exception: If the model fails to load. """ try: if chosen_inference == "hf_api": return HfApiModel(model_id=model_id) if chosen_inference == "hf_api_provider": return HfApiModel(provider="together") if chosen_inference == "litellm": return LiteLLMModel(model_id=model_id) if chosen_inference == "openai": if not key_manager: raise ValueError("Key manager required for OpenAI model") return OpenAIServerModel( model_id=model_id, api_key=key_manager.get_key("openai_api_key") ) if chosen_inference == "transformers": return TransformersModel( model_id="HuggingFaceTB/SmolLM2-1.7B-Instruct", device_map="auto", max_new_tokens=1000, ) raise ValueError(f"Invalid inference type: {chosen_inference}") except Exception as e: print(f"✗ Couldn't load model: {e}") raise class ToolRegistry: """Manages tool initialization and organization.""" @staticmethod def load_web_tools(model, browser, text_limit: int = 20000) -> List[Tool]: """Initialize and return web-related tools. Args: model: The language model to use. browser: The web browser instance. text_limit: The maximum text length for the text inspector tool. Returns: A list of web-related tools. """ return [ GoogleSearchTool(provider="serper"), VisitTool(browser), PageUpTool(browser), PageDownTool(browser), FinderTool(browser), FindNextTool(browser), ArchiveSearchTool(browser), TextInspectorTool(model, text_limit), ] @staticmethod def load_document_tools() -> List[Tool]: """Initialize and return document processing tools. Returns: List of document tools. """ return [FrontmatterGeneratorTool(), TextCleanerTool()] @staticmethod def load_image_generation_tools() -> Optional[Tool]: """Initialize and return image generation tools. Returns: The image generation tool or None if initialization fails. """ try: return Tool.from_space( space_id="xkerser/FLUX.1-dev", name="image_generator", description="Generates high-quality AgentImage using the FLUX.1-dev model based on text prompts.", ) except Exception as e: print(f"✗ Couldn't initialize image generation tool: {e}") return None # ------------------------ Agent Creation and Execution ------------------------ def create_agent() -> CodeAgent: """Creates a fresh agent instance with configured tools. Returns: CodeAgent: Configured agent ready for use. Raises: ValueError: If tool validation fails. RuntimeError: If agent creation fails. """ try: # Initialize model model = LiteLLMModel( custom_role_conversions=CUSTOM_ROLE_CONVERSIONS, model_id="openrouter/google/gemini-2.0-flash-001", ) # Initialize tools text_limit = 30000 browser = SimpleTextBrowser(**BROWSER_CONFIG) # Create tool instances with proper error handling web_tools = ToolRegistry.load_web_tools(model, browser, text_limit) doc_tools = [] # Initialize as empty list image_generator = None # Initialize as None try: doc_tools = ToolRegistry.load_document_tools() except AssertionError as e: print(f"Warning: Error loading document tools: {str(e)}") print("Attempting to continue with available tools...") image_generator = ToolRegistry.load_image_generation_tools() # Combine available tools (filter out None values) all_tools = [visualizer] + web_tools + doc_tools if image_generator: # Add only if it's not None all_tools.append(image_generator) # Log available tools print(f"Loaded {len(all_tools)} tools successfully") for tool in all_tools: print(f"- {tool.name}: {tool.description[:50]}...") return CodeAgent( model=model, tools=all_tools, max_steps=12, verbosity_level=2, additional_authorized_imports=AUTHORIZED_IMPORTS, planning_interval=4, ) except Exception as e: print(f"Failed to create agent: {e}") raise RuntimeError(f"Agent creation failed: {e}") def stream_to_gradio( agent, task: str, reset_agent_memory: bool = False, additional_args: Optional[dict] = None, ): """Runs an agent with the given task and streams messages as Gradio ChatMessages.""" try: for step_log in agent.run( task, stream=True, reset=reset_agent_memory, additional_args=additional_args ): for message in pull_messages_from_step(step_log): yield message # Process final answer with comprehensive media output final_answer = step_log # Last log is the run's final_answer final_answer = handle_agent_output_types(final_answer) # Output handling based on type if isinstance(final_answer, AgentText): yield gr.ChatMessage( role="assistant", content=f"Final answer:\n{final_answer.to_string()}\n", ) elif isinstance(final_answer, AgentImage): yield gr.ChatMessage( role="assistant", content={"image": final_answer.to_string(), "type": "file"}, ) elif isinstance(final_answer, AgentAudio): yield gr.ChatMessage( role="assistant", content={"audio": final_answer.to_string(), "type": "file"}, ) else: yield gr.ChatMessage( role="assistant", content=f"Final answer: {str(final_answer)}" ) except Exception as e: error_message = f"Error occurred during processing: {str(e)}\n\nPlease try again with a different query or check your inputs." yield gr.ChatMessage( role="assistant", content=error_message, ) # ------------------------ Gradio UI Components ------------------------ class GradioUI: """A one-line interface to launch your agent in Gradio.""" def __init__(self, file_upload_folder: str | None = None): """Initialize the Gradio UI with optional file upload functionality.""" self.file_upload_folder = file_upload_folder self.allowed_extensions = ALLOWED_EXTENSIONS # Use the constant if self.file_upload_folder: os.makedirs(self.file_upload_folder, exist_ok=True) def interact_with_agent( self, prompt: str, messages: List[Dict], session_state: Dict, uploaded_files: List[str], ) -> List[Dict]: """Main interaction handler with the agent.""" if "agent" not in session_state: try: session_state["agent"] = create_agent() session_state["creation_time"] = datetime.datetime.now() session_state["request_count"] = 0 except Exception as e: error_message = f"Error initializing agent: {str(e)}\n\nPlease refresh the page and try again." messages.append( gr.ChatMessage( role="assistant", content=error_message, ) ) yield messages return # Exit if can't create agent session_state["request_count"] += 1 messages.append(gr.ChatMessage(role="user", content=prompt)) yield messages file_message = "" try: if uploaded_files: file_info = {} for file_path in uploaded_files: ext = os.path.splitext(file_path)[1].lower() if ext in [".jpg", ".jpeg", ".png", ".gif", ".webp"]: category = "images" elif ext in [".mp3", ".wav", ".ogg"]: category = "audio" else: category = "documents" if category not in file_info: file_info[category] = [] file_info[category].append(os.path.basename(file_path)) file_message = "\nYou have been provided with these files:\n" for category, files in file_info.items(): file_message += f"- {category.capitalize()}: {', '.join(files)}\n" prompt_with_files = prompt + file_message else: prompt_with_files = prompt except Exception as e: prompt_with_files = prompt print( f"WARNING: Error processing files: {e}. Continuing without file info." ) try: reset_needed = session_state["request_count"] > 15 for msg in stream_to_gradio( session_state["agent"], task=prompt_with_files, reset_agent_memory=reset_needed, ): messages.append(msg) yield messages if reset_needed: session_state["request_count"] = 1 except Exception as e: error_message = f"Error processing your request: {str(e)}\n\nPlease try again with a different query." messages.append( gr.ChatMessage( role="assistant", content=error_message, ) ) yield messages def log_user_message(self, text_input: str) -> Tuple[str, gr.Textbox, gr.Button]: """Process user message log files.""" return ( text_input, gr.Textbox(value="", interactive=False, placeholder="Processing..."), gr.Button(interactive=False), ) def upload_file(self, files: List[str]) -> Tuple[str, List[str]]: """Handle file uploads with validation, security, and clear feedback.""" if not files: return "No file uploaded", [] uploaded_files = [] error_message = None for file_path in files: try: file_extension = os.path.splitext(file_path)[1].lower() if file_extension not in self.allowed_extensions: error_message = ( f"❌ File type '{file_extension}' is not allowed. " f"Supported types: {', '.join(ALLOWED_EXTENSIONS)}" ) return error_message, [] file_size_mb = os.path.getsize(file_path) / (1024 * 1024) max_file_size_mb = 50 if file_size_mb > max_file_size_mb: error_message = f"❌ File size ({file_size_mb:.1f} MB) exceeds {max_file_size_mb} MB limit." return error_message, [] sanitized_name = re.sub(r"[^\w\-.]", "", os.path.basename(file_path)) dest_path = os.path.join(self.file_upload_folder, sanitized_name) shutil.copy(file_path, dest_path) uploaded_files.append(dest_path) print(f"Uploaded {file_path} to {dest_path}") except Exception as e: error_message = f"❌ Upload error: {str(e)}" return error_message, [] if error_message: return error_message, [] return ( f"✓ Files uploaded successfully: {', '.join([os.path.basename(f) for f in uploaded_files])}", uploaded_files, ) def detect_device(self, request: gr.Request): """Detect whether the user is on mobile or desktop device.""" if not request: return "Unknown device" is_mobile_header = request.headers.get("sec-ch-ua-mobile") if is_mobile_header: return "Mobile" if "?1" in is_mobile_header else "Desktop" user_agent = request.headers.get("user-agent", "").lower() mobile_keywords = ["android", "iphone", "ipad", "mobile", "phone"] if any(keyword in user_agent for keyword in mobile_keywords): return "Mobile" platform = request.headers.get("sec-ch-ua-platform", "").lower() if platform: if platform in ['"android"', '"ios"']: return "Mobile" if platform in ['"windows"', '"macos"', '"linux"']: return "Desktop" return "Desktop" def launch(self, **kwargs): """Launch the Gradio UI with responsive layout.""" with gr.Blocks(theme="ocean", fill_height=True) as demo: @gr.render() def layout(request: gr.Request): device = self.detect_device(request) print(f"device - {device}") if device == "Desktop": return self._create_desktop_layout() return self._create_mobile_layout() demo.queue(max_size=20).launch(debug=True, **kwargs) def _create_desktop_layout(self): """Create the desktop layout with sidebar and enhanced styling.""" with gr.Blocks(fill_height=True) as sidebar_demo: with gr.Sidebar(): gr.Markdown( """# 🔍 OpenDeepResearch ### Smolagents + Document Tools """ ) with gr.Group(): gr.Markdown("What can I help you with today?", container=True) text_input = gr.Textbox( lines=4, label="Your request", container=False, placeholder="Enter your question or task here...", show_label=False, ) with gr.Row(): clear_btn = gr.Button("Clear", variant="secondary") launch_research_btn = gr.Button("Run", variant="primary") if self.file_upload_folder: with gr.Group(): gr.Markdown("📎 Upload Documents") file_upload = gr.File( label="Upload files for analysis", file_types=self.allowed_extensions, file_count="multiple", ) upload_status = gr.Textbox( label="Upload Status", interactive=False, visible=False ) uploaded_files_state = gr.State([]) gr.HTML("

Powered by:

") with gr.Row(): gr.HTML( """
logo huggingface/smolagents
""" ) session_state = gr.State({}) stored_messages = gr.State([]) chatbot = gr.Chatbot( label="OpenDeepResearch Assistant", type="messages", avatar_images=( None, "https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/smolagents/mascot_smol.png", ), resizeable=True, show_copy_button=True, scale=1, elem_id="my-chatbot", height=700, ) clear_btn.click( lambda: ([], [], {"agent": session_state.get("agent")}, []), None, [chatbot, stored_messages, session_state, uploaded_files_state], ) if self.file_upload_folder: file_upload.change( self.upload_file, [file_upload], [upload_status, uploaded_files_state], ) self._connect_event_handlers( text_input, launch_research_btn, stored_messages, chatbot, session_state, uploaded_files_state, ) return sidebar_demo def _create_mobile_layout(self): """Create the mobile layout (simpler without sidebar).""" with gr.Blocks(fill_height=True) as simple_demo: gr.Markdown("""#OpenDeepResearch - free the AI agents!""") session_state = gr.State({}) stored_messages = gr.State([]) file_upload = gr.File( label="Upload files for analysis", file_types=self.allowed_extensions, file_count="multiple", ) uploaded_files_state = gr.State([]) chatbot = gr.Chatbot( label="open-Deep-Research", type="messages", avatar_images=( None, "https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/smolagents/mascot_smol.png", ), resizeable=True, scale=1, ) if self.file_upload_folder: upload_status = gr.Textbox( label="Upload Status", interactive=False, visible=False ) file_upload.change( self.upload_file, [file_upload], [upload_status, uploaded_files_state], ) text_input = gr.Textbox( lines=1, label="What's on your mind mate?", placeholder="Chuck in a question and we'll take care of the rest", ) launch_research_btn = gr.Button("Run", variant="primary") self._connect_event_handlers( text_input, launch_research_btn, stored_messages, chatbot, session_state, uploaded_files_state, ) return simple_demo def _connect_event_handlers( self, text_input, launch_research_btn, stored_messages, chatbot, session_state, uploaded_files_state, ): """Connect the event handlers for input elements.""" text_input.submit( self.log_user_message, [text_input], [stored_messages, text_input, launch_research_btn], ).then( self.interact_with_agent, [stored_messages, chatbot, session_state, uploaded_files_state], [chatbot], ).then( lambda: ( gr.Textbox( interactive=True, placeholder="Enter your prompt here and press the button", ), gr.Button(interactive=True), ), None, [text_input, launch_research_btn], ) launch_research_btn.click( self.log_user_message, [text_input], [stored_messages, text_input, launch_research_btn], ).then( self.interact_with_agent, [stored_messages, chatbot, session_state, uploaded_files_state], [chatbot], ).then( lambda: ( gr.Textbox( interactive=True, placeholder="Enter your prompt here and press the button", ), gr.Button(interactive=True), ), None, [text_input, launch_research_btn], ) # ------------------------ Execution ------------------------ def main(): """Main entry point for the application.""" setup_environment() os.makedirs(f"./{BROWSER_CONFIG['downloads_folder']}", exist_ok=True) GradioUI(file_upload_folder="uploaded_files").launch() if __name__ == "__main__": main()