Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import google.generativeai as genai | |
| from huggingface_hub import HfApi, create_repo, upload_folder, space_info | |
| import os | |
| import time | |
| import json | |
| from datetime import datetime | |
| import tempfile | |
| import shutil | |
| import re | |
| import requests | |
| # Load Prompts from JSON | |
| PROMPTS = {} | |
| try: | |
| with open("prompts.json", "r") as f: | |
| PROMPTS = json.load(f) | |
| print("β Loaded prompts from prompts.json") | |
| except Exception as e: | |
| print(f"β Error loading prompts.json: {e}") | |
| # Define minimal fallback prompts if file load fails to prevent crash | |
| PROMPTS = { | |
| "architect": "You are the Architect Agent.", | |
| "developer": "You are the Developer Agent. Ensure you include a get_performance_metrics function with api_name='get_metrics'.", | |
| "debugger": "You are the Debugger Agent.", | |
| "tester": "You are the Tester Agent." | |
| } | |
| class MetricCollector: | |
| """Collects real metrics from deployed Gradio apps""" | |
| def __init__(self, space_name: str): | |
| self.space_name = space_name | |
| self.base_url = f"https://{space_name.replace('/', '-')}.hf.space" | |
| def wait_for_space_ready(self, timeout: int = 300) -> bool: | |
| """Wait for space to be accessible and responding""" | |
| start_time = time.time() | |
| print(f"π Waiting for space at {self.base_url}") | |
| while time.time() - start_time < timeout: | |
| try: | |
| response = requests.get(self.base_url, timeout=10) | |
| if response.status_code == 200: | |
| print("β Space is responding to HTTP requests") | |
| # Verify the Gradio API is actually available | |
| try: | |
| info_response = requests.get(f"{self.base_url}/info", timeout=10) | |
| if info_response.status_code == 200: | |
| print("β Gradio API is available") | |
| # Give it extra time to fully initialize | |
| time.sleep(30) | |
| return True | |
| except: | |
| print("β³ Gradio API not ready yet...") | |
| except Exception as e: | |
| elapsed = int(time.time() - start_time) | |
| print(f"β³ Still waiting... ({elapsed}s) - {str(e)[:50]}") | |
| time.sleep(15) | |
| print(f"β Timeout after {timeout}s") | |
| return False | |
| def test_app_functionality(self) -> bool: | |
| """Test if the app is actually functional by trying to use it""" | |
| print("π§ͺ Testing app functionality...") | |
| try: | |
| # Get available endpoints | |
| info_response = requests.get(f"{self.base_url}/info", timeout=10) | |
| if info_response.status_code != 200: | |
| print("β Cannot fetch app info") | |
| return False | |
| info = info_response.json() | |
| endpoints = info.get('named_endpoints', {}) | |
| if not endpoints: | |
| print("β No endpoints found") | |
| return False | |
| print(f"β Found {len(endpoints)} endpoints") | |
| # Try to call at least one endpoint to verify app works | |
| for endpoint_path in endpoints.keys(): | |
| endpoint_name = endpoint_path.lstrip('/') | |
| print(f"π§ͺ Testing endpoint: {endpoint_name}") | |
| try: | |
| test_result = self.call_gradio_api(endpoint_name, [], timeout=30) | |
| if test_result is not None: | |
| print(f"β Endpoint {endpoint_name} is functional") | |
| return True | |
| except Exception as e: | |
| print(f"β οΈ Endpoint {endpoint_name} failed: {str(e)[:100]}") | |
| continue | |
| print("β No functional endpoints found") | |
| return False | |
| except Exception as e: | |
| print(f"β Functionality test failed: {str(e)}") | |
| return False | |
| def call_gradio_api(self, endpoint: str, data: list = None, timeout: int = 60): | |
| """Call a Gradio API endpoint with better error handling""" | |
| try: | |
| if data is None: | |
| data = [] | |
| api_url = f"{self.base_url}/api/{endpoint}" | |
| print(f"π‘ Calling API: {api_url}") | |
| response = requests.post( | |
| api_url, | |
| json={"data": data}, | |
| timeout=timeout, | |
| headers={"Content-Type": "application/json"} | |
| ) | |
| print(f"π‘ Response status: {response.status_code}") | |
| if response.status_code == 200: | |
| result = response.json() | |
| data_result = result.get("data", []) | |
| print(f"β API call successful, got {len(data_result)} data items") | |
| return data_result | |
| else: | |
| print(f"β οΈ API call failed: {response.status_code}") | |
| print(f"Response: {response.text[:200]}") | |
| return None | |
| except requests.exceptions.Timeout: | |
| print(f"β οΈ API call timed out after {timeout}s") | |
| return None | |
| except Exception as e: | |
| print(f"β οΈ Error calling API: {str(e)}") | |
| return None | |
| def extract_metrics_from_app(self, max_retries: int = 3): | |
| """Extract real performance metrics from the deployed app with retries""" | |
| for attempt in range(max_retries): | |
| print(f"\nπ― Metrics collection attempt {attempt + 1}/{max_retries}") | |
| metrics = { | |
| "throughput": 0, | |
| "latency_avg": 0, | |
| "latency_p50": 0, | |
| "latency_p95": 0, | |
| "latency_p99": 0, | |
| "cpu_efficiency": 0, | |
| "memory_usage": 0, | |
| "performance_score": 0, | |
| "source": "failed" | |
| } | |
| # Wait for space to be ready | |
| if not self.wait_for_space_ready(timeout=300): | |
| print(f"β οΈ Attempt {attempt + 1}: Space not ready") | |
| if attempt < max_retries - 1: | |
| print("β³ Waiting 30s before retry...") | |
| time.sleep(30) | |
| continue | |
| # Test if app is functional | |
| if not self.test_app_functionality(): | |
| print(f"β οΈ Attempt {attempt + 1}: App not functional") | |
| if attempt < max_retries - 1: | |
| print("β³ Waiting 30s before retry...") | |
| time.sleep(30) | |
| continue | |
| try: | |
| # Try to get the app's info endpoint | |
| info_response = requests.get(f"{self.base_url}/info", timeout=10) | |
| if info_response.status_code == 200: | |
| info = info_response.json() | |
| endpoints = info.get('named_endpoints', {}) | |
| print(f"β Found {len(endpoints)} named endpoints: {list(endpoints.keys())}") | |
| # Priority 1: Look for the get_metrics endpoint | |
| if '/get_metrics' in endpoints: | |
| print("π― Found get_metrics endpoint!") | |
| result = self.call_gradio_api("get_metrics", [], timeout=60) | |
| if result: | |
| print(f"π Raw result from get_metrics: {str(result)[:200]}") | |
| metrics_data = self.parse_metrics_from_result(result) | |
| if metrics_data.get("performance_score", 0) > 0: | |
| metrics.update(metrics_data) | |
| metrics["source"] = "real_api" | |
| print(f"β SUCCESS! Collected real metrics: Score {metrics['performance_score']}/100") | |
| return metrics | |
| else: | |
| print("β οΈ Metrics parsed but score is 0") | |
| else: | |
| print("β οΈ get_metrics returned None") | |
| else: | |
| print("β οΈ 'get_metrics' endpoint NOT found in named_endpoints!") | |
| # Priority 2: Try common metric endpoint patterns | |
| metric_keywords = ['metric', 'benchmark', 'performance', 'test', 'stats'] | |
| for endpoint_path, endpoint_info in endpoints.items(): | |
| endpoint_name = endpoint_path.lstrip('/') | |
| if any(keyword in endpoint_name.lower() for keyword in metric_keywords): | |
| print(f"π§ͺ Trying endpoint: {endpoint_name}") | |
| result = self.call_gradio_api(endpoint_name, [], timeout=60) | |
| if result: | |
| print(f"π Raw result from {endpoint_name}: {str(result)[:200]}") | |
| metrics_data = self.parse_metrics_from_result(result) | |
| if metrics_data.get("performance_score", 0) > 0: | |
| metrics.update(metrics_data) | |
| metrics["source"] = "real_api" | |
| print(f"β SUCCESS! Collected metrics from {endpoint_name}") | |
| return metrics | |
| # Priority 3: Try ALL endpoints as a last resort | |
| print("π Trying all available endpoints...") | |
| for endpoint_path in endpoints.keys(): | |
| endpoint_name = endpoint_path.lstrip('/') | |
| print(f"π§ͺ Trying endpoint: {endpoint_name}") | |
| try: | |
| result = self.call_gradio_api(endpoint_name, [], timeout=30) | |
| if result: | |
| metrics_data = self.parse_metrics_from_result(result) | |
| if metrics_data.get("performance_score", 0) > 0: | |
| metrics.update(metrics_data) | |
| metrics["source"] = "real_api" | |
| print(f"β SUCCESS! Collected metrics from {endpoint_name}") | |
| return metrics | |
| except Exception as e: | |
| print(f"β οΈ Endpoint {endpoint_name} failed: {str(e)[:50]}") | |
| continue | |
| except Exception as e: | |
| print(f"β οΈ Attempt {attempt + 1} error: {str(e)}") | |
| if attempt < max_retries - 1: | |
| print("β³ Waiting 30s before retry...") | |
| time.sleep(30) | |
| # All attempts failed | |
| print("β Failed to collect real metrics after all attempts") | |
| metrics["source"] = "placeholder" | |
| metrics["performance_score"] = 50 # Default baseline | |
| return metrics | |
| def parse_metrics_from_result(self, result): | |
| """Parse metrics from Gradio API result with improved handling""" | |
| metrics = {} | |
| print(f"π Parsing result type: {type(result)}") | |
| # Result could be a list, dict, or string | |
| if isinstance(result, list) and len(result) > 0: | |
| item = result[0] | |
| print(f"π List item type: {type(item)}") | |
| if isinstance(item, str): | |
| print(f"π String content: {item[:200]}") | |
| # Try to parse as JSON | |
| try: | |
| parsed = json.loads(item) | |
| print(f"β Parsed as JSON: {parsed}") | |
| metrics.update(self.extract_metrics_from_dict(parsed)) | |
| except: | |
| print("β οΈ Not valid JSON, trying regex") | |
| # Try regex parsing | |
| metrics.update(self.extract_metrics_from_text(item)) | |
| elif isinstance(item, dict): | |
| print(f"β Direct dict: {item}") | |
| metrics.update(self.extract_metrics_from_dict(item)) | |
| elif isinstance(result, dict): | |
| print(f"β Direct dict result: {result}") | |
| metrics.update(self.extract_metrics_from_dict(result)) | |
| elif isinstance(result, str): | |
| print(f"π String result: {result[:200]}") | |
| try: | |
| parsed = json.loads(result) | |
| metrics.update(self.extract_metrics_from_dict(parsed)) | |
| except: | |
| metrics.update(self.extract_metrics_from_text(result)) | |
| print(f"π Extracted metrics: {metrics}") | |
| return metrics | |
| def extract_metrics_from_dict(self, data: dict): | |
| """Extract metrics from dictionary data""" | |
| metrics = {} | |
| # Common metric keys | |
| key_mappings = { | |
| 'throughput': ['throughput', 'packets_per_sec', 'pps', 'packets_sec'], | |
| 'latency_avg': ['latency_avg', 'avg_latency', 'latency_mean', 'mean_latency'], | |
| 'latency_p50': ['latency_p50', 'p50', 'median_latency', 'latency_median'], | |
| 'latency_p95': ['latency_p95', 'p95', 'latency_95'], | |
| 'latency_p99': ['latency_p99', 'p99', 'latency_99'], | |
| 'cpu_efficiency': ['cpu_efficiency', 'efficiency', 'ops_per_packet', 'cpu_ops'], | |
| 'memory_usage': ['memory_usage', 'memory_mb', 'memory', 'mem_usage'], | |
| 'performance_score': ['performance_score', 'score', 'total_score', 'overall_score'] | |
| } | |
| for metric_key, possible_keys in key_mappings.items(): | |
| for key in possible_keys: | |
| if key in data: | |
| try: | |
| value = float(data[key]) | |
| metrics[metric_key] = value | |
| print(f"β Found {metric_key} = {value}") | |
| break | |
| except (ValueError, TypeError): | |
| pass | |
| return metrics | |
| def extract_metrics_from_text(self, text: str): | |
| """Extract metrics from text output using regex""" | |
| metrics = {} | |
| patterns = { | |
| 'throughput': r'throughput[:\s]+([0-9,.]+)', | |
| 'latency_avg': r'(?:avg|average|mean)\s*latency[:\s]+([0-9.]+)', | |
| 'latency_p50': r'p50[:\s]+([0-9.]+)', | |
| 'latency_p95': r'p95[:\s]+([0-9.]+)', | |
| 'latency_p99': r'p99[:\s]+([0-9.]+)', | |
| 'cpu_efficiency': r'(?:cpu[_\s]*)?efficiency[:\s]+([0-9.]+)', | |
| 'performance_score': r'(?:performance[_\s]+)?score[:\s]+([0-9]+)' | |
| } | |
| for key, pattern in patterns.items(): | |
| match = re.search(pattern, text, re.IGNORECASE) | |
| if match: | |
| try: | |
| value = match.group(1).replace(',', '') | |
| metrics[key] = float(value) | |
| print(f"β Regex found {key} = {value}") | |
| except: | |
| pass | |
| return metrics | |
| class MultiAgentSystem: | |
| def __init__(self, gemini_api_key, hf_token): | |
| self.hf_api = HfApi(token=hf_token) | |
| self.hf_token = hf_token | |
| genai.configure(api_key=gemini_api_key) | |
| # Initialize four specialized agents with prompts loaded from JSON | |
| self.architect = genai.GenerativeModel( | |
| model_name='gemini-2.5-flash', | |
| system_instruction=PROMPTS.get("architect", "You are the Architect Agent.") | |
| ) | |
| self.developer = genai.GenerativeModel( | |
| model_name='gemini-2.5-flash', | |
| system_instruction=PROMPTS.get("developer", "You are the Developer Agent.") | |
| ) | |
| self.debugger = genai.GenerativeModel( | |
| model_name='gemini-2.5-flash', | |
| system_instruction=PROMPTS.get("debugger", "You are the Debugger Agent.") | |
| ) | |
| self.tester = genai.GenerativeModel( | |
| model_name='gemini-2.5-flash', | |
| system_instruction=PROMPTS.get("tester", "You are the Tester Agent.") | |
| ) | |
| self.space_name = None | |
| self.iteration = 0 | |
| self.shared_context = { | |
| "current_code": "", | |
| "performance_history": [], | |
| "architecture_decisions": [], | |
| "known_issues": [], | |
| "optimization_log": [] | |
| } | |
| self.temp_dir = tempfile.mkdtemp() | |
| self.conversation_logs = [] | |
| self.target_improvement = 0 | |
| def log_conversation(self, agent_name, input_msg, output_msg): | |
| """Log inter-agent communication""" | |
| self.conversation_logs.append({ | |
| "timestamp": datetime.now().isoformat(), | |
| "agent": agent_name, | |
| "input": input_msg[:1000], | |
| "output": output_msg[:2000] | |
| }) | |
| def create_space(self): | |
| """Create a new HuggingFace Space""" | |
| try: | |
| timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") | |
| # Create a simplified space name | |
| name_suffix = f"optimizer-{timestamp}" | |
| user_info = self.hf_api.whoami() | |
| username = user_info['name'] | |
| # Full repo ID | |
| repo_id = f"{username}/packet-{name_suffix}" | |
| print(f"Creating Space: {repo_id}") | |
| create_repo( | |
| repo_id=repo_id, | |
| repo_type="space", | |
| space_sdk="gradio", | |
| token=self.hf_token, | |
| private=False | |
| ) | |
| self.space_name = repo_id | |
| return f"β Created Space: {repo_id}" | |
| except Exception as e: | |
| return f"β Error creating space: {str(e)}" | |
| def upload_files_to_space(self, files): | |
| """Upload files to the created space""" | |
| if not self.space_name: | |
| return "β No space created" | |
| try: | |
| # Create temp directory for upload | |
| upload_dir = os.path.join(self.temp_dir, "upload") | |
| if os.path.exists(upload_dir): | |
| shutil.rmtree(upload_dir) | |
| os.makedirs(upload_dir) | |
| # Write files | |
| for filename, content in files.items(): | |
| with open(os.path.join(upload_dir, filename), "w") as f: | |
| f.write(content) | |
| # Upload | |
| print(f"Uploading to {self.space_name}...") | |
| self.hf_api.upload_folder( | |
| folder_path=upload_dir, | |
| repo_id=self.space_name, | |
| repo_type="space", | |
| token=self.hf_token | |
| ) | |
| return f"β Deployed to {self.space_name}" | |
| except Exception as e: | |
| return f"β Error uploading: {str(e)}" | |
| def get_space_logs(self): | |
| """Get logs from the space (best effort)""" | |
| if not self.space_name: | |
| return {"error": "No space"} | |
| try: | |
| info = space_info(self.space_name, token=self.hf_token) | |
| runtime = info.runtime | |
| return { | |
| "stage": runtime.stage if runtime else "UNKNOWN", | |
| "error": getattr(runtime, "error", None), | |
| "message": "Check HF Space logs for details" | |
| } | |
| except Exception as e: | |
| return {"error": str(e)} | |
| def validate_code_locally(self, code): | |
| """Validate code syntax and required endpoints locally before deploying""" | |
| try: | |
| if not code or len(code.strip()) == 0: | |
| return False, "Code is empty" | |
| # Check for API endpoint requirement | |
| if 'api_name="get_metrics"' not in code and "api_name='get_metrics'" not in code: | |
| return False, "Code is missing the required api_name=\"get_metrics\" argument in the Gradio button." | |
| # Check for placeholders that cause syntax errors | |
| if "..." in code or "pass # Implement" in code: | |
| return False, "Code contains placeholders (...) or incomplete blocks which will cause runtime errors." | |
| # Check for valid syntax | |
| compile(code, '<string>', 'exec') | |
| return True, "Valid Python Syntax" | |
| except Exception as e: | |
| return False, f"Syntax Error: {str(e)}" | |
| def collect_real_metrics(self): | |
| """Collect real metrics from deployed space with better validation""" | |
| if not self.space_name: | |
| print("β οΈ No space deployed yet") | |
| return { | |
| "throughput": 0, | |
| "latency_avg": 0, | |
| "performance_score": 0, | |
| "source": "no_space" | |
| } | |
| print(f"\n{'='*80}") | |
| print(f"π COLLECTING REAL METRICS FROM: {self.space_name}") | |
| print(f"{'='*80}\n") | |
| collector = MetricCollector(self.space_name) | |
| metrics = collector.extract_metrics_from_app(max_retries=3) | |
| print(f"\n{'='*80}") | |
| print("π FINAL COLLECTED METRICS:") | |
| print(f" Source: {metrics.get('source', 'unknown')}") | |
| print(f" Throughput: {metrics.get('throughput', 0):,.0f} packets/sec") | |
| print(f" Avg Latency: {metrics.get('latency_avg', 0):.2f} ms") | |
| print(f" P95 Latency: {metrics.get('latency_p95', 0):.2f} ms") | |
| print(f" Performance Score: {metrics.get('performance_score', 0)}/100") | |
| print(f"{'='*80}\n") | |
| # If we got placeholder metrics, try one more time after a longer wait | |
| if metrics.get("source") == "placeholder": | |
| print("β οΈ Got placeholder metrics, trying once more after 60s wait...") | |
| time.sleep(60) | |
| metrics = collector.extract_metrics_from_app(max_retries=1) | |
| return metrics | |
| def wait_for_space_build(self, timeout=300): | |
| """Wait for space to build and verify it's functional""" | |
| if not self.space_name: | |
| return False, "No space created" | |
| start_time = time.time() | |
| print(f"\nβ³ Waiting for space to build (timeout: {timeout}s)...") | |
| while time.time() - start_time < timeout: | |
| try: | |
| info = space_info(self.space_name, token=self.hf_token) | |
| runtime = info.runtime if hasattr(info, 'runtime') else None | |
| if runtime and hasattr(runtime, 'stage'): | |
| stage = runtime.stage | |
| elapsed = int(time.time() - start_time) | |
| print(f" Stage: {stage} ({elapsed}s elapsed)") | |
| if stage == "RUNNING": | |
| print("β Space is RUNNING!") | |
| # Additional verification: test if app is actually functional | |
| print("π§ͺ Verifying app functionality...") | |
| time.sleep(15) # Give it time to fully start | |
| collector = MetricCollector(self.space_name) | |
| if collector.test_app_functionality(): | |
| print("β Space is running AND functional!") | |
| return True, "Space is running and functional" | |
| else: | |
| print("β οΈ Space is running but app may not be functional yet, waiting...") | |
| time.sleep(20) | |
| continue | |
| elif stage in ["RUNTIME_ERROR", "BUILD_ERROR"]: | |
| error_msg = getattr(runtime, 'error', 'Unknown error') | |
| return False, f"Space error: {stage} - {error_msg}" | |
| time.sleep(15) | |
| except Exception as e: | |
| print(f" Error checking status: {str(e)[:50]}") | |
| time.sleep(15) | |
| continue | |
| return False, f"Timeout after {timeout}s" | |
| def automatic_error_fix_cycle(self, error_info): | |
| """Automatically fix errors detected in deployed space""" | |
| log = "\nπ§ AUTOMATIC ERROR DETECTION AND FIX\n" + "="*80 + "\n" | |
| log += f"Error detected: {error_info}\n\n" | |
| log += "π PHASE 1: DEBUGGER ANALYZING ERROR...\n" | |
| debugger_prompt = f"""The deployed space has encountered an error: | |
| ERROR INFO: | |
| {json.dumps(error_info, indent=2)} | |
| CURRENT CODE: | |
| ```python | |
| {self.shared_context['current_code']} | |
| ``` | |
| Analyze: | |
| 1. What is causing the error | |
| 2. Root cause analysis | |
| 3. Specific fixes needed | |
| Provide detailed debugging information.""" | |
| try: | |
| debugger_response = self.debugger.generate_content(debugger_prompt) | |
| debugger_output = debugger_response.text | |
| self.log_conversation("DEBUGGER (ERROR)", debugger_prompt, debugger_output) | |
| log += "β Error analysis complete\n\n" | |
| except Exception as e: | |
| return log + f"β Error during analysis: {str(e)}\n", None, None | |
| log += "π» PHASE 2: DEVELOPER FIXING ERROR...\n" | |
| fix_prompt = f"""DEBUGGER'S ERROR ANALYSIS: | |
| {debugger_output} | |
| ERROR INFO: | |
| {json.dumps(error_info, indent=2)} | |
| CURRENT CODE: | |
| ```python | |
| {self.shared_context['current_code']} | |
| ``` | |
| Fix the error and provide corrected code. Ensure: | |
| 1. The specific error is resolved | |
| 2. Code is syntactically correct | |
| 3. All imports are included | |
| 4. The app will run without errors | |
| 5. The get_performance_metrics() function with api_name="get_metrics" is included | |
| 6. CRITICAL: DO NOT use placeholders like '...'""" | |
| try: | |
| fix_response = self.developer.generate_content(fix_prompt) | |
| fix_output = fix_response.text | |
| self.log_conversation("DEVELOPER (ERROR FIX)", fix_prompt, fix_output) | |
| fixed_code = self.extract_code(fix_output, "APP_PY") | |
| requirements = self.extract_code(fix_output, "REQUIREMENTS") | |
| # Local Validation Loop | |
| attempts = 0 | |
| while attempts < 3: | |
| is_valid, error_msg = self.validate_code_locally(fixed_code) | |
| if is_valid: | |
| break | |
| attempts += 1 | |
| print(f"β οΈ Fixed code still has syntax errors (Attempt {attempts}/3): {error_msg}") | |
| refix_prompt = f"""The fixed code still has a SYNTAX ERROR: | |
| ERROR: {error_msg} | |
| You likely used placeholders like '...' or left incomplete blocks. | |
| REWRITE the COMPLETE code fixing this error. Do NOT use placeholders.""" | |
| fix_response = self.developer.generate_content(refix_prompt) | |
| fix_output = fix_response.text | |
| self.log_conversation(f"DEVELOPER (RE-FIX {attempts})", refix_prompt, fix_output) | |
| fixed_code = self.extract_code(fix_output, "APP_PY") | |
| if not fixed_code: | |
| fixed_code = self.shared_context['current_code'] | |
| if not requirements: | |
| requirements = "gradio==4.44.0\nnumpy>=1.24.0" | |
| self.shared_context['current_code'] = fixed_code | |
| log += "β Error fix implemented\n\n" | |
| return log, fixed_code, requirements | |
| except Exception as e: | |
| return log + f"β Error during fix: {str(e)}\n", None, None | |
| def extract_metrics_from_output(self, text): | |
| """Extract performance metrics from agent outputs (fallback)""" | |
| metrics = { | |
| "throughput": 0, | |
| "latency_avg": 0, | |
| "latency_p95": 0, | |
| "cpu_efficiency": 0, | |
| "performance_score": 0 | |
| } | |
| throughput_match = re.search(r'Throughput[:\s]+([0-9,.]+)', text, re.IGNORECASE) | |
| if throughput_match: | |
| try: | |
| metrics["throughput"] = float(throughput_match.group(1).replace(',', '')) | |
| except: | |
| pass | |
| latency_match = re.search(r'Avg\s+Latency[:\s]+([0-9.]+)', text, re.IGNORECASE) | |
| if latency_match: | |
| try: | |
| metrics["latency_avg"] = float(latency_match.group(1)) | |
| except: | |
| pass | |
| p95_match = re.search(r'P95\s+Latency[:\s]+([0-9.]+)', text, re.IGNORECASE) | |
| if p95_match: | |
| try: | |
| metrics["latency_p95"] = float(p95_match.group(1)) | |
| except: | |
| pass | |
| score_match = re.search(r'PERFORMANCE[_\s]+SCORE[:\s]+([0-9]+)', text, re.IGNORECASE) | |
| if score_match: | |
| try: | |
| metrics["performance_score"] = int(score_match.group(1)) | |
| except: | |
| pass | |
| return metrics | |
| def initial_design_phase(self): | |
| """Phase 1: Architect designs initial system""" | |
| self.iteration += 1 | |
| architect_prompt = """Design the initial packet simulation algorithm system optimized for high performance. | |
| Consider: | |
| - Multiple algorithm approaches (basic, batched, optimized) | |
| - Performance metrics to track (throughput, latency, efficiency) | |
| - Benchmarking capabilities built into the UI | |
| - Clear performance visualizations | |
| CRITICAL: The app MUST include a get_performance_metrics() function that returns JSON metrics. | |
| Focus on creating a baseline that we can iteratively improve. | |
| Target initial performance score: 60/100""" | |
| try: | |
| architect_response = self.architect.generate_content(architect_prompt) | |
| architect_output = architect_response.text | |
| self.log_conversation("ARCHITECT", architect_prompt, architect_output) | |
| self.shared_context["architecture_decisions"].append({ | |
| "iteration": self.iteration, | |
| "design": architect_output, | |
| "timestamp": datetime.now().isoformat() | |
| }) | |
| return architect_output | |
| except Exception as e: | |
| return f"β Architect error: {str(e)}" | |
| def development_phase(self, architect_specs, previous_metrics=None): | |
| """Phase 2: Developer implements with performance focus""" | |
| context = f"""ARCHITECT'S SPECIFICATIONS: | |
| {architect_specs} | |
| PERFORMANCE CONTEXT: | |
| """ | |
| if previous_metrics: | |
| context += f"Previous iteration metrics: {json.dumps(previous_metrics, indent=2)}\n" | |
| context += "Your goal is to exceed these metrics.\n" | |
| else: | |
| context += "This is the initial implementation. Focus on creating a solid baseline with good instrumentation.\n" | |
| developer_prompt = f"""{context} | |
| Implement a complete Gradio application with: | |
| 1. Multiple packet simulation algorithms (at least 3) | |
| 2. A get_performance_metrics() function with api_name="get_metrics" that returns JSON | |
| 3. Built-in benchmarking tools | |
| 4. Real-time performance visualization | |
| 5. Detailed timing measurements | |
| CRITICAL: Include the get_performance_metrics() function exactly as specified in your system prompt. | |
| CRITICAL: Write the FULL, COMPLETE code. Do not use placeholders like '...' or 'pass' for incomplete blocks. | |
| Remember: Simulate packets safely (NO real network operations)""" | |
| try: | |
| developer_response = self.developer.generate_content(developer_prompt) | |
| developer_output = developer_response.text | |
| self.log_conversation("DEVELOPER", developer_prompt, developer_output) | |
| code = self.extract_code(developer_output, "APP_PY") | |
| requirements = self.extract_code(developer_output, "REQUIREMENTS") | |
| # Local Validation Loop | |
| attempts = 0 | |
| while attempts < 3: | |
| is_valid, error_msg = self.validate_code_locally(code) | |
| if is_valid: | |
| break | |
| attempts += 1 | |
| print(f"β οΈ Generated code has syntax errors (Attempt {attempts}/3): {error_msg}") | |
| fix_syntax_prompt = f"""The code you wrote has a SYNTAX ERROR and cannot run. | |
| ERROR: {error_msg} | |
| You likely used placeholders like '...' or left incomplete blocks. | |
| REWRITE the COMPLETE code fixing this error. Do NOT use placeholders.""" | |
| developer_response = self.developer.generate_content(fix_syntax_prompt) | |
| developer_output = developer_response.text | |
| self.log_conversation(f"DEVELOPER (SYNTAX FIX {attempts})", fix_syntax_prompt, developer_output) | |
| code = self.extract_code(developer_output, "APP_PY") | |
| # requirements usually stay same, but could re-extract if needed | |
| if not requirements: | |
| requirements = "gradio==4.44.0\nnumpy>=1.24.0" | |
| self.shared_context["current_code"] = code | |
| return developer_output, code, requirements | |
| except Exception as e: | |
| return f"β Developer error: {str(e)}", "", "" | |
| def testing_phase(self, code): | |
| """Phase 2.5: Tester runs benchmarks and collects metrics""" | |
| tester_prompt = f"""Analyze this packet simulation code and generate a comprehensive test plan: | |
| CODE: | |
| ```python | |
| {code} | |
| ``` | |
| Generate: | |
| 1. Specific test scenarios to run | |
| 2. Expected performance characteristics | |
| 3. Metrics to collect | |
| 4. Benchmark methodology | |
| Provide hypothetical but realistic performance numbers based on the algorithm's design.""" | |
| try: | |
| tester_response = self.tester.generate_content(tester_prompt) | |
| tester_output = tester_response.text | |
| self.log_conversation("TESTER", tester_prompt, tester_output) | |
| metrics = self.extract_metrics_from_output(tester_output) | |
| return tester_output, metrics | |
| except Exception as e: | |
| return f"β Tester error: {str(e)}", {} | |
| def debugging_phase(self, code, architect_specs, test_metrics, real_metrics=None): | |
| """Phase 3: Debugger validates and measures REAL performance""" | |
| # Use real metrics if available, otherwise use test estimates | |
| metrics_to_use = real_metrics if real_metrics and real_metrics.get("source") == "real_api" else test_metrics | |
| debugger_prompt = f"""ARCHITECT'S SPECIFICATIONS: | |
| {architect_specs} | |
| METRICS (Source: {metrics_to_use.get('source', 'test_estimates') if isinstance(metrics_to_use, dict) else 'test_estimates'}): | |
| {json.dumps(metrics_to_use, indent=2)} | |
| DEVELOPER'S CODE: | |
| ```python | |
| {code} | |
| ``` | |
| Validate: | |
| 1. Code correctness and safety | |
| 2. Performance against specifications | |
| 3. Whether optimizations are effective | |
| 4. Any bugs or issues | |
| Provide a performance score (0-100) and recommendations.""" | |
| try: | |
| debugger_response = self.debugger.generate_content(debugger_prompt) | |
| debugger_output = debugger_response.text | |
| self.log_conversation("DEBUGGER", debugger_prompt, debugger_output) | |
| # Extract any additional metrics from debugger | |
| debugger_metrics = self.extract_metrics_from_output(debugger_output) | |
| # Merge metrics: real > debugger > test | |
| final_metrics = {**test_metrics} | |
| if debugger_metrics.get("performance_score", 0) > 0: | |
| final_metrics.update(debugger_metrics) | |
| if real_metrics and real_metrics.get("source") == "real_api": | |
| final_metrics.update(real_metrics) | |
| # Update performance history with REAL metrics if available | |
| self.shared_context["performance_history"].append({ | |
| "iteration": self.iteration, | |
| "metrics": final_metrics, | |
| "real_metrics": real_metrics if real_metrics else {}, | |
| "timestamp": datetime.now().isoformat() | |
| }) | |
| ready = "READY_FOR_DEPLOYMENT: YES" in debugger_output or final_metrics.get("performance_score", 0) >= 70 | |
| return debugger_output, ready, final_metrics | |
| except Exception as e: | |
| return f"β Debugger error: {str(e)}", False, {} | |
| def fix_phase(self, code, debugger_findings): | |
| """Phase 4: Developer fixes issues""" | |
| fix_prompt = f"""DEBUGGER'S FINDINGS: | |
| {debugger_findings} | |
| CURRENT CODE: | |
| ```python | |
| {code} | |
| ``` | |
| Fix all identified issues while maintaining or improving performance. | |
| Ensure the get_performance_metrics() function with api_name="get_metrics" is present. | |
| CRITICAL: DO NOT use placeholders like '...'. Write the FULL code.""" | |
| try: | |
| fix_response = self.developer.generate_content(fix_prompt) | |
| fix_output = fix_response.text | |
| self.log_conversation("DEVELOPER (FIX)", fix_prompt, fix_output) | |
| fixed_code = self.extract_code(fix_output, "APP_PY") | |
| # Local Validation Loop | |
| attempts = 0 | |
| while attempts < 3: | |
| is_valid, error_msg = self.validate_code_locally(fixed_code) | |
| if is_valid: | |
| break | |
| attempts += 1 | |
| print(f"β οΈ Fixed code still has syntax errors (Attempt {attempts}/3): {error_msg}") | |
| refix_prompt = f"""The fixed code still has a SYNTAX ERROR: | |
| ERROR: {error_msg} | |
| You likely used placeholders like '...' or left incomplete blocks. | |
| REWRITE the COMPLETE code fixing this error. Do NOT use placeholders.""" | |
| fix_response = self.developer.generate_content(refix_prompt) | |
| fix_output = fix_response.text | |
| self.log_conversation(f"DEVELOPER (RE-FIX {attempts})", refix_prompt, fix_output) | |
| fixed_code = self.extract_code(fix_output, "APP_PY") | |
| if not fixed_code: | |
| fixed_code = code | |
| self.shared_context["current_code"] = fixed_code | |
| return fix_output, fixed_code | |
| except Exception as e: | |
| return f"β Developer fix error: {str(e)}", code | |
| def autonomous_improvement_cycle(self): | |
| """Fully autonomous improvement cycle based on REAL performance metrics""" | |
| self.iteration += 1 | |
| # Get previous performance metrics | |
| prev_metrics = self.shared_context["performance_history"][-1] if self.shared_context["performance_history"] else {} | |
| prev_score = prev_metrics.get("metrics", {}).get("performance_score", 0) | |
| log = f"\nπ AUTONOMOUS IMPROVEMENT CYCLE {self.iteration}\n{'='*80}\n" | |
| log += f"Previous Performance Score: {prev_score}/100\n" | |
| log += f"Target: {prev_score + 10}/100\n\n" | |
| # Phase 1: Tester analyzes current performance | |
| log += "π§ͺ PHASE 1: TESTER ANALYZING CURRENT PERFORMANCE...\n" | |
| tester_prompt = f"""Analyze the current system performance: | |
| CURRENT CODE: | |
| ```python | |
| {self.shared_context['current_code']} | |
| ``` | |
| PERFORMANCE HISTORY: | |
| {json.dumps(self.shared_context['performance_history'][-3:], indent=2)} | |
| Identify: | |
| 1. Performance bottlenecks | |
| 2. Optimization opportunities | |
| 3. Specific metrics to improve | |
| 4. Realistic performance targets""" | |
| tester_output = self.tester.generate_content(tester_prompt).text | |
| self.log_conversation("TESTER", tester_prompt, tester_output) | |
| log += "β Performance analysis complete\n\n" | |
| # Phase 2: Architect designs optimizations | |
| log += "ποΈ PHASE 2: ARCHITECT DESIGNING OPTIMIZATIONS...\n" | |
| architect_prompt = f"""TESTER'S PERFORMANCE ANALYSIS: | |
| {tester_output} | |
| CURRENT PERFORMANCE METRICS: | |
| {json.dumps(prev_metrics.get('metrics', {}), indent=2)} | |
| OPTIMIZATION HISTORY: | |
| {json.dumps(self.shared_context['optimization_log'][-2:], indent=2)} | |
| Design specific optimizations to improve performance score by at least 10 points. | |
| Focus on the biggest bottlenecks identified.""" | |
| architect_output = self.architect.generate_content(architect_prompt).text | |
| self.log_conversation("ARCHITECT", architect_prompt, architect_output) | |
| self.shared_context["architecture_decisions"].append({ | |
| "iteration": self.iteration, | |
| "design": architect_output | |
| }) | |
| log += "β Optimization strategy complete\n\n" | |
| # Phase 3: Developer implements | |
| log += "π» PHASE 3: DEVELOPER IMPLEMENTING OPTIMIZATIONS...\n" | |
| dev_output, code, requirements = self.development_phase( | |
| architect_output, | |
| prev_metrics.get('metrics', {}) | |
| ) | |
| log += "β Implementation complete\n\n" | |
| # Phase 4: Tester benchmarks new version (estimates) | |
| log += "π§ͺ PHASE 4: TESTER BENCHMARKING NEW VERSION...\n" | |
| test_output, test_metrics = self.testing_phase(code) | |
| log += "β Benchmarking complete\n\n" | |
| # Phase 5: Deploy and collect REAL metrics | |
| log += "π PHASE 5: DEPLOYING TO COLLECT REAL METRICS...\n" | |
| deploy_status = self.deploy_to_space(code, requirements) | |
| log += f"{deploy_status}\n" | |
| log += "β³ Waiting for space to build...\n" | |
| success, build_msg = self.wait_for_space_build(timeout=180) | |
| log += f"{build_msg}\n" | |
| # Collect real metrics | |
| real_metrics = None | |
| if success: | |
| log += "\nπ COLLECTING REAL METRICS FROM DEPLOYED APP...\n" | |
| time.sleep(10) # Extra time for app to stabilize | |
| real_metrics = self.collect_real_metrics() | |
| if real_metrics.get("source") == "real_api": | |
| log += f"β Real metrics collected successfully!\n" | |
| log += f" Performance Score: {real_metrics.get('performance_score', 0)}/100\n" | |
| else: | |
| log += "β οΈ Could not collect real metrics, using estimates\n" | |
| else: | |
| log += "β οΈ Space failed to build, using test estimates\n" | |
| log += "\n" | |
| # Phase 6: Debugger validates with REAL metrics | |
| log += "π PHASE 6: DEBUGGER VALIDATING IMPROVEMENTS...\n" | |
| debug_output, ready, final_metrics = self.debugging_phase(code, architect_output, test_metrics, real_metrics) | |
| log += "β Validation complete\n\n" | |
| # Phase 7: Fix if needed | |
| if not ready: | |
| log += "π§ PHASE 7: DEVELOPER FIXING ISSUES...\n" | |
| fix_output, code = self.fix_phase(code, debug_output) | |
| log += "β Fixes applied\n\n" | |
| log += "π RE-DEPLOYING AND RE-TESTING...\n" | |
| deploy_status = self.deploy_to_space(code, requirements) | |
| success, build_msg = self.wait_for_space_build(timeout=180) | |
| if success: | |
| time.sleep(10) | |
| real_metrics = self.collect_real_metrics() | |
| test_output_2, test_metrics_2 = self.testing_phase(code) | |
| debug_output_2, ready, final_metrics = self.debugging_phase(code, architect_output, test_metrics_2, real_metrics) | |
| log += "β Re-testing complete\n\n" | |
| # Log optimization results | |
| new_score = final_metrics.get("performance_score", 0) | |
| improvement = new_score - prev_score | |
| self.shared_context["optimization_log"].append({ | |
| "iteration": self.iteration, | |
| "previous_score": prev_score, | |
| "new_score": new_score, | |
| "improvement": improvement, | |
| "optimizations": architect_output[:500], | |
| "real_metrics": real_metrics.get("source") == "real_api" if real_metrics else False | |
| }) | |
| log += f"π RESULTS:\n" | |
| log += f"Previous Score: {prev_score}/100\n" | |
| log += f"New Score: {new_score}/100\n" | |
| log += f"Improvement: {'+' if improvement >= 0 else ''}{improvement} points\n" | |
| log += f"Metrics Source: {final_metrics.get('source', 'estimates')}\n" | |
| return log, architect_output, dev_output, debug_output, code, requirements, final_metrics | |
| def extract_code(self, text, marker): | |
| """Extract code blocks from agent responses""" | |
| try: | |
| if marker == "APP_PY": | |
| if "```python" in text: | |
| start = text.find("```python") + len("```python") | |
| end = text.find("```", start) | |
| return text[start:end].strip() | |
| elif marker == "REQUIREMENTS": | |
| if "REQUIREMENTS:" in text: | |
| start = text.find("REQUIREMENTS:") + len("REQUIREMENTS:") | |
| if "```" in text[start:]: | |
| start = text.find("```", start) + 3 | |
| end = text.find("```", start) | |
| return text[start:end].strip() | |
| return "" | |
| except: | |
| return "" | |
| def deploy_to_space(self, code, requirements): | |
| """Deploy code to HuggingFace Space without overwriting README""" | |
| files = { | |
| "app.py": code, | |
| "requirements.txt": requirements | |
| } | |
| return self.upload_files_to_space(files) | |
| def get_performance_summary(self): | |
| """Get formatted performance history""" | |
| if not self.shared_context["performance_history"]: | |
| return "No performance data yet" | |
| summary = "π PERFORMANCE HISTORY\n" + "="*80 + "\n\n" | |
| for entry in self.shared_context["performance_history"]: | |
| metrics = entry["metrics"] | |
| real_metrics = entry.get("real_metrics", {}) | |
| source = real_metrics.get("source", "estimates") | |
| summary += f""" | |
| Iteration {entry['iteration']} - {entry['timestamp']} | |
| {'='*80} | |
| Metrics Source: {source} | |
| Performance Score: {metrics.get('performance_score', 'N/A')}/100 | |
| Throughput: {metrics.get('throughput', 'N/A')} packets/sec | |
| Avg Latency: {metrics.get('latency_avg', 'N/A')} ms | |
| P95 Latency: {metrics.get('latency_p95', 'N/A')} ms | |
| CPU Efficiency: {metrics.get('cpu_efficiency', 'N/A')} | |
| {'='*80} | |
| """ | |
| return summary | |
| def get_agent_communications(self): | |
| """Get formatted log of all inter-agent communications""" | |
| if not self.conversation_logs: | |
| return "No communications yet" | |
| log = "π¬ AGENT COMMUNICATION LOG\n" + "="*80 + "\n\n" | |
| for entry in self.conversation_logs[-10:]: | |
| log += f""" | |
| {'='*80} | |
| β° {entry['timestamp']} | |
| π€ AGENT: {entry['agent']} | |
| π₯ INPUT: | |
| {entry['input'][:300]}... | |
| π€ OUTPUT: | |
| {entry['output'][:500]}... | |
| {'='*80} | |
| """ | |
| return log | |
| def cleanup(self): | |
| """Cleanup temp directory""" | |
| try: | |
| shutil.rmtree(self.temp_dir) | |
| except: | |
| pass | |
| # Global system instance | |
| system = None | |
| def initialize_system(gemini_key, hf_token): | |
| """Initialize the multi-agent system""" | |
| global system | |
| try: | |
| system = MultiAgentSystem(gemini_key, hf_token) | |
| return "β Multi-agent system initialized!\n\nπ€ Agents:\nποΈ Architect\nπ» Developer\nπ Debugger\nπ§ͺ Tester" | |
| except Exception as e: | |
| return f"β Error initializing: {str(e)}" | |
| def run_initial_development(): | |
| """Run the initial development cycle with automatic error fixing and REAL metrics""" | |
| if system is None: | |
| return "β Initialize system first!", "", "", "", "", "", "" | |
| space_status = system.create_space() | |
| if "β" in space_status: | |
| return space_status, "", "", "", "", "", "" | |
| log = f"π ITERATION 1: INITIAL DEVELOPMENT\n{'='*80}\n\n" | |
| log += f"{space_status}\n\n" | |
| log += "ποΈ PHASE 1: ARCHITECT DESIGNING SYSTEM...\n" | |
| architect_output = system.initial_design_phase() | |
| log += f"β Architecture complete\n\n" | |
| log += "π» PHASE 2: DEVELOPER IMPLEMENTING...\n" | |
| dev_output, code, requirements = system.development_phase(architect_output) | |
| log += f"β Implementation complete\n\n" | |
| log += "π§ͺ PHASE 3: TESTER BENCHMARKING (ESTIMATES)...\n" | |
| test_output, test_metrics = system.testing_phase(code) | |
| log += f"β Benchmarking complete\n\n" | |
| log += "π PHASE 4: DEPLOYING...\n" | |
| deploy_status = system.deploy_to_space(code, requirements) | |
| log += f"{deploy_status}\n\n" | |
| log += "β³ WAITING FOR SPACE TO BUILD...\n" | |
| success, build_msg = system.wait_for_space_build(timeout=180) | |
| log += f"{build_msg}\n\n" | |
| # Auto-fix errors | |
| max_fix_attempts = 3 | |
| fix_attempt = 0 | |
| while not success and fix_attempt < max_fix_attempts: | |
| fix_attempt += 1 | |
| log += f"\nπ§ AUTOMATIC FIX ATTEMPT {fix_attempt}/{max_fix_attempts}\n" | |
| error_info = system.get_space_logs() | |
| fix_log, fixed_code, fixed_req = system.automatic_error_fix_cycle(error_info) | |
| log += fix_log | |
| if fixed_code: | |
| log += "\nπ RE-DEPLOYING WITH FIXES...\n" | |
| deploy_status = system.deploy_to_space(fixed_code, fixed_req) | |
| log += f"{deploy_status}\n\n" | |
| log += "β³ WAITING FOR SPACE TO BUILD...\n" | |
| success, build_msg = system.wait_for_space_build(timeout=180) | |
| log += f"{build_msg}\n\n" | |
| if success: | |
| code = fixed_code | |
| requirements = fixed_req | |
| break | |
| else: | |
| log += "β Could not generate fix\n" | |
| break | |
| # Collect REAL metrics from deployed app | |
| real_metrics = None | |
| if success: | |
| log += "\nπ COLLECTING REAL METRICS FROM DEPLOYED APP...\n" | |
| time.sleep(10) # Give app time to stabilize | |
| real_metrics = system.collect_real_metrics() | |
| if real_metrics.get("source") == "real_api": | |
| log += f"β Real metrics collected successfully!\n\n" | |
| else: | |
| log += "β οΈ Could not collect real metrics, using estimates\n\n" | |
| log += "π PHASE 5: DEBUGGER VALIDATING WITH REAL METRICS...\n" | |
| debug_output, ready, metrics = system.debugging_phase(code, architect_output, test_metrics, real_metrics) | |
| log += f"β Validation complete\n\n" | |
| if not ready: | |
| log += "π§ PHASE 6: DEVELOPER FIXING ISSUES...\n" | |
| fix_output, code = system.fix_phase(code, debug_output) | |
| log += f"β Fixes applied\n\n" | |
| log += "π RE-DEPLOYING AND RE-TESTING...\n" | |
| deploy_status = system.deploy_to_space(code, requirements) | |
| success, build_msg = system.wait_for_space_build(timeout=180) | |
| if success: | |
| time.sleep(10) | |
| real_metrics = system.collect_real_metrics() | |
| test_output_2, test_metrics_2 = system.testing_phase(code) | |
| debug_output_2, ready, metrics = system.debugging_phase(code, architect_output, test_metrics_2, real_metrics) | |
| debug_output += "\n\n--- AFTER FIXES ---\n" + debug_output_2 | |
| log += f"β Re-testing complete\n\n" | |
| log += f"π Space URL: https://huggingface.co/spaces/{system.space_name}\n\n" | |
| log += f"π Initial Performance Score: {metrics.get('performance_score', 'N/A')}/100\n" | |
| log += f"π Metrics Source: {metrics.get('source', 'estimates')}\n" | |
| return log, architect_output, dev_output, debug_output, code, deploy_status, f"Performance Score: {metrics.get('performance_score', 0)}/100 (Source: {metrics.get('source', 'estimates')})" | |
| def run_autonomous_improvement(): | |
| """Run autonomous improvement cycle with REAL metrics collection""" | |
| if system is None: | |
| return "β Initialize system first!", "", "", "", "", "", "" | |
| if system.iteration == 0: | |
| return "β Run initial development first!", "", "", "", "", "", "" | |
| log, arch_out, dev_out, debug_out, code, req, metrics = system.autonomous_improvement_cycle() | |
| log += f"\nπ Space URL: https://huggingface.co/spaces/{system.space_name}\n" | |
| score_summary = f"Performance Score: {metrics.get('performance_score', 0)}/100 (Source: {metrics.get('source', 'estimates')})" | |
| return log, arch_out, dev_out, debug_out, code, "", score_summary | |
| def get_performance_history(): | |
| """Get performance history""" | |
| if system is None: | |
| return "No performance data yet" | |
| return system.get_performance_summary() | |
| def get_communications(): | |
| """Get inter-agent communications""" | |
| if system is None: | |
| return "No communications yet" | |
| return system.get_agent_communications() | |
| # Create Gradio Interface | |
| with gr.Blocks(title="Autonomous Multi-Agent Optimizer") as demo: | |
| gr.Markdown(""" | |
| # π€ Autonomous Multi-Agent Performance Optimizer | |
| Watch AI agents autonomously optimize packet simulation algorithms through iterative improvements. | |
| ## The Team: | |
| - ποΈ **ARCHITECT**: Analyzes metrics and designs optimizations | |
| - π» **DEVELOPER**: Implements high-performance code | |
| - π **DEBUGGER**: Validates performance and quality | |
| - π§ͺ **TESTER**: Runs benchmarks and collects metrics | |
| **Fully Autonomous**: Agents optimize algorithms based on REAL performance scores from deployed apps! | |
| """) | |
| with gr.Tab("π Setup & Initial Build"): | |
| gr.Markdown("### Step 1: Initialize Multi-Agent System") | |
| with gr.Row(): | |
| gemini_api_key = gr.Textbox( | |
| label="Gemini API Key", | |
| placeholder="Enter your Google AI Studio API key", | |
| type="password" | |
| ) | |
| hf_token = gr.Textbox( | |
| label="Hugging Face Token", | |
| placeholder="Enter your HF token (write access)", | |
| type="password" | |
| ) | |
| init_btn = gr.Button("π€ Initialize Agent Team", variant="primary", size="lg") | |
| init_output = gr.Textbox(label="System Status", lines=5) | |
| init_btn.click( | |
| initialize_system, | |
| inputs=[gemini_api_key, hf_token], | |
| outputs=init_output | |
| ) | |
| gr.Markdown("### Step 2: Initial Development") | |
| gr.Markdown("Agents will autonomously design, build, deploy, and measure REAL performance metrics.") | |
| develop_btn = gr.Button("π― Start Autonomous Development", variant="primary", size="lg") | |
| dev_log = gr.Textbox(label="Development Log", lines=12) | |
| performance_display = gr.Textbox(label="Initial Performance Score", lines=2) | |
| with gr.Row(): | |
| with gr.Column(): | |
| architect_output = gr.Textbox(label="ποΈ Architect's Design", lines=8) | |
| with gr.Column(): | |
| developer_output = gr.Textbox(label="π» Developer's Implementation", lines=8) | |
| with gr.Row(): | |
| with gr.Column(): | |
| debugger_output = gr.Textbox(label="π Debugger's Report", lines=8) | |
| with gr.Column(): | |
| initial_code = gr.Code(label="Final Code", language="python", lines=8) | |
| deploy_status_1 = gr.Textbox(label="Deployment Status") | |
| develop_btn.click( | |
| run_initial_development, | |
| outputs=[dev_log, architect_output, developer_output, debugger_output, initial_code, deploy_status_1, performance_display] | |
| ) | |
| with gr.Tab("π Autonomous Optimization"): | |
| gr.Markdown(""" | |
| ### Fully Autonomous Improvement Cycles with REAL Metrics | |
| The agents will autonomously: | |
| 1. Tester analyzes current performance and identifies bottlenecks | |
| 2. Architect designs optimizations to improve scores | |
| 3. Developer implements performance improvements | |
| 4. **Deploy and collect REAL metrics from the running app** | |
| 5. Debugger validates improvements using REAL data | |
| 6. Measure actual score improvement | |
| **Goal**: Continuously improve performance scores through autonomous optimization with real feedback! | |
| """) | |
| gr.Markdown("### Current Performance") | |
| current_perf = gr.Textbox(label="Current Score", lines=2, value="Run initial development first") | |
| performance_display.change(lambda x: x, inputs=performance_display, outputs=current_perf) | |
| optimize_btn = gr.Button("π Run Autonomous Optimization Cycle", variant="primary", size="lg") | |
| optimization_log = gr.Textbox(label="Optimization Log", lines=12) | |
| new_perf = gr.Textbox(label="New Performance Score", lines=2) | |
| with gr.Row(): | |
| with gr.Column(): | |
| architect_improve = gr.Textbox(label="ποΈ Optimization Strategy", lines=8) | |
| with gr.Column(): | |
| developer_improve = gr.Textbox(label="π» Performance Improvements", lines=8) | |
| with gr.Row(): | |
| with gr.Column(): | |
| debugger_improve = gr.Textbox(label="π Performance Validation", lines=8) | |
| with gr.Column(): | |
| improved_code = gr.Code(label="Optimized Code", language="python", lines=8) | |
| deploy_status_2 = gr.Textbox(label="Deployment Status") | |
| new_perf.change(lambda x: x, inputs=new_perf, outputs=current_perf) | |
| optimize_btn.click( | |
| run_autonomous_improvement, | |
| outputs=[optimization_log, architect_improve, developer_improve, debugger_improve, improved_code, deploy_status_2, new_perf] | |
| ) | |
| gr.Markdown("### Tip: Run Multiple Cycles") | |
| gr.Markdown("Keep clicking 'Run Autonomous Optimization Cycle' to watch the agents iteratively improve based on REAL performance data! Each cycle aims to improve the score by 10+ points.") | |
| with gr.Tab("π Performance Analytics"): | |
| gr.Markdown(""" | |
| ### Performance History & Metrics | |
| View complete performance tracking across all iterations, including REAL vs estimated metrics. | |
| """) | |
| refresh_perf_btn = gr.Button("π Refresh Performance Data") | |
| perf_history = gr.Textbox(label="Performance History", lines=25) | |
| refresh_perf_btn.click( | |
| get_performance_history, | |
| outputs=perf_history | |
| ) | |
| with gr.Tab("π¬ Agent Communications"): | |
| gr.Markdown(""" | |
| ### Inter-Agent Communication Log | |
| See how agents communicate and share context. | |
| """) | |
| refresh_btn = gr.Button("π Refresh Communications") | |
| comms_output = gr.Textbox(label="Agent Communications", lines=30) | |
| refresh_btn.click( | |
| get_communications, | |
| outputs=comms_output | |
| ) | |
| gr.Markdown(""" | |
| --- | |
| ### How Autonomous Optimization Works: | |
| **Initial Development:** | |
| 1. Architect designs system architecture | |
| 2. Developer implements code with metrics API endpoint | |
| 3. Deploy to HuggingFace | |
| 4. **Collect REAL metrics from deployed app** | |
| 5. Debugger validates using real performance data | |
| **Optimization Cycles (Fully Autonomous with REAL metrics):** | |
| 1. Tester analyzes current metrics and identifies bottlenecks | |
| 2. Architect designs optimizations based on data | |
| 3. Developer implements performance improvements | |
| 4. Deploy new version | |
| 5. **Collect REAL metrics from the running app** | |
| 6. Debugger validates improvements using actual measured performance | |
| 7. Compare real performance gains | |
| **Performance Scoring (0-100):** | |
| - Throughput (packets/sec) | |
| - Latency (average, p95, p99) | |
| - CPU Efficiency | |
| - Algorithm Complexity | |
| - Code Quality | |
| **Key Features:** | |
| - Fully autonomous optimization (no human input needed) | |
| - **REAL performance metrics from deployed apps** | |
| - Performance-driven improvements based on actual data | |
| - Shared context across all agents | |
| - Iterative score improvements | |
| - Complete metric tracking | |
| - README preservation on deploy | |
| """) | |
| if __name__ == "__main__": | |
| demo.launch() |