#!/usr/bin/env python # coding=utf-8 # Copyright 2024 The Footscray Coding Collective. All rights reserved. import mimetypes import os import re import shutil from typing import Optional import gradio as gr from dotenv import load_dotenv from huggingface_hub import login from scripts.flux_lora_tool import FluxLoRATool from scripts.text_cleaner_tool import TextCleanerTool from scripts.text_inspector_tool import TextInspectorTool from scripts.text_web_browser import ( ArchiveSearchTool, FinderTool, FindNextTool, PageDownTool, PageUpTool, SimpleTextBrowser, VisitTool, ) from scripts.visual_qa import visualizer from smolagents import ( CodeAgent, GoogleSearchTool, HfApiModel, LiteLLMModel, OpenAIServerModel, Tool, TransformersModel, ) from smolagents.agent_types import AgentAudio, AgentImage, AgentText from smolagents.gradio_ui import handle_agent_output_types, pull_messages_from_step # ------------------------ Configuration and Setup ------------------------ # Constants and configurations AUTHORIZED_IMPORTS = [ "requests", # Web requests (fetching data from the internet) "zipfile", # Working with ZIP archives "pandas", # Data manipulation and analysis (DataFrames) "numpy", # Numerical computing (arrays, linear algebra) "sympy", # Symbolic mathematics (algebra, calculus) "json", # JSON data serialization/deserialization "bs4", # Beautiful Soup for HTML/XML parsing "pubchempy", # Accessing PubChem chemical database "yaml", "xml", # XML processing "yahoo_finance", # Fetching stock data "Bio", # Bioinformatics tools (e.g., sequence analysis) "sklearn", # Scikit-learn for machine learning "scipy", # Scientific computing (stats, optimization) "pydub", # Audio manipulation "PIL", # Pillow for image processing "chess", # Chess-related functionality "PyPDF2", # PDF manipulation "pptx", # PowerPoint file manipulation "torch", # PyTorch for neural networks "datetime", # Date and time handling "fractions", # Rational number arithmetic "csv", # CSV file reading/writing "cleantext", # Text cleaning and normalization "os", # Operating system interaction (file system, etc.) VERY IMPORTANT "re", # Regular expressions for text processing "collections", # Useful data structures (e.g., defaultdict, Counter) "math", # Basic mathematical functions "random", # Random number generation "io", # Input/output streams "urllib.parse", # URL parsing and manipulation (safe URL handling) "typing", # Support for type hints (improve code clarity) "concurrent.futures", # For parallel execution "time", # Measuring time "tempfile", # Creating temporary files and directories # Data Visualization (if needed) - Consider security implications carefully "matplotlib", # Plotting library (basic charts) "seaborn", # Statistical data visualization (more advanced) # Web Scraping (more specific/controlled) - Consider ethical implications "lxml", # Faster XML/HTML processing (alternative to bs4) "selenium", # Automated browser control (for dynamic websites) # Database interaction (if needed) - Handle credentials securely! "sqlite3", # SQLite database access # Task scheduling "schedule", # Allow the agent to schedule tasks "uuid", "base64", ] USER_AGENT = ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0" ) BROWSER_CONFIG = { "viewport_size": 1024 * 5, "downloads_folder": "downloads_folder", "request_kwargs": { "headers": {"User-Agent": USER_AGENT}, "timeout": 300, }, "serpapi_key": os.getenv("SERPAPI_API_KEY"), } CUSTOM_ROLE_CONVERSIONS = {"tool-call": "assistant", "tool-response": "user"} ALLOWED_FILE_TYPES = [ "application/pdf", "application/vnd.openxmlformats-officedocument.wordprocessingml.document", "text/plain", "text/markdown", "application/json", "image/png", "image/webp", "image/jpeg", "image/gif", "video/mp4", "audio/mpeg", "audio/wav", "audio/ogg", ] def setup_environment(): """Initialize environment variables and authentication.""" load_dotenv(override=True) if os.getenv("HF_TOKEN"): # Check if token is actually set login(os.getenv("HF_TOKEN")) print("HF_TOKEN (last 10 characters):", os.getenv("HF_TOKEN")[-10:]) else: print("HF_TOKEN not found in environment variables.") # ------------------------ Model and Tool Management ------------------------ class ModelManager: """Manages model loading and initialization.""" @staticmethod def load_model(chosen_inference: str, model_id: str, key_manager=None): """Load the specified model with appropriate configuration.""" try: if chosen_inference == "hf_api": return HfApiModel(model_id=model_id) if chosen_inference == "hf_api_provider": return HfApiModel(provider="together") if chosen_inference == "litellm": return LiteLLMModel(model_id=model_id) if chosen_inference == "openai": if not key_manager: raise ValueError("Key manager required for OpenAI model") return OpenAIServerModel( model_id=model_id, api_key=key_manager.get_key("openai_api_key") ) elif chosen_inference == "transformers": return TransformersModel( model_id="HuggingFaceTB/SmolLM2-1.7B-Instruct", device_map="auto", max_new_tokens=1000, ) else: raise ValueError(f"Invalid inference type: {chosen_inference}") except Exception as e: print(f"✗ Couldn't load model: {e}") raise class ToolRegistry: """Manages tool initialization and organization.""" @staticmethod def load_web_tools(model, browser, text_limit=20000): """Initialize and return web-related tools.""" return [ GoogleSearchTool(provider="serper"), VisitTool(browser), PageUpTool(browser), PageDownTool(browser), FinderTool(browser), FindNextTool(browser), ArchiveSearchTool(browser), TextInspectorTool(model, text_limit), ] @staticmethod def load_document_tools(): """ Initialize and return document processing, i.e. sanitisation and indexing, tools. Returns: List of document tools """ return [ TextCleanerTool(), ] @staticmethod def load_image_generation_tools(): """Initialize and return image generation tools.""" try: return Tool.from_space( space_id="xkerser/FLUX.1-dev", name="image_generator", description="Generates high-quality AgentImage using the FLUX.1-dev model based on text prompts.", ) except Exception as e: print(f"✗ Couldn't initialize image generation tool: {e}") return FluxLoRATool # ------------------------ Agent Creation and Execution ------------------------ def create_agent(): """ Creates a fresh agent instance with properly configured tools. Returns: CodeAgent: Configured agent ready for use Raises: ValueError: If tool validation fails RuntimeError: If agent creation fails """ try: # Initialize model model = LiteLLMModel( custom_role_conversions=CUSTOM_ROLE_CONVERSIONS, model_id="openrouter/google/gemini-2.0-flash-001", ) # Initialize tools text_limit = 30000 browser = SimpleTextBrowser(**BROWSER_CONFIG) # Collect all tools in a single list web_tools = ToolRegistry.load_web_tools(model, browser, text_limit) doc_tools = ToolRegistry.load_document_tools() # New document tools image_generator = ToolRegistry.load_image_generation_tools() # Combine all tools into a single list all_tools = [visualizer] + web_tools + doc_tools + [image_generator] # Validate tools before creating agent for tool in all_tools: if not isinstance(tool, Tool): raise ValueError( f"Invalid tool type: {type(tool)}. " f"All tools must be instances of Tool class." ) return CodeAgent( model=model, tools=all_tools, max_steps=12, verbosity_level=2, additional_authorized_imports=AUTHORIZED_IMPORTS, planning_interval=2, ) except (ValueError, RuntimeError) as e: print(f"Failed to create agent: {e}") raise RuntimeError(f"Agent creation failed: {e}") def stream_to_gradio( agent, task: str, reset_agent_memory: bool = False, additional_args: Optional[dict] = None, ): """Runs an agent with the given task and streams messages as Gradio ChatMessages.""" for step_log in agent.run( task, stream=True, reset=reset_agent_memory, additional_args=additional_args ): for message in pull_messages_from_step(step_log): yield message # Process final answer : Use a more comprehensive media output final_answer = step_log # Last log is the run's final_answer final_answer = handle_agent_output_types(final_answer) if isinstance(final_answer, AgentText): yield gr.ChatMessage( role="assistant", content=f"**Final answer:**\n{final_answer.to_string()}\n", ) elif isinstance(final_answer, AgentImage): yield gr.ChatMessage( role="assistant", content={"image": final_answer.to_string(), "type": "file"}, ) # Send as Gradio-compatible file object: elif isinstance(final_answer, AgentAudio): yield gr.ChatMessage( role="assistant", content={"audio": final_answer.to_string(), "type": "file"}, ) # Send as Gradio-compatible file object else: yield gr.ChatMessage( role="assistant", content=f"**Final answer:** {str(final_answer)}" ) # ------------------------ Gradio UI Components ------------------------ class GradioUI: """A one-line interface to launch your agent in Gradio.""" def __init__(self, file_upload_folder: str | None = None): """Initialize the Gradio UI with optional file upload functionality.""" self.file_upload_folder = file_upload_folder if self.file_upload_folder is not None: if not os.path.exists(file_upload_folder): os.mkdir(file_upload_folder) def interact_with_agent(self, prompt, messages, session_state): """Main interaction handler with the agent.""" # Get or create session-specific agent if "agent" not in session_state: session_state["agent"] = create_agent() # Adding monitoring try: # Log the existence of agent memory has_memory = hasattr(session_state["agent"], "memory") print(f"Agent has memory: {has_memory}") if has_memory: print(f"Memory type: {type(session_state['agent'].memory)}") messages.append(gr.ChatMessage(role="user", content=prompt)) yield messages for msg in stream_to_gradio( session_state["agent"], task=prompt, reset_agent_memory=False ): messages.append(msg) yield messages # Yield messages after each step yield messages # Yield messages one last time except Exception as e: print(f"Error in interaction: {str(e)}") raise def upload_file( self, file, file_uploads_log, ): """Handle file uploads with proper validation and security.""" if file is None: return gr.Textbox("No file uploaded", visible=True), file_uploads_log try: mime_type, _ = mimetypes.guess_type(file.name) except Exception as e: return gr.Textbox(f"Error: {e}", visible=True), file_uploads_log if mime_type not in ALLOWED_FILE_TYPES: return gr.Textbox("File type disallowed", visible=True), file_uploads_log # Sanitize file name original_name = os.path.basename(file.name) sanitized_name = re.sub( r"[^\w\-.]", "_", original_name ) # Replace invalid chars with underscores # Ensure the extension correlates to the mime type type_to_ext = {} for ext, t in mimetypes.types_map.items(): if t not in type_to_ext: type_to_ext[t] = ext # Build sanitized filename with proper extension name_parts = sanitized_name.split(".")[:-1] extension = type_to_ext.get(mime_type, "") sanitized_name = "".join(name_parts) + extension # Limit File Size, and Throw Error max_file_size_mb = 50 # Define the limit file_size_mb = os.path.getsize(file.name) / (1024 * 1024) # Size in MB if file_size_mb > max_file_size_mb: return ( gr.Textbox( f"File size exceeds {max_file_size_mb} MB limit.", visible=True ), file_uploads_log, ) # Save the uploaded file to the specified folder file_path = os.path.join(self.file_upload_folder, sanitized_name) shutil.copy(file.name, file_path) return gr.Textbox( f"File uploaded: {file_path}", visible=True ), file_uploads_log + [file_path] def log_user_message(self, text_input, file_uploads_log): """Process user message and handle file references.""" message = text_input if len(file_uploads_log) > 0: message += f"\nYou have been provided with these files, which might be helpful or not: {file_uploads_log}" # Added file list return ( message, gr.Textbox( value="", interactive=False, placeholder="Processing...", # Changed placeholder. ), gr.Button(interactive=False), ) def detect_device(self, request: gr.Request): """Detect whether the user is on mobile or desktop device.""" if not request: return "Unknown device" # Handle case where request is none. # Method 1: Check sec-ch-ua-mobile header is_mobile_header = request.headers.get("sec-ch-ua-mobile") if is_mobile_header: return "Mobile" if "?1" in is_mobile_header else "Desktop" # Method 2: Check user-agent string user_agent = request.headers.get("user-agent", "").lower() mobile_keywords = ["android", "iphone", "ipad", "mobile", "phone"] if any(keyword in user_agent for keyword in mobile_keywords): return "Mobile" # Method 3: Check platform platform = request.headers.get("sec-ch-ua-platform", "").lower() if platform: if platform in ['"android"', '"ios"']: return "Mobile" if platform in ['"windows"', '"macos"', '"linux"']: return "Desktop" # Default case if no clear indicators return "Desktop" def launch(self, **kwargs): """Launch the Gradio UI with responsive layout.""" with gr.Blocks(theme="ocean", fill_height=True) as demo: # Different layouts for mobile and computer devices @gr.render() def layout(request: gr.Request): device = self.detect_device(request) print(f"device - {device}") # Render layout with sidebar if device == "Desktop": return self._create_desktop_layout() return self._create_mobile_layout() demo.queue(max_size=20).launch( debug=True, **kwargs ) # Add queue with reasonable size def _create_desktop_layout(self): """Create the desktop layout with sidebar.""" with gr.Blocks(fill_height=True) as sidebar_demo: with gr.Sidebar(): gr.Markdown( """#OpenDeepResearch - 3theSmolagents! Model_id: google/gemini-2.0-flash-001""" ) with gr.Group(): gr.Markdown("**What's on your mind mate?**", container=True) text_input = gr.Textbox( lines=3, label="Your request", container=False, placeholder="Enter your prompt here and press Shift+Enter or press the button", ) launch_research_btn = gr.Button("Run", variant="primary") # If an upload folder is provided, enable the upload feature if self.file_upload_folder is not None: upload_file = gr.File(label="Upload a file") upload_status = gr.Textbox( label="Upload Status", interactive=False, visible=False ) file_uploads_log = gr.State([]) upload_file.change( self.upload_file, [upload_file, file_uploads_log], [upload_status, file_uploads_log], ) gr.HTML("

