import gradio as gr import google.generativeai as genai from huggingface_hub import HfApi, create_repo, upload_folder, space_info import os import time import json from datetime import datetime import tempfile import shutil import re import requests # Load Prompts from JSON PROMPTS = {} try: with open("prompts.json", "r") as f: PROMPTS = json.load(f) print("โœ… Loaded prompts from prompts.json") except Exception as e: print(f"โŒ Error loading prompts.json: {e}") # Define minimal fallback prompts if file load fails to prevent crash PROMPTS = { "architect": "You are the Architect Agent.", "developer": "You are the Developer Agent. Ensure you include a get_performance_metrics function with api_name='get_metrics'.", "debugger": "You are the Debugger Agent.", "tester": "You are the Tester Agent." } class MetricCollector: """Collects real metrics from deployed Gradio apps""" def __init__(self, space_name: str): self.space_name = space_name self.base_url = f"https://{space_name.replace('/', '-')}.hf.space" def wait_for_space_ready(self, timeout: int = 300) -> bool: """Wait for space to be accessible and responding""" start_time = time.time() print(f"๐Ÿ” Waiting for space at {self.base_url}") while time.time() - start_time < timeout: try: response = requests.get(self.base_url, timeout=10) if response.status_code == 200: print("โœ“ Space is responding to HTTP requests") # Verify the Gradio API is actually available try: info_response = requests.get(f"{self.base_url}/info", timeout=10) if info_response.status_code == 200: print("โœ“ Gradio API is available") # Give it extra time to fully initialize time.sleep(30) return True except: print("โณ Gradio API not ready yet...") except Exception as e: elapsed = int(time.time() - start_time) print(f"โณ Still waiting... ({elapsed}s) - {str(e)[:50]}") time.sleep(15) print(f"โŒ Timeout after {timeout}s") return False def test_app_functionality(self) -> bool: """Test if the app is actually functional by trying to use it""" print("๐Ÿงช Testing app functionality...") try: # Get available endpoints info_response = requests.get(f"{self.base_url}/info", timeout=10) if info_response.status_code != 200: print("โŒ Cannot fetch app info") return False info = info_response.json() endpoints = info.get('named_endpoints', {}) if not endpoints: print("โŒ No endpoints found") return False print(f"โœ“ Found {len(endpoints)} endpoints") # Try to call at least one endpoint to verify app works for endpoint_path in endpoints.keys(): endpoint_name = endpoint_path.lstrip('/') print(f"๐Ÿงช Testing endpoint: {endpoint_name}") try: test_result = self.call_gradio_api(endpoint_name, [], timeout=30) if test_result is not None: print(f"โœ“ Endpoint {endpoint_name} is functional") return True except Exception as e: print(f"โš ๏ธ Endpoint {endpoint_name} failed: {str(e)[:100]}") continue print("โŒ No functional endpoints found") return False except Exception as e: print(f"โŒ Functionality test failed: {str(e)}") return False def call_gradio_api(self, endpoint: str, data: list = None, timeout: int = 60): """Call a Gradio API endpoint with better error handling""" try: if data is None: data = [] api_url = f"{self.base_url}/api/{endpoint}" print(f"๐Ÿ“ก Calling API: {api_url}") response = requests.post( api_url, json={"data": data}, timeout=timeout, headers={"Content-Type": "application/json"} ) print(f"๐Ÿ“ก Response status: {response.status_code}") if response.status_code == 200: result = response.json() data_result = result.get("data", []) print(f"โœ“ API call successful, got {len(data_result)} data items") return data_result else: print(f"โš ๏ธ API call failed: {response.status_code}") print(f"Response: {response.text[:200]}") return None except requests.exceptions.Timeout: print(f"โš ๏ธ API call timed out after {timeout}s") return None except Exception as e: print(f"โš ๏ธ Error calling API: {str(e)}") return None def extract_metrics_from_app(self, max_retries: int = 3): """Extract real performance metrics from the deployed app with retries""" for attempt in range(max_retries): print(f"\n๐ŸŽฏ Metrics collection attempt {attempt + 1}/{max_retries}") metrics = { "throughput": 0, "latency_avg": 0, "latency_p50": 0, "latency_p95": 0, "latency_p99": 0, "cpu_efficiency": 0, "memory_usage": 0, "performance_score": 0, "source": "failed" } # Wait for space to be ready if not self.wait_for_space_ready(timeout=300): print(f"โš ๏ธ Attempt {attempt + 1}: Space not ready") if attempt < max_retries - 1: print("โณ Waiting 30s before retry...") time.sleep(30) continue # Test if app is functional if not self.test_app_functionality(): print(f"โš ๏ธ Attempt {attempt + 1}: App not functional") if attempt < max_retries - 1: print("โณ Waiting 30s before retry...") time.sleep(30) continue try: # Try to get the app's info endpoint info_response = requests.get(f"{self.base_url}/info", timeout=10) if info_response.status_code == 200: info = info_response.json() endpoints = info.get('named_endpoints', {}) print(f"โœ“ Found {len(endpoints)} named endpoints: {list(endpoints.keys())}") # Priority 1: Look for the get_metrics endpoint if '/get_metrics' in endpoints: print("๐ŸŽฏ Found get_metrics endpoint!") result = self.call_gradio_api("get_metrics", [], timeout=60) if result: print(f"๐Ÿ“Š Raw result from get_metrics: {str(result)[:200]}") metrics_data = self.parse_metrics_from_result(result) if metrics_data.get("performance_score", 0) > 0: metrics.update(metrics_data) metrics["source"] = "real_api" print(f"โœ… SUCCESS! Collected real metrics: Score {metrics['performance_score']}/100") return metrics else: print("โš ๏ธ Metrics parsed but score is 0") else: print("โš ๏ธ get_metrics returned None") else: print("โš ๏ธ 'get_metrics' endpoint NOT found in named_endpoints!") # Priority 2: Try common metric endpoint patterns metric_keywords = ['metric', 'benchmark', 'performance', 'test', 'stats'] for endpoint_path, endpoint_info in endpoints.items(): endpoint_name = endpoint_path.lstrip('/') if any(keyword in endpoint_name.lower() for keyword in metric_keywords): print(f"๐Ÿงช Trying endpoint: {endpoint_name}") result = self.call_gradio_api(endpoint_name, [], timeout=60) if result: print(f"๐Ÿ“Š Raw result from {endpoint_name}: {str(result)[:200]}") metrics_data = self.parse_metrics_from_result(result) if metrics_data.get("performance_score", 0) > 0: metrics.update(metrics_data) metrics["source"] = "real_api" print(f"โœ… SUCCESS! Collected metrics from {endpoint_name}") return metrics # Priority 3: Try ALL endpoints as a last resort print("๐Ÿ” Trying all available endpoints...") for endpoint_path in endpoints.keys(): endpoint_name = endpoint_path.lstrip('/') print(f"๐Ÿงช Trying endpoint: {endpoint_name}") try: result = self.call_gradio_api(endpoint_name, [], timeout=30) if result: metrics_data = self.parse_metrics_from_result(result) if metrics_data.get("performance_score", 0) > 0: metrics.update(metrics_data) metrics["source"] = "real_api" print(f"โœ… SUCCESS! Collected metrics from {endpoint_name}") return metrics except Exception as e: print(f"โš ๏ธ Endpoint {endpoint_name} failed: {str(e)[:50]}") continue except Exception as e: print(f"โš ๏ธ Attempt {attempt + 1} error: {str(e)}") if attempt < max_retries - 1: print("โณ Waiting 30s before retry...") time.sleep(30) # All attempts failed print("โŒ Failed to collect real metrics after all attempts") metrics["source"] = "placeholder" metrics["performance_score"] = 50 # Default baseline return metrics def parse_metrics_from_result(self, result): """Parse metrics from Gradio API result with improved handling""" metrics = {} print(f"๐Ÿ” Parsing result type: {type(result)}") # Result could be a list, dict, or string if isinstance(result, list) and len(result) > 0: item = result[0] print(f"๐Ÿ” List item type: {type(item)}") if isinstance(item, str): print(f"๐Ÿ” String content: {item[:200]}") # Try to parse as JSON try: parsed = json.loads(item) print(f"โœ“ Parsed as JSON: {parsed}") metrics.update(self.extract_metrics_from_dict(parsed)) except: print("โš ๏ธ Not valid JSON, trying regex") # Try regex parsing metrics.update(self.extract_metrics_from_text(item)) elif isinstance(item, dict): print(f"โœ“ Direct dict: {item}") metrics.update(self.extract_metrics_from_dict(item)) elif isinstance(result, dict): print(f"โœ“ Direct dict result: {result}") metrics.update(self.extract_metrics_from_dict(result)) elif isinstance(result, str): print(f"๐Ÿ” String result: {result[:200]}") try: parsed = json.loads(result) metrics.update(self.extract_metrics_from_dict(parsed)) except: metrics.update(self.extract_metrics_from_text(result)) print(f"๐Ÿ“Š Extracted metrics: {metrics}") return metrics def extract_metrics_from_dict(self, data: dict): """Extract metrics from dictionary data""" metrics = {} # Common metric keys key_mappings = { 'throughput': ['throughput', 'packets_per_sec', 'pps', 'packets_sec'], 'latency_avg': ['latency_avg', 'avg_latency', 'latency_mean', 'mean_latency'], 'latency_p50': ['latency_p50', 'p50', 'median_latency', 'latency_median'], 'latency_p95': ['latency_p95', 'p95', 'latency_95'], 'latency_p99': ['latency_p99', 'p99', 'latency_99'], 'cpu_efficiency': ['cpu_efficiency', 'efficiency', 'ops_per_packet', 'cpu_ops'], 'memory_usage': ['memory_usage', 'memory_mb', 'memory', 'mem_usage'], 'performance_score': ['performance_score', 'score', 'total_score', 'overall_score'] } for metric_key, possible_keys in key_mappings.items(): for key in possible_keys: if key in data: try: value = float(data[key]) metrics[metric_key] = value print(f"โœ“ Found {metric_key} = {value}") break except (ValueError, TypeError): pass return metrics def extract_metrics_from_text(self, text: str): """Extract metrics from text output using regex""" metrics = {} patterns = { 'throughput': r'throughput[:\s]+([0-9,.]+)', 'latency_avg': r'(?:avg|average|mean)\s*latency[:\s]+([0-9.]+)', 'latency_p50': r'p50[:\s]+([0-9.]+)', 'latency_p95': r'p95[:\s]+([0-9.]+)', 'latency_p99': r'p99[:\s]+([0-9.]+)', 'cpu_efficiency': r'(?:cpu[_\s]*)?efficiency[:\s]+([0-9.]+)', 'performance_score': r'(?:performance[_\s]+)?score[:\s]+([0-9]+)' } for key, pattern in patterns.items(): match = re.search(pattern, text, re.IGNORECASE) if match: try: value = match.group(1).replace(',', '') metrics[key] = float(value) print(f"โœ“ Regex found {key} = {value}") except: pass return metrics class MultiAgentSystem: def __init__(self, gemini_api_key, hf_token): self.hf_api = HfApi(token=hf_token) self.hf_token = hf_token genai.configure(api_key=gemini_api_key) # Initialize four specialized agents with prompts loaded from JSON self.architect = genai.GenerativeModel( model_name='gemini-2.5-flash', system_instruction=PROMPTS.get("architect", "You are the Architect Agent.") ) self.developer = genai.GenerativeModel( model_name='gemini-2.5-flash', system_instruction=PROMPTS.get("developer", "You are the Developer Agent.") ) self.debugger = genai.GenerativeModel( model_name='gemini-2.5-flash', system_instruction=PROMPTS.get("debugger", "You are the Debugger Agent.") ) self.tester = genai.GenerativeModel( model_name='gemini-2.5-flash', system_instruction=PROMPTS.get("tester", "You are the Tester Agent.") ) self.space_name = None self.iteration = 0 self.shared_context = { "current_code": "", "performance_history": [], "architecture_decisions": [], "known_issues": [], "optimization_log": [] } self.temp_dir = tempfile.mkdtemp() self.conversation_logs = [] self.target_improvement = 0 def log_conversation(self, agent_name, input_msg, output_msg): """Log inter-agent communication""" self.conversation_logs.append({ "timestamp": datetime.now().isoformat(), "agent": agent_name, "input": input_msg[:1000], "output": output_msg[:2000] }) def create_space(self): """Create a new HuggingFace Space""" try: timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") # Create a simplified space name name_suffix = f"optimizer-{timestamp}" user_info = self.hf_api.whoami() username = user_info['name'] # Full repo ID repo_id = f"{username}/packet-{name_suffix}" print(f"Creating Space: {repo_id}") create_repo( repo_id=repo_id, repo_type="space", space_sdk="gradio", token=self.hf_token, private=False ) self.space_name = repo_id return f"โœ“ Created Space: {repo_id}" except Exception as e: return f"โœ— Error creating space: {str(e)}" def upload_files_to_space(self, files): """Upload files to the created space""" if not self.space_name: return "โœ— No space created" try: # Create temp directory for upload upload_dir = os.path.join(self.temp_dir, "upload") if os.path.exists(upload_dir): shutil.rmtree(upload_dir) os.makedirs(upload_dir) # Write files for filename, content in files.items(): with open(os.path.join(upload_dir, filename), "w") as f: f.write(content) # Upload print(f"Uploading to {self.space_name}...") self.hf_api.upload_folder( folder_path=upload_dir, repo_id=self.space_name, repo_type="space", token=self.hf_token ) return f"โœ“ Deployed to {self.space_name}" except Exception as e: return f"โœ— Error uploading: {str(e)}" def get_space_logs(self): """Get logs from the space (best effort)""" if not self.space_name: return {"error": "No space"} try: info = space_info(self.space_name, token=self.hf_token) runtime = info.runtime return { "stage": runtime.stage if runtime else "UNKNOWN", "error": getattr(runtime, "error", None), "message": "Check HF Space logs for details" } except Exception as e: return {"error": str(e)} def validate_code_locally(self, code): """Validate code syntax and required endpoints locally before deploying""" try: if not code or len(code.strip()) == 0: return False, "Code is empty" # Check for API endpoint requirement if 'api_name="get_metrics"' not in code and "api_name='get_metrics'" not in code: return False, "Code is missing the required api_name=\"get_metrics\" argument in the Gradio button." # Check for placeholders that cause syntax errors if "..." in code or "pass # Implement" in code: return False, "Code contains placeholders (...) or incomplete blocks which will cause runtime errors." # Check for valid syntax compile(code, '', 'exec') return True, "Valid Python Syntax" except Exception as e: return False, f"Syntax Error: {str(e)}" def collect_real_metrics(self): """Collect real metrics from deployed space with better validation""" if not self.space_name: print("โš ๏ธ No space deployed yet") return { "throughput": 0, "latency_avg": 0, "performance_score": 0, "source": "no_space" } print(f"\n{'='*80}") print(f"๐Ÿ” COLLECTING REAL METRICS FROM: {self.space_name}") print(f"{'='*80}\n") collector = MetricCollector(self.space_name) metrics = collector.extract_metrics_from_app(max_retries=3) print(f"\n{'='*80}") print("๐Ÿ“Š FINAL COLLECTED METRICS:") print(f" Source: {metrics.get('source', 'unknown')}") print(f" Throughput: {metrics.get('throughput', 0):,.0f} packets/sec") print(f" Avg Latency: {metrics.get('latency_avg', 0):.2f} ms") print(f" P95 Latency: {metrics.get('latency_p95', 0):.2f} ms") print(f" Performance Score: {metrics.get('performance_score', 0)}/100") print(f"{'='*80}\n") # If we got placeholder metrics, try one more time after a longer wait if metrics.get("source") == "placeholder": print("โš ๏ธ Got placeholder metrics, trying once more after 60s wait...") time.sleep(60) metrics = collector.extract_metrics_from_app(max_retries=1) return metrics def wait_for_space_build(self, timeout=300): """Wait for space to build and verify it's functional""" if not self.space_name: return False, "No space created" start_time = time.time() print(f"\nโณ Waiting for space to build (timeout: {timeout}s)...") while time.time() - start_time < timeout: try: info = space_info(self.space_name, token=self.hf_token) runtime = info.runtime if hasattr(info, 'runtime') else None if runtime and hasattr(runtime, 'stage'): stage = runtime.stage elapsed = int(time.time() - start_time) print(f" Stage: {stage} ({elapsed}s elapsed)") if stage == "RUNNING": print("โœ“ Space is RUNNING!") # Additional verification: test if app is actually functional print("๐Ÿงช Verifying app functionality...") time.sleep(15) # Give it time to fully start collector = MetricCollector(self.space_name) if collector.test_app_functionality(): print("โœ… Space is running AND functional!") return True, "Space is running and functional" else: print("โš ๏ธ Space is running but app may not be functional yet, waiting...") time.sleep(20) continue elif stage in ["RUNTIME_ERROR", "BUILD_ERROR"]: error_msg = getattr(runtime, 'error', 'Unknown error') return False, f"Space error: {stage} - {error_msg}" time.sleep(15) except Exception as e: print(f" Error checking status: {str(e)[:50]}") time.sleep(15) continue return False, f"Timeout after {timeout}s" def automatic_error_fix_cycle(self, error_info): """Automatically fix errors detected in deployed space""" log = "\n๐Ÿ”ง AUTOMATIC ERROR DETECTION AND FIX\n" + "="*80 + "\n" log += f"Error detected: {error_info}\n\n" log += "๐Ÿ” PHASE 1: DEBUGGER ANALYZING ERROR...\n" debugger_prompt = f"""The deployed space has encountered an error: ERROR INFO: {json.dumps(error_info, indent=2)} CURRENT CODE: ```python {self.shared_context['current_code']} ``` Analyze: 1. What is causing the error 2. Root cause analysis 3. Specific fixes needed Provide detailed debugging information.""" try: debugger_response = self.debugger.generate_content(debugger_prompt) debugger_output = debugger_response.text self.log_conversation("DEBUGGER (ERROR)", debugger_prompt, debugger_output) log += "โœ“ Error analysis complete\n\n" except Exception as e: return log + f"โœ— Error during analysis: {str(e)}\n", None, None log += "๐Ÿ’ป PHASE 2: DEVELOPER FIXING ERROR...\n" fix_prompt = f"""DEBUGGER'S ERROR ANALYSIS: {debugger_output} ERROR INFO: {json.dumps(error_info, indent=2)} CURRENT CODE: ```python {self.shared_context['current_code']} ``` Fix the error and provide corrected code. Ensure: 1. The specific error is resolved 2. Code is syntactically correct 3. All imports are included 4. The app will run without errors 5. The get_performance_metrics() function with api_name="get_metrics" is included 6. CRITICAL: DO NOT use placeholders like '...'""" try: fix_response = self.developer.generate_content(fix_prompt) fix_output = fix_response.text self.log_conversation("DEVELOPER (ERROR FIX)", fix_prompt, fix_output) fixed_code = self.extract_code(fix_output, "APP_PY") requirements = self.extract_code(fix_output, "REQUIREMENTS") # Local Validation Loop attempts = 0 while attempts < 3: is_valid, error_msg = self.validate_code_locally(fixed_code) if is_valid: break attempts += 1 print(f"โš ๏ธ Fixed code still has syntax errors (Attempt {attempts}/3): {error_msg}") refix_prompt = f"""The fixed code still has a SYNTAX ERROR: ERROR: {error_msg} You likely used placeholders like '...' or left incomplete blocks. REWRITE the COMPLETE code fixing this error. Do NOT use placeholders.""" fix_response = self.developer.generate_content(refix_prompt) fix_output = fix_response.text self.log_conversation(f"DEVELOPER (RE-FIX {attempts})", refix_prompt, fix_output) fixed_code = self.extract_code(fix_output, "APP_PY") if not fixed_code: fixed_code = self.shared_context['current_code'] if not requirements: requirements = "gradio==4.44.0\nnumpy>=1.24.0" self.shared_context['current_code'] = fixed_code log += "โœ“ Error fix implemented\n\n" return log, fixed_code, requirements except Exception as e: return log + f"โœ— Error during fix: {str(e)}\n", None, None def extract_metrics_from_output(self, text): """Extract performance metrics from agent outputs (fallback)""" metrics = { "throughput": 0, "latency_avg": 0, "latency_p95": 0, "cpu_efficiency": 0, "performance_score": 0 } throughput_match = re.search(r'Throughput[:\s]+([0-9,.]+)', text, re.IGNORECASE) if throughput_match: try: metrics["throughput"] = float(throughput_match.group(1).replace(',', '')) except: pass latency_match = re.search(r'Avg\s+Latency[:\s]+([0-9.]+)', text, re.IGNORECASE) if latency_match: try: metrics["latency_avg"] = float(latency_match.group(1)) except: pass p95_match = re.search(r'P95\s+Latency[:\s]+([0-9.]+)', text, re.IGNORECASE) if p95_match: try: metrics["latency_p95"] = float(p95_match.group(1)) except: pass score_match = re.search(r'PERFORMANCE[_\s]+SCORE[:\s]+([0-9]+)', text, re.IGNORECASE) if score_match: try: metrics["performance_score"] = int(score_match.group(1)) except: pass return metrics def initial_design_phase(self): """Phase 1: Architect designs initial system""" self.iteration += 1 architect_prompt = """Design the initial packet simulation algorithm system optimized for high performance. Consider: - Multiple algorithm approaches (basic, batched, optimized) - Performance metrics to track (throughput, latency, efficiency) - Benchmarking capabilities built into the UI - Clear performance visualizations CRITICAL: The app MUST include a get_performance_metrics() function that returns JSON metrics. Focus on creating a baseline that we can iteratively improve. Target initial performance score: 60/100""" try: architect_response = self.architect.generate_content(architect_prompt) architect_output = architect_response.text self.log_conversation("ARCHITECT", architect_prompt, architect_output) self.shared_context["architecture_decisions"].append({ "iteration": self.iteration, "design": architect_output, "timestamp": datetime.now().isoformat() }) return architect_output except Exception as e: return f"โœ— Architect error: {str(e)}" def development_phase(self, architect_specs, previous_metrics=None): """Phase 2: Developer implements with performance focus""" context = f"""ARCHITECT'S SPECIFICATIONS: {architect_specs} PERFORMANCE CONTEXT: """ if previous_metrics: context += f"Previous iteration metrics: {json.dumps(previous_metrics, indent=2)}\n" context += "Your goal is to exceed these metrics.\n" else: context += "This is the initial implementation. Focus on creating a solid baseline with good instrumentation.\n" developer_prompt = f"""{context} Implement a complete Gradio application with: 1. Multiple packet simulation algorithms (at least 3) 2. A get_performance_metrics() function with api_name="get_metrics" that returns JSON 3. Built-in benchmarking tools 4. Real-time performance visualization 5. Detailed timing measurements CRITICAL: Include the get_performance_metrics() function exactly as specified in your system prompt. CRITICAL: Write the FULL, COMPLETE code. Do not use placeholders like '...' or 'pass' for incomplete blocks. Remember: Simulate packets safely (NO real network operations)""" try: developer_response = self.developer.generate_content(developer_prompt) developer_output = developer_response.text self.log_conversation("DEVELOPER", developer_prompt, developer_output) code = self.extract_code(developer_output, "APP_PY") requirements = self.extract_code(developer_output, "REQUIREMENTS") # Local Validation Loop attempts = 0 while attempts < 3: is_valid, error_msg = self.validate_code_locally(code) if is_valid: break attempts += 1 print(f"โš ๏ธ Generated code has syntax errors (Attempt {attempts}/3): {error_msg}") fix_syntax_prompt = f"""The code you wrote has a SYNTAX ERROR and cannot run. ERROR: {error_msg} You likely used placeholders like '...' or left incomplete blocks. REWRITE the COMPLETE code fixing this error. Do NOT use placeholders.""" developer_response = self.developer.generate_content(fix_syntax_prompt) developer_output = developer_response.text self.log_conversation(f"DEVELOPER (SYNTAX FIX {attempts})", fix_syntax_prompt, developer_output) code = self.extract_code(developer_output, "APP_PY") # requirements usually stay same, but could re-extract if needed if not requirements: requirements = "gradio==4.44.0\nnumpy>=1.24.0" self.shared_context["current_code"] = code return developer_output, code, requirements except Exception as e: return f"โœ— Developer error: {str(e)}", "", "" def testing_phase(self, code): """Phase 2.5: Tester runs benchmarks and collects metrics""" tester_prompt = f"""Analyze this packet simulation code and generate a comprehensive test plan: CODE: ```python {code} ``` Generate: 1. Specific test scenarios to run 2. Expected performance characteristics 3. Metrics to collect 4. Benchmark methodology Provide hypothetical but realistic performance numbers based on the algorithm's design.""" try: tester_response = self.tester.generate_content(tester_prompt) tester_output = tester_response.text self.log_conversation("TESTER", tester_prompt, tester_output) metrics = self.extract_metrics_from_output(tester_output) return tester_output, metrics except Exception as e: return f"โœ— Tester error: {str(e)}", {} def debugging_phase(self, code, architect_specs, test_metrics, real_metrics=None): """Phase 3: Debugger validates and measures REAL performance""" # Use real metrics if available, otherwise use test estimates metrics_to_use = real_metrics if real_metrics and real_metrics.get("source") == "real_api" else test_metrics debugger_prompt = f"""ARCHITECT'S SPECIFICATIONS: {architect_specs} METRICS (Source: {metrics_to_use.get('source', 'test_estimates') if isinstance(metrics_to_use, dict) else 'test_estimates'}): {json.dumps(metrics_to_use, indent=2)} DEVELOPER'S CODE: ```python {code} ``` Validate: 1. Code correctness and safety 2. Performance against specifications 3. Whether optimizations are effective 4. Any bugs or issues Provide a performance score (0-100) and recommendations.""" try: debugger_response = self.debugger.generate_content(debugger_prompt) debugger_output = debugger_response.text self.log_conversation("DEBUGGER", debugger_prompt, debugger_output) # Extract any additional metrics from debugger debugger_metrics = self.extract_metrics_from_output(debugger_output) # Merge metrics: real > debugger > test final_metrics = {**test_metrics} if debugger_metrics.get("performance_score", 0) > 0: final_metrics.update(debugger_metrics) if real_metrics and real_metrics.get("source") == "real_api": final_metrics.update(real_metrics) # Update performance history with REAL metrics if available self.shared_context["performance_history"].append({ "iteration": self.iteration, "metrics": final_metrics, "real_metrics": real_metrics if real_metrics else {}, "timestamp": datetime.now().isoformat() }) ready = "READY_FOR_DEPLOYMENT: YES" in debugger_output or final_metrics.get("performance_score", 0) >= 70 return debugger_output, ready, final_metrics except Exception as e: return f"โœ— Debugger error: {str(e)}", False, {} def fix_phase(self, code, debugger_findings): """Phase 4: Developer fixes issues""" fix_prompt = f"""DEBUGGER'S FINDINGS: {debugger_findings} CURRENT CODE: ```python {code} ``` Fix all identified issues while maintaining or improving performance. Ensure the get_performance_metrics() function with api_name="get_metrics" is present. CRITICAL: DO NOT use placeholders like '...'. Write the FULL code.""" try: fix_response = self.developer.generate_content(fix_prompt) fix_output = fix_response.text self.log_conversation("DEVELOPER (FIX)", fix_prompt, fix_output) fixed_code = self.extract_code(fix_output, "APP_PY") # Local Validation Loop attempts = 0 while attempts < 3: is_valid, error_msg = self.validate_code_locally(fixed_code) if is_valid: break attempts += 1 print(f"โš ๏ธ Fixed code still has syntax errors (Attempt {attempts}/3): {error_msg}") refix_prompt = f"""The fixed code still has a SYNTAX ERROR: ERROR: {error_msg} You likely used placeholders like '...' or left incomplete blocks. REWRITE the COMPLETE code fixing this error. Do NOT use placeholders.""" fix_response = self.developer.generate_content(refix_prompt) fix_output = fix_response.text self.log_conversation(f"DEVELOPER (RE-FIX {attempts})", refix_prompt, fix_output) fixed_code = self.extract_code(fix_output, "APP_PY") if not fixed_code: fixed_code = code self.shared_context["current_code"] = fixed_code return fix_output, fixed_code except Exception as e: return f"โœ— Developer fix error: {str(e)}", code def autonomous_improvement_cycle(self): """Fully autonomous improvement cycle based on REAL performance metrics""" self.iteration += 1 # Get previous performance metrics prev_metrics = self.shared_context["performance_history"][-1] if self.shared_context["performance_history"] else {} prev_score = prev_metrics.get("metrics", {}).get("performance_score", 0) log = f"\n๐Ÿš€ AUTONOMOUS IMPROVEMENT CYCLE {self.iteration}\n{'='*80}\n" log += f"Previous Performance Score: {prev_score}/100\n" log += f"Target: {prev_score + 10}/100\n\n" # Phase 1: Tester analyzes current performance log += "๐Ÿงช PHASE 1: TESTER ANALYZING CURRENT PERFORMANCE...\n" tester_prompt = f"""Analyze the current system performance: CURRENT CODE: ```python {self.shared_context['current_code']} ``` PERFORMANCE HISTORY: {json.dumps(self.shared_context['performance_history'][-3:], indent=2)} Identify: 1. Performance bottlenecks 2. Optimization opportunities 3. Specific metrics to improve 4. Realistic performance targets""" tester_output = self.tester.generate_content(tester_prompt).text self.log_conversation("TESTER", tester_prompt, tester_output) log += "โœ“ Performance analysis complete\n\n" # Phase 2: Architect designs optimizations log += "๐Ÿ›๏ธ PHASE 2: ARCHITECT DESIGNING OPTIMIZATIONS...\n" architect_prompt = f"""TESTER'S PERFORMANCE ANALYSIS: {tester_output} CURRENT PERFORMANCE METRICS: {json.dumps(prev_metrics.get('metrics', {}), indent=2)} OPTIMIZATION HISTORY: {json.dumps(self.shared_context['optimization_log'][-2:], indent=2)} Design specific optimizations to improve performance score by at least 10 points. Focus on the biggest bottlenecks identified.""" architect_output = self.architect.generate_content(architect_prompt).text self.log_conversation("ARCHITECT", architect_prompt, architect_output) self.shared_context["architecture_decisions"].append({ "iteration": self.iteration, "design": architect_output }) log += "โœ“ Optimization strategy complete\n\n" # Phase 3: Developer implements log += "๐Ÿ’ป PHASE 3: DEVELOPER IMPLEMENTING OPTIMIZATIONS...\n" dev_output, code, requirements = self.development_phase( architect_output, prev_metrics.get('metrics', {}) ) log += "โœ“ Implementation complete\n\n" # Phase 4: Tester benchmarks new version (estimates) log += "๐Ÿงช PHASE 4: TESTER BENCHMARKING NEW VERSION...\n" test_output, test_metrics = self.testing_phase(code) log += "โœ“ Benchmarking complete\n\n" # Phase 5: Deploy and collect REAL metrics log += "๐Ÿš€ PHASE 5: DEPLOYING TO COLLECT REAL METRICS...\n" deploy_status = self.deploy_to_space(code, requirements) log += f"{deploy_status}\n" log += "โณ Waiting for space to build...\n" success, build_msg = self.wait_for_space_build(timeout=180) log += f"{build_msg}\n" # Collect real metrics real_metrics = None if success: log += "\n๐Ÿ“Š COLLECTING REAL METRICS FROM DEPLOYED APP...\n" time.sleep(10) # Extra time for app to stabilize real_metrics = self.collect_real_metrics() if real_metrics.get("source") == "real_api": log += f"โœ“ Real metrics collected successfully!\n" log += f" Performance Score: {real_metrics.get('performance_score', 0)}/100\n" else: log += "โš ๏ธ Could not collect real metrics, using estimates\n" else: log += "โš ๏ธ Space failed to build, using test estimates\n" log += "\n" # Phase 6: Debugger validates with REAL metrics log += "๐Ÿ” PHASE 6: DEBUGGER VALIDATING IMPROVEMENTS...\n" debug_output, ready, final_metrics = self.debugging_phase(code, architect_output, test_metrics, real_metrics) log += "โœ“ Validation complete\n\n" # Phase 7: Fix if needed if not ready: log += "๐Ÿ”ง PHASE 7: DEVELOPER FIXING ISSUES...\n" fix_output, code = self.fix_phase(code, debug_output) log += "โœ“ Fixes applied\n\n" log += "๐Ÿ”„ RE-DEPLOYING AND RE-TESTING...\n" deploy_status = self.deploy_to_space(code, requirements) success, build_msg = self.wait_for_space_build(timeout=180) if success: time.sleep(10) real_metrics = self.collect_real_metrics() test_output_2, test_metrics_2 = self.testing_phase(code) debug_output_2, ready, final_metrics = self.debugging_phase(code, architect_output, test_metrics_2, real_metrics) log += "โœ“ Re-testing complete\n\n" # Log optimization results new_score = final_metrics.get("performance_score", 0) improvement = new_score - prev_score self.shared_context["optimization_log"].append({ "iteration": self.iteration, "previous_score": prev_score, "new_score": new_score, "improvement": improvement, "optimizations": architect_output[:500], "real_metrics": real_metrics.get("source") == "real_api" if real_metrics else False }) log += f"๐Ÿ“ˆ RESULTS:\n" log += f"Previous Score: {prev_score}/100\n" log += f"New Score: {new_score}/100\n" log += f"Improvement: {'+' if improvement >= 0 else ''}{improvement} points\n" log += f"Metrics Source: {final_metrics.get('source', 'estimates')}\n" return log, architect_output, dev_output, debug_output, code, requirements, final_metrics def extract_code(self, text, marker): """Extract code blocks from agent responses""" try: if marker == "APP_PY": if "```python" in text: start = text.find("```python") + len("```python") end = text.find("```", start) return text[start:end].strip() elif marker == "REQUIREMENTS": if "REQUIREMENTS:" in text: start = text.find("REQUIREMENTS:") + len("REQUIREMENTS:") if "```" in text[start:]: start = text.find("```", start) + 3 end = text.find("```", start) return text[start:end].strip() return "" except: return "" def deploy_to_space(self, code, requirements): """Deploy code to HuggingFace Space without overwriting README""" files = { "app.py": code, "requirements.txt": requirements } return self.upload_files_to_space(files) def get_performance_summary(self): """Get formatted performance history""" if not self.shared_context["performance_history"]: return "No performance data yet" summary = "๐Ÿ“Š PERFORMANCE HISTORY\n" + "="*80 + "\n\n" for entry in self.shared_context["performance_history"]: metrics = entry["metrics"] real_metrics = entry.get("real_metrics", {}) source = real_metrics.get("source", "estimates") summary += f""" Iteration {entry['iteration']} - {entry['timestamp']} {'='*80} Metrics Source: {source} Performance Score: {metrics.get('performance_score', 'N/A')}/100 Throughput: {metrics.get('throughput', 'N/A')} packets/sec Avg Latency: {metrics.get('latency_avg', 'N/A')} ms P95 Latency: {metrics.get('latency_p95', 'N/A')} ms CPU Efficiency: {metrics.get('cpu_efficiency', 'N/A')} {'='*80} """ return summary def get_agent_communications(self): """Get formatted log of all inter-agent communications""" if not self.conversation_logs: return "No communications yet" log = "๐Ÿ’ฌ AGENT COMMUNICATION LOG\n" + "="*80 + "\n\n" for entry in self.conversation_logs[-10:]: log += f""" {'='*80} โฐ {entry['timestamp']} ๐Ÿค– AGENT: {entry['agent']} ๐Ÿ“ฅ INPUT: {entry['input'][:300]}... ๐Ÿ“ค OUTPUT: {entry['output'][:500]}... {'='*80} """ return log def cleanup(self): """Cleanup temp directory""" try: shutil.rmtree(self.temp_dir) except: pass # Global system instance system = None def initialize_system(gemini_key, hf_token): """Initialize the multi-agent system""" global system try: system = MultiAgentSystem(gemini_key, hf_token) return "โœ“ Multi-agent system initialized!\n\n๐Ÿค– Agents:\n๐Ÿ›๏ธ Architect\n๐Ÿ’ป Developer\n๐Ÿ” Debugger\n๐Ÿงช Tester" except Exception as e: return f"โœ— Error initializing: {str(e)}" def run_initial_development(): """Run the initial development cycle with automatic error fixing and REAL metrics""" if system is None: return "โœ— Initialize system first!", "", "", "", "", "", "" space_status = system.create_space() if "โœ—" in space_status: return space_status, "", "", "", "", "", "" log = f"๐Ÿš€ ITERATION 1: INITIAL DEVELOPMENT\n{'='*80}\n\n" log += f"{space_status}\n\n" log += "๐Ÿ›๏ธ PHASE 1: ARCHITECT DESIGNING SYSTEM...\n" architect_output = system.initial_design_phase() log += f"โœ“ Architecture complete\n\n" log += "๐Ÿ’ป PHASE 2: DEVELOPER IMPLEMENTING...\n" dev_output, code, requirements = system.development_phase(architect_output) log += f"โœ“ Implementation complete\n\n" log += "๐Ÿงช PHASE 3: TESTER BENCHMARKING (ESTIMATES)...\n" test_output, test_metrics = system.testing_phase(code) log += f"โœ“ Benchmarking complete\n\n" log += "๐Ÿš€ PHASE 4: DEPLOYING...\n" deploy_status = system.deploy_to_space(code, requirements) log += f"{deploy_status}\n\n" log += "โณ WAITING FOR SPACE TO BUILD...\n" success, build_msg = system.wait_for_space_build(timeout=180) log += f"{build_msg}\n\n" # Auto-fix errors max_fix_attempts = 3 fix_attempt = 0 while not success and fix_attempt < max_fix_attempts: fix_attempt += 1 log += f"\n๐Ÿ”ง AUTOMATIC FIX ATTEMPT {fix_attempt}/{max_fix_attempts}\n" error_info = system.get_space_logs() fix_log, fixed_code, fixed_req = system.automatic_error_fix_cycle(error_info) log += fix_log if fixed_code: log += "\n๐Ÿš€ RE-DEPLOYING WITH FIXES...\n" deploy_status = system.deploy_to_space(fixed_code, fixed_req) log += f"{deploy_status}\n\n" log += "โณ WAITING FOR SPACE TO BUILD...\n" success, build_msg = system.wait_for_space_build(timeout=180) log += f"{build_msg}\n\n" if success: code = fixed_code requirements = fixed_req break else: log += "โœ— Could not generate fix\n" break # Collect REAL metrics from deployed app real_metrics = None if success: log += "\n๐Ÿ“Š COLLECTING REAL METRICS FROM DEPLOYED APP...\n" time.sleep(10) # Give app time to stabilize real_metrics = system.collect_real_metrics() if real_metrics.get("source") == "real_api": log += f"โœ“ Real metrics collected successfully!\n\n" else: log += "โš ๏ธ Could not collect real metrics, using estimates\n\n" log += "๐Ÿ” PHASE 5: DEBUGGER VALIDATING WITH REAL METRICS...\n" debug_output, ready, metrics = system.debugging_phase(code, architect_output, test_metrics, real_metrics) log += f"โœ“ Validation complete\n\n" if not ready: log += "๐Ÿ”ง PHASE 6: DEVELOPER FIXING ISSUES...\n" fix_output, code = system.fix_phase(code, debug_output) log += f"โœ“ Fixes applied\n\n" log += "๐Ÿ”„ RE-DEPLOYING AND RE-TESTING...\n" deploy_status = system.deploy_to_space(code, requirements) success, build_msg = system.wait_for_space_build(timeout=180) if success: time.sleep(10) real_metrics = system.collect_real_metrics() test_output_2, test_metrics_2 = system.testing_phase(code) debug_output_2, ready, metrics = system.debugging_phase(code, architect_output, test_metrics_2, real_metrics) debug_output += "\n\n--- AFTER FIXES ---\n" + debug_output_2 log += f"โœ“ Re-testing complete\n\n" log += f"๐ŸŒ Space URL: https://huggingface.co/spaces/{system.space_name}\n\n" log += f"๐Ÿ“Š Initial Performance Score: {metrics.get('performance_score', 'N/A')}/100\n" log += f"๐Ÿ“ˆ Metrics Source: {metrics.get('source', 'estimates')}\n" return log, architect_output, dev_output, debug_output, code, deploy_status, f"Performance Score: {metrics.get('performance_score', 0)}/100 (Source: {metrics.get('source', 'estimates')})" def run_autonomous_improvement(): """Run autonomous improvement cycle with REAL metrics collection""" if system is None: return "โœ— Initialize system first!", "", "", "", "", "", "" if system.iteration == 0: return "โœ— Run initial development first!", "", "", "", "", "", "" log, arch_out, dev_out, debug_out, code, req, metrics = system.autonomous_improvement_cycle() log += f"\n๐ŸŒ Space URL: https://huggingface.co/spaces/{system.space_name}\n" score_summary = f"Performance Score: {metrics.get('performance_score', 0)}/100 (Source: {metrics.get('source', 'estimates')})" return log, arch_out, dev_out, debug_out, code, "", score_summary def get_performance_history(): """Get performance history""" if system is None: return "No performance data yet" return system.get_performance_summary() def get_communications(): """Get inter-agent communications""" if system is None: return "No communications yet" return system.get_agent_communications() # Create Gradio Interface with gr.Blocks(title="Autonomous Multi-Agent Optimizer") as demo: gr.Markdown(""" # ๐Ÿค– Autonomous Multi-Agent Performance Optimizer Watch AI agents autonomously optimize packet simulation algorithms through iterative improvements. ## The Team: - ๐Ÿ›๏ธ **ARCHITECT**: Analyzes metrics and designs optimizations - ๐Ÿ’ป **DEVELOPER**: Implements high-performance code - ๐Ÿ” **DEBUGGER**: Validates performance and quality - ๐Ÿงช **TESTER**: Runs benchmarks and collects metrics **Fully Autonomous**: Agents optimize algorithms based on REAL performance scores from deployed apps! """) with gr.Tab("๐Ÿš€ Setup & Initial Build"): gr.Markdown("### Step 1: Initialize Multi-Agent System") with gr.Row(): gemini_api_key = gr.Textbox( label="Gemini API Key", placeholder="Enter your Google AI Studio API key", type="password" ) hf_token = gr.Textbox( label="Hugging Face Token", placeholder="Enter your HF token (write access)", type="password" ) init_btn = gr.Button("๐Ÿค– Initialize Agent Team", variant="primary", size="lg") init_output = gr.Textbox(label="System Status", lines=5) init_btn.click( initialize_system, inputs=[gemini_api_key, hf_token], outputs=init_output ) gr.Markdown("### Step 2: Initial Development") gr.Markdown("Agents will autonomously design, build, deploy, and measure REAL performance metrics.") develop_btn = gr.Button("๐ŸŽฏ Start Autonomous Development", variant="primary", size="lg") dev_log = gr.Textbox(label="Development Log", lines=12) performance_display = gr.Textbox(label="Initial Performance Score", lines=2) with gr.Row(): with gr.Column(): architect_output = gr.Textbox(label="๐Ÿ›๏ธ Architect's Design", lines=8) with gr.Column(): developer_output = gr.Textbox(label="๐Ÿ’ป Developer's Implementation", lines=8) with gr.Row(): with gr.Column(): debugger_output = gr.Textbox(label="๐Ÿ” Debugger's Report", lines=8) with gr.Column(): initial_code = gr.Code(label="Final Code", language="python", lines=8) deploy_status_1 = gr.Textbox(label="Deployment Status") develop_btn.click( run_initial_development, outputs=[dev_log, architect_output, developer_output, debugger_output, initial_code, deploy_status_1, performance_display] ) with gr.Tab("๐Ÿ”„ Autonomous Optimization"): gr.Markdown(""" ### Fully Autonomous Improvement Cycles with REAL Metrics The agents will autonomously: 1. Tester analyzes current performance and identifies bottlenecks 2. Architect designs optimizations to improve scores 3. Developer implements performance improvements 4. **Deploy and collect REAL metrics from the running app** 5. Debugger validates improvements using REAL data 6. Measure actual score improvement **Goal**: Continuously improve performance scores through autonomous optimization with real feedback! """) gr.Markdown("### Current Performance") current_perf = gr.Textbox(label="Current Score", lines=2, value="Run initial development first") performance_display.change(lambda x: x, inputs=performance_display, outputs=current_perf) optimize_btn = gr.Button("๐Ÿš€ Run Autonomous Optimization Cycle", variant="primary", size="lg") optimization_log = gr.Textbox(label="Optimization Log", lines=12) new_perf = gr.Textbox(label="New Performance Score", lines=2) with gr.Row(): with gr.Column(): architect_improve = gr.Textbox(label="๐Ÿ›๏ธ Optimization Strategy", lines=8) with gr.Column(): developer_improve = gr.Textbox(label="๐Ÿ’ป Performance Improvements", lines=8) with gr.Row(): with gr.Column(): debugger_improve = gr.Textbox(label="๐Ÿ” Performance Validation", lines=8) with gr.Column(): improved_code = gr.Code(label="Optimized Code", language="python", lines=8) deploy_status_2 = gr.Textbox(label="Deployment Status") new_perf.change(lambda x: x, inputs=new_perf, outputs=current_perf) optimize_btn.click( run_autonomous_improvement, outputs=[optimization_log, architect_improve, developer_improve, debugger_improve, improved_code, deploy_status_2, new_perf] ) gr.Markdown("### Tip: Run Multiple Cycles") gr.Markdown("Keep clicking 'Run Autonomous Optimization Cycle' to watch the agents iteratively improve based on REAL performance data! Each cycle aims to improve the score by 10+ points.") with gr.Tab("๐Ÿ“Š Performance Analytics"): gr.Markdown(""" ### Performance History & Metrics View complete performance tracking across all iterations, including REAL vs estimated metrics. """) refresh_perf_btn = gr.Button("๐Ÿ”„ Refresh Performance Data") perf_history = gr.Textbox(label="Performance History", lines=25) refresh_perf_btn.click( get_performance_history, outputs=perf_history ) with gr.Tab("๐Ÿ’ฌ Agent Communications"): gr.Markdown(""" ### Inter-Agent Communication Log See how agents communicate and share context. """) refresh_btn = gr.Button("๐Ÿ”„ Refresh Communications") comms_output = gr.Textbox(label="Agent Communications", lines=30) refresh_btn.click( get_communications, outputs=comms_output ) gr.Markdown(""" --- ### How Autonomous Optimization Works: **Initial Development:** 1. Architect designs system architecture 2. Developer implements code with metrics API endpoint 3. Deploy to HuggingFace 4. **Collect REAL metrics from deployed app** 5. Debugger validates using real performance data **Optimization Cycles (Fully Autonomous with REAL metrics):** 1. Tester analyzes current metrics and identifies bottlenecks 2. Architect designs optimizations based on data 3. Developer implements performance improvements 4. Deploy new version 5. **Collect REAL metrics from the running app** 6. Debugger validates improvements using actual measured performance 7. Compare real performance gains **Performance Scoring (0-100):** - Throughput (packets/sec) - Latency (average, p95, p99) - CPU Efficiency - Algorithm Complexity - Code Quality **Key Features:** - Fully autonomous optimization (no human input needed) - **REAL performance metrics from deployed apps** - Performance-driven improvements based on actual data - Shared context across all agents - Iterative score improvements - Complete metric tracking - README preservation on deploy """) if __name__ == "__main__": demo.launch()