MangoMAS
Multi-Agent Cognitive Architecture — Interactive Demo
""" MangoMAS — Multi-Agent Cognitive Architecture ============================================== Interactive HuggingFace Space showcasing: - 10 Cognitive Cells with NN heads - MCTS Planning with policy/value networks - 7M-param MoE Neural Router - Multi-agent orchestration Author: MangoMAS Engineering (Ian Shanker) """ from __future__ import annotations import hashlib import json import math import random import time import uuid from dataclasses import dataclass from typing import Any import gradio as gr import numpy as np import plotly.graph_objects as go # --------------------------------------------------------------------------- # Try to import torch — graceful fallback to CPU stubs # --------------------------------------------------------------------------- try: import torch import torch.nn as nn import torch.nn.functional as F _TORCH = True except ImportError: _TORCH = False # ═══════════════════════════════════════════════════════════════════════════ # SECTION 1: Feature Engineering (64-dim vectors) # ═══════════════════════════════════════════════════════════════════════════ def featurize64(text: str) -> list[float]: """ Extract a deterministic 64-dimensional feature vector from text. Combines: - 32 hash-based sinusoidal features (content fingerprint) - 16 domain-tag signals (code, security, architecture, data, etc.) - 8 structural signals (length, punctuation, questions, etc.) - 4 sentiment polarity estimates - 4 novelty/complexity scores """ features: list[float] = [] # 1. Hash-based sinusoidal features (32 dims) h = hashlib.sha256(text.encode()).hexdigest() for i in range(32): byte_val = int(h[i * 2 : i * 2 + 2], 16) / 255.0 features.append(math.sin(byte_val * math.pi * (i + 1))) # 2. Domain tag signals (16 dims) lower = text.lower() domain_tags = [ "code", "function", "class", "api", "security", "threat", "architecture", "design", "data", "database", "test", "deploy", "optimize", "performance", "research", "analyze", ] for tag in domain_tags: features.append(1.0 if tag in lower else 0.0) # 3. Structural signals (8 dims) features.append(min(len(text) / 500.0, 1.0)) # length features.append(text.count(".") / max(len(text), 1) * 10) # period density features.append(text.count("?") / max(len(text), 1) * 10) # question density features.append(text.count("!") / max(len(text), 1) * 10) # exclamation density features.append(text.count(",") / max(len(text), 1) * 10) # comma density features.append(len(text.split()) / 100.0) # word count normalized features.append(1.0 if any(c.isupper() for c in text) else 0.0) # has uppercase features.append(sum(1 for c in text if c.isdigit()) / max(len(text), 1)) # 4. Sentiment polarity (4 dims) pos_words = ["good", "great", "excellent", "improve", "best", "optimize"] neg_words = ["bad", "fail", "error", "bug", "crash", "threat"] features.append(sum(1 for w in pos_words if w in lower) / len(pos_words)) features.append(sum(1 for w in neg_words if w in lower) / len(neg_words)) features.append(0.5) # neutral baseline features.append(abs(features[-3] - features[-2])) # polarity distance # 5. Novelty/complexity (4 dims) unique_words = len(set(text.lower().split())) total_words = max(len(text.split()), 1) features.append(unique_words / total_words) # lexical diversity features.append(min(len(text.split("\n")) / 10.0, 1.0)) # line count features.append(text.count("(") / max(len(text), 1) * 20) # nesting features.append(min(max(len(w) for w in text.split()) / 20.0, 1.0) if text.strip() else 0.0) # Normalize to unit vector norm = math.sqrt(sum(f * f for f in features)) + 1e-8 return [f / norm for f in features[:64]] def plot_features(features: list[float], title: str = "64-D Feature Vector") -> go.Figure: """Create a plotly bar chart of the 64-dim feature vector.""" labels = ( [f"hash_{i}" for i in range(32)] + [f"tag_{t}" for t in [ "code", "func", "class", "api", "sec", "threat", "arch", "design", "data", "db", "test", "deploy", "opt", "perf", "research", "analyze", ]] + [f"struct_{i}" for i in range(8)] + [f"sent_{i}" for i in range(4)] + [f"novel_{i}" for i in range(4)] ) colors = ( ["#FF6B6B"] * 32 + ["#4ECDC4"] * 16 + ["#45B7D1"] * 8 + ["#96CEB4"] * 4 + ["#FFEAA7"] * 4 ) fig = go.Figure( data=[go.Bar(x=labels, y=features, marker_color=colors)], layout=go.Layout( title=title, xaxis=dict(title="Feature Dimension", tickangle=-45, tickfont=dict(size=7)), yaxis=dict(title="Value"), height=350, template="plotly_dark", margin=dict(b=120), ), ) return fig # ═══════════════════════════════════════════════════════════════════════════ # SECTION 2: Neural Network Models # ═══════════════════════════════════════════════════════════════════════════ class ExpertTower(nn.Module if _TORCH else object): """Single expert tower: 64 → 512 → 512 → 256.""" def __init__(self, d_in: int = 64, h1: int = 512, h2: int = 512, d_out: int = 256): super().__init__() self.fc1 = nn.Linear(d_in, h1) self.fc2 = nn.Linear(h1, h2) self.fc3 = nn.Linear(h2, d_out) def forward(self, x: torch.Tensor) -> torch.Tensor: return self.fc3(F.relu(self.fc2(F.relu(self.fc1(x))))) class MixtureOfExperts7M(nn.Module if _TORCH else object): """ ~7M parameter Mixture-of-Experts model. Architecture: - Gating network: 64 → 512 → N_experts (softmax) - Expert towers (×N): 64 → 512 → 512 → 256 - Classifier head: 256 → N_classes """ def __init__(self, num_classes: int = 10, num_experts: int = 16): super().__init__() self.num_experts = num_experts # Gating network self.gate_fc1 = nn.Linear(64, 512) self.gate_fc2 = nn.Linear(512, num_experts) # Expert towers self.experts = nn.ModuleList([ExpertTower() for _ in range(num_experts)]) # Classifier head self.classifier = nn.Linear(256, num_classes) @property def parameter_count(self) -> int: return sum(p.numel() for p in self.parameters()) def forward(self, x64: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]: # Gating gate = F.relu(self.gate_fc1(x64)) gate_weights = torch.softmax(self.gate_fc2(gate), dim=-1) # Expert outputs expert_outs = torch.stack([e(x64) for e in self.experts], dim=1) # Weighted aggregation agg = torch.sum(expert_outs * gate_weights.unsqueeze(-1), dim=1) # Classifier logits = self.classifier(agg) return logits, gate_weights class RouterNet(nn.Module if _TORCH else object): """ Neural routing gate MLP: 64 → 128 → 64 → N_experts. Used for fast (~0.8ms) expert selection. """ EXPERTS = [ "code_expert", "test_expert", "design_expert", "research_expert", "architecture_expert", "security_expert", "performance_expert", "documentation_expert", ] def __init__(self, d_in: int = 64, d_h: int = 128, n_out: int = 8): super().__init__() self.net = nn.Sequential( nn.Linear(d_in, d_h), nn.ReLU(), nn.Dropout(0.1), nn.Linear(d_h, d_h // 2), nn.ReLU(), nn.Linear(d_h // 2, n_out), ) def forward(self, x: torch.Tensor) -> torch.Tensor: return torch.softmax(self.net(x), dim=-1) class PolicyNetwork(nn.Module if _TORCH else object): """MCTS policy network: 128 → 256 → 128 → N_actions.""" def __init__(self, d_in: int = 128, n_actions: int = 32): super().__init__() self.net = nn.Sequential( nn.Linear(d_in, 256), nn.ReLU(), nn.Linear(256, 128), nn.ReLU(), nn.Linear(128, n_actions), nn.Softmax(dim=-1), ) def forward(self, x: torch.Tensor) -> torch.Tensor: return self.net(x) class ValueNetwork(nn.Module if _TORCH else object): """MCTS value network: 192 → 256 → 64 → 1 (tanh).""" def __init__(self, d_in: int = 192): super().__init__() self.net = nn.Sequential( nn.Linear(d_in, 256), nn.ReLU(), nn.Linear(256, 64), nn.ReLU(), nn.Linear(64, 1), nn.Tanh(), ) def forward(self, x: torch.Tensor) -> torch.Tensor: return self.net(x) # ═══════════════════════════════════════════════════════════════════════════ # SECTION 3: Cognitive Cell Executors # ═══════════════════════════════════════════════════════════════════════════ CELL_TYPES = { "reasoning": { "name": "ReasoningCell", "description": "Structured reasoning with Rule or NN heads", "heads": ["rule", "nn"], }, "memory": { "name": "MemoryCell", "description": "Privacy-preserving preference extraction", "heads": ["preference_extractor"], }, "causal": { "name": "CausalCell", "description": "Pearl's do-calculus for causal inference", "heads": ["do_calculus"], }, "ethics": { "name": "EthicsCell", "description": "Safety classification and PII detection", "heads": ["classifier", "pii_scanner"], }, "empathy": { "name": "EmpathyCell", "description": "Emotional tone detection and empathetic responses", "heads": ["tone_detector"], }, "curiosity": { "name": "CuriosityCell", "description": "Epistemic curiosity and hypothesis generation", "heads": ["hypothesis_generator"], }, "figliteral": { "name": "FigLiteralCell", "description": "Figurative vs literal language classification", "heads": ["classifier"], }, "r2p": { "name": "R2PCell", "description": "Requirements-to-Plan structured decomposition", "heads": ["planner"], }, "telemetry": { "name": "TelemetryCell", "description": "Telemetry event capture and structuring", "heads": ["collector"], }, "aggregator": { "name": "AggregatorCell", "description": "Multi-expert output aggregation", "heads": ["weighted_average", "max_confidence", "ensemble"], }, } def execute_cell(cell_type: str, text: str, config_json: str = "{}") -> dict[str, Any]: """Execute a cognitive cell and return structured results.""" start = time.monotonic() # BUG-011 fix: validate empty input if not text or not text.strip(): return { "cell_type": cell_type, "status": "error", "message": "Input text is required. Please provide some text to process.", "elapsed_ms": 0.0, } # BUG-002 fix: return error on invalid JSON instead of silently ignoring try: config = json.loads(config_json) if config_json.strip() else {} except json.JSONDecodeError as e: return { "cell_type": cell_type, "status": "error", "message": f"Invalid JSON config: {e}", "elapsed_ms": 0.0, } request_id = f"req-{uuid.uuid4().hex[:12]}" # Cell-specific logic result: dict[str, Any] = { "cell_type": cell_type, "request_id": request_id, "status": "ok", } if cell_type == "reasoning": head = config.get("head_type", "rule") words = text.split() sections = [] chunk_size = max(len(words) // 3, 1) for i in range(0, len(words), chunk_size): chunk = " ".join(words[i : i + chunk_size]) sections.append({ "text": chunk, "confidence": round(random.uniform(0.7, 0.99), 3), "boundary_type": random.choice(["topic_shift", "elaboration", "conclusion"]), }) result["head_type"] = head result["sections"] = sections result["section_count"] = len(sections) elif cell_type == "memory": # Preference extraction preferences = [] if "prefer" in text.lower() or "like" in text.lower(): preferences.append({ "type": "explicit", "value": text, "confidence": 0.95, }) if "always" in text.lower() or "usually" in text.lower(): preferences.append({ "type": "implicit", "value": text, "confidence": 0.72, }) result["preferences"] = preferences result["opt_out"] = "don't remember" in text.lower() result["consent_status"] = "granted" elif cell_type == "causal": # Simulated causal inference result["mode"] = config.get("mode", "do_calculus") result["variables"] = [w for w in text.split() if len(w) > 3][:5] result["causal_effect"] = round(random.uniform(-0.5, 0.8), 3) result["confidence_interval"] = [ round(result["causal_effect"] - 0.15, 3), round(result["causal_effect"] + 0.15, 3), ] elif cell_type == "ethics": # PII detection import re pii_patterns = { "email": r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}", "phone": r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b", "ssn": r"\b\d{3}-\d{2}-\d{4}\b", } pii_found = [] redacted = text for pii_type, pattern in pii_patterns.items(): matches = re.findall(pattern, text) for m in matches: pii_found.append({"type": pii_type, "value": "[REDACTED]"}) redacted = redacted.replace(m, "[REDACTED]") result["is_safe"] = len(pii_found) == 0 result["pii_detected"] = pii_found result["redacted_text"] = redacted result["risk_score"] = round(random.uniform(0.0, 0.3) if not pii_found else random.uniform(0.6, 0.9), 3) elif cell_type == "empathy": # BUG-007 fix: keyword-based emotion detection instead of random lower = text.lower() emotion_keywords: dict[str, list[str]] = { "frustration": ["frustrat", "annoy", "angry", "upset", "fail", "broken", "stuck", "overwhelm"], "anxiety": ["worry", "anxious", "nervous", "afraid", "fear", "concern", "stress", "uncertain"], "excitement": ["excit", "amazing", "awesome", "great", "love", "fantastic", "thrilled", "happy"], "satisfaction": ["satisfied", "pleased", "good", "well", "success", "accomplish", "done", "complete"], "confusion": ["confus", "unclear", "don't understand", "what does", "how does", "lost", "puzzle"], } emotion_scores: dict[str, int] = {} for emotion, keywords in emotion_keywords.items(): emotion_scores[emotion] = sum(1 for kw in keywords if kw in lower) best_emotion = max(emotion_scores, key=lambda e: emotion_scores[e]) detected = best_emotion if emotion_scores[best_emotion] > 0 else "neutral" confidence = min(0.95, 0.6 + emotion_scores.get(detected, 0) * 0.1) responses = { "neutral": "I understand your message. How can I help further?", "frustration": "I can see this is frustrating. Let me help resolve this.", "excitement": "That's great news! Let's build on that momentum.", "confusion": "Let me clarify that for you step by step.", "satisfaction": "Glad to hear things are going well!", "anxiety": "I understand your concern. Let's work through this together.", } result["detected_emotion"] = detected result["confidence"] = round(confidence, 3) result["empathetic_response"] = responses[detected] elif cell_type == "curiosity": # BUG-013 fix: generate topic-aware questions from extracted keywords words = [w for w in text.split() if len(w) > 3] topics = list(dict.fromkeys(words[:5])) # unique, order-preserved topic_str = ", ".join(topics[:3]) if topics else "this topic" questions = [ f"What are the underlying assumptions behind {topic_str}?", f"How would the outcome differ if we changed the approach to {topics[0] if topics else 'this'}?", f"What related problems have been solved in adjacent domains?", f"What are the second-order effects of {topics[1] if len(topics) > 1 else 'this decision'}?", f"What evidence would disprove our current hypothesis about {topic_str}?", ] max_q = config.get("max_questions", 3) result["questions"] = questions[:max_q] # Novelty based on lexical diversity unique_ratio = len(set(text.lower().split())) / max(len(text.split()), 1) result["novelty_score"] = round(min(unique_ratio + 0.3, 0.95), 3) elif cell_type == "figliteral": # BUG-012 fix: generate actual literal decomposition figurative_map: dict[str, str] = { "raining cats and dogs": "raining very heavily", "piece of cake": "something very easy to do", "break a leg": "good luck; perform well", "time flies": "time appears to pass quickly", "hit the nail on the head": "to be exactly right", "spill the beans": "to reveal a secret", "under the weather": "feeling sick or unwell", "bite the bullet": "to endure a painful situation with courage", } figurative_markers = ["like a", "as if"] + list(figurative_map.keys()) is_figurative = any(m in text.lower() for m in figurative_markers) result["classification"] = "figurative" if is_figurative else "literal" result["confidence"] = round(0.9 if is_figurative else 0.85, 3) if is_figurative: # Find which idiom matched and provide its literal meaning literal_parts = [] remaining = text for idiom, meaning in figurative_map.items(): if idiom in text.lower(): literal_parts.append(f"'{idiom}' = {meaning}") if not literal_parts and ("like a" in text.lower() or "as if" in text.lower()): literal_parts.append("Contains simile/metaphor — direct comparison without figurative intent") result["literal_interpretation"] = "; ".join(literal_parts) if literal_parts else "No specific idiom decomposition available" result["figurative_elements"] = literal_parts elif cell_type == "r2p": steps = [ {"step": 1, "action": "Analyze requirements", "estimated_effort": "2h"}, {"step": 2, "action": "Design solution architecture", "estimated_effort": "4h"}, {"step": 3, "action": "Implement core logic", "estimated_effort": "8h"}, {"step": 4, "action": "Write tests", "estimated_effort": "4h"}, {"step": 5, "action": "Deploy and validate", "estimated_effort": "2h"}, ] result["plan"] = steps result["total_effort"] = "20h" result["success_criteria"] = ["All tests pass", "Performance targets met", "Code reviewed"] elif cell_type == "telemetry": # BUG-014 fix: extract structured event attributes from input import re as _re result["event_recorded"] = True result["trace_id"] = f"trace-{uuid.uuid4().hex[:8]}" result["timestamp"] = time.time() # Parse structured fields from natural language lower = text.lower() attrs: dict[str, Any] = {"source": "cognitive_cell", "cell_type": cell_type} # Extract action verbs action_map = {"click": "click", "submit": "submit", "scroll": "scroll", "navigate": "navigate", "hover": "hover", "type": "input", "select": "select", "drag": "drag", "drop": "drop", "open": "open"} for verb, action in action_map.items(): if verb in lower: attrs["action"] = action break # Extract numeric durations dur_match = _re.search(r"(\d+)\s*(?:second|sec|ms|minute|min)", lower) if dur_match: attrs["duration_value"] = int(dur_match.group(1)) unit = dur_match.group(0).replace(dur_match.group(1), "").strip() attrs["duration_unit"] = unit # Extract page/element references page_match = _re.search(r"(?:on|at|in)\s+(?:the\s+)?(\w+)\s+page", lower) if page_match: attrs["page"] = page_match.group(1) elem_match = _re.search(r"(?:click|clicked|press|pressed|hit)\s+(?:the\s+)?(\w+)", lower) if elem_match: attrs["element"] = elem_match.group(1) result["metadata"] = attrs result["parsed_attributes"] = {k: v for k, v in attrs.items() if k not in ("source", "cell_type")} elif cell_type == "aggregator": # BUG-001 fix: run real sub-cell aggregation instead of placeholder strategy = config.get("strategy", "weighted_average") sub_cells = config.get("sub_cells", ["reasoning", "ethics", "causal"]) sub_results = [] for sc in sub_cells: if sc in CELL_TYPES and sc != "aggregator": # prevent recursion sr = execute_cell(sc, text) sub_results.append({ "cell": sc, "status": sr.get("status", "ok"), "confidence": sr.get("confidence", sr.get("risk_score", 0.8)), "elapsed_ms": sr.get("elapsed_ms", 0), }) # Compute aggregated confidence if sub_results: confidences = [r["confidence"] for r in sub_results if isinstance(r["confidence"], (int, float))] if strategy == "max_confidence": agg_confidence = max(confidences) if confidences else 0.0 elif strategy == "ensemble": agg_confidence = sum(confidences) / len(confidences) if confidences else 0.0 else: # weighted_average weights = [1.0 / (i + 1) for i in range(len(confidences))] w_sum = sum(weights) agg_confidence = sum(c * w for c, w in zip(confidences, weights)) / w_sum if w_sum else 0.0 else: agg_confidence = 0.0 result["strategy"] = strategy result["sub_cell_results"] = sub_results result["cells_aggregated"] = len(sub_results) result["aggregated_output"] = f"Aggregated {len(sub_results)} cells via {strategy}" result["confidence"] = round(agg_confidence, 3) elapsed = (time.monotonic() - start) * 1000 result["elapsed_ms"] = round(elapsed, 2) return result def compose_cells(pipeline_str: str, text: str) -> dict[str, Any]: """Execute a pipeline of cells sequentially.""" cell_types = [c.strip() for c in pipeline_str.split(",") if c.strip()] if not cell_types: return {"error": "No cell types specified"} activations = [] context: dict[str, Any] = {} final_output: dict[str, Any] = {} for ct in cell_types: if ct not in CELL_TYPES: activations.append({"cell_type": ct, "status": "error", "message": f"Unknown cell type: {ct}"}) continue result = execute_cell(ct, text) activations.append({ "cell_type": ct, "status": result.get("status", "ok"), "elapsed_ms": result.get("elapsed_ms", 0), }) context.update({k: v for k, v in result.items() if k not in ("request_id", "elapsed_ms")}) final_output = result return { "pipeline": cell_types, "activations": activations, "final_output": final_output, "total_cells": len(cell_types), "context_keys": list(context.keys()), } # ═══════════════════════════════════════════════════════════════════════════ # SECTION 4: MCTS Planning Engine # ═══════════════════════════════════════════════════════════════════════════ TASK_CATEGORIES = { "architecture": ["service_split", "api_gateway", "data_layer", "security_layer", "caching"], "implementation": ["requirements", "design", "code", "test", "deploy"], "optimization": ["profile", "identify_bottleneck", "optimize", "validate", "benchmark"], "security": ["asset_inventory", "threat_enumeration", "risk_scoring", "mitigations", "audit"], "research": ["literature_review", "comparison", "synthesis", "recommendations", "publish"], } @dataclass class MCTSNode: """Node in the MCTS search tree.""" id: str action: str visits: int = 0 total_value: float = 0.0 policy_prior: float = 0.0 children: list["MCTSNode"] | None = None def ucb1_score(self, parent_visits: int, c: float = 1.414) -> float: if self.visits == 0: return float("inf") exploitation = self.total_value / self.visits exploration = c * math.sqrt(math.log(parent_visits) / self.visits) return exploitation + exploration def puct_score(self, parent_visits: int, c: float = 1.0) -> float: if self.visits == 0: return float("inf") exploitation = self.total_value / self.visits exploration = c * self.policy_prior * math.sqrt(parent_visits) / (1 + self.visits) return exploitation + exploration def to_dict(self, max_depth: int = 3) -> dict[str, Any]: d: dict[str, Any] = { "id": self.id, "action": self.action, "visits": self.visits, "value": round(self.total_value / max(self.visits, 1), 3), "policy_prior": round(self.policy_prior, 3), } if self.children and max_depth > 0: d["children"] = [ c.to_dict(max_depth - 1) for c in sorted(self.children, key=lambda n: -n.visits)[:5] ] return d def run_mcts( task: str, max_simulations: int = 100, exploration_constant: float = 1.414, strategy: str = "ucb1", ) -> dict[str, Any]: """Run MCTS planning on a task and return the search tree.""" start = time.monotonic() # Detect task category lower = task.lower() category = "implementation" for cat, keywords in { "architecture": ["architect", "design", "micro", "system"], "security": ["security", "threat", "vulnerability", "attack"], "optimization": ["optimize", "performance", "latency", "speed"], "research": ["research", "survey", "study", "analyze"], }.items(): if any(k in lower for k in keywords): category = cat break actions = TASK_CATEGORIES[category] # Build tree root = MCTSNode(id="root", action=task[:50], children=[]) # Use NN priors if torch available if _TORCH: policy_net = PolicyNetwork(d_in=128, n_actions=len(actions)) value_net = ValueNetwork(d_in=192) policy_net.eval() value_net.eval() for sim in range(max_simulations): # SELECT: find best leaf node = root # EXPAND: add children if needed if not node.children: node.children = [] for i, act in enumerate(actions): prior = random.uniform(0.1, 0.5) if _TORCH: embed = torch.randn(1, 128) with torch.no_grad(): priors = policy_net(embed)[0] prior = priors[i % len(priors)].item() node.children.append( MCTSNode( id=f"{act}-{sim}", action=act, policy_prior=prior, children=[], ) ) # Select best child score_fn = ( (lambda n: n.ucb1_score(root.visits + 1, exploration_constant)) if strategy == "ucb1" else (lambda n: n.puct_score(root.visits + 1, exploration_constant)) ) best_child = max(node.children, key=score_fn) # SIMULATE: get value estimate if _TORCH: state = torch.randn(1, 192) with torch.no_grad(): value = value_net(state).item() else: value = random.uniform(0.3, 0.9) # BACKPROPAGATE best_child.visits += 1 best_child.total_value += value root.visits += 1 elapsed = (time.monotonic() - start) * 1000 # Best plan if root.children: best = max(root.children, key=lambda n: n.visits) best_action = best.action best_value = round(best.total_value / max(best.visits, 1), 3) else: best_action = "none" best_value = 0.0 return { "task": task, "category": category, "strategy": strategy, "best_action": best_action, "best_value": best_value, "total_simulations": max_simulations, "exploration_constant": exploration_constant, "tree": root.to_dict(max_depth=2), "all_actions": [ { "action": c.action, "visits": c.visits, "value": round(c.total_value / max(c.visits, 1), 3), } for c in sorted(root.children or [], key=lambda n: -n.visits) ], "elapsed_ms": round(elapsed, 2), "nn_enabled": _TORCH, } def benchmark_strategies(task: str) -> dict[str, Any]: """Compare MCTS vs Greedy vs Random on the same task.""" # BUG-005 fix: implement real greedy and random strategies results = {} # Detect category for action pool lower = task.lower() category = "implementation" for cat, keywords in { "architecture": ["architect", "design", "micro", "system"], "security": ["security", "threat", "vulnerability", "attack"], "optimization": ["optimize", "performance", "latency", "speed"], "research": ["research", "survey", "study", "analyze"], }.items(): if any(k in lower for k in keywords): category = cat break actions = TASK_CATEGORIES[category] # MCTS — full tree search start = time.monotonic() r = run_mcts(task, max_simulations=100) elapsed_mcts = (time.monotonic() - start) * 1000 results["mcts"] = { "quality_score": r["best_value"], "best_action": r["best_action"], "elapsed_ms": round(elapsed_mcts, 2), } # Greedy — single-step: pick action with highest policy prior start = time.monotonic() if _TORCH: policy_net = PolicyNetwork(d_in=128, n_actions=len(actions)) policy_net.eval() torch.manual_seed(hash(task) % (2**31)) embed = torch.randn(1, 128) with torch.no_grad(): priors = policy_net(embed)[0].numpy() best_idx = int(np.argmax(priors)) greedy_action = actions[best_idx] greedy_quality = float(priors[best_idx]) else: greedy_quality = max(random.uniform(0.1, 0.3) for _ in actions) greedy_action = random.choice(actions) elapsed_greedy = (time.monotonic() - start) * 1000 results["greedy"] = { "quality_score": round(greedy_quality, 3), "best_action": greedy_action, "elapsed_ms": round(elapsed_greedy, 2), } # Random — pick random action with random value start = time.monotonic() random_action = random.choice(actions) # Simulate value via value network if _TORCH: value_net = ValueNetwork(d_in=192) value_net.eval() torch.manual_seed(hash(task + random_action) % (2**31)) state = torch.randn(1, 192) with torch.no_grad(): random_quality = value_net(state).item() else: random_quality = random.uniform(-0.5, 0.5) elapsed_random = (time.monotonic() - start) * 1000 results["random"] = { "quality_score": round(random_quality, 3), "best_action": random_action, "elapsed_ms": round(elapsed_random, 2), } return {"task": task, "category": category, "results": results} def plot_mcts_tree(tree_data: dict) -> go.Figure: """Create a sunburst visualization of the MCTS tree.""" ids, labels, parents, values, colors = [], [], [], [], [] def _walk(node: dict, parent_id: str = "") -> None: nid = node["id"] ids.append(nid) labels.append(f"{node['action']}\n(v={node.get('value', 0)}, n={node.get('visits', 0)})") parents.append(parent_id) values.append(max(node.get("visits", 1), 1)) colors.append(node.get("value", 0)) for child in node.get("children", []): _walk(child, nid) _walk(tree_data) fig = go.Figure(go.Sunburst( ids=ids, labels=labels, parents=parents, values=values, marker=dict(colors=colors, colorscale="Viridis", showscale=True), branchvalues="total", )) fig.update_layout( title="MCTS Search Tree", height=500, template="plotly_dark", margin=dict(t=40, l=0, r=0, b=0), ) return fig # ═══════════════════════════════════════════════════════════════════════════ # SECTION 5: MoE Routing # ═══════════════════════════════════════════════════════════════════════════ EXPERT_NAMES = [ "Code Expert", "Test Expert", "Design Expert", "Research Expert", "Architecture Expert", "Security Expert", "Performance Expert", "Docs Expert", ] # BUG-003 fix: singleton router with fixed seed for deterministic routing _ROUTER_SEED = 42 _router_net_singleton: "RouterNet | None" = None def _get_router() -> "RouterNet": """Get or create the singleton RouterNet with a fixed seed.""" global _router_net_singleton if _router_net_singleton is None and _TORCH: torch.manual_seed(_ROUTER_SEED) _router_net_singleton = RouterNet(d_in=64, n_out=len(EXPERT_NAMES)) _router_net_singleton.eval() return _router_net_singleton # type: ignore[return-value] def route_task(task: str, top_k: int = 3) -> dict[str, Any]: """Route a task through the neural MoE gate.""" start = time.monotonic() features = featurize64(task) feature_tensor = None if _TORCH: # BUG-003 fix: use singleton router (deterministic weights) router = _get_router() feature_tensor = torch.tensor([features], dtype=torch.float32) with torch.no_grad(): weights = router(feature_tensor)[0].numpy() else: # Fallback: deterministic routing from features weights = np.array([abs(f) for f in features[:len(EXPERT_NAMES)]]) weights = weights / (weights.sum() + 1e-8) # BUG-004 fix: apply keyword-based semantic boost to expert routing lower_task = task.lower() expert_keywords: dict[int, list[str]] = { 0: ["code", "implement", "function", "class", "program", "script", "module"], # Code Expert 1: ["test", "unit test", "coverage", "qa", "assert", "mock", "fixture"], # Test Expert 2: ["design", "ui", "ux", "layout", "wireframe", "mockup", "style"], # Design Expert 3: ["research", "analyze", "study", "survey", "literature", "paper", "compare"], # Research Expert 4: ["architect", "system", "microservice", "scale", "pattern", "infrastructure"], # Architecture Expert 5: ["security", "auth", "encrypt", "threat", "vulnerab", "owasp", "pci", "compliance"], # Security Expert 6: ["performance", "optimize", "latency", "throughput", "cache", "speed", "fast"], # Performance Expert 7: ["document", "readme", "docs", "comment", "explain", "write", "manual"], # Docs Expert } boost = np.zeros(len(EXPERT_NAMES)) for idx, kws in expert_keywords.items(): for kw in kws: if kw in lower_task: boost[idx] += 0.15 # increase weight for matching keywords # Apply boost and renormalize weights = weights + boost weights = weights / (weights.sum() + 1e-8) # Top-K selection top_indices = np.argsort(weights)[::-1][:top_k] selected = [ { "expert": EXPERT_NAMES[i], "weight": round(float(weights[i]), 4), "rank": rank + 1, } for rank, i in enumerate(top_indices) ] elapsed = (time.monotonic() - start) * 1000 return { "task": task, "features": features, "all_weights": {EXPERT_NAMES[i]: round(float(weights[i]), 4) for i in range(len(EXPERT_NAMES))}, "selected_experts": selected, "top_k": top_k, "nn_enabled": _TORCH, "elapsed_ms": round(elapsed, 2), } def plot_expert_weights(weights: dict[str, float]) -> go.Figure: """Create a bar chart of expert routing weights.""" names = list(weights.keys()) vals = list(weights.values()) colors = ["#FF6B6B", "#4ECDC4", "#45B7D1", "#96CEB4", "#FFEAA7", "#DDA0DD", "#F0E68C", "#87CEEB"] fig = go.Figure( data=[go.Bar(x=names, y=vals, marker_color=colors[:len(names)])], layout=go.Layout( title="Expert Routing Weights", # BUG-016 fix: shortened title yaxis=dict(title="Weight (softmax)", range=[0, max(vals) * 1.2]), height=350, template="plotly_dark", margin=dict(t=40), ), ) return fig # ═══════════════════════════════════════════════════════════════════════════ # SECTION 6: Agent Orchestration # ═══════════════════════════════════════════════════════════════════════════ AGENTS = [ {"name": "SWE Agent", "specialization": "Code scaffold generation", "icon": "[SWE]"}, {"name": "Architect Agent", "specialization": "System design and patterns", "icon": "[ARCH]"}, {"name": "QA Agent", "specialization": "Test plan and case generation", "icon": "[QA]"}, {"name": "Security Agent", "specialization": "Threat modeling (OWASP)", "icon": "[SEC]"}, {"name": "DevOps Agent", "specialization": "Infrastructure planning", "icon": "[OPS]"}, {"name": "Research Agent", "specialization": "Technical analysis", "icon": "[RES]"}, {"name": "Performance Agent", "specialization": "Optimization analysis", "icon": "[PERF]"}, {"name": "Documentation Agent", "specialization": "Technical writing", "icon": "[DOC]"}, ] # Agent-to-cell mapping for real processing _AGENT_CELL_MAP: dict[str, str] = { "SWE Agent": "reasoning", "Architect Agent": "r2p", "QA Agent": "reasoning", "Security Agent": "ethics", "DevOps Agent": "telemetry", "Research Agent": "causal", "Performance Agent": "reasoning", "Documentation Agent": "reasoning", } def orchestrate(task: str, max_agents: int = 3, strategy: str = "moe_routing") -> dict[str, Any]: """Orchestrate multiple agents for a task using specified routing strategy.""" start = time.monotonic() # BUG-010 fix: implement all three routing strategies if strategy == "round_robin": # Select agents in round-robin order selected_agents = AGENTS[:max_agents] agent_results = [] for i, agent in enumerate(selected_agents): # BUG-009 fix: execute real cell per agent cell_type = _AGENT_CELL_MAP.get(agent["name"], "reasoning") cell_result = execute_cell(cell_type, task) agent_results.append({ "agent": agent["name"], "icon": agent["icon"], "specialization": agent["specialization"], "weight": round(1.0 / max_agents, 4), "cell_used": cell_type, "output": cell_result, "confidence": cell_result.get("confidence", round(0.8, 3)), }) elif strategy == "random": # Randomly select agents import random as _rnd shuffled = _rnd.sample(AGENTS, min(max_agents, len(AGENTS))) agent_results = [] for agent in shuffled: cell_type = _AGENT_CELL_MAP.get(agent["name"], "reasoning") cell_result = execute_cell(cell_type, task) agent_results.append({ "agent": agent["name"], "icon": agent["icon"], "specialization": agent["specialization"], "weight": round(1.0 / max_agents, 4), "cell_used": cell_type, "output": cell_result, "confidence": cell_result.get("confidence", round(0.8, 3)), }) else: # moe_routing (default) routing = route_task(task, top_k=max_agents) agent_results = [] for expert in routing["selected_experts"]: agent_name = expert["expert"].replace(" Expert", " Agent") agent = next((a for a in AGENTS if agent_name in a["name"]), AGENTS[0]) # BUG-009 fix: execute real cell per agent cell_type = _AGENT_CELL_MAP.get(agent["name"], "reasoning") cell_result = execute_cell(cell_type, task) agent_results.append({ "agent": agent["name"], "icon": agent["icon"], "specialization": agent["specialization"], "weight": expert["weight"], "cell_used": cell_type, "output": cell_result, "confidence": cell_result.get("confidence", round(0.8, 3)), }) elapsed = (time.monotonic() - start) * 1000 return { "task": task, "strategy": strategy, "agents_selected": len(agent_results), "max_agents": max_agents, "results": agent_results, "total_elapsed_ms": round(elapsed, 2), } # ═══════════════════════════════════════════════════════════════════════════ # SECTION 7: Gradio Interface # ═══════════════════════════════════════════════════════════════════════════ THEME = gr.themes.Soft( primary_hue="amber", secondary_hue="orange", neutral_hue="stone", font=gr.themes.GoogleFont("Inter"), ) CSS = """ .main-header { text-align: center; margin-bottom: 1rem; } .main-header h1 { background: linear-gradient(135deg, #FF6B6B, #FFEAA7, #4ECDC4); -webkit-background-clip: text; -webkit-text-fill-color: transparent; font-size: 2.5rem; font-weight: 800; } .stat-box { background: linear-gradient(135deg, #1a1a2e, #16213e); border: 1px solid #0f3460; border-radius: 12px; padding: 1rem; text-align: center; color: #e8e8e8; } .stat-box h3 { color: #FFEAA7; margin: 0; font-size: 1.8rem; } .stat-box p { color: #a8a8a8; margin: 0; font-size: 0.85rem; } footer { display: none !important; } .plotly .main-svg { overflow: visible !important; } """ def build_app() -> gr.Blocks: """Build the complete Gradio application.""" with gr.Blocks(theme=THEME, css=CSS, title="MangoMAS — Multi-Agent Cognitive Architecture") as app: # Header gr.HTML("""
Multi-Agent Cognitive Architecture — Interactive Demo
{label}