netgenius-lab / app.py
wuhp's picture
Update app.py
14223ce verified
import gradio as gr
import google.generativeai as genai
from huggingface_hub import HfApi, create_repo, upload_folder, space_info
import os
import time
import json
from datetime import datetime
import tempfile
import shutil
import re
import requests
# Load Prompts from JSON
PROMPTS = {}
try:
with open("prompts.json", "r") as f:
PROMPTS = json.load(f)
print("βœ… Loaded prompts from prompts.json")
except Exception as e:
print(f"❌ Error loading prompts.json: {e}")
# Define minimal fallback prompts if file load fails to prevent crash
PROMPTS = {
"architect": "You are the Architect Agent.",
"developer": "You are the Developer Agent. Ensure you include a get_performance_metrics function with api_name='get_metrics'.",
"debugger": "You are the Debugger Agent.",
"tester": "You are the Tester Agent."
}
class MetricCollector:
"""Collects real metrics from deployed Gradio apps"""
def __init__(self, space_name: str):
self.space_name = space_name
self.base_url = f"https://{space_name.replace('/', '-')}.hf.space"
def wait_for_space_ready(self, timeout: int = 300) -> bool:
"""Wait for space to be accessible and responding"""
start_time = time.time()
print(f"πŸ” Waiting for space at {self.base_url}")
while time.time() - start_time < timeout:
try:
response = requests.get(self.base_url, timeout=10)
if response.status_code == 200:
print("βœ“ Space is responding to HTTP requests")
# Verify the Gradio API is actually available
try:
info_response = requests.get(f"{self.base_url}/info", timeout=10)
if info_response.status_code == 200:
print("βœ“ Gradio API is available")
# Give it extra time to fully initialize
time.sleep(30)
return True
except:
print("⏳ Gradio API not ready yet...")
except Exception as e:
elapsed = int(time.time() - start_time)
print(f"⏳ Still waiting... ({elapsed}s) - {str(e)[:50]}")
time.sleep(15)
print(f"❌ Timeout after {timeout}s")
return False
def test_app_functionality(self) -> bool:
"""Test if the app is actually functional by trying to use it"""
print("πŸ§ͺ Testing app functionality...")
try:
# Get available endpoints
info_response = requests.get(f"{self.base_url}/info", timeout=10)
if info_response.status_code != 200:
print("❌ Cannot fetch app info")
return False
info = info_response.json()
endpoints = info.get('named_endpoints', {})
if not endpoints:
print("❌ No endpoints found")
return False
print(f"βœ“ Found {len(endpoints)} endpoints")
# Try to call at least one endpoint to verify app works
for endpoint_path in endpoints.keys():
endpoint_name = endpoint_path.lstrip('/')
print(f"πŸ§ͺ Testing endpoint: {endpoint_name}")
try:
test_result = self.call_gradio_api(endpoint_name, [], timeout=30)
if test_result is not None:
print(f"βœ“ Endpoint {endpoint_name} is functional")
return True
except Exception as e:
print(f"⚠️ Endpoint {endpoint_name} failed: {str(e)[:100]}")
continue
print("❌ No functional endpoints found")
return False
except Exception as e:
print(f"❌ Functionality test failed: {str(e)}")
return False
def call_gradio_api(self, endpoint: str, data: list = None, timeout: int = 60):
"""Call a Gradio API endpoint with better error handling"""
try:
if data is None:
data = []
api_url = f"{self.base_url}/api/{endpoint}"
print(f"πŸ“‘ Calling API: {api_url}")
response = requests.post(
api_url,
json={"data": data},
timeout=timeout,
headers={"Content-Type": "application/json"}
)
print(f"πŸ“‘ Response status: {response.status_code}")
if response.status_code == 200:
result = response.json()
data_result = result.get("data", [])
print(f"βœ“ API call successful, got {len(data_result)} data items")
return data_result
else:
print(f"⚠️ API call failed: {response.status_code}")
print(f"Response: {response.text[:200]}")
return None
except requests.exceptions.Timeout:
print(f"⚠️ API call timed out after {timeout}s")
return None
except Exception as e:
print(f"⚠️ Error calling API: {str(e)}")
return None
def extract_metrics_from_app(self, max_retries: int = 3):
"""Extract real performance metrics from the deployed app with retries"""
for attempt in range(max_retries):
print(f"\n🎯 Metrics collection attempt {attempt + 1}/{max_retries}")
metrics = {
"throughput": 0,
"latency_avg": 0,
"latency_p50": 0,
"latency_p95": 0,
"latency_p99": 0,
"cpu_efficiency": 0,
"memory_usage": 0,
"performance_score": 0,
"source": "failed"
}
# Wait for space to be ready
if not self.wait_for_space_ready(timeout=300):
print(f"⚠️ Attempt {attempt + 1}: Space not ready")
if attempt < max_retries - 1:
print("⏳ Waiting 30s before retry...")
time.sleep(30)
continue
# Test if app is functional
if not self.test_app_functionality():
print(f"⚠️ Attempt {attempt + 1}: App not functional")
if attempt < max_retries - 1:
print("⏳ Waiting 30s before retry...")
time.sleep(30)
continue
try:
# Try to get the app's info endpoint
info_response = requests.get(f"{self.base_url}/info", timeout=10)
if info_response.status_code == 200:
info = info_response.json()
endpoints = info.get('named_endpoints', {})
print(f"βœ“ Found {len(endpoints)} named endpoints: {list(endpoints.keys())}")
# Priority 1: Look for the get_metrics endpoint
if '/get_metrics' in endpoints:
print("🎯 Found get_metrics endpoint!")
result = self.call_gradio_api("get_metrics", [], timeout=60)
if result:
print(f"πŸ“Š Raw result from get_metrics: {str(result)[:200]}")
metrics_data = self.parse_metrics_from_result(result)
if metrics_data.get("performance_score", 0) > 0:
metrics.update(metrics_data)
metrics["source"] = "real_api"
print(f"βœ… SUCCESS! Collected real metrics: Score {metrics['performance_score']}/100")
return metrics
else:
print("⚠️ Metrics parsed but score is 0")
else:
print("⚠️ get_metrics returned None")
else:
print("⚠️ 'get_metrics' endpoint NOT found in named_endpoints!")
# Priority 2: Try common metric endpoint patterns
metric_keywords = ['metric', 'benchmark', 'performance', 'test', 'stats']
for endpoint_path, endpoint_info in endpoints.items():
endpoint_name = endpoint_path.lstrip('/')
if any(keyword in endpoint_name.lower() for keyword in metric_keywords):
print(f"πŸ§ͺ Trying endpoint: {endpoint_name}")
result = self.call_gradio_api(endpoint_name, [], timeout=60)
if result:
print(f"πŸ“Š Raw result from {endpoint_name}: {str(result)[:200]}")
metrics_data = self.parse_metrics_from_result(result)
if metrics_data.get("performance_score", 0) > 0:
metrics.update(metrics_data)
metrics["source"] = "real_api"
print(f"βœ… SUCCESS! Collected metrics from {endpoint_name}")
return metrics
# Priority 3: Try ALL endpoints as a last resort
print("πŸ” Trying all available endpoints...")
for endpoint_path in endpoints.keys():
endpoint_name = endpoint_path.lstrip('/')
print(f"πŸ§ͺ Trying endpoint: {endpoint_name}")
try:
result = self.call_gradio_api(endpoint_name, [], timeout=30)
if result:
metrics_data = self.parse_metrics_from_result(result)
if metrics_data.get("performance_score", 0) > 0:
metrics.update(metrics_data)
metrics["source"] = "real_api"
print(f"βœ… SUCCESS! Collected metrics from {endpoint_name}")
return metrics
except Exception as e:
print(f"⚠️ Endpoint {endpoint_name} failed: {str(e)[:50]}")
continue
except Exception as e:
print(f"⚠️ Attempt {attempt + 1} error: {str(e)}")
if attempt < max_retries - 1:
print("⏳ Waiting 30s before retry...")
time.sleep(30)
# All attempts failed
print("❌ Failed to collect real metrics after all attempts")
metrics["source"] = "placeholder"
metrics["performance_score"] = 50 # Default baseline
return metrics
def parse_metrics_from_result(self, result):
"""Parse metrics from Gradio API result with improved handling"""
metrics = {}
print(f"πŸ” Parsing result type: {type(result)}")
# Result could be a list, dict, or string
if isinstance(result, list) and len(result) > 0:
item = result[0]
print(f"πŸ” List item type: {type(item)}")
if isinstance(item, str):
print(f"πŸ” String content: {item[:200]}")
# Try to parse as JSON
try:
parsed = json.loads(item)
print(f"βœ“ Parsed as JSON: {parsed}")
metrics.update(self.extract_metrics_from_dict(parsed))
except:
print("⚠️ Not valid JSON, trying regex")
# Try regex parsing
metrics.update(self.extract_metrics_from_text(item))
elif isinstance(item, dict):
print(f"βœ“ Direct dict: {item}")
metrics.update(self.extract_metrics_from_dict(item))
elif isinstance(result, dict):
print(f"βœ“ Direct dict result: {result}")
metrics.update(self.extract_metrics_from_dict(result))
elif isinstance(result, str):
print(f"πŸ” String result: {result[:200]}")
try:
parsed = json.loads(result)
metrics.update(self.extract_metrics_from_dict(parsed))
except:
metrics.update(self.extract_metrics_from_text(result))
print(f"πŸ“Š Extracted metrics: {metrics}")
return metrics
def extract_metrics_from_dict(self, data: dict):
"""Extract metrics from dictionary data"""
metrics = {}
# Common metric keys
key_mappings = {
'throughput': ['throughput', 'packets_per_sec', 'pps', 'packets_sec'],
'latency_avg': ['latency_avg', 'avg_latency', 'latency_mean', 'mean_latency'],
'latency_p50': ['latency_p50', 'p50', 'median_latency', 'latency_median'],
'latency_p95': ['latency_p95', 'p95', 'latency_95'],
'latency_p99': ['latency_p99', 'p99', 'latency_99'],
'cpu_efficiency': ['cpu_efficiency', 'efficiency', 'ops_per_packet', 'cpu_ops'],
'memory_usage': ['memory_usage', 'memory_mb', 'memory', 'mem_usage'],
'performance_score': ['performance_score', 'score', 'total_score', 'overall_score']
}
for metric_key, possible_keys in key_mappings.items():
for key in possible_keys:
if key in data:
try:
value = float(data[key])
metrics[metric_key] = value
print(f"βœ“ Found {metric_key} = {value}")
break
except (ValueError, TypeError):
pass
return metrics
def extract_metrics_from_text(self, text: str):
"""Extract metrics from text output using regex"""
metrics = {}
patterns = {
'throughput': r'throughput[:\s]+([0-9,.]+)',
'latency_avg': r'(?:avg|average|mean)\s*latency[:\s]+([0-9.]+)',
'latency_p50': r'p50[:\s]+([0-9.]+)',
'latency_p95': r'p95[:\s]+([0-9.]+)',
'latency_p99': r'p99[:\s]+([0-9.]+)',
'cpu_efficiency': r'(?:cpu[_\s]*)?efficiency[:\s]+([0-9.]+)',
'performance_score': r'(?:performance[_\s]+)?score[:\s]+([0-9]+)'
}
for key, pattern in patterns.items():
match = re.search(pattern, text, re.IGNORECASE)
if match:
try:
value = match.group(1).replace(',', '')
metrics[key] = float(value)
print(f"βœ“ Regex found {key} = {value}")
except:
pass
return metrics
class MultiAgentSystem:
def __init__(self, gemini_api_key, hf_token):
self.hf_api = HfApi(token=hf_token)
self.hf_token = hf_token
genai.configure(api_key=gemini_api_key)
# Initialize four specialized agents with prompts loaded from JSON
self.architect = genai.GenerativeModel(
model_name='gemini-2.5-flash',
system_instruction=PROMPTS.get("architect", "You are the Architect Agent.")
)
self.developer = genai.GenerativeModel(
model_name='gemini-2.5-flash',
system_instruction=PROMPTS.get("developer", "You are the Developer Agent.")
)
self.debugger = genai.GenerativeModel(
model_name='gemini-2.5-flash',
system_instruction=PROMPTS.get("debugger", "You are the Debugger Agent.")
)
self.tester = genai.GenerativeModel(
model_name='gemini-2.5-flash',
system_instruction=PROMPTS.get("tester", "You are the Tester Agent.")
)
self.space_name = None
self.iteration = 0
self.shared_context = {
"current_code": "",
"performance_history": [],
"architecture_decisions": [],
"known_issues": [],
"optimization_log": []
}
self.temp_dir = tempfile.mkdtemp()
self.conversation_logs = []
self.target_improvement = 0
def log_conversation(self, agent_name, input_msg, output_msg):
"""Log inter-agent communication"""
self.conversation_logs.append({
"timestamp": datetime.now().isoformat(),
"agent": agent_name,
"input": input_msg[:1000],
"output": output_msg[:2000]
})
def create_space(self):
"""Create a new HuggingFace Space"""
try:
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
# Create a simplified space name
name_suffix = f"optimizer-{timestamp}"
user_info = self.hf_api.whoami()
username = user_info['name']
# Full repo ID
repo_id = f"{username}/packet-{name_suffix}"
print(f"Creating Space: {repo_id}")
create_repo(
repo_id=repo_id,
repo_type="space",
space_sdk="gradio",
token=self.hf_token,
private=False
)
self.space_name = repo_id
return f"βœ“ Created Space: {repo_id}"
except Exception as e:
return f"βœ— Error creating space: {str(e)}"
def upload_files_to_space(self, files):
"""Upload files to the created space"""
if not self.space_name:
return "βœ— No space created"
try:
# Create temp directory for upload
upload_dir = os.path.join(self.temp_dir, "upload")
if os.path.exists(upload_dir):
shutil.rmtree(upload_dir)
os.makedirs(upload_dir)
# Write files
for filename, content in files.items():
with open(os.path.join(upload_dir, filename), "w") as f:
f.write(content)
# Upload
print(f"Uploading to {self.space_name}...")
self.hf_api.upload_folder(
folder_path=upload_dir,
repo_id=self.space_name,
repo_type="space",
token=self.hf_token
)
return f"βœ“ Deployed to {self.space_name}"
except Exception as e:
return f"βœ— Error uploading: {str(e)}"
def get_space_logs(self):
"""Get logs from the space (best effort)"""
if not self.space_name:
return {"error": "No space"}
try:
info = space_info(self.space_name, token=self.hf_token)
runtime = info.runtime
return {
"stage": runtime.stage if runtime else "UNKNOWN",
"error": getattr(runtime, "error", None),
"message": "Check HF Space logs for details"
}
except Exception as e:
return {"error": str(e)}
def validate_code_locally(self, code):
"""Validate code syntax and required endpoints locally before deploying"""
try:
if not code or len(code.strip()) == 0:
return False, "Code is empty"
# Check for API endpoint requirement
if 'api_name="get_metrics"' not in code and "api_name='get_metrics'" not in code:
return False, "Code is missing the required api_name=\"get_metrics\" argument in the Gradio button."
# Check for placeholders that cause syntax errors
if "..." in code or "pass # Implement" in code:
return False, "Code contains placeholders (...) or incomplete blocks which will cause runtime errors."
# Check for valid syntax
compile(code, '<string>', 'exec')
return True, "Valid Python Syntax"
except Exception as e:
return False, f"Syntax Error: {str(e)}"
def collect_real_metrics(self):
"""Collect real metrics from deployed space with better validation"""
if not self.space_name:
print("⚠️ No space deployed yet")
return {
"throughput": 0,
"latency_avg": 0,
"performance_score": 0,
"source": "no_space"
}
print(f"\n{'='*80}")
print(f"πŸ” COLLECTING REAL METRICS FROM: {self.space_name}")
print(f"{'='*80}\n")
collector = MetricCollector(self.space_name)
metrics = collector.extract_metrics_from_app(max_retries=3)
print(f"\n{'='*80}")
print("πŸ“Š FINAL COLLECTED METRICS:")
print(f" Source: {metrics.get('source', 'unknown')}")
print(f" Throughput: {metrics.get('throughput', 0):,.0f} packets/sec")
print(f" Avg Latency: {metrics.get('latency_avg', 0):.2f} ms")
print(f" P95 Latency: {metrics.get('latency_p95', 0):.2f} ms")
print(f" Performance Score: {metrics.get('performance_score', 0)}/100")
print(f"{'='*80}\n")
# If we got placeholder metrics, try one more time after a longer wait
if metrics.get("source") == "placeholder":
print("⚠️ Got placeholder metrics, trying once more after 60s wait...")
time.sleep(60)
metrics = collector.extract_metrics_from_app(max_retries=1)
return metrics
def wait_for_space_build(self, timeout=300):
"""Wait for space to build and verify it's functional"""
if not self.space_name:
return False, "No space created"
start_time = time.time()
print(f"\n⏳ Waiting for space to build (timeout: {timeout}s)...")
while time.time() - start_time < timeout:
try:
info = space_info(self.space_name, token=self.hf_token)
runtime = info.runtime if hasattr(info, 'runtime') else None
if runtime and hasattr(runtime, 'stage'):
stage = runtime.stage
elapsed = int(time.time() - start_time)
print(f" Stage: {stage} ({elapsed}s elapsed)")
if stage == "RUNNING":
print("βœ“ Space is RUNNING!")
# Additional verification: test if app is actually functional
print("πŸ§ͺ Verifying app functionality...")
time.sleep(15) # Give it time to fully start
collector = MetricCollector(self.space_name)
if collector.test_app_functionality():
print("βœ… Space is running AND functional!")
return True, "Space is running and functional"
else:
print("⚠️ Space is running but app may not be functional yet, waiting...")
time.sleep(20)
continue
elif stage in ["RUNTIME_ERROR", "BUILD_ERROR"]:
error_msg = getattr(runtime, 'error', 'Unknown error')
return False, f"Space error: {stage} - {error_msg}"
time.sleep(15)
except Exception as e:
print(f" Error checking status: {str(e)[:50]}")
time.sleep(15)
continue
return False, f"Timeout after {timeout}s"
def automatic_error_fix_cycle(self, error_info):
"""Automatically fix errors detected in deployed space"""
log = "\nπŸ”§ AUTOMATIC ERROR DETECTION AND FIX\n" + "="*80 + "\n"
log += f"Error detected: {error_info}\n\n"
log += "πŸ” PHASE 1: DEBUGGER ANALYZING ERROR...\n"
debugger_prompt = f"""The deployed space has encountered an error:
ERROR INFO:
{json.dumps(error_info, indent=2)}
CURRENT CODE:
```python
{self.shared_context['current_code']}
```
Analyze:
1. What is causing the error
2. Root cause analysis
3. Specific fixes needed
Provide detailed debugging information."""
try:
debugger_response = self.debugger.generate_content(debugger_prompt)
debugger_output = debugger_response.text
self.log_conversation("DEBUGGER (ERROR)", debugger_prompt, debugger_output)
log += "βœ“ Error analysis complete\n\n"
except Exception as e:
return log + f"βœ— Error during analysis: {str(e)}\n", None, None
log += "πŸ’» PHASE 2: DEVELOPER FIXING ERROR...\n"
fix_prompt = f"""DEBUGGER'S ERROR ANALYSIS:
{debugger_output}
ERROR INFO:
{json.dumps(error_info, indent=2)}
CURRENT CODE:
```python
{self.shared_context['current_code']}
```
Fix the error and provide corrected code. Ensure:
1. The specific error is resolved
2. Code is syntactically correct
3. All imports are included
4. The app will run without errors
5. The get_performance_metrics() function with api_name="get_metrics" is included
6. CRITICAL: DO NOT use placeholders like '...'"""
try:
fix_response = self.developer.generate_content(fix_prompt)
fix_output = fix_response.text
self.log_conversation("DEVELOPER (ERROR FIX)", fix_prompt, fix_output)
fixed_code = self.extract_code(fix_output, "APP_PY")
requirements = self.extract_code(fix_output, "REQUIREMENTS")
# Local Validation Loop
attempts = 0
while attempts < 3:
is_valid, error_msg = self.validate_code_locally(fixed_code)
if is_valid:
break
attempts += 1
print(f"⚠️ Fixed code still has syntax errors (Attempt {attempts}/3): {error_msg}")
refix_prompt = f"""The fixed code still has a SYNTAX ERROR:
ERROR: {error_msg}
You likely used placeholders like '...' or left incomplete blocks.
REWRITE the COMPLETE code fixing this error. Do NOT use placeholders."""
fix_response = self.developer.generate_content(refix_prompt)
fix_output = fix_response.text
self.log_conversation(f"DEVELOPER (RE-FIX {attempts})", refix_prompt, fix_output)
fixed_code = self.extract_code(fix_output, "APP_PY")
if not fixed_code:
fixed_code = self.shared_context['current_code']
if not requirements:
requirements = "gradio==4.44.0\nnumpy>=1.24.0"
self.shared_context['current_code'] = fixed_code
log += "βœ“ Error fix implemented\n\n"
return log, fixed_code, requirements
except Exception as e:
return log + f"βœ— Error during fix: {str(e)}\n", None, None
def extract_metrics_from_output(self, text):
"""Extract performance metrics from agent outputs (fallback)"""
metrics = {
"throughput": 0,
"latency_avg": 0,
"latency_p95": 0,
"cpu_efficiency": 0,
"performance_score": 0
}
throughput_match = re.search(r'Throughput[:\s]+([0-9,.]+)', text, re.IGNORECASE)
if throughput_match:
try:
metrics["throughput"] = float(throughput_match.group(1).replace(',', ''))
except:
pass
latency_match = re.search(r'Avg\s+Latency[:\s]+([0-9.]+)', text, re.IGNORECASE)
if latency_match:
try:
metrics["latency_avg"] = float(latency_match.group(1))
except:
pass
p95_match = re.search(r'P95\s+Latency[:\s]+([0-9.]+)', text, re.IGNORECASE)
if p95_match:
try:
metrics["latency_p95"] = float(p95_match.group(1))
except:
pass
score_match = re.search(r'PERFORMANCE[_\s]+SCORE[:\s]+([0-9]+)', text, re.IGNORECASE)
if score_match:
try:
metrics["performance_score"] = int(score_match.group(1))
except:
pass
return metrics
def initial_design_phase(self):
"""Phase 1: Architect designs initial system"""
self.iteration += 1
architect_prompt = """Design the initial packet simulation algorithm system optimized for high performance.
Consider:
- Multiple algorithm approaches (basic, batched, optimized)
- Performance metrics to track (throughput, latency, efficiency)
- Benchmarking capabilities built into the UI
- Clear performance visualizations
CRITICAL: The app MUST include a get_performance_metrics() function that returns JSON metrics.
Focus on creating a baseline that we can iteratively improve.
Target initial performance score: 60/100"""
try:
architect_response = self.architect.generate_content(architect_prompt)
architect_output = architect_response.text
self.log_conversation("ARCHITECT", architect_prompt, architect_output)
self.shared_context["architecture_decisions"].append({
"iteration": self.iteration,
"design": architect_output,
"timestamp": datetime.now().isoformat()
})
return architect_output
except Exception as e:
return f"βœ— Architect error: {str(e)}"
def development_phase(self, architect_specs, previous_metrics=None):
"""Phase 2: Developer implements with performance focus"""
context = f"""ARCHITECT'S SPECIFICATIONS:
{architect_specs}
PERFORMANCE CONTEXT:
"""
if previous_metrics:
context += f"Previous iteration metrics: {json.dumps(previous_metrics, indent=2)}\n"
context += "Your goal is to exceed these metrics.\n"
else:
context += "This is the initial implementation. Focus on creating a solid baseline with good instrumentation.\n"
developer_prompt = f"""{context}
Implement a complete Gradio application with:
1. Multiple packet simulation algorithms (at least 3)
2. A get_performance_metrics() function with api_name="get_metrics" that returns JSON
3. Built-in benchmarking tools
4. Real-time performance visualization
5. Detailed timing measurements
CRITICAL: Include the get_performance_metrics() function exactly as specified in your system prompt.
CRITICAL: Write the FULL, COMPLETE code. Do not use placeholders like '...' or 'pass' for incomplete blocks.
Remember: Simulate packets safely (NO real network operations)"""
try:
developer_response = self.developer.generate_content(developer_prompt)
developer_output = developer_response.text
self.log_conversation("DEVELOPER", developer_prompt, developer_output)
code = self.extract_code(developer_output, "APP_PY")
requirements = self.extract_code(developer_output, "REQUIREMENTS")
# Local Validation Loop
attempts = 0
while attempts < 3:
is_valid, error_msg = self.validate_code_locally(code)
if is_valid:
break
attempts += 1
print(f"⚠️ Generated code has syntax errors (Attempt {attempts}/3): {error_msg}")
fix_syntax_prompt = f"""The code you wrote has a SYNTAX ERROR and cannot run.
ERROR: {error_msg}
You likely used placeholders like '...' or left incomplete blocks.
REWRITE the COMPLETE code fixing this error. Do NOT use placeholders."""
developer_response = self.developer.generate_content(fix_syntax_prompt)
developer_output = developer_response.text
self.log_conversation(f"DEVELOPER (SYNTAX FIX {attempts})", fix_syntax_prompt, developer_output)
code = self.extract_code(developer_output, "APP_PY")
# requirements usually stay same, but could re-extract if needed
if not requirements:
requirements = "gradio==4.44.0\nnumpy>=1.24.0"
self.shared_context["current_code"] = code
return developer_output, code, requirements
except Exception as e:
return f"βœ— Developer error: {str(e)}", "", ""
def testing_phase(self, code):
"""Phase 2.5: Tester runs benchmarks and collects metrics"""
tester_prompt = f"""Analyze this packet simulation code and generate a comprehensive test plan:
CODE:
```python
{code}
```
Generate:
1. Specific test scenarios to run
2. Expected performance characteristics
3. Metrics to collect
4. Benchmark methodology
Provide hypothetical but realistic performance numbers based on the algorithm's design."""
try:
tester_response = self.tester.generate_content(tester_prompt)
tester_output = tester_response.text
self.log_conversation("TESTER", tester_prompt, tester_output)
metrics = self.extract_metrics_from_output(tester_output)
return tester_output, metrics
except Exception as e:
return f"βœ— Tester error: {str(e)}", {}
def debugging_phase(self, code, architect_specs, test_metrics, real_metrics=None):
"""Phase 3: Debugger validates and measures REAL performance"""
# Use real metrics if available, otherwise use test estimates
metrics_to_use = real_metrics if real_metrics and real_metrics.get("source") == "real_api" else test_metrics
debugger_prompt = f"""ARCHITECT'S SPECIFICATIONS:
{architect_specs}
METRICS (Source: {metrics_to_use.get('source', 'test_estimates') if isinstance(metrics_to_use, dict) else 'test_estimates'}):
{json.dumps(metrics_to_use, indent=2)}
DEVELOPER'S CODE:
```python
{code}
```
Validate:
1. Code correctness and safety
2. Performance against specifications
3. Whether optimizations are effective
4. Any bugs or issues
Provide a performance score (0-100) and recommendations."""
try:
debugger_response = self.debugger.generate_content(debugger_prompt)
debugger_output = debugger_response.text
self.log_conversation("DEBUGGER", debugger_prompt, debugger_output)
# Extract any additional metrics from debugger
debugger_metrics = self.extract_metrics_from_output(debugger_output)
# Merge metrics: real > debugger > test
final_metrics = {**test_metrics}
if debugger_metrics.get("performance_score", 0) > 0:
final_metrics.update(debugger_metrics)
if real_metrics and real_metrics.get("source") == "real_api":
final_metrics.update(real_metrics)
# Update performance history with REAL metrics if available
self.shared_context["performance_history"].append({
"iteration": self.iteration,
"metrics": final_metrics,
"real_metrics": real_metrics if real_metrics else {},
"timestamp": datetime.now().isoformat()
})
ready = "READY_FOR_DEPLOYMENT: YES" in debugger_output or final_metrics.get("performance_score", 0) >= 70
return debugger_output, ready, final_metrics
except Exception as e:
return f"βœ— Debugger error: {str(e)}", False, {}
def fix_phase(self, code, debugger_findings):
"""Phase 4: Developer fixes issues"""
fix_prompt = f"""DEBUGGER'S FINDINGS:
{debugger_findings}
CURRENT CODE:
```python
{code}
```
Fix all identified issues while maintaining or improving performance.
Ensure the get_performance_metrics() function with api_name="get_metrics" is present.
CRITICAL: DO NOT use placeholders like '...'. Write the FULL code."""
try:
fix_response = self.developer.generate_content(fix_prompt)
fix_output = fix_response.text
self.log_conversation("DEVELOPER (FIX)", fix_prompt, fix_output)
fixed_code = self.extract_code(fix_output, "APP_PY")
# Local Validation Loop
attempts = 0
while attempts < 3:
is_valid, error_msg = self.validate_code_locally(fixed_code)
if is_valid:
break
attempts += 1
print(f"⚠️ Fixed code still has syntax errors (Attempt {attempts}/3): {error_msg}")
refix_prompt = f"""The fixed code still has a SYNTAX ERROR:
ERROR: {error_msg}
You likely used placeholders like '...' or left incomplete blocks.
REWRITE the COMPLETE code fixing this error. Do NOT use placeholders."""
fix_response = self.developer.generate_content(refix_prompt)
fix_output = fix_response.text
self.log_conversation(f"DEVELOPER (RE-FIX {attempts})", refix_prompt, fix_output)
fixed_code = self.extract_code(fix_output, "APP_PY")
if not fixed_code:
fixed_code = code
self.shared_context["current_code"] = fixed_code
return fix_output, fixed_code
except Exception as e:
return f"βœ— Developer fix error: {str(e)}", code
def autonomous_improvement_cycle(self):
"""Fully autonomous improvement cycle based on REAL performance metrics"""
self.iteration += 1
# Get previous performance metrics
prev_metrics = self.shared_context["performance_history"][-1] if self.shared_context["performance_history"] else {}
prev_score = prev_metrics.get("metrics", {}).get("performance_score", 0)
log = f"\nπŸš€ AUTONOMOUS IMPROVEMENT CYCLE {self.iteration}\n{'='*80}\n"
log += f"Previous Performance Score: {prev_score}/100\n"
log += f"Target: {prev_score + 10}/100\n\n"
# Phase 1: Tester analyzes current performance
log += "πŸ§ͺ PHASE 1: TESTER ANALYZING CURRENT PERFORMANCE...\n"
tester_prompt = f"""Analyze the current system performance:
CURRENT CODE:
```python
{self.shared_context['current_code']}
```
PERFORMANCE HISTORY:
{json.dumps(self.shared_context['performance_history'][-3:], indent=2)}
Identify:
1. Performance bottlenecks
2. Optimization opportunities
3. Specific metrics to improve
4. Realistic performance targets"""
tester_output = self.tester.generate_content(tester_prompt).text
self.log_conversation("TESTER", tester_prompt, tester_output)
log += "βœ“ Performance analysis complete\n\n"
# Phase 2: Architect designs optimizations
log += "πŸ›οΈ PHASE 2: ARCHITECT DESIGNING OPTIMIZATIONS...\n"
architect_prompt = f"""TESTER'S PERFORMANCE ANALYSIS:
{tester_output}
CURRENT PERFORMANCE METRICS:
{json.dumps(prev_metrics.get('metrics', {}), indent=2)}
OPTIMIZATION HISTORY:
{json.dumps(self.shared_context['optimization_log'][-2:], indent=2)}
Design specific optimizations to improve performance score by at least 10 points.
Focus on the biggest bottlenecks identified."""
architect_output = self.architect.generate_content(architect_prompt).text
self.log_conversation("ARCHITECT", architect_prompt, architect_output)
self.shared_context["architecture_decisions"].append({
"iteration": self.iteration,
"design": architect_output
})
log += "βœ“ Optimization strategy complete\n\n"
# Phase 3: Developer implements
log += "πŸ’» PHASE 3: DEVELOPER IMPLEMENTING OPTIMIZATIONS...\n"
dev_output, code, requirements = self.development_phase(
architect_output,
prev_metrics.get('metrics', {})
)
log += "βœ“ Implementation complete\n\n"
# Phase 4: Tester benchmarks new version (estimates)
log += "πŸ§ͺ PHASE 4: TESTER BENCHMARKING NEW VERSION...\n"
test_output, test_metrics = self.testing_phase(code)
log += "βœ“ Benchmarking complete\n\n"
# Phase 5: Deploy and collect REAL metrics
log += "πŸš€ PHASE 5: DEPLOYING TO COLLECT REAL METRICS...\n"
deploy_status = self.deploy_to_space(code, requirements)
log += f"{deploy_status}\n"
log += "⏳ Waiting for space to build...\n"
success, build_msg = self.wait_for_space_build(timeout=180)
log += f"{build_msg}\n"
# Collect real metrics
real_metrics = None
if success:
log += "\nπŸ“Š COLLECTING REAL METRICS FROM DEPLOYED APP...\n"
time.sleep(10) # Extra time for app to stabilize
real_metrics = self.collect_real_metrics()
if real_metrics.get("source") == "real_api":
log += f"βœ“ Real metrics collected successfully!\n"
log += f" Performance Score: {real_metrics.get('performance_score', 0)}/100\n"
else:
log += "⚠️ Could not collect real metrics, using estimates\n"
else:
log += "⚠️ Space failed to build, using test estimates\n"
log += "\n"
# Phase 6: Debugger validates with REAL metrics
log += "πŸ” PHASE 6: DEBUGGER VALIDATING IMPROVEMENTS...\n"
debug_output, ready, final_metrics = self.debugging_phase(code, architect_output, test_metrics, real_metrics)
log += "βœ“ Validation complete\n\n"
# Phase 7: Fix if needed
if not ready:
log += "πŸ”§ PHASE 7: DEVELOPER FIXING ISSUES...\n"
fix_output, code = self.fix_phase(code, debug_output)
log += "βœ“ Fixes applied\n\n"
log += "πŸ”„ RE-DEPLOYING AND RE-TESTING...\n"
deploy_status = self.deploy_to_space(code, requirements)
success, build_msg = self.wait_for_space_build(timeout=180)
if success:
time.sleep(10)
real_metrics = self.collect_real_metrics()
test_output_2, test_metrics_2 = self.testing_phase(code)
debug_output_2, ready, final_metrics = self.debugging_phase(code, architect_output, test_metrics_2, real_metrics)
log += "βœ“ Re-testing complete\n\n"
# Log optimization results
new_score = final_metrics.get("performance_score", 0)
improvement = new_score - prev_score
self.shared_context["optimization_log"].append({
"iteration": self.iteration,
"previous_score": prev_score,
"new_score": new_score,
"improvement": improvement,
"optimizations": architect_output[:500],
"real_metrics": real_metrics.get("source") == "real_api" if real_metrics else False
})
log += f"πŸ“ˆ RESULTS:\n"
log += f"Previous Score: {prev_score}/100\n"
log += f"New Score: {new_score}/100\n"
log += f"Improvement: {'+' if improvement >= 0 else ''}{improvement} points\n"
log += f"Metrics Source: {final_metrics.get('source', 'estimates')}\n"
return log, architect_output, dev_output, debug_output, code, requirements, final_metrics
def extract_code(self, text, marker):
"""Extract code blocks from agent responses"""
try:
if marker == "APP_PY":
if "```python" in text:
start = text.find("```python") + len("```python")
end = text.find("```", start)
return text[start:end].strip()
elif marker == "REQUIREMENTS":
if "REQUIREMENTS:" in text:
start = text.find("REQUIREMENTS:") + len("REQUIREMENTS:")
if "```" in text[start:]:
start = text.find("```", start) + 3
end = text.find("```", start)
return text[start:end].strip()
return ""
except:
return ""
def deploy_to_space(self, code, requirements):
"""Deploy code to HuggingFace Space without overwriting README"""
files = {
"app.py": code,
"requirements.txt": requirements
}
return self.upload_files_to_space(files)
def get_performance_summary(self):
"""Get formatted performance history"""
if not self.shared_context["performance_history"]:
return "No performance data yet"
summary = "πŸ“Š PERFORMANCE HISTORY\n" + "="*80 + "\n\n"
for entry in self.shared_context["performance_history"]:
metrics = entry["metrics"]
real_metrics = entry.get("real_metrics", {})
source = real_metrics.get("source", "estimates")
summary += f"""
Iteration {entry['iteration']} - {entry['timestamp']}
{'='*80}
Metrics Source: {source}
Performance Score: {metrics.get('performance_score', 'N/A')}/100
Throughput: {metrics.get('throughput', 'N/A')} packets/sec
Avg Latency: {metrics.get('latency_avg', 'N/A')} ms
P95 Latency: {metrics.get('latency_p95', 'N/A')} ms
CPU Efficiency: {metrics.get('cpu_efficiency', 'N/A')}
{'='*80}
"""
return summary
def get_agent_communications(self):
"""Get formatted log of all inter-agent communications"""
if not self.conversation_logs:
return "No communications yet"
log = "πŸ’¬ AGENT COMMUNICATION LOG\n" + "="*80 + "\n\n"
for entry in self.conversation_logs[-10:]:
log += f"""
{'='*80}
⏰ {entry['timestamp']}
πŸ€– AGENT: {entry['agent']}
πŸ“₯ INPUT:
{entry['input'][:300]}...
πŸ“€ OUTPUT:
{entry['output'][:500]}...
{'='*80}
"""
return log
def cleanup(self):
"""Cleanup temp directory"""
try:
shutil.rmtree(self.temp_dir)
except:
pass
# Global system instance
system = None
def initialize_system(gemini_key, hf_token):
"""Initialize the multi-agent system"""
global system
try:
system = MultiAgentSystem(gemini_key, hf_token)
return "βœ“ Multi-agent system initialized!\n\nπŸ€– Agents:\nπŸ›οΈ Architect\nπŸ’» Developer\nπŸ” Debugger\nπŸ§ͺ Tester"
except Exception as e:
return f"βœ— Error initializing: {str(e)}"
def run_initial_development():
"""Run the initial development cycle with automatic error fixing and REAL metrics"""
if system is None:
return "βœ— Initialize system first!", "", "", "", "", "", ""
space_status = system.create_space()
if "βœ—" in space_status:
return space_status, "", "", "", "", "", ""
log = f"πŸš€ ITERATION 1: INITIAL DEVELOPMENT\n{'='*80}\n\n"
log += f"{space_status}\n\n"
log += "πŸ›οΈ PHASE 1: ARCHITECT DESIGNING SYSTEM...\n"
architect_output = system.initial_design_phase()
log += f"βœ“ Architecture complete\n\n"
log += "πŸ’» PHASE 2: DEVELOPER IMPLEMENTING...\n"
dev_output, code, requirements = system.development_phase(architect_output)
log += f"βœ“ Implementation complete\n\n"
log += "πŸ§ͺ PHASE 3: TESTER BENCHMARKING (ESTIMATES)...\n"
test_output, test_metrics = system.testing_phase(code)
log += f"βœ“ Benchmarking complete\n\n"
log += "πŸš€ PHASE 4: DEPLOYING...\n"
deploy_status = system.deploy_to_space(code, requirements)
log += f"{deploy_status}\n\n"
log += "⏳ WAITING FOR SPACE TO BUILD...\n"
success, build_msg = system.wait_for_space_build(timeout=180)
log += f"{build_msg}\n\n"
# Auto-fix errors
max_fix_attempts = 3
fix_attempt = 0
while not success and fix_attempt < max_fix_attempts:
fix_attempt += 1
log += f"\nπŸ”§ AUTOMATIC FIX ATTEMPT {fix_attempt}/{max_fix_attempts}\n"
error_info = system.get_space_logs()
fix_log, fixed_code, fixed_req = system.automatic_error_fix_cycle(error_info)
log += fix_log
if fixed_code:
log += "\nπŸš€ RE-DEPLOYING WITH FIXES...\n"
deploy_status = system.deploy_to_space(fixed_code, fixed_req)
log += f"{deploy_status}\n\n"
log += "⏳ WAITING FOR SPACE TO BUILD...\n"
success, build_msg = system.wait_for_space_build(timeout=180)
log += f"{build_msg}\n\n"
if success:
code = fixed_code
requirements = fixed_req
break
else:
log += "βœ— Could not generate fix\n"
break
# Collect REAL metrics from deployed app
real_metrics = None
if success:
log += "\nπŸ“Š COLLECTING REAL METRICS FROM DEPLOYED APP...\n"
time.sleep(10) # Give app time to stabilize
real_metrics = system.collect_real_metrics()
if real_metrics.get("source") == "real_api":
log += f"βœ“ Real metrics collected successfully!\n\n"
else:
log += "⚠️ Could not collect real metrics, using estimates\n\n"
log += "πŸ” PHASE 5: DEBUGGER VALIDATING WITH REAL METRICS...\n"
debug_output, ready, metrics = system.debugging_phase(code, architect_output, test_metrics, real_metrics)
log += f"βœ“ Validation complete\n\n"
if not ready:
log += "πŸ”§ PHASE 6: DEVELOPER FIXING ISSUES...\n"
fix_output, code = system.fix_phase(code, debug_output)
log += f"βœ“ Fixes applied\n\n"
log += "πŸ”„ RE-DEPLOYING AND RE-TESTING...\n"
deploy_status = system.deploy_to_space(code, requirements)
success, build_msg = system.wait_for_space_build(timeout=180)
if success:
time.sleep(10)
real_metrics = system.collect_real_metrics()
test_output_2, test_metrics_2 = system.testing_phase(code)
debug_output_2, ready, metrics = system.debugging_phase(code, architect_output, test_metrics_2, real_metrics)
debug_output += "\n\n--- AFTER FIXES ---\n" + debug_output_2
log += f"βœ“ Re-testing complete\n\n"
log += f"🌐 Space URL: https://huggingface.co/spaces/{system.space_name}\n\n"
log += f"πŸ“Š Initial Performance Score: {metrics.get('performance_score', 'N/A')}/100\n"
log += f"πŸ“ˆ Metrics Source: {metrics.get('source', 'estimates')}\n"
return log, architect_output, dev_output, debug_output, code, deploy_status, f"Performance Score: {metrics.get('performance_score', 0)}/100 (Source: {metrics.get('source', 'estimates')})"
def run_autonomous_improvement():
"""Run autonomous improvement cycle with REAL metrics collection"""
if system is None:
return "βœ— Initialize system first!", "", "", "", "", "", ""
if system.iteration == 0:
return "βœ— Run initial development first!", "", "", "", "", "", ""
log, arch_out, dev_out, debug_out, code, req, metrics = system.autonomous_improvement_cycle()
log += f"\n🌐 Space URL: https://huggingface.co/spaces/{system.space_name}\n"
score_summary = f"Performance Score: {metrics.get('performance_score', 0)}/100 (Source: {metrics.get('source', 'estimates')})"
return log, arch_out, dev_out, debug_out, code, "", score_summary
def get_performance_history():
"""Get performance history"""
if system is None:
return "No performance data yet"
return system.get_performance_summary()
def get_communications():
"""Get inter-agent communications"""
if system is None:
return "No communications yet"
return system.get_agent_communications()
# Create Gradio Interface
with gr.Blocks(title="Autonomous Multi-Agent Optimizer") as demo:
gr.Markdown("""
# πŸ€– Autonomous Multi-Agent Performance Optimizer
Watch AI agents autonomously optimize packet simulation algorithms through iterative improvements.
## The Team:
- πŸ›οΈ **ARCHITECT**: Analyzes metrics and designs optimizations
- πŸ’» **DEVELOPER**: Implements high-performance code
- πŸ” **DEBUGGER**: Validates performance and quality
- πŸ§ͺ **TESTER**: Runs benchmarks and collects metrics
**Fully Autonomous**: Agents optimize algorithms based on REAL performance scores from deployed apps!
""")
with gr.Tab("πŸš€ Setup & Initial Build"):
gr.Markdown("### Step 1: Initialize Multi-Agent System")
with gr.Row():
gemini_api_key = gr.Textbox(
label="Gemini API Key",
placeholder="Enter your Google AI Studio API key",
type="password"
)
hf_token = gr.Textbox(
label="Hugging Face Token",
placeholder="Enter your HF token (write access)",
type="password"
)
init_btn = gr.Button("πŸ€– Initialize Agent Team", variant="primary", size="lg")
init_output = gr.Textbox(label="System Status", lines=5)
init_btn.click(
initialize_system,
inputs=[gemini_api_key, hf_token],
outputs=init_output
)
gr.Markdown("### Step 2: Initial Development")
gr.Markdown("Agents will autonomously design, build, deploy, and measure REAL performance metrics.")
develop_btn = gr.Button("🎯 Start Autonomous Development", variant="primary", size="lg")
dev_log = gr.Textbox(label="Development Log", lines=12)
performance_display = gr.Textbox(label="Initial Performance Score", lines=2)
with gr.Row():
with gr.Column():
architect_output = gr.Textbox(label="πŸ›οΈ Architect's Design", lines=8)
with gr.Column():
developer_output = gr.Textbox(label="πŸ’» Developer's Implementation", lines=8)
with gr.Row():
with gr.Column():
debugger_output = gr.Textbox(label="πŸ” Debugger's Report", lines=8)
with gr.Column():
initial_code = gr.Code(label="Final Code", language="python", lines=8)
deploy_status_1 = gr.Textbox(label="Deployment Status")
develop_btn.click(
run_initial_development,
outputs=[dev_log, architect_output, developer_output, debugger_output, initial_code, deploy_status_1, performance_display]
)
with gr.Tab("πŸ”„ Autonomous Optimization"):
gr.Markdown("""
### Fully Autonomous Improvement Cycles with REAL Metrics
The agents will autonomously:
1. Tester analyzes current performance and identifies bottlenecks
2. Architect designs optimizations to improve scores
3. Developer implements performance improvements
4. **Deploy and collect REAL metrics from the running app**
5. Debugger validates improvements using REAL data
6. Measure actual score improvement
**Goal**: Continuously improve performance scores through autonomous optimization with real feedback!
""")
gr.Markdown("### Current Performance")
current_perf = gr.Textbox(label="Current Score", lines=2, value="Run initial development first")
performance_display.change(lambda x: x, inputs=performance_display, outputs=current_perf)
optimize_btn = gr.Button("πŸš€ Run Autonomous Optimization Cycle", variant="primary", size="lg")
optimization_log = gr.Textbox(label="Optimization Log", lines=12)
new_perf = gr.Textbox(label="New Performance Score", lines=2)
with gr.Row():
with gr.Column():
architect_improve = gr.Textbox(label="πŸ›οΈ Optimization Strategy", lines=8)
with gr.Column():
developer_improve = gr.Textbox(label="πŸ’» Performance Improvements", lines=8)
with gr.Row():
with gr.Column():
debugger_improve = gr.Textbox(label="πŸ” Performance Validation", lines=8)
with gr.Column():
improved_code = gr.Code(label="Optimized Code", language="python", lines=8)
deploy_status_2 = gr.Textbox(label="Deployment Status")
new_perf.change(lambda x: x, inputs=new_perf, outputs=current_perf)
optimize_btn.click(
run_autonomous_improvement,
outputs=[optimization_log, architect_improve, developer_improve, debugger_improve, improved_code, deploy_status_2, new_perf]
)
gr.Markdown("### Tip: Run Multiple Cycles")
gr.Markdown("Keep clicking 'Run Autonomous Optimization Cycle' to watch the agents iteratively improve based on REAL performance data! Each cycle aims to improve the score by 10+ points.")
with gr.Tab("πŸ“Š Performance Analytics"):
gr.Markdown("""
### Performance History & Metrics
View complete performance tracking across all iterations, including REAL vs estimated metrics.
""")
refresh_perf_btn = gr.Button("πŸ”„ Refresh Performance Data")
perf_history = gr.Textbox(label="Performance History", lines=25)
refresh_perf_btn.click(
get_performance_history,
outputs=perf_history
)
with gr.Tab("πŸ’¬ Agent Communications"):
gr.Markdown("""
### Inter-Agent Communication Log
See how agents communicate and share context.
""")
refresh_btn = gr.Button("πŸ”„ Refresh Communications")
comms_output = gr.Textbox(label="Agent Communications", lines=30)
refresh_btn.click(
get_communications,
outputs=comms_output
)
gr.Markdown("""
---
### How Autonomous Optimization Works:
**Initial Development:**
1. Architect designs system architecture
2. Developer implements code with metrics API endpoint
3. Deploy to HuggingFace
4. **Collect REAL metrics from deployed app**
5. Debugger validates using real performance data
**Optimization Cycles (Fully Autonomous with REAL metrics):**
1. Tester analyzes current metrics and identifies bottlenecks
2. Architect designs optimizations based on data
3. Developer implements performance improvements
4. Deploy new version
5. **Collect REAL metrics from the running app**
6. Debugger validates improvements using actual measured performance
7. Compare real performance gains
**Performance Scoring (0-100):**
- Throughput (packets/sec)
- Latency (average, p95, p99)
- CPU Efficiency
- Algorithm Complexity
- Code Quality
**Key Features:**
- Fully autonomous optimization (no human input needed)
- **REAL performance metrics from deployed apps**
- Performance-driven improvements based on actual data
- Shared context across all agents
- Iterative score improvements
- Complete metric tracking
- README preservation on deploy
""")
if __name__ == "__main__":
demo.launch()