Powered by:

") with gr.Row(): gr.HTML( """
logo huggingface/smolagents
""" ) # Add session state to store session-specific data session_state = gr.State({}) # Initialize empty state for each session stored_messages = gr.State([]) if "file_uploads_log" not in locals(): file_uploads_log = gr.State([]) chatbot = gr.Chatbot( label="Research-Assistant", type="messages", avatar_images=( None, "https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/smolagents/mascot_smol.png", ), resizeable=False, scale=1, elem_id="my-chatbot", ) self._connect_event_handlers( text_input, launch_research_btn, file_uploads_log, stored_messages, chatbot, session_state, ) return sidebar_demo def _create_mobile_layout(self): """Create the mobile layout (simpler without sidebar).""" with gr.Blocks(fill_height=True) as simple_demo: gr.Markdown("""#OpenDeepResearch - free the AI agents!""") # Add session state to store session-specific data session_state = gr.State({}) stored_messages = gr.State([]) file_uploads_log = gr.State([]) chatbot = gr.Chatbot( label="Research-Assistant", type="messages", avatar_images=( None, "https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/smolagents/mascot_smol.png", ), resizeable=True, scale=1, ) # If an upload folder is provided, enable the upload feature if self.file_upload_folder is not None: upload_file = gr.File(label="Upload a file") upload_status = gr.Textbox( label="Upload Status", interactive=False, visible=False ) upload_file.change( self.upload_file, [upload_file, file_uploads_log], [upload_status, file_uploads_log], ) text_input = gr.Textbox( lines=1, label="What's on your mind mate?", placeholder="Chuck in a question and we'll take care of the rest", ) launch_research_btn = gr.Button("Run", variant="primary") self._connect_event_handlers( text_input, launch_research_btn, file_uploads_log, stored_messages, chatbot, session_state, ) return simple_demo def _connect_event_handlers( self, text_input, launch_research_btn, file_uploads_log, stored_messages, chatbot, session_state, ): """Connect the event handlers for input elements.""" # Connect text input submit event text_input.submit( self.log_user_message, [text_input, file_uploads_log], [stored_messages, text_input, launch_research_btn], ).then( self.interact_with_agent, [stored_messages, chatbot, session_state], [chatbot], ).then( lambda: ( gr.Textbox( interactive=True, placeholder="Enter your prompt here and press the button", ), gr.Button(interactive=True), ), None, [text_input, launch_research_btn], ) # Connect button click event launch_research_btn.click( self.log_user_message, [text_input, file_uploads_log], [stored_messages, text_input, launch_research_btn], ).then( self.interact_with_agent, [stored_messages, chatbot, session_state], [chatbot], ).then( lambda: ( gr.Textbox( interactive=True, placeholder="Enter your prompt here and press the button", ), gr.Button(interactive=True), ), None, [text_input, launch_research_btn], ) # ------------------------ Execution ------------------------ def main(): """Main entry point for the application.""" # Initialize environment setup_environment() # Ensure downloads folder exists os.makedirs(f"./{BROWSER_CONFIG['downloads_folder']}", exist_ok=True) # Launch UI GradioUI(file_upload_folder="uploaded_files").launch() if __name__ == "__main__": main()