"""Main application for the OpenDeepResearch Gradio interface.""" import mimetypes import os import re import shutil from typing import Optional from cleantext import clean from dotenv import load_dotenv from huggingface_hub import login import gradio as gr from scripts.text_inspector_tool import TextInspectorTool from scripts.text_web_browser import ( ArchiveSearchTool, FinderTool, FindNextTool, PageDownTool, PageUpTool, SimpleTextBrowser, VisitTool, ) from scripts.visual_qa import visualizer from scripts.text_cleaner_tool import TextCleanerTool from smolagents import ( CodeAgent, HfApiModel, LiteLLMModel, OpenAIServerModel, TransformersModel, GoogleSearchTool, Tool, ) from smolagents.agent_types import AgentText # AgentImage, AgentAudio from smolagents.gradio_ui import pull_messages_from_step, handle_agent_output_types # Constants and configurations AUTHORIZED_IMPORTS = [ "requests", # Web requests (fetching data from the internet) "zipfile", # Working with ZIP archives "pandas", # Data manipulation and analysis (DataFrames) "numpy", # Numerical computing (arrays, linear algebra) "sympy", # Symbolic mathematics (algebra, calculus) "json", # JSON data serialization/deserialization "bs4", # Beautiful Soup for HTML/XML parsing "pubchempy", # Accessing PubChem chemical database "xml", # XML processing "yahoo_finance", # Fetching stock data "Bio", # Bioinformatics tools (e.g., sequence analysis) "sklearn", # Scikit-learn for machine learning "scipy", # Scientific computing (stats, optimization) "pydub", # Audio manipulation "PIL", # Pillow for image processing "chess", # Chess-related functionality "PyPDF2", # PDF manipulation "pptx", # PowerPoint file manipulation "torch", # PyTorch for neural networks "datetime", # Date and time handling "fractions", # Rational number arithmetic "csv", # CSV file reading/writing "cleantext", # Text cleaning and normalization "os", # Operating system interaction (file system, etc.) VERY IMPORTANT "re", # Regular expressions for text processing "collections", # Useful data structures (e.g., defaultdict, Counter) "math", # Basic mathematical functions "random", # Random number generation "io", # Input/output streams "urllib.parse", # URL parsing and manipulation (safe URL handling) "typing", # Support for type hints (improve code clarity) "concurrent.futures", # For parallel execution "time", # Measuring time "tempfile", # Creating temporary files and directories # Data Visualization (if needed) - Consider security implications carefully "matplotlib", # Plotting library (basic charts) "seaborn", # Statistical data visualization (more advanced) # Web Scraping (more specific/controlled) - Consider ethical implications "lxml", # Faster XML/HTML processing (alternative to bs4) "selenium", # Automated browser control (for dynamic websites) # Database interaction (if needed) - Handle credentials securely! "sqlite3", # SQLite database access # "psycopg2", # PostgreSQL adapter if needed # Task scheduling "schedule", # Allow the agent to schedule tasks # Networking # "socket", # Networking ] USER_AGENT = ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0" ) BROWSER_CONFIG = { "viewport_size": 1024 * 5, "downloads_folder": "downloads_folder", "request_kwargs": { "headers": {"User-Agent": USER_AGENT}, "timeout": 300, }, "serpapi_key": os.getenv("SERPAPI_API_KEY"), } CUSTOM_ROLE_CONVERSIONS = {"tool-call": "assistant", "tool-response": "user"} ALLOWED_FILE_TYPES = [ "application/pdf", "application/vnd.openxmlformats-officedocument.wordprocessingml.document", "text/plain", "text/markdown", # Added Markdown support "application/json", # Added JSON support "image/png", "image/webp", "image/jpeg", # Added JPEG support "image/gif", # Added GIF support "video/mp4", "audio/mpeg", # Added MP3 support "audio/wav", # Added WAV support "audio/ogg", # Added OGG support ] def setup_environment(): """Initialize environment variables and authentication.""" load_dotenv(override=True) hf_token = os.getenv("HF_TOKEN") if hf_token: # Check if token is actually set login(hf_token) print("HF_TOKEN (last 10 characters):", hf_token[-10:]) else: print("HF_TOKEN not found in environment variables.") class ModelManager: """Manages model loading and initialization.""" @staticmethod def load_model(chosen_inference: str, model_id: str, key_manager=None): """Load the specified model with appropriate configuration.""" try: if chosen_inference == "hf_api": return HfApiModel(model_id=model_id) if chosen_inference == "hf_api_provider": return HfApiModel(provider="together") if chosen_inference == "litellm": return LiteLLMModel(model_id=model_id) if chosen_inference == "openai": if not key_manager: raise ValueError("Key manager required for OpenAI model") return OpenAIServerModel( model_id=model_id, api_key=key_manager.get_key("openai_api_key") ) if chosen_inference == "transformers": return TransformersModel( model_id="HuggingFaceTB/SmolLM2-1.7B-Instruct", device_map="auto", max_new_tokens=1000, ) raise ValueError(f"Invalid inference type: {chosen_inference}") except (ValueError, RuntimeError) as e: # More specific exceptions print(f"Model loading failed: {e}") raise class ToolRegistry: """Manages tool initialization and organization.""" @staticmethod def load_web_tools(model, browser, text_limit=20000): """Initialize and return web-related tools.""" return [ GoogleSearchTool(provider="serper"), VisitTool(browser), PageUpTool(browser), PageDownTool(browser), FinderTool(browser), FindNextTool(browser), ArchiveSearchTool(browser), TextInspectorTool(model, text_limit), ] @staticmethod def load_image_generation_tools(): """Initialize and return image generation tools.""" try: return Tool.from_space( space_id="xkerser/FLUX.1-dev", name="image_generator", description=( "Generates high-quality AgentImage. " "with text prompt (77 token limit)." ), ) except Exception as e: print(f"✗ Couldn't initialize image generation tool: {e}") raise @staticmethod def load_clean_text_tool(): """Initialize and return text cleaning tool.""" try: return TextCleanerTool() except Exception as e: print(f"✗ Couldn't initialize clean text tool: {e}") raise def create_agent(): """Creates a fresh agent instance with properly configured tools.""" # Initialize model model = LiteLLMModel( custom_role_conversions=CUSTOM_ROLE_CONVERSIONS, # Currently serving: model_id="openrouter/anthropic/claude-3.7-sonnet", ) # DEEPSEEK = openrouter/perplexity/r1-1776 <--- boss model # Initialize tools text_limit = 20000 browser = SimpleTextBrowser(**BROWSER_CONFIG) # Collect all tools in a single list web_tools = ToolRegistry.load_web_tools(model, browser, text_limit) image_generator = ToolRegistry.load_image_generation_tools() clean_text = TextCleanerTool() # Instantiate TextCleanerTool # Combine all tools into a single list all_tools = [visualizer] + web_tools + [image_generator, clean_text] # Validate tools before creating agent for tool in all_tools: if not isinstance(tool, Tool): raise ValueError( f"Invalid tool type: {type(tool)}. " f"All tools must be instances of Tool class." ) return CodeAgent( model=model, tools=all_tools, # Pass a single list containing all tools max_steps=10, verbosity_level=1, additional_authorized_imports=AUTHORIZED_IMPORTS, planning_interval=4, ) def stream_to_gradio( agent, task: str, reset_agent_memory: bool = False, additional_args: Optional[dict] = None, ): """Streams agent responses with improved status indicators.""" try: # Initial processing indicator yield gr.ChatMessage(role="assistant", content="⏳ Processing your request...") # Track what we've yielded to replace the processing indicator first_message_yielded = False for step_log in agent.run( task, stream=True, reset=reset_agent_memory, additional_args=additional_args ): # pull_messages_from_step is a generator function that yields messages # We need to iterate through each yielded message for message in pull_messages_from_step(step_log): if not first_message_yielded: # Replace the initial "Processing" message first_message_yielded = True message.content = message.content.replace( "⏳ Processing your request...", "" ) # Check message content for document analysis or search references content_lower = ( message.content.lower() if hasattr(message, "content") else "" ) if "document analysis" in content_lower: message.content = f"📄 **Document Analysis:** {message.content}" elif "search" in content_lower: message.content = f"🔍 **Search:** {message.content}" yield message # Final answer with enhanced formatting final_answer = handle_agent_output_types(step_log) if isinstance(final_answer, AgentText): yield gr.ChatMessage( role="assistant", content=f"✅ **Final Answer:**\n\n{final_answer.to_string()}", ) else: yield gr.ChatMessage( role="assistant", content=f"✅ **Final Answer:** {str(final_answer)}" ) except Exception as e: yield gr.ChatMessage( role="assistant", content=( f"❌ **Error:** {str(e)}\n\n" f"Please try again with a different query." ), ) class GradioUI: def __init__(self, file_upload_folder=None, max_queue_size=50): # Initialize all attributes here self.file_upload_folder = file_upload_folder self.max_queue_size = max_queue_size self.text_input = None self.submit_btn = None self.stop_btn = None self.clear_btn = None self.status = None self.chatbot = None self.session_state = None self.job = None if self.file_upload_folder is not None: os.makedirs(file_upload_folder, exist_ok=True) def interact_with_agent(self, prompt, messages, session_state): """Main interaction handler with the agent.""" # Get or create session-specific agent if "agent" not in session_state: session_state["agent"] = create_agent() # Adding monitoring try: # Log the existence of agent memory has_memory = hasattr(session_state["agent"], "memory") print(f"Agent has memory: {has_memory}") if has_memory: print(f"Memory type: {type(session_state['agent'].memory)}") messages.append(gr.ChatMessage(role="user", content=prompt)) yield messages for msg in stream_to_gradio( session_state["agent"], task=prompt, reset_agent_memory=False ): messages.append(msg) yield messages # Yield messages after each step yield messages # Yield messages one last time except Exception as e: print(f"Error in interaction: {str(e)}") raise def upload_file( self, file, file_uploads_log, ): """Handle file uploads with proper validation and security.""" if file is None: return gr.Textbox("No file uploaded", visible=True), file_uploads_log try: mime_type, _ = mimetypes.guess_type(file.name) except Exception as e: return gr.Textbox(f"Error: {e}", visible=True), file_uploads_log if mime_type not in ALLOWED_FILE_TYPES: return gr.Textbox("File type disallowed", visible=True), file_uploads_log # Sanitize file name original_name = os.path.basename(file.name) # Replace invalid chars with underscores sanitized_name = re.sub(r"[^\w\-.]", "_", original_name) # Ensure the extension correlates to the mime type type_to_ext = {} for ext, t in mimetypes.types_map.items(): if t not in type_to_ext: type_to_ext[t] = ext # Build sanitized filename with proper extension name_parts = sanitized_name.split(".")[:-1] extension = type_to_ext.get(mime_type, "") sanitized_name = "".join(name_parts) + extension # Limit File Size, and Throw Error max_file_size_mb = 50 # Define the limit file_size_mb = os.path.getsize(file.name) / (1024 * 1024) # Size in MB if file_size_mb > max_file_size_mb: return ( gr.Textbox( f"File size exceeds {max_file_size_mb} MB limit.", visible=True ), file_uploads_log, ) # Save the uploaded file to the specified folder file_path = os.path.join(self.file_upload_folder, sanitized_name) shutil.copy(file.name, file_path) return gr.Textbox( f"File uploaded: {file_path}", visible=True ), file_uploads_log + [file_path] def log_user_message(self, text_input, file_uploads_log): """Process user message and handle file references.""" cleaned_message = clean( text_input, fix_unicode=True, to_ascii=True, lower=True, no_line_breaks=False, no_urls=False, no_emails=False, no_phone_numbers=False, no_numbers=False, no_digits=False, no_currency_symbols=False, no_punct=False, lang="en", ) # Can change default behaviour with TextCleanerTool message = cleaned_message # Use the cleaned message if file_uploads_log: # Added file list to message message += ( f"\nYou have been provided with these files, which might be " f"helpful or not: {file_uploads_log}" ) return ( message, gr.Textbox( value="", interactive=False, placeholder="Processing...", # Changed placeholder. ), gr.Button(interactive=False), ) def detect_device(self, request: gr.Request): """Detect whether the user is on mobile or desktop device.""" if not request: return "Unknown device" # Handle case where request is none. # Method 1: Check sec-ch-ua-mobile header is_mobile_header = request.headers.get("sec-ch-ua-mobile") if is_mobile_header: return "Mobile" if "?1" in is_mobile_header else "Desktop" # Method 2: Check user-agent string user_agent = request.headers.get("user-agent", "").lower() mobile_keywords = ["android", "iphone", "ipad", "mobile", "phone"] if any(keyword in user_agent for keyword in mobile_keywords): return "Mobile" # Method 3: Check platform platform = request.headers.get("sec-ch-ua-platform", "").lower() if platform: if platform in ['"android"', '"ios"']: return "Mobile" return "Desktop" # Default case if no clear indicators return "Desktop" def launch(self, **kwargs): """Launch the Gradio UI with responsive layout.""" with gr.Blocks(theme="ocean", fill_height=True) as demo: # Different layouts for mobile and computer devices @gr.render() def layout(request: gr.Request): device = self.detect_device(request) print(f"device - {device}") # Render layout with sidebar if device == "Desktop": return self._create_desktop_layout() else: return self._create_mobile_layout() demo.queue(max_size=20).launch( debug=True, **kwargs ) # Add queue with reasonable size def _create_desktop_layout(self): """Create the desktop layout with sidebar.""" with gr.Blocks(fill_height=True) as sidebar_demo: with gr.Sidebar(): gr.Markdown( """#OpenDeepResearch - 3theSmolagents! Model_id: anthropic/claude-3.7-sonnet""" ) with gr.Group(): gr.Markdown("**What's on your mind mate?**", container=True) text_input = gr.Textbox( lines=3, label="Your request", container=False, placeholder=( "Enter your prompt here and press Shift+Enter or " "press the button" ), ) launch_research_btn = gr.Button("Run", variant="primary") # If an upload folder is provided, enable the upload feature if self.file_upload_folder is not None: upload_file = gr.File(label="Upload a file") upload_status = gr.Textbox( label="Upload Status", interactive=False, visible=False ) file_uploads_log = gr.State([]) upload_file.change( self.upload_file, [upload_file, file_uploads_log], [upload_status, file_uploads_log], ) gr.HTML("

