Spaces:
Sleeping
Sleeping
| """ | |
| AutoGPT Hugging Face Space Entry Point | |
| Main application file for the Docker Space with Google Gemini integration | |
| """ | |
| import os | |
| import json | |
| import time | |
| from typing import Optional, List, Dict, Any | |
| import gradio as gr | |
| # Try to import Google Gemini - prefer new google-genai package | |
| # Falls back to deprecated google-generativeai if needed | |
| GEMINI_AVAILABLE = False | |
| genai = None | |
| try: | |
| import google.genai as genai | |
| GEMINI_AVAILABLE = True | |
| except (ImportError, Exception) as e: | |
| try: | |
| # Fallback to deprecated package | |
| import google.generativeai as genai | |
| GEMINI_AVAILABLE = True | |
| except (ImportError, Exception): | |
| GEMINI_AVAILABLE = False | |
| print(f"Warning: Google Gemini library not available: {e}") | |
| print("Install with: pip install google-generativeai") | |
| # Environment variables for API keys | |
| GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") | |
| GEMINI_MODEL = os.getenv("GEMINI_MODEL", "gemini-pro") | |
| class AutoGPTAgent: | |
| """ | |
| Autonomous GPT agent powered by Google Gemini that breaks down tasks and executes them iteratively | |
| """ | |
| def __init__(self, api_key: Optional[str] = None, model: Optional[str] = None, temperature: float = 0.7): | |
| """ | |
| Initialize the AutoGPT agent with Google Gemini | |
| Args: | |
| api_key: Google Gemini API key | |
| model: Gemini model name (defaults to gemini-pro) | |
| temperature: Temperature for LLM generation (0.0-2.0) | |
| """ | |
| self.temperature = temperature | |
| self.api_key = api_key or GOOGLE_API_KEY | |
| self.model = model or GEMINI_MODEL | |
| self.client = None | |
| if GEMINI_AVAILABLE and self.api_key and genai is not None: | |
| try: | |
| # Configure API key (works for both old and new packages) | |
| genai.configure(api_key=self.api_key) | |
| # Use GenerativeModel for the old package API | |
| self.client = genai.GenerativeModel(self.model) | |
| self.model_name = self.model | |
| except Exception as e: | |
| print(f"Warning: Could not initialize Gemini client: {e}") | |
| self.client = None | |
| else: | |
| self.client = None | |
| def _generate_response(self, prompt: str, system_instruction: Optional[str] = None) -> str: | |
| """ | |
| Generate a response using Gemini API | |
| Args: | |
| prompt: User prompt | |
| system_instruction: Optional system instruction | |
| Returns: | |
| Generated response text | |
| """ | |
| if not self.client: | |
| return "API client not initialized. Please configure GOOGLE_API_KEY." | |
| try: | |
| # Combine system instruction and prompt | |
| full_prompt = prompt | |
| if system_instruction: | |
| full_prompt = f"{system_instruction}\n\n{prompt}" | |
| # Use the google.generativeai API (works with deprecated package) | |
| generation_config = { | |
| "temperature": self.temperature, | |
| "max_output_tokens": 2048, | |
| } | |
| response = self.client.generate_content( | |
| full_prompt, | |
| generation_config=generation_config | |
| ) | |
| # Extract text from response | |
| return response.text.strip() | |
| except Exception as e: | |
| raise Exception(f"Gemini API error: {str(e)}") | |
| def plan_task(self, task: str) -> List[str]: | |
| """ | |
| Break down a task into actionable steps | |
| Args: | |
| task: The main task description | |
| Returns: | |
| List of steps to accomplish the task | |
| """ | |
| if not self.client: | |
| # Fallback: simple step generation | |
| return [ | |
| f"Analyze the task: {task}", | |
| f"Research relevant information about: {task}", | |
| f"Execute the main objective: {task}", | |
| f"Review and refine the results" | |
| ] | |
| try: | |
| prompt = f"""Break down the following task into 3-5 specific, actionable steps. | |
| Task: {task} | |
| Return ONLY a JSON array of step descriptions, no other text. | |
| Example format: ["Step 1 description", "Step 2 description", ...]""" | |
| system_instruction = "You are a task planning assistant. Return only valid JSON arrays." | |
| response_text = self._generate_response(prompt, system_instruction) | |
| # Try to parse JSON, fallback to simple parsing | |
| try: | |
| # Clean response - remove markdown code blocks if present | |
| cleaned = response_text.strip() | |
| if cleaned.startswith("```"): | |
| # Remove code block markers | |
| lines = cleaned.split("\n") | |
| cleaned = "\n".join(lines[1:-1]) if len(lines) > 2 else cleaned | |
| if cleaned.startswith("json"): | |
| cleaned = cleaned[4:].strip() | |
| steps = json.loads(cleaned) | |
| if isinstance(steps, list): | |
| return steps | |
| except json.JSONDecodeError: | |
| # Fallback: try to extract array from text | |
| import re | |
| # Look for array pattern | |
| match = re.search(r'\[(.*?)\]', response_text, re.DOTALL) | |
| if match: | |
| array_content = match.group(1) | |
| steps = [s.strip().strip('"\'') for s in array_content.split(",") if s.strip()] | |
| if steps: | |
| return steps | |
| # Last resort: split by lines | |
| lines = [l.strip() for l in response_text.split("\n") if l.strip() and not l.strip().startswith("#")] | |
| if lines: | |
| return lines[:5] # Limit to 5 steps | |
| # If all parsing fails, return simple steps | |
| return [f"Execute: {task}"] | |
| except Exception as e: | |
| print(f"Error in plan_task: {e}") | |
| return [f"Execute: {task}"] | |
| def execute_step(self, step: str, context: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Execute a single step of the plan | |
| Args: | |
| step: Description of the step to execute | |
| context: Previous context and results | |
| Returns: | |
| Dictionary with step results | |
| """ | |
| if not self.client: | |
| # Fallback implementation | |
| return { | |
| "step": step, | |
| "status": "completed", | |
| "result": f"Simulated execution of: {step}", | |
| "reasoning": "This is a placeholder result. Configure GOOGLE_API_KEY for full functionality." | |
| } | |
| try: | |
| context_str = json.dumps(context, indent=2) if context else "No previous context" | |
| prompt = f"""You are an autonomous AI agent executing a task step. | |
| Previous context: | |
| {context_str} | |
| Current step to execute: {step} | |
| Provide: | |
| 1. Your reasoning/approach | |
| 2. The action you would take | |
| 3. The expected result | |
| Format your response clearly.""" | |
| system_instruction = "You are an autonomous AI agent that executes tasks step by step. Be thorough and detailed in your responses." | |
| result = self._generate_response(prompt, system_instruction) | |
| return { | |
| "step": step, | |
| "status": "completed", | |
| "result": result, | |
| "reasoning": result.split("\n")[0] if "\n" in result else result[:200] | |
| } | |
| except Exception as e: | |
| return { | |
| "step": step, | |
| "status": "error", | |
| "result": f"Error executing step: {str(e)}", | |
| "reasoning": str(e) | |
| } | |
| def run(self, task: str, max_iterations: int = 5, progress_callback=None) -> str: | |
| """ | |
| Run the autonomous agent on a task | |
| Args: | |
| task: The main task to accomplish | |
| max_iterations: Maximum number of iterations | |
| progress_callback: Optional callback function for progress updates | |
| Returns: | |
| Final result as formatted string | |
| """ | |
| if not self.api_key: | |
| return """β οΈ **Configuration Required** | |
| To use AutoGPT with Google Gemini, please configure your API key: | |
| 1. Go to Space Settings β Repository secrets | |
| 2. Add `GOOGLE_API_KEY` with your Google Gemini API key | |
| (Get one at: https://makersuite.google.com/app/apikey) | |
| 3. Optionally set `GEMINI_MODEL` (default: gemini-pro) | |
| Available models: gemini-pro, gemini-pro-vision | |
| **Current Status:** Running in demo mode with simulated responses.""" | |
| results = [] | |
| output_lines = [] | |
| # Step 1: Plan the task | |
| if progress_callback: | |
| progress_callback("π Planning task with Gemini...") | |
| output_lines.append(f"# π€ AutoGPT Execution (Powered by Google Gemini)") | |
| output_lines.append(f"\n## Task: {task}") | |
| output_lines.append(f"\n### Planning Phase") | |
| try: | |
| steps = self.plan_task(task) | |
| output_lines.append(f"\n**Generated {len(steps)} steps:**") | |
| for i, step in enumerate(steps, 1): | |
| output_lines.append(f"{i}. {step}") | |
| except Exception as e: | |
| output_lines.append(f"\nβ οΈ Error in planning: {str(e)}") | |
| steps = [task] | |
| # Step 2: Execute steps | |
| output_lines.append(f"\n### Execution Phase") | |
| context = {"task": task, "steps": steps} | |
| iterations = min(max_iterations, len(steps)) | |
| for i in range(iterations): | |
| if i < len(steps): | |
| step = steps[i] | |
| if progress_callback: | |
| progress_callback(f"βοΈ Executing step {i+1}/{iterations}: {step[:50]}...") | |
| output_lines.append(f"\n**Step {i+1}: {step}**") | |
| step_result = self.execute_step(step, context) | |
| results.append(step_result) | |
| output_lines.append(f"Status: {step_result['status']}") | |
| output_lines.append(f"Result: {step_result['result'][:300]}...") | |
| context[f"step_{i+1}"] = step_result | |
| time.sleep(0.5) # Small delay for readability | |
| # Step 3: Summarize | |
| output_lines.append(f"\n### Summary") | |
| output_lines.append(f"\nβ Completed {len(results)} steps") | |
| output_lines.append(f"\n**Final Result:**") | |
| if results: | |
| final_result = results[-1].get("result", "Task completed") | |
| output_lines.append(final_result) | |
| else: | |
| output_lines.append("No results generated.") | |
| return "\n".join(output_lines) | |
| def autogpt_interface( | |
| task: str, | |
| max_iterations: int = 5, | |
| temperature: float = 0.7, | |
| progress=gr.Progress() | |
| ) -> str: | |
| """ | |
| Main AutoGPT interface function with progress tracking | |
| Args: | |
| task: The task description for AutoGPT | |
| max_iterations: Maximum number of iterations | |
| temperature: Temperature for LLM generation | |
| progress: Gradio progress tracker | |
| Returns: | |
| Result string from AutoGPT execution | |
| """ | |
| if not task or not task.strip(): | |
| return "β οΈ Please enter a task description." | |
| try: | |
| # Initialize agent with Gemini | |
| agent = AutoGPTAgent(temperature=temperature) | |
| # Track progress | |
| def progress_callback(message: str): | |
| if progress: | |
| progress(0, desc=message) | |
| # Run the agent | |
| result = agent.run( | |
| task=task.strip(), | |
| max_iterations=max_iterations, | |
| progress_callback=progress_callback | |
| ) | |
| return result | |
| except Exception as e: | |
| return f"β **Error:** {str(e)}\n\nPlease check your API configuration in Space Settings." | |
| # Create enhanced Gradio interface | |
| def create_interface(): | |
| """Create and return the enhanced Gradio interface""" | |
| with gr.Blocks( | |
| title="AutoGPT Space π€ (Google Gemini)" | |
| ) as demo: | |
| # Header | |
| gr.HTML(""" | |
| <div class="main-header"> | |
| <h1>π€ AutoGPT Space</h1> | |
| <p>Autonomous AI Agent Powered by Google Gemini</p> | |
| </div> | |
| """) | |
| # Info box | |
| gr.Markdown(""" | |
| <div class="info-box"> | |
| <strong>π‘ How to use:</strong><br> | |
| 1. Enter a task you want AutoGPT to accomplish<br> | |
| 2. Adjust max iterations and temperature if needed<br> | |
| 3. Click "Run AutoGPT" and watch it work autonomously<br> | |
| 4. Configure GOOGLE_API_KEY in Space Settings for full functionality<br> | |
| 5. Get your API key at: <a href="https://makersuite.google.com/app/apikey" target="_blank">Google AI Studio</a> | |
| </div> | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### βοΈ Configuration") | |
| task_input = gr.Textbox( | |
| label="π Task Description", | |
| placeholder="Example: Research and summarize the latest AI developments, then create a Python script to analyze the data...", | |
| lines=4, | |
| value="" | |
| ) | |
| with gr.Accordion("π§ Advanced Settings", open=False): | |
| max_iter = gr.Slider( | |
| minimum=1, | |
| maximum=20, | |
| value=5, | |
| step=1, | |
| label="Max Iterations", | |
| info="Maximum number of steps AutoGPT will take" | |
| ) | |
| temperature = gr.Slider( | |
| minimum=0.0, | |
| maximum=2.0, | |
| value=0.7, | |
| step=0.1, | |
| label="Temperature", | |
| info="Controls randomness (0.0 = deterministic, 2.0 = creative)" | |
| ) | |
| submit_btn = gr.Button( | |
| "π Run AutoGPT", | |
| variant="primary", | |
| size="lg" | |
| ) | |
| clear_btn = gr.Button("ποΈ Clear", variant="secondary") | |
| with gr.Column(scale=1): | |
| gr.Markdown("### π Results") | |
| output = gr.Markdown( | |
| label="", | |
| value="**Ready to execute tasks!**\n\nEnter a task above and click 'Run AutoGPT' to begin.", | |
| elem_classes=["output-box"] | |
| ) | |
| # Examples | |
| gr.Markdown("### π‘ Example Tasks") | |
| gr.Examples( | |
| examples=[ | |
| ["Research and summarize the latest developments in AI, focusing on large language models"], | |
| ["Create a Python script to analyze stock market data and generate a report"], | |
| ["Write a comprehensive blog post about sustainable energy solutions"], | |
| ["Design a marketing strategy for a new tech product launch"], | |
| ["Analyze customer feedback data and provide actionable insights"] | |
| ], | |
| inputs=task_input, | |
| label="Click to load example" | |
| ) | |
| # Event handlers | |
| submit_btn.click( | |
| fn=autogpt_interface, | |
| inputs=[task_input, max_iter, temperature], | |
| outputs=output, | |
| show_progress="full" | |
| ) | |
| clear_btn.click( | |
| fn=lambda: ("", "**Ready to execute tasks!**\n\nEnter a task above and click 'Run AutoGPT' to begin."), | |
| inputs=[], | |
| outputs=[task_input, output] | |
| ) | |
| # Footer | |
| gr.Markdown(""" | |
| --- | |
| <div style="text-align: center; color: #666; padding: 20px;"> | |
| <p>π API keys are securely stored in Space Settings β Repository secrets</p> | |
| <p>Powered by Google Gemini | Built with Gradio</p> | |
| </div> | |
| """) | |
| return demo | |
| if __name__ == "__main__": | |
| try: | |
| # Get port from environment (Hugging Face Spaces sets this) | |
| port = int(os.environ.get("PORT", 7860)) | |
| # Custom CSS for better styling | |
| custom_css = """ | |
| .gradio-container { | |
| font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; | |
| } | |
| .main-header { | |
| text-align: center; | |
| padding: 20px; | |
| background: linear-gradient(135deg, #4285f4 0%, #34a853 100%); | |
| color: white; | |
| border-radius: 10px; | |
| margin-bottom: 20px; | |
| } | |
| .info-box { | |
| padding: 15px; | |
| background: #e8f0fe; | |
| border-left: 4px solid #4285f4; | |
| border-radius: 5px; | |
| margin: 10px 0; | |
| } | |
| """ | |
| # Launch the interface | |
| demo = create_interface() | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=port, | |
| share=False, | |
| show_error=True, | |
| theme=gr.themes.Soft( | |
| primary_hue="blue", | |
| secondary_hue="green", | |
| ), | |
| css=custom_css | |
| ) | |
| except Exception as e: | |
| print(f"Fatal error starting application: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| raise | |