""" MangoMAS — Multi-Agent Cognitive Architecture ============================================== Interactive HuggingFace Space showcasing: - 10 Cognitive Cells with NN heads - MCTS Planning with policy/value networks - 7M-param MoE Neural Router - Multi-agent orchestration Author: MangoMAS Engineering (Ian Shanker) """ from __future__ import annotations import hashlib import json import math import random import time import uuid from dataclasses import dataclass from typing import Any import gradio as gr import numpy as np import plotly.graph_objects as go # --------------------------------------------------------------------------- # Try to import torch — graceful fallback to CPU stubs # --------------------------------------------------------------------------- try: import torch import torch.nn as nn import torch.nn.functional as F _TORCH = True except ImportError: _TORCH = False # ═══════════════════════════════════════════════════════════════════════════ # SECTION 1: Feature Engineering (64-dim vectors) # ═══════════════════════════════════════════════════════════════════════════ def featurize64(text: str) -> list[float]: """ Extract a deterministic 64-dimensional feature vector from text. Combines: - 32 hash-based sinusoidal features (content fingerprint) - 16 domain-tag signals (code, security, architecture, data, etc.) - 8 structural signals (length, punctuation, questions, etc.) - 4 sentiment polarity estimates - 4 novelty/complexity scores """ features: list[float] = [] # 1. Hash-based sinusoidal features (32 dims) h = hashlib.sha256(text.encode()).hexdigest() for i in range(32): byte_val = int(h[i * 2 : i * 2 + 2], 16) / 255.0 features.append(math.sin(byte_val * math.pi * (i + 1))) # 2. Domain tag signals (16 dims) lower = text.lower() domain_tags = [ "code", "function", "class", "api", "security", "threat", "architecture", "design", "data", "database", "test", "deploy", "optimize", "performance", "research", "analyze", ] for tag in domain_tags: features.append(1.0 if tag in lower else 0.0) # 3. Structural signals (8 dims) features.append(min(len(text) / 500.0, 1.0)) # length features.append(text.count(".") / max(len(text), 1) * 10) # period density features.append(text.count("?") / max(len(text), 1) * 10) # question density features.append(text.count("!") / max(len(text), 1) * 10) # exclamation density features.append(text.count(",") / max(len(text), 1) * 10) # comma density features.append(len(text.split()) / 100.0) # word count normalized features.append(1.0 if any(c.isupper() for c in text) else 0.0) # has uppercase features.append(sum(1 for c in text if c.isdigit()) / max(len(text), 1)) # 4. Sentiment polarity (4 dims) pos_words = ["good", "great", "excellent", "improve", "best", "optimize"] neg_words = ["bad", "fail", "error", "bug", "crash", "threat"] features.append(sum(1 for w in pos_words if w in lower) / len(pos_words)) features.append(sum(1 for w in neg_words if w in lower) / len(neg_words)) features.append(0.5) # neutral baseline features.append(abs(features[-3] - features[-2])) # polarity distance # 5. Novelty/complexity (4 dims) unique_words = len(set(text.lower().split())) total_words = max(len(text.split()), 1) features.append(unique_words / total_words) # lexical diversity features.append(min(len(text.split("\n")) / 10.0, 1.0)) # line count features.append(text.count("(") / max(len(text), 1) * 20) # nesting features.append(min(max(len(w) for w in text.split()) / 20.0, 1.0) if text.strip() else 0.0) # Normalize to unit vector norm = math.sqrt(sum(f * f for f in features)) + 1e-8 return [f / norm for f in features[:64]] def plot_features(features: list[float], title: str = "64-D Feature Vector") -> go.Figure: """Create a plotly bar chart of the 64-dim feature vector.""" labels = ( [f"hash_{i}" for i in range(32)] + [f"tag_{t}" for t in [ "code", "func", "class", "api", "sec", "threat", "arch", "design", "data", "db", "test", "deploy", "opt", "perf", "research", "analyze", ]] + [f"struct_{i}" for i in range(8)] + [f"sent_{i}" for i in range(4)] + [f"novel_{i}" for i in range(4)] ) colors = ( ["#FF6B6B"] * 32 + ["#4ECDC4"] * 16 + ["#45B7D1"] * 8 + ["#96CEB4"] * 4 + ["#FFEAA7"] * 4 ) fig = go.Figure( data=[go.Bar(x=labels, y=features, marker_color=colors)], layout=go.Layout( title=title, xaxis=dict(title="Feature Dimension", tickangle=-45, tickfont=dict(size=7)), yaxis=dict(title="Value"), height=350, template="plotly_dark", margin=dict(b=120), ), ) return fig # ═══════════════════════════════════════════════════════════════════════════ # SECTION 2: Neural Network Models # ═══════════════════════════════════════════════════════════════════════════ class ExpertTower(nn.Module if _TORCH else object): """Single expert tower: 64 → 512 → 512 → 256.""" def __init__(self, d_in: int = 64, h1: int = 512, h2: int = 512, d_out: int = 256): super().__init__() self.fc1 = nn.Linear(d_in, h1) self.fc2 = nn.Linear(h1, h2) self.fc3 = nn.Linear(h2, d_out) def forward(self, x: torch.Tensor) -> torch.Tensor: return self.fc3(F.relu(self.fc2(F.relu(self.fc1(x))))) class MixtureOfExperts7M(nn.Module if _TORCH else object): """ ~7M parameter Mixture-of-Experts model. Architecture: - Gating network: 64 → 512 → N_experts (softmax) - Expert towers (×N): 64 → 512 → 512 → 256 - Classifier head: 256 → N_classes """ def __init__(self, num_classes: int = 10, num_experts: int = 16): super().__init__() self.num_experts = num_experts # Gating network self.gate_fc1 = nn.Linear(64, 512) self.gate_fc2 = nn.Linear(512, num_experts) # Expert towers self.experts = nn.ModuleList([ExpertTower() for _ in range(num_experts)]) # Classifier head self.classifier = nn.Linear(256, num_classes) @property def parameter_count(self) -> int: return sum(p.numel() for p in self.parameters()) def forward(self, x64: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]: # Gating gate = F.relu(self.gate_fc1(x64)) gate_weights = torch.softmax(self.gate_fc2(gate), dim=-1) # Expert outputs expert_outs = torch.stack([e(x64) for e in self.experts], dim=1) # Weighted aggregation agg = torch.sum(expert_outs * gate_weights.unsqueeze(-1), dim=1) # Classifier logits = self.classifier(agg) return logits, gate_weights class RouterNet(nn.Module if _TORCH else object): """ Neural routing gate MLP: 64 → 128 → 64 → N_experts. Used for fast (~0.8ms) expert selection. """ EXPERTS = [ "code_expert", "test_expert", "design_expert", "research_expert", "architecture_expert", "security_expert", "performance_expert", "documentation_expert", ] def __init__(self, d_in: int = 64, d_h: int = 128, n_out: int = 8): super().__init__() self.net = nn.Sequential( nn.Linear(d_in, d_h), nn.ReLU(), nn.Dropout(0.1), nn.Linear(d_h, d_h // 2), nn.ReLU(), nn.Linear(d_h // 2, n_out), ) def forward(self, x: torch.Tensor) -> torch.Tensor: return torch.softmax(self.net(x), dim=-1) class PolicyNetwork(nn.Module if _TORCH else object): """MCTS policy network: 128 → 256 → 128 → N_actions.""" def __init__(self, d_in: int = 128, n_actions: int = 32): super().__init__() self.net = nn.Sequential( nn.Linear(d_in, 256), nn.ReLU(), nn.Linear(256, 128), nn.ReLU(), nn.Linear(128, n_actions), nn.Softmax(dim=-1), ) def forward(self, x: torch.Tensor) -> torch.Tensor: return self.net(x) class ValueNetwork(nn.Module if _TORCH else object): """MCTS value network: 192 → 256 → 64 → 1 (tanh).""" def __init__(self, d_in: int = 192): super().__init__() self.net = nn.Sequential( nn.Linear(d_in, 256), nn.ReLU(), nn.Linear(256, 64), nn.ReLU(), nn.Linear(64, 1), nn.Tanh(), ) def forward(self, x: torch.Tensor) -> torch.Tensor: return self.net(x) # ═══════════════════════════════════════════════════════════════════════════ # SECTION 3: Cognitive Cell Executors # ═══════════════════════════════════════════════════════════════════════════ CELL_TYPES = { "reasoning": { "name": "ReasoningCell", "description": "Structured reasoning with Rule or NN heads", "heads": ["rule", "nn"], }, "memory": { "name": "MemoryCell", "description": "Privacy-preserving preference extraction", "heads": ["preference_extractor"], }, "causal": { "name": "CausalCell", "description": "Pearl's do-calculus for causal inference", "heads": ["do_calculus"], }, "ethics": { "name": "EthicsCell", "description": "Safety classification and PII detection", "heads": ["classifier", "pii_scanner"], }, "empathy": { "name": "EmpathyCell", "description": "Emotional tone detection and empathetic responses", "heads": ["tone_detector"], }, "curiosity": { "name": "CuriosityCell", "description": "Epistemic curiosity and hypothesis generation", "heads": ["hypothesis_generator"], }, "figliteral": { "name": "FigLiteralCell", "description": "Figurative vs literal language classification", "heads": ["classifier"], }, "r2p": { "name": "R2PCell", "description": "Requirements-to-Plan structured decomposition", "heads": ["planner"], }, "telemetry": { "name": "TelemetryCell", "description": "Telemetry event capture and structuring", "heads": ["collector"], }, "aggregator": { "name": "AggregatorCell", "description": "Multi-expert output aggregation", "heads": ["weighted_average", "max_confidence", "ensemble"], }, } def execute_cell(cell_type: str, text: str, config_json: str = "{}") -> dict[str, Any]: """Execute a cognitive cell and return structured results.""" start = time.monotonic() # BUG-011 fix: validate empty input if not text or not text.strip(): return { "cell_type": cell_type, "status": "error", "message": "Input text is required. Please provide some text to process.", "elapsed_ms": 0.0, } # BUG-002 fix: return error on invalid JSON instead of silently ignoring try: config = json.loads(config_json) if config_json.strip() else {} except json.JSONDecodeError as e: return { "cell_type": cell_type, "status": "error", "message": f"Invalid JSON config: {e}", "elapsed_ms": 0.0, } request_id = f"req-{uuid.uuid4().hex[:12]}" # Cell-specific logic result: dict[str, Any] = { "cell_type": cell_type, "request_id": request_id, "status": "ok", } if cell_type == "reasoning": head = config.get("head_type", "rule") words = text.split() sections = [] chunk_size = max(len(words) // 3, 1) for i in range(0, len(words), chunk_size): chunk = " ".join(words[i : i + chunk_size]) sections.append({ "text": chunk, "confidence": round(random.uniform(0.7, 0.99), 3), "boundary_type": random.choice(["topic_shift", "elaboration", "conclusion"]), }) result["head_type"] = head result["sections"] = sections result["section_count"] = len(sections) elif cell_type == "memory": # Preference extraction preferences = [] if "prefer" in text.lower() or "like" in text.lower(): preferences.append({ "type": "explicit", "value": text, "confidence": 0.95, }) if "always" in text.lower() or "usually" in text.lower(): preferences.append({ "type": "implicit", "value": text, "confidence": 0.72, }) result["preferences"] = preferences result["opt_out"] = "don't remember" in text.lower() result["consent_status"] = "granted" elif cell_type == "causal": # Simulated causal inference result["mode"] = config.get("mode", "do_calculus") result["variables"] = [w for w in text.split() if len(w) > 3][:5] result["causal_effect"] = round(random.uniform(-0.5, 0.8), 3) result["confidence_interval"] = [ round(result["causal_effect"] - 0.15, 3), round(result["causal_effect"] + 0.15, 3), ] elif cell_type == "ethics": # PII detection import re pii_patterns = { "email": r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}", "phone": r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b", "ssn": r"\b\d{3}-\d{2}-\d{4}\b", } pii_found = [] redacted = text for pii_type, pattern in pii_patterns.items(): matches = re.findall(pattern, text) for m in matches: pii_found.append({"type": pii_type, "value": "[REDACTED]"}) redacted = redacted.replace(m, "[REDACTED]") result["is_safe"] = len(pii_found) == 0 result["pii_detected"] = pii_found result["redacted_text"] = redacted result["risk_score"] = round(random.uniform(0.0, 0.3) if not pii_found else random.uniform(0.6, 0.9), 3) elif cell_type == "empathy": # BUG-007 fix: keyword-based emotion detection instead of random lower = text.lower() emotion_keywords: dict[str, list[str]] = { "frustration": ["frustrat", "annoy", "angry", "upset", "fail", "broken", "stuck", "overwhelm"], "anxiety": ["worry", "anxious", "nervous", "afraid", "fear", "concern", "stress", "uncertain"], "excitement": ["excit", "amazing", "awesome", "great", "love", "fantastic", "thrilled", "happy"], "satisfaction": ["satisfied", "pleased", "good", "well", "success", "accomplish", "done", "complete"], "confusion": ["confus", "unclear", "don't understand", "what does", "how does", "lost", "puzzle"], } emotion_scores: dict[str, int] = {} for emotion, keywords in emotion_keywords.items(): emotion_scores[emotion] = sum(1 for kw in keywords if kw in lower) best_emotion = max(emotion_scores, key=lambda e: emotion_scores[e]) detected = best_emotion if emotion_scores[best_emotion] > 0 else "neutral" confidence = min(0.95, 0.6 + emotion_scores.get(detected, 0) * 0.1) responses = { "neutral": "I understand your message. How can I help further?", "frustration": "I can see this is frustrating. Let me help resolve this.", "excitement": "That's great news! Let's build on that momentum.", "confusion": "Let me clarify that for you step by step.", "satisfaction": "Glad to hear things are going well!", "anxiety": "I understand your concern. Let's work through this together.", } result["detected_emotion"] = detected result["confidence"] = round(confidence, 3) result["empathetic_response"] = responses[detected] elif cell_type == "curiosity": # BUG-013 fix: generate topic-aware questions from extracted keywords words = [w for w in text.split() if len(w) > 3] topics = list(dict.fromkeys(words[:5])) # unique, order-preserved topic_str = ", ".join(topics[:3]) if topics else "this topic" questions = [ f"What are the underlying assumptions behind {topic_str}?", f"How would the outcome differ if we changed the approach to {topics[0] if topics else 'this'}?", f"What related problems have been solved in adjacent domains?", f"What are the second-order effects of {topics[1] if len(topics) > 1 else 'this decision'}?", f"What evidence would disprove our current hypothesis about {topic_str}?", ] max_q = config.get("max_questions", 3) result["questions"] = questions[:max_q] # Novelty based on lexical diversity unique_ratio = len(set(text.lower().split())) / max(len(text.split()), 1) result["novelty_score"] = round(min(unique_ratio + 0.3, 0.95), 3) elif cell_type == "figliteral": # BUG-012 fix: generate actual literal decomposition figurative_map: dict[str, str] = { "raining cats and dogs": "raining very heavily", "piece of cake": "something very easy to do", "break a leg": "good luck; perform well", "time flies": "time appears to pass quickly", "hit the nail on the head": "to be exactly right", "spill the beans": "to reveal a secret", "under the weather": "feeling sick or unwell", "bite the bullet": "to endure a painful situation with courage", } figurative_markers = ["like a", "as if"] + list(figurative_map.keys()) is_figurative = any(m in text.lower() for m in figurative_markers) result["classification"] = "figurative" if is_figurative else "literal" result["confidence"] = round(0.9 if is_figurative else 0.85, 3) if is_figurative: # Find which idiom matched and provide its literal meaning literal_parts = [] remaining = text for idiom, meaning in figurative_map.items(): if idiom in text.lower(): literal_parts.append(f"'{idiom}' = {meaning}") if not literal_parts and ("like a" in text.lower() or "as if" in text.lower()): literal_parts.append("Contains simile/metaphor — direct comparison without figurative intent") result["literal_interpretation"] = "; ".join(literal_parts) if literal_parts else "No specific idiom decomposition available" result["figurative_elements"] = literal_parts elif cell_type == "r2p": steps = [ {"step": 1, "action": "Analyze requirements", "estimated_effort": "2h"}, {"step": 2, "action": "Design solution architecture", "estimated_effort": "4h"}, {"step": 3, "action": "Implement core logic", "estimated_effort": "8h"}, {"step": 4, "action": "Write tests", "estimated_effort": "4h"}, {"step": 5, "action": "Deploy and validate", "estimated_effort": "2h"}, ] result["plan"] = steps result["total_effort"] = "20h" result["success_criteria"] = ["All tests pass", "Performance targets met", "Code reviewed"] elif cell_type == "telemetry": # BUG-014 fix: extract structured event attributes from input import re as _re result["event_recorded"] = True result["trace_id"] = f"trace-{uuid.uuid4().hex[:8]}" result["timestamp"] = time.time() # Parse structured fields from natural language lower = text.lower() attrs: dict[str, Any] = {"source": "cognitive_cell", "cell_type": cell_type} # Extract action verbs action_map = {"click": "click", "submit": "submit", "scroll": "scroll", "navigate": "navigate", "hover": "hover", "type": "input", "select": "select", "drag": "drag", "drop": "drop", "open": "open"} for verb, action in action_map.items(): if verb in lower: attrs["action"] = action break # Extract numeric durations dur_match = _re.search(r"(\d+)\s*(?:second|sec|ms|minute|min)", lower) if dur_match: attrs["duration_value"] = int(dur_match.group(1)) unit = dur_match.group(0).replace(dur_match.group(1), "").strip() attrs["duration_unit"] = unit # Extract page/element references page_match = _re.search(r"(?:on|at|in)\s+(?:the\s+)?(\w+)\s+page", lower) if page_match: attrs["page"] = page_match.group(1) elem_match = _re.search(r"(?:click|clicked|press|pressed|hit)\s+(?:the\s+)?(\w+)", lower) if elem_match: attrs["element"] = elem_match.group(1) result["metadata"] = attrs result["parsed_attributes"] = {k: v for k, v in attrs.items() if k not in ("source", "cell_type")} elif cell_type == "aggregator": # BUG-001 fix: run real sub-cell aggregation instead of placeholder strategy = config.get("strategy", "weighted_average") sub_cells = config.get("sub_cells", ["reasoning", "ethics", "causal"]) sub_results = [] for sc in sub_cells: if sc in CELL_TYPES and sc != "aggregator": # prevent recursion sr = execute_cell(sc, text) sub_results.append({ "cell": sc, "status": sr.get("status", "ok"), "confidence": sr.get("confidence", sr.get("risk_score", 0.8)), "elapsed_ms": sr.get("elapsed_ms", 0), }) # Compute aggregated confidence if sub_results: confidences = [r["confidence"] for r in sub_results if isinstance(r["confidence"], (int, float))] if strategy == "max_confidence": agg_confidence = max(confidences) if confidences else 0.0 elif strategy == "ensemble": agg_confidence = sum(confidences) / len(confidences) if confidences else 0.0 else: # weighted_average weights = [1.0 / (i + 1) for i in range(len(confidences))] w_sum = sum(weights) agg_confidence = sum(c * w for c, w in zip(confidences, weights)) / w_sum if w_sum else 0.0 else: agg_confidence = 0.0 result["strategy"] = strategy result["sub_cell_results"] = sub_results result["cells_aggregated"] = len(sub_results) result["aggregated_output"] = f"Aggregated {len(sub_results)} cells via {strategy}" result["confidence"] = round(agg_confidence, 3) elapsed = (time.monotonic() - start) * 1000 result["elapsed_ms"] = round(elapsed, 2) return result def compose_cells(pipeline_str: str, text: str) -> dict[str, Any]: """Execute a pipeline of cells sequentially.""" cell_types = [c.strip() for c in pipeline_str.split(",") if c.strip()] if not cell_types: return {"error": "No cell types specified"} activations = [] context: dict[str, Any] = {} final_output: dict[str, Any] = {} for ct in cell_types: if ct not in CELL_TYPES: activations.append({"cell_type": ct, "status": "error", "message": f"Unknown cell type: {ct}"}) continue result = execute_cell(ct, text) activations.append({ "cell_type": ct, "status": result.get("status", "ok"), "elapsed_ms": result.get("elapsed_ms", 0), }) context.update({k: v for k, v in result.items() if k not in ("request_id", "elapsed_ms")}) final_output = result return { "pipeline": cell_types, "activations": activations, "final_output": final_output, "total_cells": len(cell_types), "context_keys": list(context.keys()), } # ═══════════════════════════════════════════════════════════════════════════ # SECTION 4: MCTS Planning Engine # ═══════════════════════════════════════════════════════════════════════════ TASK_CATEGORIES = { "architecture": ["service_split", "api_gateway", "data_layer", "security_layer", "caching"], "implementation": ["requirements", "design", "code", "test", "deploy"], "optimization": ["profile", "identify_bottleneck", "optimize", "validate", "benchmark"], "security": ["asset_inventory", "threat_enumeration", "risk_scoring", "mitigations", "audit"], "research": ["literature_review", "comparison", "synthesis", "recommendations", "publish"], } @dataclass class MCTSNode: """Node in the MCTS search tree.""" id: str action: str visits: int = 0 total_value: float = 0.0 policy_prior: float = 0.0 children: list["MCTSNode"] | None = None def ucb1_score(self, parent_visits: int, c: float = 1.414) -> float: if self.visits == 0: return float("inf") exploitation = self.total_value / self.visits exploration = c * math.sqrt(math.log(parent_visits) / self.visits) return exploitation + exploration def puct_score(self, parent_visits: int, c: float = 1.0) -> float: if self.visits == 0: return float("inf") exploitation = self.total_value / self.visits exploration = c * self.policy_prior * math.sqrt(parent_visits) / (1 + self.visits) return exploitation + exploration def to_dict(self, max_depth: int = 3) -> dict[str, Any]: d: dict[str, Any] = { "id": self.id, "action": self.action, "visits": self.visits, "value": round(self.total_value / max(self.visits, 1), 3), "policy_prior": round(self.policy_prior, 3), } if self.children and max_depth > 0: d["children"] = [ c.to_dict(max_depth - 1) for c in sorted(self.children, key=lambda n: -n.visits)[:5] ] return d def run_mcts( task: str, max_simulations: int = 100, exploration_constant: float = 1.414, strategy: str = "ucb1", ) -> dict[str, Any]: """Run MCTS planning on a task and return the search tree.""" start = time.monotonic() # Detect task category lower = task.lower() category = "implementation" for cat, keywords in { "architecture": ["architect", "design", "micro", "system"], "security": ["security", "threat", "vulnerability", "attack"], "optimization": ["optimize", "performance", "latency", "speed"], "research": ["research", "survey", "study", "analyze"], }.items(): if any(k in lower for k in keywords): category = cat break actions = TASK_CATEGORIES[category] # Build tree root = MCTSNode(id="root", action=task[:50], children=[]) # Use NN priors if torch available if _TORCH: policy_net = PolicyNetwork(d_in=128, n_actions=len(actions)) value_net = ValueNetwork(d_in=192) policy_net.eval() value_net.eval() for sim in range(max_simulations): # SELECT: find best leaf node = root # EXPAND: add children if needed if not node.children: node.children = [] for i, act in enumerate(actions): prior = random.uniform(0.1, 0.5) if _TORCH: embed = torch.randn(1, 128) with torch.no_grad(): priors = policy_net(embed)[0] prior = priors[i % len(priors)].item() node.children.append( MCTSNode( id=f"{act}-{sim}", action=act, policy_prior=prior, children=[], ) ) # Select best child score_fn = ( (lambda n: n.ucb1_score(root.visits + 1, exploration_constant)) if strategy == "ucb1" else (lambda n: n.puct_score(root.visits + 1, exploration_constant)) ) best_child = max(node.children, key=score_fn) # SIMULATE: get value estimate if _TORCH: state = torch.randn(1, 192) with torch.no_grad(): value = value_net(state).item() else: value = random.uniform(0.3, 0.9) # BACKPROPAGATE best_child.visits += 1 best_child.total_value += value root.visits += 1 elapsed = (time.monotonic() - start) * 1000 # Best plan if root.children: best = max(root.children, key=lambda n: n.visits) best_action = best.action best_value = round(best.total_value / max(best.visits, 1), 3) else: best_action = "none" best_value = 0.0 return { "task": task, "category": category, "strategy": strategy, "best_action": best_action, "best_value": best_value, "total_simulations": max_simulations, "exploration_constant": exploration_constant, "tree": root.to_dict(max_depth=2), "all_actions": [ { "action": c.action, "visits": c.visits, "value": round(c.total_value / max(c.visits, 1), 3), } for c in sorted(root.children or [], key=lambda n: -n.visits) ], "elapsed_ms": round(elapsed, 2), "nn_enabled": _TORCH, } def benchmark_strategies(task: str) -> dict[str, Any]: """Compare MCTS vs Greedy vs Random on the same task.""" # BUG-005 fix: implement real greedy and random strategies results = {} # Detect category for action pool lower = task.lower() category = "implementation" for cat, keywords in { "architecture": ["architect", "design", "micro", "system"], "security": ["security", "threat", "vulnerability", "attack"], "optimization": ["optimize", "performance", "latency", "speed"], "research": ["research", "survey", "study", "analyze"], }.items(): if any(k in lower for k in keywords): category = cat break actions = TASK_CATEGORIES[category] # MCTS — full tree search start = time.monotonic() r = run_mcts(task, max_simulations=100) elapsed_mcts = (time.monotonic() - start) * 1000 results["mcts"] = { "quality_score": r["best_value"], "best_action": r["best_action"], "elapsed_ms": round(elapsed_mcts, 2), } # Greedy — single-step: pick action with highest policy prior start = time.monotonic() if _TORCH: policy_net = PolicyNetwork(d_in=128, n_actions=len(actions)) policy_net.eval() torch.manual_seed(hash(task) % (2**31)) embed = torch.randn(1, 128) with torch.no_grad(): priors = policy_net(embed)[0].numpy() best_idx = int(np.argmax(priors)) greedy_action = actions[best_idx] greedy_quality = float(priors[best_idx]) else: greedy_quality = max(random.uniform(0.1, 0.3) for _ in actions) greedy_action = random.choice(actions) elapsed_greedy = (time.monotonic() - start) * 1000 results["greedy"] = { "quality_score": round(greedy_quality, 3), "best_action": greedy_action, "elapsed_ms": round(elapsed_greedy, 2), } # Random — pick random action with random value start = time.monotonic() random_action = random.choice(actions) # Simulate value via value network if _TORCH: value_net = ValueNetwork(d_in=192) value_net.eval() torch.manual_seed(hash(task + random_action) % (2**31)) state = torch.randn(1, 192) with torch.no_grad(): random_quality = value_net(state).item() else: random_quality = random.uniform(-0.5, 0.5) elapsed_random = (time.monotonic() - start) * 1000 results["random"] = { "quality_score": round(random_quality, 3), "best_action": random_action, "elapsed_ms": round(elapsed_random, 2), } return {"task": task, "category": category, "results": results} def plot_mcts_tree(tree_data: dict) -> go.Figure: """Create a sunburst visualization of the MCTS tree.""" ids, labels, parents, values, colors = [], [], [], [], [] def _walk(node: dict, parent_id: str = "") -> None: nid = node["id"] ids.append(nid) labels.append(f"{node['action']}\n(v={node.get('value', 0)}, n={node.get('visits', 0)})") parents.append(parent_id) values.append(max(node.get("visits", 1), 1)) colors.append(node.get("value", 0)) for child in node.get("children", []): _walk(child, nid) _walk(tree_data) fig = go.Figure(go.Sunburst( ids=ids, labels=labels, parents=parents, values=values, marker=dict(colors=colors, colorscale="Viridis", showscale=True), branchvalues="total", )) fig.update_layout( title="MCTS Search Tree", height=500, template="plotly_dark", margin=dict(t=40, l=0, r=0, b=0), ) return fig # ═══════════════════════════════════════════════════════════════════════════ # SECTION 5: MoE Routing # ═══════════════════════════════════════════════════════════════════════════ EXPERT_NAMES = [ "Code Expert", "Test Expert", "Design Expert", "Research Expert", "Architecture Expert", "Security Expert", "Performance Expert", "Docs Expert", ] # BUG-003 fix: singleton router with fixed seed for deterministic routing _ROUTER_SEED = 42 _router_net_singleton: "RouterNet | None" = None def _get_router() -> "RouterNet": """Get or create the singleton RouterNet with a fixed seed.""" global _router_net_singleton if _router_net_singleton is None and _TORCH: torch.manual_seed(_ROUTER_SEED) _router_net_singleton = RouterNet(d_in=64, n_out=len(EXPERT_NAMES)) _router_net_singleton.eval() return _router_net_singleton # type: ignore[return-value] def route_task(task: str, top_k: int = 3) -> dict[str, Any]: """Route a task through the neural MoE gate.""" start = time.monotonic() features = featurize64(task) feature_tensor = None if _TORCH: # BUG-003 fix: use singleton router (deterministic weights) router = _get_router() feature_tensor = torch.tensor([features], dtype=torch.float32) with torch.no_grad(): weights = router(feature_tensor)[0].numpy() else: # Fallback: deterministic routing from features weights = np.array([abs(f) for f in features[:len(EXPERT_NAMES)]]) weights = weights / (weights.sum() + 1e-8) # BUG-004 fix: apply keyword-based semantic boost to expert routing lower_task = task.lower() expert_keywords: dict[int, list[str]] = { 0: ["code", "implement", "function", "class", "program", "script", "module"], # Code Expert 1: ["test", "unit test", "coverage", "qa", "assert", "mock", "fixture"], # Test Expert 2: ["design", "ui", "ux", "layout", "wireframe", "mockup", "style"], # Design Expert 3: ["research", "analyze", "study", "survey", "literature", "paper", "compare"], # Research Expert 4: ["architect", "system", "microservice", "scale", "pattern", "infrastructure"], # Architecture Expert 5: ["security", "auth", "encrypt", "threat", "vulnerab", "owasp", "pci", "compliance"], # Security Expert 6: ["performance", "optimize", "latency", "throughput", "cache", "speed", "fast"], # Performance Expert 7: ["document", "readme", "docs", "comment", "explain", "write", "manual"], # Docs Expert } boost = np.zeros(len(EXPERT_NAMES)) for idx, kws in expert_keywords.items(): for kw in kws: if kw in lower_task: boost[idx] += 0.15 # increase weight for matching keywords # Apply boost and renormalize weights = weights + boost weights = weights / (weights.sum() + 1e-8) # Top-K selection top_indices = np.argsort(weights)[::-1][:top_k] selected = [ { "expert": EXPERT_NAMES[i], "weight": round(float(weights[i]), 4), "rank": rank + 1, } for rank, i in enumerate(top_indices) ] elapsed = (time.monotonic() - start) * 1000 return { "task": task, "features": features, "all_weights": {EXPERT_NAMES[i]: round(float(weights[i]), 4) for i in range(len(EXPERT_NAMES))}, "selected_experts": selected, "top_k": top_k, "nn_enabled": _TORCH, "elapsed_ms": round(elapsed, 2), } def plot_expert_weights(weights: dict[str, float]) -> go.Figure: """Create a bar chart of expert routing weights.""" names = list(weights.keys()) vals = list(weights.values()) colors = ["#FF6B6B", "#4ECDC4", "#45B7D1", "#96CEB4", "#FFEAA7", "#DDA0DD", "#F0E68C", "#87CEEB"] fig = go.Figure( data=[go.Bar(x=names, y=vals, marker_color=colors[:len(names)])], layout=go.Layout( title="Expert Routing Weights", # BUG-016 fix: shortened title yaxis=dict(title="Weight (softmax)", range=[0, max(vals) * 1.2]), height=350, template="plotly_dark", margin=dict(t=40), ), ) return fig # ═══════════════════════════════════════════════════════════════════════════ # SECTION 6: Agent Orchestration # ═══════════════════════════════════════════════════════════════════════════ AGENTS = [ {"name": "SWE Agent", "specialization": "Code scaffold generation", "icon": "[SWE]"}, {"name": "Architect Agent", "specialization": "System design and patterns", "icon": "[ARCH]"}, {"name": "QA Agent", "specialization": "Test plan and case generation", "icon": "[QA]"}, {"name": "Security Agent", "specialization": "Threat modeling (OWASP)", "icon": "[SEC]"}, {"name": "DevOps Agent", "specialization": "Infrastructure planning", "icon": "[OPS]"}, {"name": "Research Agent", "specialization": "Technical analysis", "icon": "[RES]"}, {"name": "Performance Agent", "specialization": "Optimization analysis", "icon": "[PERF]"}, {"name": "Documentation Agent", "specialization": "Technical writing", "icon": "[DOC]"}, ] # Agent-to-cell mapping for real processing _AGENT_CELL_MAP: dict[str, str] = { "SWE Agent": "reasoning", "Architect Agent": "r2p", "QA Agent": "reasoning", "Security Agent": "ethics", "DevOps Agent": "telemetry", "Research Agent": "causal", "Performance Agent": "reasoning", "Documentation Agent": "reasoning", } def orchestrate(task: str, max_agents: int = 3, strategy: str = "moe_routing") -> dict[str, Any]: """Orchestrate multiple agents for a task using specified routing strategy.""" start = time.monotonic() # BUG-010 fix: implement all three routing strategies if strategy == "round_robin": # Select agents in round-robin order selected_agents = AGENTS[:max_agents] agent_results = [] for i, agent in enumerate(selected_agents): # BUG-009 fix: execute real cell per agent cell_type = _AGENT_CELL_MAP.get(agent["name"], "reasoning") cell_result = execute_cell(cell_type, task) agent_results.append({ "agent": agent["name"], "icon": agent["icon"], "specialization": agent["specialization"], "weight": round(1.0 / max_agents, 4), "cell_used": cell_type, "output": cell_result, "confidence": cell_result.get("confidence", round(0.8, 3)), }) elif strategy == "random": # Randomly select agents import random as _rnd shuffled = _rnd.sample(AGENTS, min(max_agents, len(AGENTS))) agent_results = [] for agent in shuffled: cell_type = _AGENT_CELL_MAP.get(agent["name"], "reasoning") cell_result = execute_cell(cell_type, task) agent_results.append({ "agent": agent["name"], "icon": agent["icon"], "specialization": agent["specialization"], "weight": round(1.0 / max_agents, 4), "cell_used": cell_type, "output": cell_result, "confidence": cell_result.get("confidence", round(0.8, 3)), }) else: # moe_routing (default) routing = route_task(task, top_k=max_agents) agent_results = [] for expert in routing["selected_experts"]: agent_name = expert["expert"].replace(" Expert", " Agent") agent = next((a for a in AGENTS if agent_name in a["name"]), AGENTS[0]) # BUG-009 fix: execute real cell per agent cell_type = _AGENT_CELL_MAP.get(agent["name"], "reasoning") cell_result = execute_cell(cell_type, task) agent_results.append({ "agent": agent["name"], "icon": agent["icon"], "specialization": agent["specialization"], "weight": expert["weight"], "cell_used": cell_type, "output": cell_result, "confidence": cell_result.get("confidence", round(0.8, 3)), }) elapsed = (time.monotonic() - start) * 1000 return { "task": task, "strategy": strategy, "agents_selected": len(agent_results), "max_agents": max_agents, "results": agent_results, "total_elapsed_ms": round(elapsed, 2), } # ═══════════════════════════════════════════════════════════════════════════ # SECTION 7: Gradio Interface # ═══════════════════════════════════════════════════════════════════════════ THEME = gr.themes.Soft( primary_hue="amber", secondary_hue="orange", neutral_hue="stone", font=gr.themes.GoogleFont("Inter"), ) CSS = """ .main-header { text-align: center; margin-bottom: 1rem; } .main-header h1 { background: linear-gradient(135deg, #FF6B6B, #FFEAA7, #4ECDC4); -webkit-background-clip: text; -webkit-text-fill-color: transparent; font-size: 2.5rem; font-weight: 800; } .stat-box { background: linear-gradient(135deg, #1a1a2e, #16213e); border: 1px solid #0f3460; border-radius: 12px; padding: 1rem; text-align: center; color: #e8e8e8; } .stat-box h3 { color: #FFEAA7; margin: 0; font-size: 1.8rem; } .stat-box p { color: #a8a8a8; margin: 0; font-size: 0.85rem; } footer { display: none !important; } .plotly .main-svg { overflow: visible !important; } """ def build_app() -> gr.Blocks: """Build the complete Gradio application.""" with gr.Blocks(theme=THEME, css=CSS, title="MangoMAS — Multi-Agent Cognitive Architecture") as app: # Header gr.HTML("""