Powered by:

") with gr.Row(): gr.HTML( """
logo huggingface/smolagents
""" ) # Add session state to store session-specific data # Initialize empty state for each session session_state = gr.State({}) stored_messages = gr.State([]) if "file_uploads_log" not in locals(): file_uploads_log = gr.State([]) chatbot = gr.Chatbot( label="ODR", type="messages", avatar_images=( None, "https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/smolagents/mascot_smol.png", ), resizeable=False, scale=1, elem_id="my-chatbot", ) self._connect_event_handlers( text_input, launch_research_btn, file_uploads_log, stored_messages, chatbot, session_state, ) return sidebar_demo def _create_mobile_layout(self): """Create the mobile layout (simpler without sidebar).""" with gr.Blocks(fill_height=True) as simple_demo: gr.Markdown("""#OpenDeepResearch - free the AI agents!""") # Add session state to store session-specific data session_state = gr.State({}) stored_messages = gr.State([]) file_uploads_log = gr.State([]) chatbot = gr.Chatbot( label="ODR", type="messages", avatar_images=( None, "https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/smolagents/mascot_smol.png", ), resizeable=True, scale=1, ) # If an upload folder is provided, enable the upload feature if self.file_upload_folder is not None: upload_file = gr.File(label="Upload a file") upload_status = gr.Textbox( label="Upload Status", interactive=False, visible=False ) upload_file.change( self.upload_file, [upload_file, file_uploads_log], [upload_status, file_uploads_log], ) text_input = gr.Textbox( lines=1, label="What's on your mind mate?", placeholder="Chuck in a question and we'll take care of the rest", ) launch_research_btn = gr.Button("Run", variant="primary") self._connect_event_handlers( text_input, launch_research_btn, file_uploads_log, stored_messages, chatbot, session_state, ) return simple_demo def _create_common_ui_elements(self): """Create common UI elements with control buttons.""" with gr.Group(): self.text_input = gr.Textbox( lines=3, label="Your request", placeholder="Enter your question about the documents...", elem_classes=["prompt-box"], ) with gr.Row(): self.submit_btn = gr.Button("Run", variant="primary") self.stop_btn = gr.Button("Stop Generation", variant="stop") self.clear_btn = gr.Button("Clear Chat", variant="secondary") # Status indicator for document processing self.status = gr.Textbox( "", label="Status", interactive=False, visible=True ) def _connect_event_handlers( self, text_input, launch_research_btn, file_uploads_log, stored_messages, chatbot, session_state, ): """Connect event handlers with appropriate parameters.""" # Define the job handler for stopping generation self.job = None def start_processing(prompt, chat_history): # We'll use the passed components directly rather than self.status return prompt, chat_history def stop_generation(): if self.job: self.job.cancel() def clear_chat(): return [], gr.Textbox(interactive=True), gr.Button(interactive=True), "" # Connect text input submission text_input.submit( self.log_user_message, [text_input, file_uploads_log], [stored_messages, text_input, launch_research_btn], ).then( self.interact_with_agent, [stored_messages, chatbot, session_state], [chatbot], ) # Connect button click launch_research_btn.click( self.log_user_message, [text_input, file_uploads_log], [stored_messages, text_input, launch_research_btn], ).then( self.interact_with_agent, [stored_messages, chatbot, session_state], [chatbot], ) # Store the job for cancellation if needed self.job = None # This would need to be assigned to an actual event def main(): """Main entry point for the application.""" # Initialize environment setup_environment() # Ensure downloads folder exists os.makedirs(f"./{BROWSER_CONFIG['downloads_folder']}", exist_ok=True) # Launch UI GradioUI(file_upload_folder="uploaded_files").launch() if __name__ == "__main__": main()