Spaces:
Runtime error
Runtime error
| """Main application for the OpenDeepResearch Gradio interface.""" | |
| import mimetypes | |
| import os | |
| import re | |
| import shutil | |
| from typing import Optional | |
| from dotenv import load_dotenv | |
| from huggingface_hub import login | |
| import gradio as gr | |
| from scripts.text_inspector_tool import TextInspectorTool | |
| from scripts.text_web_browser import ( | |
| ArchiveSearchTool, | |
| FinderTool, | |
| FindNextTool, | |
| PageDownTool, | |
| PageUpTool, | |
| SimpleTextBrowser, | |
| VisitTool, | |
| ) | |
| from scripts.visual_qa import visualizer | |
| from scripts.text_cleaner_tool import TextCleanerTool | |
| from smolagents import ( | |
| CodeAgent, | |
| HfApiModel, | |
| LiteLLMModel, | |
| OpenAIServerModel, | |
| TransformersModel, | |
| GoogleSearchTool, | |
| Tool, | |
| ) | |
| from smolagents.agent_types import AgentText, AgentImage, AgentAudio | |
| from smolagents.gradio_ui import pull_messages_from_step, handle_agent_output_types | |
| # Constants and configurations | |
| AUTHORIZED_IMPORTS = [ | |
| "requests", | |
| "zipfile", | |
| "pandas", | |
| "numpy", | |
| "sympy", | |
| "json", | |
| "bs4", | |
| "pubchempy", | |
| "xml", | |
| "yahoo_finance", | |
| "Bio", | |
| "sklearn", | |
| "scipy", | |
| "pydub", | |
| "PIL", | |
| "chess", | |
| "PyPDF2", | |
| "pptx", | |
| "torch", | |
| "datetime", | |
| "fractions", | |
| "csv", | |
| "clean-text", | |
| ] | |
| USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0" | |
| BROWSER_CONFIG = { | |
| "viewport_size": 1024 * 5, | |
| "downloads_folder": "downloads_folder", | |
| "request_kwargs": { | |
| "headers": {"User-Agent": USER_AGENT}, | |
| "timeout": 300, | |
| }, | |
| "serpapi_key": os.getenv("SERPAPI_API_KEY"), | |
| } | |
| CUSTOM_ROLE_CONVERSIONS = {"tool-call": "assistant", "tool-response": "user"} | |
| ALLOWED_FILE_TYPES = [ | |
| "application/pdf", | |
| "application/vnd.openxmlformats-officedocument.wordprocessingml.document", | |
| "text/plain", | |
| "text/markdown", # Added Markdown support | |
| "application/json", # Added JSON support | |
| "image/png", | |
| "image/webp", | |
| "image/jpeg", # Added JPEG support | |
| "image/gif", # Added GIF support | |
| "video/mp4", | |
| "audio/mpeg", # Added MP3 support | |
| "audio/wav", # Added WAV support | |
| "audio/ogg", # Added OGG support | |
| ] | |
| def setup_environment(): | |
| """Initialize environment variables and authentication.""" | |
| load_dotenv(override=True) | |
| hf_token = os.getenv("HF_TOKEN") | |
| if hf_token: # Check if token is actually set | |
| login(hf_token) | |
| print("HF_TOKEN (last 10 characters):", hf_token[-10:]) | |
| else: | |
| print("HF_TOKEN not found in environment variables.") | |
| class ModelManager: | |
| """Manages model loading and initialization.""" | |
| def load_model(chosen_inference: str, model_id: str, key_manager=None): | |
| """Load the specified model with appropriate configuration.""" | |
| try: | |
| if chosen_inference == "hf_api": | |
| return HfApiModel(model_id=model_id) | |
| if chosen_inference == "hf_api_provider": | |
| return HfApiModel(provider="together") | |
| if chosen_inference == "litellm": | |
| return LiteLLMModel(model_id=model_id) | |
| if chosen_inference == "openai": | |
| if not key_manager: | |
| raise ValueError("Key manager required for OpenAI model") | |
| return OpenAIServerModel( | |
| model_id=model_id, api_key=key_manager.get_key("openai_api_key") | |
| ) | |
| if chosen_inference == "transformers": | |
| return TransformersModel( | |
| model_id="HuggingFaceTB/SmolLM2-1.7B-Instruct", | |
| device_map="auto", | |
| max_new_tokens=1000, | |
| ) | |
| raise ValueError(f"Invalid inference type: {chosen_inference}") | |
| except Exception as e: | |
| print(f"✗ Couldn't load model: {e}") | |
| raise | |
| class ToolRegistry: | |
| """Manages tool initialization and organization.""" | |
| def load_web_tools(model, browser, text_limit=20000): | |
| """Initialize and return web-related tools.""" | |
| return [ | |
| GoogleSearchTool(provider="serper"), | |
| VisitTool(browser), | |
| PageUpTool(browser), | |
| PageDownTool(browser), | |
| FinderTool(browser), | |
| FindNextTool(browser), | |
| ArchiveSearchTool(browser), | |
| TextInspectorTool(model, text_limit), | |
| ] | |
| def load_image_generation_tools(): | |
| """Initialize and return image generation tools.""" | |
| try: | |
| return Tool.from_space( | |
| space_id="xkerser/FLUX.1-dev", | |
| name="image_generator", | |
| description="Generates high-quality AgentImage with text prompt (77 token limit).", | |
| ) | |
| except Exception as e: | |
| print(f"✗ Couldn't initialize image generation tool: {e}") | |
| raise | |
| def load_clean_text_tool(): | |
| """Initialize and return image generation tools.""" | |
| try: | |
| return TextCleanerTool | |
| except Exception as e: | |
| print(f"✗ Couldn't initialize clean text tool: {e}") | |
| raise | |
| def create_agent(): | |
| """Creates a fresh agent instance with properly configured tools.""" | |
| # Initialize model | |
| model = LiteLLMModel( | |
| custom_role_conversions=CUSTOM_ROLE_CONVERSIONS, | |
| model_id="openrouter/deepseek/deepseek-chat-v3-0324:free", # currently serving: | |
| ) # DEEPSEEK = openrouter/perplexity/r1-1776 <--- boss model | |
| # Initialize tools | |
| text_limit = 30000 | |
| browser = SimpleTextBrowser(**BROWSER_CONFIG) | |
| # Collect all tools in a single list | |
| web_tools = ToolRegistry.load_web_tools(model, browser, text_limit) | |
| image_generator = ToolRegistry.load_image_generation_tools() | |
| clean_text = TextCleanerTool() # Instantiate TextCleanerTool | |
| # Combine all tools into a single list (not a tuple) | |
| all_tools = [visualizer] + web_tools + [image_generator] + [clean_text] | |
| # Validate tools before creating agent | |
| for tool in all_tools: | |
| if not isinstance(tool, Tool): | |
| raise ValueError( | |
| "Invalid tool type: " | |
| f"{type(tool)}. All tools must be instances of Tool class." | |
| ) | |
| return CodeAgent( | |
| model=model, | |
| tools=all_tools, # Pass a single list containing all tools | |
| max_steps=12, | |
| verbosity_level=2, | |
| additional_authorized_imports=AUTHORIZED_IMPORTS, | |
| planning_interval=4, | |
| ) | |
| def stream_to_gradio( | |
| agent, | |
| task: str, | |
| reset_agent_memory: bool = False, | |
| additional_args: Optional[dict] = None, | |
| ): | |
| """Runs an agent with the given task and streams messages as Gradio ChatMessages.""" | |
| for step_log in agent.run( | |
| task, stream=True, reset=reset_agent_memory, additional_args=additional_args | |
| ): | |
| yield from pull_messages_from_step(step_log) | |
| # Process final answer : Use a more comprehensive media output | |
| final_answer = step_log # Last log is the run's final_answer | |
| final_answer = handle_agent_output_types(final_answer) | |
| if isinstance(final_answer, AgentText): | |
| yield gr.ChatMessage( | |
| role="assistant", | |
| content=f"**Final answer:**\n{final_answer.to_string()}\n", | |
| ) | |
| elif isinstance(final_answer, AgentImage): | |
| yield gr.ChatMessage( | |
| role="assistant", | |
| content={"image": final_answer.to_string(), "type": "file"}, | |
| ) # Send as Gradio-compatible file object: | |
| elif isinstance(final_answer, AgentAudio): | |
| yield gr.ChatMessage( | |
| role="assistant", | |
| content={"audio": final_answer.to_string(), "type": "file"}, | |
| ) # Send as Gradio-compatible file object | |
| else: | |
| yield gr.ChatMessage( | |
| role="assistant", content=f"**Final answer:** {str(final_answer)}" | |
| ) | |
| class GradioUI: | |
| """A one-line interface to launch your agent in Gradio.""" | |
| def __init__(self, file_upload_folder: str | None = None): | |
| """Initialize the Gradio UI with optional file upload functionality.""" | |
| self.file_upload_folder = file_upload_folder | |
| if self.file_upload_folder is not None: | |
| os.makedirs(file_upload_folder, exist_ok=True) | |
| def interact_with_agent(self, prompt, messages, session_state): | |
| """Main interaction handler with the agent.""" | |
| # Get or create session-specific agent | |
| if "agent" not in session_state: | |
| session_state["agent"] = create_agent() | |
| # Adding monitoring | |
| try: | |
| # Log the existence of agent memory | |
| has_memory = hasattr(session_state["agent"], "memory") | |
| print(f"Agent has memory: {has_memory}") | |
| if has_memory: | |
| print(f"Memory type: {type(session_state['agent'].memory)}") | |
| messages.append(gr.ChatMessage(role="user", content=prompt)) | |
| yield messages | |
| for msg in stream_to_gradio( | |
| session_state["agent"], task=prompt, reset_agent_memory=False | |
| ): | |
| messages.append(msg) | |
| yield messages # Yield messages after each step | |
| yield messages # Yield messages one last time | |
| except Exception as e: | |
| print(f"Error in interaction: {str(e)}") | |
| raise | |
| def upload_file( | |
| self, | |
| file, | |
| file_uploads_log, | |
| ): | |
| """Handle file uploads with proper validation and security.""" | |
| if file is None: | |
| return gr.Textbox("No file uploaded", visible=True), file_uploads_log | |
| try: | |
| mime_type, _ = mimetypes.guess_type(file.name) | |
| except Exception as e: | |
| return gr.Textbox(f"Error: {e}", visible=True), file_uploads_log | |
| if mime_type not in ALLOWED_FILE_TYPES: | |
| return gr.Textbox("File type disallowed", visible=True), file_uploads_log | |
| # Sanitize file name | |
| original_name = os.path.basename(file.name) | |
| sanitized_name = re.sub( | |
| r"[^\w\-.]", "_", original_name | |
| ) # Replace invalid chars with underscores | |
| # Ensure the extension correlates to the mime type | |
| type_to_ext = {} | |
| for ext, t in mimetypes.types_map.items(): | |
| if t not in type_to_ext: | |
| type_to_ext[t] = ext | |
| # Build sanitized filename with proper extension | |
| name_parts = sanitized_name.split(".")[:-1] | |
| extension = type_to_ext.get(mime_type, "") | |
| sanitized_name = "".join(name_parts) + extension | |
| # Limit File Size, and Throw Error | |
| max_file_size_mb = 50 # Define the limit | |
| file_size_mb = os.path.getsize(file.name) / (1024 * 1024) # Size in MB | |
| if file_size_mb > max_file_size_mb: | |
| return ( | |
| gr.Textbox( | |
| f"File size exceeds {max_file_size_mb} MB limit.", visible=True | |
| ), | |
| file_uploads_log, | |
| ) | |
| # Save the uploaded file to the specified folder | |
| file_path = os.path.join(self.file_upload_folder, sanitized_name) | |
| shutil.copy(file.name, file_path) | |
| return gr.Textbox( | |
| f"File uploaded: {file_path}", visible=True | |
| ), file_uploads_log + [file_path] | |
| def log_user_message(self, text_input, file_uploads_log): | |
| """Process user message and handle file references.""" | |
| message = text_input | |
| if file_uploads_log: | |
| message += f"\nYou have been provided with these files, which might be helpful or not: {file_uploads_log}" # Added file list | |
| return ( | |
| message, | |
| gr.Textbox( | |
| value="", | |
| interactive=False, | |
| placeholder="Processing...", # Changed placeholder. | |
| ), | |
| gr.Button(interactive=False), | |
| ) | |
| def detect_device(self, request: gr.Request): | |
| """Detect whether the user is on mobile or desktop device.""" | |
| if not request: | |
| return "Unknown device" # Handle case where request is none. | |
| # Method 1: Check sec-ch-ua-mobile header | |
| is_mobile_header = request.headers.get("sec-ch-ua-mobile") | |
| if is_mobile_header: | |
| return "Mobile" if "?1" in is_mobile_header else "Desktop" | |
| # Method 2: Check user-agent string | |
| user_agent = request.headers.get("user-agent", "").lower() | |
| mobile_keywords = ["android", "iphone", "ipad", "mobile", "phone"] | |
| if any(keyword in user_agent for keyword in mobile_keywords): | |
| return "Mobile" | |
| # Method 3: Check platform | |
| platform = request.headers.get("sec-ch-ua-platform", "").lower() | |
| if platform: | |
| if platform in ['"android"', '"ios"']: | |
| return "Mobile" | |
| elif platform in ['"windows"', '"macos"', '"linux"']: | |
| return "Desktop" | |
| # Default case if no clear indicators | |
| return "Desktop" | |
| def launch(self, **kwargs): | |
| """Launch the Gradio UI with responsive layout.""" | |
| with gr.Blocks(theme="ocean", fill_height=True) as demo: | |
| # Different layouts for mobile and computer devices | |
| def layout(request: gr.Request): | |
| device = self.detect_device(request) | |
| print(f"device - {device}") | |
| # Render layout with sidebar | |
| if device == "Desktop": | |
| return self._create_desktop_layout() | |
| else: | |
| return self._create_mobile_layout() | |
| demo.queue(max_size=20).launch( | |
| debug=True, **kwargs | |
| ) # Add queue with reasonable size | |
| def _create_desktop_layout(self): | |
| """Create the desktop layout with sidebar.""" | |
| with gr.Blocks(fill_height=True) as sidebar_demo: | |
| with gr.Sidebar(): | |
| gr.Markdown( | |
| """#OpenDeepResearch - 3theSmolagents! | |
| Model_id: deepseek/deepseek-chat-v3-0324:free""" | |
| ) | |
| with gr.Group(): | |
| gr.Markdown("**What's on your mind mate?**", container=True) | |
| text_input = gr.Textbox( | |
| lines=3, | |
| label="Your request", | |
| container=False, | |
| placeholder="Enter your prompt here and press Shift+Enter or press the button", | |
| ) | |
| launch_research_btn = gr.Button("Run", variant="primary") | |
| # If an upload folder is provided, enable the upload feature | |
| if self.file_upload_folder is not None: | |
| upload_file = gr.File(label="Upload a file") | |
| upload_status = gr.Textbox( | |
| label="Upload Status", interactive=False, visible=False | |
| ) | |
| file_uploads_log = gr.State([]) | |
| upload_file.change( | |
| self.upload_file, | |
| [upload_file, file_uploads_log], | |
| [upload_status, file_uploads_log], | |
| ) | |
| gr.HTML("<br><br><h4><center>Powered by:</center></h4>") | |
| with gr.Row(): | |
| gr.HTML( | |
| """ | |
| <div style="display: flex; align-items: center; gap: 8px; font-family: system-ui, -apple-system, sans-serif;"> | |
| <img src="https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/smolagents/mascot_smol.png" | |
| style="width: 32px; height: 32px; object-fit: contain;" alt="logo"> | |
| <a target="_blank" href="https://github.com/huggingface/smolagents"> | |
| <b>huggingface/smolagents</b> | |
| </a> | |
| </div> | |
| """ | |
| ) | |
| # Add session state to store session-specific data | |
| session_state = gr.State({}) # Initialize empty state for each session | |
| stored_messages = gr.State([]) | |
| if "file_uploads_log" not in locals(): | |
| file_uploads_log = gr.State([]) | |
| chatbot = gr.Chatbot( | |
| label="open-Deep-Research", | |
| type="messages", | |
| avatar_images=( | |
| None, | |
| "https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/smolagents/mascot_smol.png", | |
| ), | |
| resizeable=False, | |
| scale=1, | |
| elem_id="my-chatbot", | |
| ) | |
| self._connect_event_handlers( | |
| text_input, | |
| launch_research_btn, | |
| file_uploads_log, | |
| stored_messages, | |
| chatbot, | |
| session_state, | |
| ) | |
| return sidebar_demo | |
| def _create_mobile_layout(self): | |
| """Create the mobile layout (simpler without sidebar).""" | |
| with gr.Blocks(fill_height=True) as simple_demo: | |
| gr.Markdown("""#OpenDeepResearch - free the AI agents!""") | |
| # Add session state to store session-specific data | |
| session_state = gr.State({}) | |
| stored_messages = gr.State([]) | |
| file_uploads_log = gr.State([]) | |
| chatbot = gr.Chatbot( | |
| label="ODR", | |
| type="messages", | |
| avatar_images=( | |
| None, | |
| "https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/smolagents/mascot_smol.png", | |
| ), | |
| resizeable=True, | |
| scale=1, | |
| ) | |
| # If an upload folder is provided, enable the upload feature | |
| if self.file_upload_folder is not None: | |
| upload_file = gr.File(label="Upload a file") | |
| upload_status = gr.Textbox( | |
| label="Upload Status", interactive=False, visible=False | |
| ) | |
| upload_file.change( | |
| self.upload_file, | |
| [upload_file, file_uploads_log], | |
| [upload_status, file_uploads_log], | |
| ) | |
| text_input = gr.Textbox( | |
| lines=1, | |
| label="What's on your mind mate?", | |
| placeholder="Chuck in a question and we'll take care of the rest", | |
| ) | |
| launch_research_btn = gr.Button("Run", variant="primary") | |
| self._connect_event_handlers( | |
| text_input, | |
| launch_research_btn, | |
| file_uploads_log, | |
| stored_messages, | |
| chatbot, | |
| session_state, | |
| ) | |
| return simple_demo | |
| def _connect_event_handlers( | |
| self, | |
| text_input, | |
| launch_research_btn, | |
| file_uploads_log, | |
| stored_messages, | |
| chatbot, | |
| session_state, | |
| ): | |
| """Connect the event handlers for input elements.""" | |
| # Connect text input submit event | |
| text_input.submit( | |
| self.log_user_message, | |
| [text_input, file_uploads_log], | |
| [stored_messages, text_input, launch_research_btn], | |
| ).then( | |
| self.interact_with_agent, | |
| [stored_messages, chatbot, session_state], | |
| [chatbot], | |
| ).then( | |
| lambda: ( | |
| gr.Textbox( | |
| interactive=True, | |
| placeholder="Enter your prompt here and press the button", | |
| ), | |
| gr.Button(interactive=True), | |
| ), | |
| None, | |
| [text_input, launch_research_btn], | |
| ) | |
| # Connect button click event | |
| launch_research_btn.click( | |
| self.log_user_message, | |
| [text_input, file_uploads_log], | |
| [stored_messages, text_input, launch_research_btn], | |
| ).then( | |
| self.interact_with_agent, | |
| [stored_messages, chatbot, session_state], | |
| [chatbot], | |
| ).then( | |
| lambda: ( | |
| gr.Textbox( | |
| interactive=True, | |
| placeholder="Enter your prompt here and press the button", | |
| ), | |
| gr.Button(interactive=True), | |
| ), | |
| None, | |
| [text_input, launch_research_btn], | |
| ) | |
| def main(): | |
| """Main entry point for the application.""" | |
| # Initialize environment | |
| setup_environment() | |
| # Ensure downloads folder exists | |
| os.makedirs(f"./{BROWSER_CONFIG['downloads_folder']}", exist_ok=True) | |
| # Launch UI | |
| GradioUI(file_upload_folder="uploaded_files").launch() | |
| if __name__ == "__main__": | |
| main() | |