Spaces:
Build error
Build error
| import gradio as gr | |
| import json | |
| import time | |
| import asyncio | |
| import subprocess | |
| import os | |
| import uuid | |
| from datetime import datetime | |
| from typing import Dict, List, Any, Optional, Tuple | |
| import tempfile | |
| import shutil | |
| import zipfile | |
| from pathlib import Path | |
| import requests | |
| import yaml | |
| class ColabNotebookManager: | |
| def __init__(self): | |
| self.notebooks = {} | |
| self.current_notebook = None | |
| def create_notebook(self, name: str, agent_type: str) -> str: | |
| """Create a new notebook for an agent""" | |
| notebook_id = str(uuid.uuid4()) | |
| notebook = { | |
| "id": notebook_id, | |
| "name": f"{agent_type}_{name}", | |
| "agent_type": agent_type, | |
| "cells": [], | |
| "created_at": datetime.now().isoformat(), | |
| "metadata": { | |
| "agent_outputs": [], | |
| "generated_code": [], | |
| "tools_used": [], | |
| "api_calls": [] | |
| } | |
| } | |
| self.notebooks[notebook_id] = notebook | |
| return notebook_id | |
| def add_cell(self, notebook_id: str, cell_type: str, content: str, metadata: Dict = None): | |
| """Add a cell to the notebook""" | |
| if notebook_id in self.notebooks: | |
| cell = { | |
| "cell_type": cell_type, | |
| "source": content, | |
| "metadata": metadata or {}, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| self.notebooks[notebook_id]["cells"].append(cell) | |
| def get_notebook_content(self, notebook_id: str) -> str: | |
| """Generate notebook content in Colab format""" | |
| if notebook_id not in self.notebooks: | |
| return "# Notebook not found" | |
| notebook = self.notebooks[notebook_id] | |
| content = f"# {notebook['name']}\n\n" | |
| content += f"**Agent Type:** {notebook['agent_type']}\n" | |
| content += f"**Created:** {notebook['created_at']}\n\n" | |
| content += "---\n\n" | |
| for i, cell in enumerate(notebook["cells"], 1): | |
| if cell["cell_type"] == "code": | |
| content += f"## Code Block {i}\n" | |
| content += "```python\n" | |
| content += cell["source"] | |
| content += "\n```\n\n" | |
| elif cell["cell_type"] == "markdown": | |
| content += f"## Notes {i}\n" | |
| content += cell["source"] + "\n\n" | |
| return content | |
| class AIAgent: | |
| def __init__(self, name: str, agent_type: str, notebook_manager: ColabNotebookManager): | |
| self.name = name | |
| self.agent_type = agent_type | |
| self.notebook_manager = notebook_manager | |
| self.notebook_id = notebook_manager.create_notebook(name, agent_type) | |
| self.tools = [] | |
| self.history = [] | |
| def execute_task(self, task: str, context: Dict = None) -> Dict[str, Any]: | |
| """Execute a task and log results to notebook""" | |
| start_time = time.time() | |
| # Add task to notebook | |
| self.notebook_manager.add_cell( | |
| self.notebook_id, | |
| "markdown", | |
| f"## Task: {task}\n\n**Time:** { | |
| datetime.now().isoformat() | |
| }" | |
| ) | |
| # Simulate task execution | |
| result = self._process_task(task, context or {}) | |
| # Add result to notebook | |
| self.notebook_manager.add_cell( | |
| self.notebook_id, | |
| "markdown", | |
| f"## Result\n\n```\n{json.dumps(result, indent=2)}\n```" | |
| ) | |
| # Add to history | |
| self.history.append({ | |
| agent.agent_type | |
| }) | |
| return result | |
| def _process_task(self, task: str, context: Dict) -> Dict: | |
| """Process task based on agent type""" | |
| if self.agent_type == "coder": | |
| return self._code_agent_task(task, context) | |
| elif self.agent_type == "data_processor": | |
| return self._data_processor_task(task, context) | |
| elif self.agent_type == "api_caller": | |
| return self._api_caller_task(task, context) | |
| elif self.agent_type == "server_manager": | |
| return self._server_manager_task(task, context) | |
| elif self.agent_type == "workflow_orchestrator": | |
| return self._workflow_task(task, context) | |
| else: | |
| return {"status": "error", "message": "Unknown agent type"} | |
| def _code_agent_task(self, task: str, context: Dict) -> Dict: | |
| """Handle code generation tasks""" | |
| code_samples = { | |
| "api": """ | |
| # Generated API Endpoint | |
| from fastapi import FastAPI | |
| import uvicorn | |
| app = FastAPI() | |
| @app.get("/") | |
| async def root(): | |
| return {"message": "Hello World"} | |
| if __name__ == "__main__": | |
| uvicorn.run(app, host="0.0.0.0", port=8000) | |
| """, | |
| "data": """ | |
| # Data Processing Script | |
| import pandas as pd | |
| import numpy as np | |
| def process_data(file_path): | |
| df = pd.read_csv(file_path) | |
| # Add your processing logic here | |
| return df | |
| result = process_data("data.csv") | |
| print(result.head()) | |
| """, | |
| "web": """ | |
| # Web Application with Gradio | |
| import gradio as gr | |
| def greet(name): | |
| return f"Hello {name}!" | |
| demo = gr.Interface(fn=greet, inputs="text", outputs="text") | |
| demo.launch() | |
| """ | |
| } | |
| # Generate code based on task keywords | |
| code = code_samples.get("web", "# Generated code\nprint('Task completed')") | |
| # Add code to notebook | |
| self.notebook_manager.add_cell(self.notebook_id, "code", code) | |
| return { | |
| "status": "success", | |
| "generated_code": code, | |
| "language": "python", | |
| "agent": self.name | |
| } | |
| def _data_processor_task(self, task: str, context: Dict) -> Dict: | |
| """Handle data processing tasks""" | |
| processing_code = """ | |
| # Data Processing Pipeline | |
| import pandas as pd | |
| import numpy as np | |
| from sklearn.preprocessing import StandardScaler | |
| def load_and_process_data(file_path): | |
| # Load data | |
| df = pd.read_csv(file_path) | |
| # Clean data | |
| df = df.dropna() | |
| # Transform data | |
| scaler = StandardScaler() | |
| numeric_columns = df.select_dtypes(include=[np.number]).columns | |
| df[numeric_columns] = scaler.fit_transform(df[numeric_columns]) | |
| return df | |
| # Execute processing | |
| processed_data = load_and_process_data("input.csv") | |
| processed_data.to_csv("processed_output.csv", index=False) | |
| """ | |
| self.notebook_manager.add_cell(self.notebook_id, "code", processing_code) | |
| return { | |
| "status": "success", | |
| "operation": "data_processing", | |
| "output_format": "csv", | |
| "agent": self.name | |
| } | |
| def _api_caller_task(self, task: str, context: Dict) -> Dict: | |
| """Handle API calling tasks""" | |
| api_code = """ | |
| # API Integration Code | |
| import requests | |
| import json | |
| class APIClient: | |
| def __init__(self, base_url): | |
| self.base_url = base_url | |
| self.headers = {"Content-Type": "application/json"} | |
| def get(self, endpoint): | |
| response = requests.get(f"{self.base_url}{endpoint}", headers=self.headers) | |
| return response.json() | |
| def post(self, endpoint, data): | |
| response = requests.post(f"{self.base_url}{endpoint}", | |
| data=json.dumps(data), | |
| headers=self.headers) | |
| return response.json() | |
| # Usage example | |
| client = APIClient("https://api.example.com") | |
| result = client.get("/users") | |
| print(result) | |
| """ | |
| self.notebook_manager.add_cell(self.notebook_id, "code", api_code) | |
| return { | |
| "status": "success", | |
| "operation": "api_integration", | |
| "endpoints": ["GET", "POST"], | |
| "agent": self.name | |
| } | |
| def _server_manager_task(self, task: str, context: Dict) -> Dict: | |
| """Handle server management tasks""" | |
| server_code = """ | |
| # Server Setup Script | |
| import os | |
| import subprocess | |
| import docker | |
| def setup_server(): | |
| # Create Dockerfile | |
| dockerfile_content = ''' | |
| FROM python:3.9-slim | |
| WORKDIR /app | |
| COPY requirements.txt . | |
| RUN pip install -r requirements.txt | |
| COPY . . | |
| CMD ["python", "app.py"] | |
| ''' | |
| with open("Dockerfile", "w") as f: | |
| f.write(dockerfile_content) | |
| # Create requirements.txt | |
| with open("requirements.txt", "w") as f: | |
| f.write("fastapi\\nuvicorn\\ngradio\\n") | |
| # Create docker-compose.yml | |
| docker_compose = ''' | |
| version: '3.8' | |
| services: | |
| app: | |
| build: . | |
| ports: | |
| - "8000:8000" | |
| environment: | |
| - ENV=production | |
| ''' | |
| with open("docker-compose.yml", "w") as f: | |
| f.write(docker_compose) | |
| return "Server setup complete" | |
| # Execute setup | |
| setup_server() | |
| """ | |
| self.notebook_manager.add_cell(self.notebook_id, "code", server_code) | |
| return { | |
| "status": "success", | |
| "operation": "server_setup", | |
| "containerization": "docker", | |
| "agent": self.name | |
| } | |
| def _workflow_task(self, task: str, context: Dict) -> Dict: | |
| """Handle workflow orchestration tasks""" | |
| workflow_code = """ | |
| # Workflow Orchestration | |
| import asyncio | |
| from typing import List, Dict, Any | |
| class WorkflowManager: | |
| def __init__(self): | |
| self.tasks = [] | |
| self.results = {} | |
| async def execute_task(self, task_name: str, task_func, *args, **kwargs): | |
| """Execute a single task""" | |
| print(f"Executing task: {task_name}") | |
| result = await task_func(*args, **kwargs) | |
| self.results[task_name] = result | |
| return result | |
| async def run_workflow(self, workflow_config: List[Dict]): | |
| """Run a complete workflow""" | |
| for task in workflow_config: | |
| await self.execute_task( | |
| task["name"], | |
| task["function"], | |
| *task.get("args", []), | |
| **task.get("kwargs", {}) | |
| ) | |
| return self.results | |
| # Example workflow | |
| async def data_processing_task(): | |
| # Simulate data processing | |
| await asyncio.sleep(1) | |
| return {"processed": 1000, "status": "complete"} | |
| async def model_training_task(): | |
| # Simulate model training | |
| await asyncio.sleep(2) | |
| return {"accuracy": 0.95, "model": "trained"} | |
| # Define workflow | |
| workflow = [ | |
| {"name": "data_processing", "function": data_processing_task}, | |
| {"name": "model_training", "function": model_training_task} | |
| ] | |
| # Execute workflow | |
| manager = WorkflowManager() | |
| results = await manager.run_workflow(workflow) | |
| print(results) | |
| """ | |
| self.notebook_manager.add_cell(self.notebook_id, "code", workflow_code) | |
| return { | |
| "status": "success", | |
| "operation": "workflow_orchestration", | |
| "tasks_executed": len(workflow_code.split("async def")), | |
| "agent": self.name | |
| } | |
| class MultiAgentSystem: | |
| def __init__(self): | |
| self.notebook_manager = ColabNotebookManager() | |
| self.agents = {} | |
| self.workflows = [] | |
| self.active_workflows = {} | |
| def create_agent(self, name: str, agent_type: str) -> str: | |
| """Create a new agent""" | |
| agent = AIAgent(name, agent_type, self.notebook_manager) | |
| self.agents[name] = agent | |
| return f"Agent {name} created successfully" | |
| def create_workflow(self, name: str, agents: List[str], tasks: List[str]) -> Dict: | |
| """Create a multi-agent workflow""" | |
| workflow_id = str(uuid.uuid4()) | |
| workflow = { | |
| "id": workflow_id, | |
| "name": name, | |
| "agents": agents, | |
| "tasks": tasks, | |
| "status": "created", | |
| "created_at": datetime.now().isoformat() | |
| } | |
| self.workflows.append(workflow) | |
| return workflow | |
| async def execute_workflow(self, workflow_id: str) -> Dict: | |
| """Execute a multi-agent workflow""" | |
| workflow = next((w for w in self.workflows if w["id"] == workflow_id), None) | |
| if not workflow: | |
| return {"status": "error", "message": "Workflow not found"} | |
| workflow["status"] = "running" | |
| self.active_workflows[workflow_id] = workflow | |
| results = [] | |
| for i, (agent_name, task) in enumerate(zip(workflow["agents"], workflow["tasks"])): | |
| if agent_name in self.agents: | |
| result = self.agents[agent_name].execute_task(task) | |
| results.append(result) | |
| # Add workflow step to notebook | |
| self.notebook_manager.add_cell( | |
| self.agents[agent_name].notebook_id, | |
| "markdown", | |
| f"### Workflow Step {i+1}\n**Agent:** {agent_name}\n**Task:** {task}" | |
| ) | |
| workflow["status"] = "completed" | |
| workflow["results"] = results | |
| return { | |
| "status": "success", | |
| "workflow_id": workflow_id, | |
| "results": results, | |
| "execution_time": time.time() | |
| } | |
| # Initialize the system | |
| agent_system = MultiAgentSystem() | |
| def create_interface(): | |
| with gr.Blocks(theme=gr.themes.Soft(), title="AI Agent Development Environment") as demo: | |
| gr.HTML(""" | |
| <div style='text-align: center; padding: 20px;'> | |
| <h1>🤖 Multi-Agent Development Environment</h1> | |
| <p style='font-size: 18px; color: #666;'> | |
| Powerful AI Agent System with Workflow Automation • | |
| <a href='https://huggingface.co/spaces/akhaliq/anycoder' target='_blank'>Built with anycoder</a> | |
| </p> | |
| </div> | |
| """) | |
| with gr.Tabs() as tabs: | |
| # Agent Management Tab | |
| with gr.TabItem("🎯 Agent Management"): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Create New Agent") | |
| agent_name = gr.Textbox(label="Agent Name", placeholder="Enter agent name...") | |
| agent_type = gr.Dropdown( | |
| choices=["coder", "data_processor", "api_caller", "server_manager", "workflow_orchestrator"], | |
| label="Agent Type", | |
| value="coder" | |
| ) | |
| create_agent_btn = gr.Button("Create Agent", variant="primary") | |
| agent_status = gr.Textbox(label="Status", interactive=False) | |
| with gr.Column(scale=2): | |
| gr.Markdown("### Active Agents") | |
| agents_display = gr.JSON(label="Agents", value={}) | |
| create_agent_btn.click( | |
| lambda name, atype: (agent_system.create_agent(name, atype), | |
| list(agent_system.agents.keys())), | |
| inputs=[agent_name, agent_type], | |
| outputs=[agent_status, agents_display] | |
| ) | |
| # Workflow Builder Tab | |
| with gr.TabItem("🔄 Workflow Builder"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### Create Workflow") | |
| workflow_name = gr.Textbox(label="Workflow Name") | |
| workflow_agents = gr.CheckboxGroup( | |
| label="Select Agents", | |
| choices=[] | |
| ) | |
| workflow_tasks = gr.Textbox( | |
| label="Tasks (one per line)", | |
| lines=5, | |
| placeholder="Task 1\nTask 2\nTask 3..." | |
| ) | |
| create_workflow_btn = gr.Button("Create Workflow", variant="primary") | |
| workflow_status = gr.Textbox(label="Status", interactive=False) | |
| with gr.Column(): | |
| gr.Markdown("### Active Workflows") | |
| workflows_display = gr.JSON(label="Workflows", value=[]) | |
| def update_agents_list(): | |
| return gr.CheckboxGroup(choices=list(agent_system.agents.keys())) | |
| def create_workflow(name, agents, tasks_str): | |
| if not name or not agents or not tasks_str: | |
| return "Please fill all fields", [] | |
| tasks = [task.strip() for task in tasks_str.split('\n') if task.strip()] | |
| if len(tasks) != len(agents): | |
| return "Number of tasks must match number of agents", [] | |
| workflow = agent_system.create_workflow(name, agents, tasks) | |
| return f"Workflow '{name}' created successfully", agent_system.workflows | |
| create_workflow_btn.click( | |
| create_workflow, | |
| inputs=[workflow_name, workflow_agents, workflow_tasks], | |
| outputs=[workflow_status, workflows_display] | |
| ) | |
| # Execution Tab | |
| with gr.TabItem("⚡ Execute"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### Execute Workflow") | |
| workflow_selector = gr.Dropdown( | |
| label="Select Workflow", | |
| choices=[] | |
| ) | |
| execute_btn = gr.Button("Execute Workflow", variant="primary") | |
| execution_status = gr.Textbox(label="Status", interactive=False) | |
| with gr.Column(): | |
| gr.Markdown("### Execution Results") | |
| execution_results = gr.JSON(label="Results", value={}) | |
| def update_workflows_list(): | |
| if agent_system.workflows: | |
| choices = [(w["name"], w["id"]) for w in agent_system.workflows] | |
| return gr.Dropdown(choices=choices, value=agent_system.workflows[0]["id"] if agent_system.workflows else None) | |
| return gr.Dropdown(choices=[]) | |
| async def execute_workflow(workflow_id): | |
| if not workflow_id: | |
| return "Please select a workflow", {} | |
| result = await agent_system.execute_workflow(workflow_id) | |
| return f"Workflow executed successfully", result | |
| execute_btn.click( | |
| execute_workflow, | |
| inputs=[workflow_selector], | |
| outputs=[execution_status, execution_results] | |
| ) | |
| # Colab Notebooks Tab | |
| with gr.TabItem("📓 Colab Notebooks"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### Agent Notebooks") | |
| notebook_selector = gr.Dropdown( | |
| label="Select Agent Notebook", | |
| choices=[] | |
| ) | |
| refresh_notebooks_btn = gr.Button("Refresh", variant="secondary") | |
| with gr.Column(): | |
| gr.Markdown("### Notebook Content") | |
| notebook_content = gr.Markdown(label="Content") | |
| def update_notebooks_list(): | |
| if agent_system.agents: | |
| choices = [(agent.name, agent.notebook_id) for agent in agent_system.agents.values()] | |
| return gr.Dropdown(choices=choices) | |
| return gr.Dropdown(choices=[]) | |
| def display_notebook(notebook_id): | |
| if notebook_id: | |
| content = agent_system.notebook_manager.get_notebook_content(notebook_id) | |
| return content | |
| return "Select a notebook to view content" | |
| refresh_notebooks_btn.click( | |
| update_notebooks_list, | |
| outputs=[notebook_selector] | |
| ) | |
| notebook_selector.change( | |
| display_notebook, | |
| inputs=[notebook_selector], | |
| outputs=[notebook_content] | |
| ) | |
| # Tools & APIs Tab | |
| with gr.TabItem("🛠️ Tools & APIs"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### API Configuration") | |
| api_endpoint = gr.Textbox(label="API Endpoint", placeholder="https://api.example.com") | |
| api_key = gr.Textbox(label="API Key", type="password") | |
| api_method = gr.Dropdown( | |
| choices=["GET", "POST", "PUT", "DELETE"], | |
| label="HTTP Method", | |
| value="GET" | |
| ) | |
| api_body = gr.JSON(label="Request Body (JSON)", value={}) | |
| test_api_btn = gr.Button("Test API", variant="primary") | |
| api_response = gr.Textbox(label="Response", lines=10, interactive=False) | |
| with gr.Column(): | |
| gr.Markdown("### Proxy Configuration") | |
| proxy_host = gr.Textbox(label="Proxy Host", placeholder="proxy.example.com") | |
| proxy_port = gr.Number(label="Proxy Port", value=8080) | |
| proxy_username = gr.Textbox(label="Proxy Username") | |
| proxy_password = gr.Textbox(label="Proxy Password", type="password") | |
| save_proxy_btn = gr.Button("Save Proxy Settings", variant="secondary") | |
| proxy_status = gr.Textbox(label="Status", interactive=False) | |
| def test_api(endpoint, api_key, method, body): | |
| try: | |
| headers = {} | |
| if api_key: | |
| headers["Authorization"] = f"Bearer {api_key}" | |
| if method == "GET": | |
| response = requests.get(endpoint, headers=headers) | |
| elif method == "POST": | |
| response = requests.post(endpoint, json=body, headers=headers) | |
| else: | |
| return "Method not implemented in demo" | |
| return f"Status: {response.status_code}\n\nResponse:\n{response.text}" | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| test_api_btn.click( | |
| test_api, | |
| inputs=[api_endpoint, api_key, api_method, api_body], | |
| outputs=[api_response] | |
| ) | |
| def save_proxy(host, port, username, password): | |
| # In a real implementation, save to config | |
| return f"Proxy settings saved: {host}:{port}" | |
| save_proxy_btn.click( | |
| save_proxy, | |
| inputs=[proxy_host, proxy_port, proxy_username, proxy_password], | |
| outputs=[proxy_status] | |
| ) | |
| # Data Processing Tab | |
| with gr.TabItem("📊 Data Processing"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### Upload Data") | |
| data_file = gr.File(label="Upload CSV/JSON file", file_types=[".csv", ".json"]) | |
| processing_type = gr.Dropdown( | |
| choices=["Clean Data", "Transform Data", "Analyze Data", "Visualize Data"], | |
| label="Processing Type", | |
| value="Clean Data" | |
| ) | |
| process_data_btn = gr.Button("Process Data", variant="primary") | |
| processing_status = gr.Textbox(label="Status", interactive=False) | |
| with gr.Column(): | |
| gr.Markdown("### Processing Results") | |
| processing_results = gr.JSON(label="Results", value={}) | |
| download_processed = gr.File(label="Download Processed Data", visible=False) | |
| def process_data(file, ptype): | |
| if not file: | |
| return "Please upload a file", {} | |
| # Simulate processing | |
| processing_code = f""" | |
| # Data Processing: {ptype} | |
| import pandas as pd | |
| import numpy as np | |
| # Load data | |
| df = pd.read_csv("{file.name}") | |
| # Processing steps based on type | |
| if "{ptype}" == "Clean Data": | |
| df = df.dropna() | |
| df = df.drop_duplicates() | |
| elif "{ptype}" == "Transform Data": | |
| # Add transformation logic | |
| pass | |
| elif "{ptype}" == "Analyze Data": | |
| # Add analysis logic | |
| pass | |
| # Save processed data | |
| df.to_csv("processed_{file.name}", index=False) | |
| print(f"Processed {{len(df)}} rows") | |
| """ | |
| # Add to a notebook | |
| if agent_system.agents: | |
| first_agent = list(agent_system.agents.values())[0] | |
| first_agent.notebook_manager.add_cell( | |
| first_agent.notebook_id, | |
| "code", | |
| processing_code | |
| ) | |
| return f"Data processed successfully with {ptype}", { | |
| "rows_processed": 1000, | |
| "processing_type": ptype, | |
| "output_file": f"processed_{file.name}" | |
| } | |
| process_data_btn.click( | |
| process_data, | |
| inputs=[data_file, processing_type], | |
| outputs=[processing_status, processing_results] | |
| ) | |
| # Server Setup Tab | |
| with gr.TabItem("🖥️ Server Setup"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### Server Configuration") | |
| server_type = gr.Dropdown( | |
| choices=["FastAPI", "Flask", "Docker", "Kubernetes"], | |
| label="Server Type", | |
| value="FastAPI" | |
| ) | |
| server_port = gr.Number(label="Port", value=8000) | |
| server_env = gr.Dropdown( | |
| choices=["Development", "Production", "Staging"], | |
| label="Environment", | |
| value="Development" | |
| ) | |
| setup_server_btn = gr.Button("Setup Server", variant="primary") | |
| server_status = gr.Textbox(label="Status", interactive=False) | |
| with gr.Column(): | |
| gr.Markdown("### Generated Configuration") | |
| server_config = gr.Code(label="Server Code", language="python", lines=20) | |
| download_config = gr.File(label="Download Config", visible=False) | |
| def setup_server(stype, port, env): | |
| config_code = f""" | |
| # Generated {stype} Server Configuration | |
| # Environment: {env} | |
| # Port: {port} | |
| if "{stype}" == "FastAPI": | |
| from fastapi import FastAPI | |
| import uvicorn | |
| app = FastAPI(title="AI Agent Server", version="1.0.0") | |
| @app.get("/") | |
| async def root(): | |
| return {{"message": "AI Agent Server Running", "status": "active"}} | |
| @app.get("/health") | |
| async def health_check(): | |
| return {{"status": "healthy"}} | |
| if __name__ == "__main__": | |
| uvicorn.run(app, host="0.0.0.0", port={port}) | |
| elif "{stype}" == "Docker": | |
| # Dockerfile content | |
| dockerfile = ''' | |
| FROM python:3.9-slim | |
| WORKDIR /app | |
| COPY requirements.txt . | |
| RUN pip install -r requirements.txt | |
| COPY . . | |
| EXPOSE {port} | |
| CMD ["python", "app.py"] | |
| ''' | |
| print(dockerfile) | |
| elif "{stype}" == "Flask": | |
| from flask import Flask, jsonify | |
| app = Flask(__name__) | |
| @app.route('/') | |
| def home(): | |
| return jsonify({{"message": "AI Agent Server", "port": {port}}}) | |
| if __name__ == "__main__": | |
| app.run(host='0.0.0.0', port={port}, debug={env == "Development"}) | |
| """ | |
| return f"{stype} server configuration generated for {env} environment", config_code | |
| setup_server_btn.click( | |
| setup_server, | |
| inputs=[server_type, server_port, server_env], | |
| outputs=[server_status, server_config] | |
| ) | |
| # Auto-refresh components | |
| demo.load( | |
| update_agents_list, | |
| outputs=[agents_display] | |
| ) | |
| demo.load( | |
| update_notebooks_list, | |
| outputs=[notebook_selector] | |
| ) | |
| demo.load( | |
| update_workflows_list, | |
| outputs=[workflow_selector] | |
| ) | |
| return demo | |
| # Create and launch the interface | |
| if __name__ == "__main__": | |
| demo = create_interface() | |
| demo.launch(server_name="0.0.0.0", server_port=7860, share=True) |