MangoMAS

Multi-Agent Cognitive Architecture — Interactive Demo

""") # Stats bar with gr.Row(): for label, value in [ ("Cognitive Cells", "10"), ("MoE Params", "~7M"), ("MCTS Strategies", "UCB1 + PUCT"), ("Expert Agents", "8"), ]: gr.HTML(f'

{value}

{label}

') # ── TAB 1: Cognitive Cells ───────────────────────────────────────── with gr.Tab("Cognitive Cells", id="cells"): gr.Markdown("### Execute any of the 10 biologically-inspired cognitive cells") with gr.Row(): cell_type = gr.Dropdown( choices=list(CELL_TYPES.keys()), value="reasoning", label="Cell Type", info="Select a cognitive cell to execute", ) cell_info = gr.Textbox( label="Description", value=CELL_TYPES["reasoning"]["description"], interactive=False, ) cell_input = gr.Textbox( label="Input Text", placeholder="Enter text to process through the cell...", value="Design a scalable microservices architecture with event-driven communication", lines=3, ) cell_config = gr.Textbox( label="Config (JSON, optional)", placeholder='{"head_type": "nn"}', value="{}", lines=1, ) cell_btn = gr.Button("Execute Cell", variant="primary") cell_output = gr.JSON(label="Cell Output") gr.Markdown("---\n### Cell Composition Pipeline") pipeline_input = gr.Textbox( label="Pipeline (comma-separated cell types)", value="ethics, reasoning, aggregator", placeholder="ethics, reasoning, memory", ) pipeline_text = gr.Textbox( label="Input Text", value="Analyze the security implications of this API design: user@example.com", lines=2, ) pipeline_btn = gr.Button("Run Pipeline", variant="secondary") pipeline_output = gr.JSON(label="Pipeline Result") # Wiring def on_cell_select(ct: str) -> str: return CELL_TYPES.get(ct, {}).get("description", "Unknown cell type") cell_type.change(on_cell_select, inputs=cell_type, outputs=cell_info) cell_btn.click(execute_cell, inputs=[cell_type, cell_input, cell_config], outputs=cell_output) pipeline_btn.click(compose_cells, inputs=[pipeline_input, pipeline_text], outputs=pipeline_output) # ── TAB 2: MCTS Planning ────────────────────────────────────────── with gr.Tab("MCTS Planning", id="mcts"): gr.Markdown("### Monte Carlo Tree Search with Policy/Value Neural Networks") with gr.Row(): mcts_task = gr.Textbox( label="Task to Plan", value="Design a secure, scalable REST API with authentication", lines=2, scale=3, ) with gr.Column(scale=1): mcts_sims = gr.Slider(10, 500, value=100, step=10, label="Simulations") mcts_c = gr.Slider(0.1, 3.0, value=1.414, step=0.1, label="Exploration Constant (C)") mcts_strat = gr.Radio(["ucb1", "puct"], value="ucb1", label="Selection Strategy") mcts_btn = gr.Button("Run MCTS", variant="primary") with gr.Row(): mcts_tree_plot = gr.Plot(label="Search Tree Visualization") mcts_json = gr.JSON(label="MCTS Result") gr.Markdown("---\n### Strategy Benchmark") bench_task = gr.Textbox( label="Benchmark Task", value="Optimize database query performance for high-throughput system", ) bench_btn = gr.Button("Run Benchmark", variant="secondary") bench_output = gr.JSON(label="Benchmark Results (MCTS vs Greedy vs Random)") def run_and_plot(task, sims, c, strat): result = run_mcts(task, int(sims), c, strat) fig = plot_mcts_tree(result["tree"]) return fig, result mcts_btn.click(run_and_plot, inputs=[mcts_task, mcts_sims, mcts_c, mcts_strat], outputs=[mcts_tree_plot, mcts_json]) bench_btn.click(benchmark_strategies, inputs=bench_task, outputs=bench_output) # ── TAB 3: MoE Router ───────────────────────────────────────────── with gr.Tab("MoE Router", id="moe"): gr.Markdown("### Neural Mixture-of-Experts Routing Gate") gr.Markdown( "The RouterNet MLP extracts 64-dimensional features from text, " "then routes to the top-K most relevant expert agents." ) with gr.Row(): moe_task = gr.Textbox( label="Task to Route", value="Implement a threat detection system with real-time alerting", lines=2, scale=3, ) moe_topk = gr.Slider(1, 8, value=3, step=1, label="Top-K Experts", scale=1) moe_btn = gr.Button("Route Task", variant="primary") with gr.Row(): moe_features_plot = gr.Plot(label="64-D Feature Vector") moe_weights_plot = gr.Plot(label="Expert Routing Weights") moe_json = gr.JSON(label="Routing Result") def route_and_plot(task, top_k): result = route_task(task, int(top_k)) feat_fig = plot_features(result["features"]) weight_fig = plot_expert_weights(result["all_weights"]) # Don't send features array to JSON (too large) display = {k: v for k, v in result.items() if k != "features"} return feat_fig, weight_fig, display moe_btn.click(route_and_plot, inputs=[moe_task, moe_topk], outputs=[moe_features_plot, moe_weights_plot, moe_json]) # ── TAB 4: Agent Orchestration ───────────────────────────────────── with gr.Tab("Agents", id="agents"): gr.Markdown("### Multi-Agent Orchestration with MoE Routing") with gr.Row(): orch_task = gr.Textbox( label="Task", value="Build a secure payment processing microservice with PCI compliance", lines=2, scale=3, ) with gr.Column(scale=1): orch_agents = gr.Slider(1, 8, value=3, step=1, label="Max Agents") orch_strat = gr.Dropdown( ["moe_routing", "round_robin", "random"], value="moe_routing", label="Routing Strategy", ) orch_btn = gr.Button("Orchestrate", variant="primary") orch_output = gr.JSON(label="Orchestration Result") gr.Markdown("---\n### Available Agents") agent_table = gr.Dataframe( value=[[a["icon"], a["name"], a["specialization"]] for a in AGENTS], headers=["", "Agent", "Specialization"], interactive=False, ) orch_btn.click(orchestrate, inputs=[orch_task, orch_agents, orch_strat], outputs=orch_output) # ── TAB 5: Architecture ──────────────────────────────────────────── with gr.Tab("Architecture", id="arch"): gr.Markdown(""" ### MangoMAS System Architecture ``` ┌──────────────────────────────────────────────────────────┐ │ FastAPI Gateway │ │ (Auth / Tenant Middleware) │ ├──────────────────────────────────────────────────────────┤ │ │ │ ┌──────────────┐ ┌───────────────────────────┐ │ │ │ MoE Input │────▶│ RouterNet (Neural Gate) │ │ │ │ Parser │ │ 64-dim → MLP → Softmax │ │ │ └──────────────┘ └─────────┬─────────────────┘ │ │ │ │ │ ┌───────┬───────┬───────┼───────┬───────┐ │ │ ▼ ▼ ▼ ▼ ▼ ▼ │ │ Expert Expert Expert Expert Expert Expert │ │ │ │ │ │ │ │ │ │ Agent Agent Agent Agent Agent Agent │ │ │ │ │ │ │ │ │ │ ┌─────┴───────┴───────┴───────┴───────┴───────┘ │ │ │ Cognitive Cell Layer │ │ │ [Reasoning│Memory│Ethics│Causal│Empathy│...] │ │ └─────────────────────┬────────────────────────┘ │ │ ▼ │ │ Aggregator Cell │ │ (weighted / ensemble / ranking) │ │ │ │ │ Feedback Loop → Router Update │ │ │ │ │ Response + Metrics + Traces │ └──────────────────────────────────────────────────────────┘ ``` ### Neural Network Components | Component | Architecture | Parameters | Latency | |-----------|-------------|------------|---------| | **MixtureOfExperts7M** | 16 Expert Towers (64→512→512→256) + Gate | ~7M | ~5ms | | **RouterNet** | MLP (64→128→64→8) + Softmax | ~17K | <1ms | | **PolicyNetwork** | MLP (128→256→128→32) + Softmax | ~70K | <1ms | | **ValueNetwork** | MLP (192→256→64→1) + Tanh | ~66K | <1ms | | **ReasoningCell NN Head** | Lightweight transformer | ~500K | ~50ms | ### Cognitive Cell Lifecycle ``` preprocess() → infer() → postprocess() → publish() │ │ │ │ Validate Core Logic Format Emit Event Normalize NN/Rule Filter (Event Bus) Enrich Inference Enrich ``` """) # ── TAB 6: Metrics ───────────────────────────────────────────────── with gr.Tab("Metrics", id="metrics"): gr.Markdown("### Live Performance Benchmarks") metrics_btn = gr.Button("Run All Benchmarks", variant="primary") with gr.Row(): metrics_routing = gr.Plot(label="Routing Latency by Expert Count") metrics_cells = gr.Plot(label="Cell Execution Latency") metrics_json = gr.JSON(label="Raw Metrics") def run_benchmarks(): # BUG-008 fix: warmup run to stabilize timing, then average over more iterations # Warmup: discard first run route_task("warmup", top_k=1) execute_cell("reasoning", "warmup") # Routing latency vs top-K ks = list(range(1, 9)) latencies = [] for k in ks: times = [] for _ in range(10): # increased from 5 to 10 for stability r = route_task("Test routing benchmark task", top_k=k) times.append(r["elapsed_ms"]) # Use median instead of mean to reduce outlier impact times.sort() latencies.append(times[len(times) // 2]) fig_routing = go.Figure( data=[go.Scatter(x=ks, y=latencies, mode="lines+markers", name="Routing Latency")], layout=go.Layout( title="Routing Latency vs Top-K", xaxis_title="Top-K Experts", yaxis_title="Latency (ms)", height=350, template="plotly_dark", ), ) # Cell execution latency cell_times: dict[str, float] = {} for ct in CELL_TYPES: times = [] for _ in range(5): # increased from 3 to 5 r = execute_cell(ct, "Benchmark test input for cell") times.append(r["elapsed_ms"]) times.sort() cell_times[ct] = times[len(times) // 2] # median fig_cells = go.Figure( data=[go.Bar( x=list(cell_times.keys()), y=list(cell_times.values()), marker_color=["#FF6B6B", "#4ECDC4", "#45B7D1", "#96CEB4", "#FFEAA7", "#DDA0DD", "#F0E68C", "#87CEEB", "#FFA07A", "#98FB98"], )], layout=go.Layout( title="Cell Execution Latency", xaxis=dict( title="Cell Type", tickangle=-30, # BUG-017 fix: reduce rotation angle tickfont=dict(size=9), ), yaxis_title="Latency (ms)", height=400, # slightly taller for label room template="plotly_dark", margin=dict(b=100), # BUG-017 fix: more bottom margin ), ) summary = { "torch_available": _TORCH, "routing_latency_p50_ms": round(sorted(latencies)[len(latencies) // 2], 3), "cell_latency_avg_ms": round(sum(cell_times.values()) / len(cell_times), 3), "total_nn_parameters": "~7.15M" if _TORCH else "N/A (CPU fallback)", } return fig_routing, fig_cells, summary metrics_btn.click(run_benchmarks, outputs=[metrics_routing, metrics_cells, metrics_json]) return app # ═══════════════════════════════════════════════════════════════════════════ # MAIN # ═══════════════════════════════════════════════════════════════════════════ if __name__ == "__main__": app = build_app() app.launch( server_name="0.0.0.0", server_port=7860, share=False, )