knicole's picture
Deploy Gradio app with multiple files
567af2d verified
import gradio as gr
import json
import time
import asyncio
import subprocess
import os
import uuid
from datetime import datetime
from typing import Dict, List, Any, Optional, Tuple
import tempfile
import shutil
import zipfile
from pathlib import Path
import requests
import yaml
class ColabNotebookManager:
def __init__(self):
self.notebooks = {}
self.current_notebook = None
def create_notebook(self, name: str, agent_type: str) -> str:
"""Create a new notebook for an agent"""
notebook_id = str(uuid.uuid4())
notebook = {
"id": notebook_id,
"name": f"{agent_type}_{name}",
"agent_type": agent_type,
"cells": [],
"created_at": datetime.now().isoformat(),
"metadata": {
"agent_outputs": [],
"generated_code": [],
"tools_used": [],
"api_calls": []
}
}
self.notebooks[notebook_id] = notebook
return notebook_id
def add_cell(self, notebook_id: str, cell_type: str, content: str, metadata: Dict = None):
"""Add a cell to the notebook"""
if notebook_id in self.notebooks:
cell = {
"cell_type": cell_type,
"source": content,
"metadata": metadata or {},
"timestamp": datetime.now().isoformat()
}
self.notebooks[notebook_id]["cells"].append(cell)
def get_notebook_content(self, notebook_id: str) -> str:
"""Generate notebook content in Colab format"""
if notebook_id not in self.notebooks:
return "# Notebook not found"
notebook = self.notebooks[notebook_id]
content = f"# {notebook['name']}\n\n"
content += f"**Agent Type:** {notebook['agent_type']}\n"
content += f"**Created:** {notebook['created_at']}\n\n"
content += "---\n\n"
for i, cell in enumerate(notebook["cells"], 1):
if cell["cell_type"] == "code":
content += f"## Code Block {i}\n"
content += "```python\n"
content += cell["source"]
content += "\n```\n\n"
elif cell["cell_type"] == "markdown":
content += f"## Notes {i}\n"
content += cell["source"] + "\n\n"
return content
class AIAgent:
def __init__(self, name: str, agent_type: str, notebook_manager: ColabNotebookManager):
self.name = name
self.agent_type = agent_type
self.notebook_manager = notebook_manager
self.notebook_id = notebook_manager.create_notebook(name, agent_type)
self.tools = []
self.history = []
def execute_task(self, task: str, context: Dict = None) -> Dict[str, Any]:
"""Execute a task and log results to notebook"""
start_time = time.time()
# Add task to notebook
self.notebook_manager.add_cell(
self.notebook_id,
"markdown",
f"## Task: {task}\n\n**Time:** {
datetime.now().isoformat()
}"
)
# Simulate task execution
result = self._process_task(task, context or {})
# Add result to notebook
self.notebook_manager.add_cell(
self.notebook_id,
"markdown",
f"## Result\n\n```\n{json.dumps(result, indent=2)}\n```"
)
# Add to history
self.history.append({
agent.agent_type
})
return result
def _process_task(self, task: str, context: Dict) -> Dict:
"""Process task based on agent type"""
if self.agent_type == "coder":
return self._code_agent_task(task, context)
elif self.agent_type == "data_processor":
return self._data_processor_task(task, context)
elif self.agent_type == "api_caller":
return self._api_caller_task(task, context)
elif self.agent_type == "server_manager":
return self._server_manager_task(task, context)
elif self.agent_type == "workflow_orchestrator":
return self._workflow_task(task, context)
else:
return {"status": "error", "message": "Unknown agent type"}
def _code_agent_task(self, task: str, context: Dict) -> Dict:
"""Handle code generation tasks"""
code_samples = {
"api": """
# Generated API Endpoint
from fastapi import FastAPI
import uvicorn
app = FastAPI()
@app.get("/")
async def root():
return {"message": "Hello World"}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
""",
"data": """
# Data Processing Script
import pandas as pd
import numpy as np
def process_data(file_path):
df = pd.read_csv(file_path)
# Add your processing logic here
return df
result = process_data("data.csv")
print(result.head())
""",
"web": """
# Web Application with Gradio
import gradio as gr
def greet(name):
return f"Hello {name}!"
demo = gr.Interface(fn=greet, inputs="text", outputs="text")
demo.launch()
"""
}
# Generate code based on task keywords
code = code_samples.get("web", "# Generated code\nprint('Task completed')")
# Add code to notebook
self.notebook_manager.add_cell(self.notebook_id, "code", code)
return {
"status": "success",
"generated_code": code,
"language": "python",
"agent": self.name
}
def _data_processor_task(self, task: str, context: Dict) -> Dict:
"""Handle data processing tasks"""
processing_code = """
# Data Processing Pipeline
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
def load_and_process_data(file_path):
# Load data
df = pd.read_csv(file_path)
# Clean data
df = df.dropna()
# Transform data
scaler = StandardScaler()
numeric_columns = df.select_dtypes(include=[np.number]).columns
df[numeric_columns] = scaler.fit_transform(df[numeric_columns])
return df
# Execute processing
processed_data = load_and_process_data("input.csv")
processed_data.to_csv("processed_output.csv", index=False)
"""
self.notebook_manager.add_cell(self.notebook_id, "code", processing_code)
return {
"status": "success",
"operation": "data_processing",
"output_format": "csv",
"agent": self.name
}
def _api_caller_task(self, task: str, context: Dict) -> Dict:
"""Handle API calling tasks"""
api_code = """
# API Integration Code
import requests
import json
class APIClient:
def __init__(self, base_url):
self.base_url = base_url
self.headers = {"Content-Type": "application/json"}
def get(self, endpoint):
response = requests.get(f"{self.base_url}{endpoint}", headers=self.headers)
return response.json()
def post(self, endpoint, data):
response = requests.post(f"{self.base_url}{endpoint}",
data=json.dumps(data),
headers=self.headers)
return response.json()
# Usage example
client = APIClient("https://api.example.com")
result = client.get("/users")
print(result)
"""
self.notebook_manager.add_cell(self.notebook_id, "code", api_code)
return {
"status": "success",
"operation": "api_integration",
"endpoints": ["GET", "POST"],
"agent": self.name
}
def _server_manager_task(self, task: str, context: Dict) -> Dict:
"""Handle server management tasks"""
server_code = """
# Server Setup Script
import os
import subprocess
import docker
def setup_server():
# Create Dockerfile
dockerfile_content = '''
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["python", "app.py"]
'''
with open("Dockerfile", "w") as f:
f.write(dockerfile_content)
# Create requirements.txt
with open("requirements.txt", "w") as f:
f.write("fastapi\\nuvicorn\\ngradio\\n")
# Create docker-compose.yml
docker_compose = '''
version: '3.8'
services:
app:
build: .
ports:
- "8000:8000"
environment:
- ENV=production
'''
with open("docker-compose.yml", "w") as f:
f.write(docker_compose)
return "Server setup complete"
# Execute setup
setup_server()
"""
self.notebook_manager.add_cell(self.notebook_id, "code", server_code)
return {
"status": "success",
"operation": "server_setup",
"containerization": "docker",
"agent": self.name
}
def _workflow_task(self, task: str, context: Dict) -> Dict:
"""Handle workflow orchestration tasks"""
workflow_code = """
# Workflow Orchestration
import asyncio
from typing import List, Dict, Any
class WorkflowManager:
def __init__(self):
self.tasks = []
self.results = {}
async def execute_task(self, task_name: str, task_func, *args, **kwargs):
"""Execute a single task"""
print(f"Executing task: {task_name}")
result = await task_func(*args, **kwargs)
self.results[task_name] = result
return result
async def run_workflow(self, workflow_config: List[Dict]):
"""Run a complete workflow"""
for task in workflow_config:
await self.execute_task(
task["name"],
task["function"],
*task.get("args", []),
**task.get("kwargs", {})
)
return self.results
# Example workflow
async def data_processing_task():
# Simulate data processing
await asyncio.sleep(1)
return {"processed": 1000, "status": "complete"}
async def model_training_task():
# Simulate model training
await asyncio.sleep(2)
return {"accuracy": 0.95, "model": "trained"}
# Define workflow
workflow = [
{"name": "data_processing", "function": data_processing_task},
{"name": "model_training", "function": model_training_task}
]
# Execute workflow
manager = WorkflowManager()
results = await manager.run_workflow(workflow)
print(results)
"""
self.notebook_manager.add_cell(self.notebook_id, "code", workflow_code)
return {
"status": "success",
"operation": "workflow_orchestration",
"tasks_executed": len(workflow_code.split("async def")),
"agent": self.name
}
class MultiAgentSystem:
def __init__(self):
self.notebook_manager = ColabNotebookManager()
self.agents = {}
self.workflows = []
self.active_workflows = {}
def create_agent(self, name: str, agent_type: str) -> str:
"""Create a new agent"""
agent = AIAgent(name, agent_type, self.notebook_manager)
self.agents[name] = agent
return f"Agent {name} created successfully"
def create_workflow(self, name: str, agents: List[str], tasks: List[str]) -> Dict:
"""Create a multi-agent workflow"""
workflow_id = str(uuid.uuid4())
workflow = {
"id": workflow_id,
"name": name,
"agents": agents,
"tasks": tasks,
"status": "created",
"created_at": datetime.now().isoformat()
}
self.workflows.append(workflow)
return workflow
async def execute_workflow(self, workflow_id: str) -> Dict:
"""Execute a multi-agent workflow"""
workflow = next((w for w in self.workflows if w["id"] == workflow_id), None)
if not workflow:
return {"status": "error", "message": "Workflow not found"}
workflow["status"] = "running"
self.active_workflows[workflow_id] = workflow
results = []
for i, (agent_name, task) in enumerate(zip(workflow["agents"], workflow["tasks"])):
if agent_name in self.agents:
result = self.agents[agent_name].execute_task(task)
results.append(result)
# Add workflow step to notebook
self.notebook_manager.add_cell(
self.agents[agent_name].notebook_id,
"markdown",
f"### Workflow Step {i+1}\n**Agent:** {agent_name}\n**Task:** {task}"
)
workflow["status"] = "completed"
workflow["results"] = results
return {
"status": "success",
"workflow_id": workflow_id,
"results": results,
"execution_time": time.time()
}
# Initialize the system
agent_system = MultiAgentSystem()
def create_interface():
with gr.Blocks(theme=gr.themes.Soft(), title="AI Agent Development Environment") as demo:
gr.HTML("""
<div style='text-align: center; padding: 20px;'>
<h1>🤖 Multi-Agent Development Environment</h1>
<p style='font-size: 18px; color: #666;'>
Powerful AI Agent System with Workflow Automation •
<a href='https://huggingface.co/spaces/akhaliq/anycoder' target='_blank'>Built with anycoder</a>
</p>
</div>
""")
with gr.Tabs() as tabs:
# Agent Management Tab
with gr.TabItem("🎯 Agent Management"):
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### Create New Agent")
agent_name = gr.Textbox(label="Agent Name", placeholder="Enter agent name...")
agent_type = gr.Dropdown(
choices=["coder", "data_processor", "api_caller", "server_manager", "workflow_orchestrator"],
label="Agent Type",
value="coder"
)
create_agent_btn = gr.Button("Create Agent", variant="primary")
agent_status = gr.Textbox(label="Status", interactive=False)
with gr.Column(scale=2):
gr.Markdown("### Active Agents")
agents_display = gr.JSON(label="Agents", value={})
create_agent_btn.click(
lambda name, atype: (agent_system.create_agent(name, atype),
list(agent_system.agents.keys())),
inputs=[agent_name, agent_type],
outputs=[agent_status, agents_display]
)
# Workflow Builder Tab
with gr.TabItem("🔄 Workflow Builder"):
with gr.Row():
with gr.Column():
gr.Markdown("### Create Workflow")
workflow_name = gr.Textbox(label="Workflow Name")
workflow_agents = gr.CheckboxGroup(
label="Select Agents",
choices=[]
)
workflow_tasks = gr.Textbox(
label="Tasks (one per line)",
lines=5,
placeholder="Task 1\nTask 2\nTask 3..."
)
create_workflow_btn = gr.Button("Create Workflow", variant="primary")
workflow_status = gr.Textbox(label="Status", interactive=False)
with gr.Column():
gr.Markdown("### Active Workflows")
workflows_display = gr.JSON(label="Workflows", value=[])
def update_agents_list():
return gr.CheckboxGroup(choices=list(agent_system.agents.keys()))
def create_workflow(name, agents, tasks_str):
if not name or not agents or not tasks_str:
return "Please fill all fields", []
tasks = [task.strip() for task in tasks_str.split('\n') if task.strip()]
if len(tasks) != len(agents):
return "Number of tasks must match number of agents", []
workflow = agent_system.create_workflow(name, agents, tasks)
return f"Workflow '{name}' created successfully", agent_system.workflows
create_workflow_btn.click(
create_workflow,
inputs=[workflow_name, workflow_agents, workflow_tasks],
outputs=[workflow_status, workflows_display]
)
# Execution Tab
with gr.TabItem("⚡ Execute"):
with gr.Row():
with gr.Column():
gr.Markdown("### Execute Workflow")
workflow_selector = gr.Dropdown(
label="Select Workflow",
choices=[]
)
execute_btn = gr.Button("Execute Workflow", variant="primary")
execution_status = gr.Textbox(label="Status", interactive=False)
with gr.Column():
gr.Markdown("### Execution Results")
execution_results = gr.JSON(label="Results", value={})
def update_workflows_list():
if agent_system.workflows:
choices = [(w["name"], w["id"]) for w in agent_system.workflows]
return gr.Dropdown(choices=choices, value=agent_system.workflows[0]["id"] if agent_system.workflows else None)
return gr.Dropdown(choices=[])
async def execute_workflow(workflow_id):
if not workflow_id:
return "Please select a workflow", {}
result = await agent_system.execute_workflow(workflow_id)
return f"Workflow executed successfully", result
execute_btn.click(
execute_workflow,
inputs=[workflow_selector],
outputs=[execution_status, execution_results]
)
# Colab Notebooks Tab
with gr.TabItem("📓 Colab Notebooks"):
with gr.Row():
with gr.Column():
gr.Markdown("### Agent Notebooks")
notebook_selector = gr.Dropdown(
label="Select Agent Notebook",
choices=[]
)
refresh_notebooks_btn = gr.Button("Refresh", variant="secondary")
with gr.Column():
gr.Markdown("### Notebook Content")
notebook_content = gr.Markdown(label="Content")
def update_notebooks_list():
if agent_system.agents:
choices = [(agent.name, agent.notebook_id) for agent in agent_system.agents.values()]
return gr.Dropdown(choices=choices)
return gr.Dropdown(choices=[])
def display_notebook(notebook_id):
if notebook_id:
content = agent_system.notebook_manager.get_notebook_content(notebook_id)
return content
return "Select a notebook to view content"
refresh_notebooks_btn.click(
update_notebooks_list,
outputs=[notebook_selector]
)
notebook_selector.change(
display_notebook,
inputs=[notebook_selector],
outputs=[notebook_content]
)
# Tools & APIs Tab
with gr.TabItem("🛠️ Tools & APIs"):
with gr.Row():
with gr.Column():
gr.Markdown("### API Configuration")
api_endpoint = gr.Textbox(label="API Endpoint", placeholder="https://api.example.com")
api_key = gr.Textbox(label="API Key", type="password")
api_method = gr.Dropdown(
choices=["GET", "POST", "PUT", "DELETE"],
label="HTTP Method",
value="GET"
)
api_body = gr.JSON(label="Request Body (JSON)", value={})
test_api_btn = gr.Button("Test API", variant="primary")
api_response = gr.Textbox(label="Response", lines=10, interactive=False)
with gr.Column():
gr.Markdown("### Proxy Configuration")
proxy_host = gr.Textbox(label="Proxy Host", placeholder="proxy.example.com")
proxy_port = gr.Number(label="Proxy Port", value=8080)
proxy_username = gr.Textbox(label="Proxy Username")
proxy_password = gr.Textbox(label="Proxy Password", type="password")
save_proxy_btn = gr.Button("Save Proxy Settings", variant="secondary")
proxy_status = gr.Textbox(label="Status", interactive=False)
def test_api(endpoint, api_key, method, body):
try:
headers = {}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
if method == "GET":
response = requests.get(endpoint, headers=headers)
elif method == "POST":
response = requests.post(endpoint, json=body, headers=headers)
else:
return "Method not implemented in demo"
return f"Status: {response.status_code}\n\nResponse:\n{response.text}"
except Exception as e:
return f"Error: {str(e)}"
test_api_btn.click(
test_api,
inputs=[api_endpoint, api_key, api_method, api_body],
outputs=[api_response]
)
def save_proxy(host, port, username, password):
# In a real implementation, save to config
return f"Proxy settings saved: {host}:{port}"
save_proxy_btn.click(
save_proxy,
inputs=[proxy_host, proxy_port, proxy_username, proxy_password],
outputs=[proxy_status]
)
# Data Processing Tab
with gr.TabItem("📊 Data Processing"):
with gr.Row():
with gr.Column():
gr.Markdown("### Upload Data")
data_file = gr.File(label="Upload CSV/JSON file", file_types=[".csv", ".json"])
processing_type = gr.Dropdown(
choices=["Clean Data", "Transform Data", "Analyze Data", "Visualize Data"],
label="Processing Type",
value="Clean Data"
)
process_data_btn = gr.Button("Process Data", variant="primary")
processing_status = gr.Textbox(label="Status", interactive=False)
with gr.Column():
gr.Markdown("### Processing Results")
processing_results = gr.JSON(label="Results", value={})
download_processed = gr.File(label="Download Processed Data", visible=False)
def process_data(file, ptype):
if not file:
return "Please upload a file", {}
# Simulate processing
processing_code = f"""
# Data Processing: {ptype}
import pandas as pd
import numpy as np
# Load data
df = pd.read_csv("{file.name}")
# Processing steps based on type
if "{ptype}" == "Clean Data":
df = df.dropna()
df = df.drop_duplicates()
elif "{ptype}" == "Transform Data":
# Add transformation logic
pass
elif "{ptype}" == "Analyze Data":
# Add analysis logic
pass
# Save processed data
df.to_csv("processed_{file.name}", index=False)
print(f"Processed {{len(df)}} rows")
"""
# Add to a notebook
if agent_system.agents:
first_agent = list(agent_system.agents.values())[0]
first_agent.notebook_manager.add_cell(
first_agent.notebook_id,
"code",
processing_code
)
return f"Data processed successfully with {ptype}", {
"rows_processed": 1000,
"processing_type": ptype,
"output_file": f"processed_{file.name}"
}
process_data_btn.click(
process_data,
inputs=[data_file, processing_type],
outputs=[processing_status, processing_results]
)
# Server Setup Tab
with gr.TabItem("🖥️ Server Setup"):
with gr.Row():
with gr.Column():
gr.Markdown("### Server Configuration")
server_type = gr.Dropdown(
choices=["FastAPI", "Flask", "Docker", "Kubernetes"],
label="Server Type",
value="FastAPI"
)
server_port = gr.Number(label="Port", value=8000)
server_env = gr.Dropdown(
choices=["Development", "Production", "Staging"],
label="Environment",
value="Development"
)
setup_server_btn = gr.Button("Setup Server", variant="primary")
server_status = gr.Textbox(label="Status", interactive=False)
with gr.Column():
gr.Markdown("### Generated Configuration")
server_config = gr.Code(label="Server Code", language="python", lines=20)
download_config = gr.File(label="Download Config", visible=False)
def setup_server(stype, port, env):
config_code = f"""
# Generated {stype} Server Configuration
# Environment: {env}
# Port: {port}
if "{stype}" == "FastAPI":
from fastapi import FastAPI
import uvicorn
app = FastAPI(title="AI Agent Server", version="1.0.0")
@app.get("/")
async def root():
return {{"message": "AI Agent Server Running", "status": "active"}}
@app.get("/health")
async def health_check():
return {{"status": "healthy"}}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port={port})
elif "{stype}" == "Docker":
# Dockerfile content
dockerfile = '''
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
EXPOSE {port}
CMD ["python", "app.py"]
'''
print(dockerfile)
elif "{stype}" == "Flask":
from flask import Flask, jsonify
app = Flask(__name__)
@app.route('/')
def home():
return jsonify({{"message": "AI Agent Server", "port": {port}}})
if __name__ == "__main__":
app.run(host='0.0.0.0', port={port}, debug={env == "Development"})
"""
return f"{stype} server configuration generated for {env} environment", config_code
setup_server_btn.click(
setup_server,
inputs=[server_type, server_port, server_env],
outputs=[server_status, server_config]
)
# Auto-refresh components
demo.load(
update_agents_list,
outputs=[agents_display]
)
demo.load(
update_notebooks_list,
outputs=[notebook_selector]
)
demo.load(
update_workflows_list,
outputs=[workflow_selector]
)
return demo
# Create and launch the interface
if __name__ == "__main__":
demo = create_interface()
demo.launch(server_name="0.0.0.0", server_port=7860, share=True)