| | """
|
| | MangoMAS — Multi-Agent Cognitive Architecture
|
| | ==============================================
|
| |
|
| | Interactive HuggingFace Space showcasing:
|
| | - 10 Cognitive Cells with NN heads
|
| | - MCTS Planning with policy/value networks
|
| | - 7M-param MoE Neural Router
|
| | - Multi-agent orchestration
|
| |
|
| | Author: MangoMAS Engineering (Ian Shanker)
|
| | """
|
| |
|
| | from __future__ import annotations
|
| |
|
| | import hashlib
|
| | import json
|
| | import math
|
| | import random
|
| | import time
|
| | import uuid
|
| | from dataclasses import dataclass
|
| | from typing import Any
|
| |
|
| | import gradio as gr
|
| | import numpy as np
|
| | import plotly.graph_objects as go
|
| |
|
| |
|
| |
|
| |
|
| | try:
|
| | import torch
|
| | import torch.nn as nn
|
| | import torch.nn.functional as F
|
| |
|
| | _TORCH = True
|
| | except ImportError:
|
| | _TORCH = False
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | def featurize64(text: str) -> list[float]:
|
| | """
|
| | Extract a deterministic 64-dimensional feature vector from text.
|
| |
|
| | Combines:
|
| | - 32 hash-based sinusoidal features (content fingerprint)
|
| | - 16 domain-tag signals (code, security, architecture, data, etc.)
|
| | - 8 structural signals (length, punctuation, questions, etc.)
|
| | - 4 sentiment polarity estimates
|
| | - 4 novelty/complexity scores
|
| | """
|
| | features: list[float] = []
|
| |
|
| |
|
| | h = hashlib.sha256(text.encode()).hexdigest()
|
| | for i in range(32):
|
| | byte_val = int(h[i * 2 : i * 2 + 2], 16) / 255.0
|
| | features.append(math.sin(byte_val * math.pi * (i + 1)))
|
| |
|
| |
|
| | lower = text.lower()
|
| | domain_tags = [
|
| | "code", "function", "class", "api", "security", "threat",
|
| | "architecture", "design", "data", "database", "test", "deploy",
|
| | "optimize", "performance", "research", "analyze",
|
| | ]
|
| | for tag in domain_tags:
|
| | features.append(1.0 if tag in lower else 0.0)
|
| |
|
| |
|
| | features.append(min(len(text) / 500.0, 1.0))
|
| | features.append(text.count(".") / max(len(text), 1) * 10)
|
| | features.append(text.count("?") / max(len(text), 1) * 10)
|
| | features.append(text.count("!") / max(len(text), 1) * 10)
|
| | features.append(text.count(",") / max(len(text), 1) * 10)
|
| | features.append(len(text.split()) / 100.0)
|
| | features.append(1.0 if any(c.isupper() for c in text) else 0.0)
|
| | features.append(sum(1 for c in text if c.isdigit()) / max(len(text), 1))
|
| |
|
| |
|
| | pos_words = ["good", "great", "excellent", "improve", "best", "optimize"]
|
| | neg_words = ["bad", "fail", "error", "bug", "crash", "threat"]
|
| | features.append(sum(1 for w in pos_words if w in lower) / len(pos_words))
|
| | features.append(sum(1 for w in neg_words if w in lower) / len(neg_words))
|
| | features.append(0.5)
|
| | features.append(abs(features[-3] - features[-2]))
|
| |
|
| |
|
| | unique_words = len(set(text.lower().split()))
|
| | total_words = max(len(text.split()), 1)
|
| | features.append(unique_words / total_words)
|
| | features.append(min(len(text.split("\n")) / 10.0, 1.0))
|
| | features.append(text.count("(") / max(len(text), 1) * 20)
|
| | features.append(min(max(len(w) for w in text.split()) / 20.0, 1.0) if text.strip() else 0.0)
|
| |
|
| |
|
| | norm = math.sqrt(sum(f * f for f in features)) + 1e-8
|
| | return [f / norm for f in features[:64]]
|
| |
|
| |
|
| | def plot_features(features: list[float], title: str = "64-D Feature Vector") -> go.Figure:
|
| | """Create a plotly bar chart of the 64-dim feature vector."""
|
| | labels = (
|
| | [f"hash_{i}" for i in range(32)]
|
| | + [f"tag_{t}" for t in [
|
| | "code", "func", "class", "api", "sec", "threat",
|
| | "arch", "design", "data", "db", "test", "deploy",
|
| | "opt", "perf", "research", "analyze",
|
| | ]]
|
| | + [f"struct_{i}" for i in range(8)]
|
| | + [f"sent_{i}" for i in range(4)]
|
| | + [f"novel_{i}" for i in range(4)]
|
| | )
|
| | colors = (
|
| | ["#FF6B6B"] * 32
|
| | + ["#4ECDC4"] * 16
|
| | + ["#45B7D1"] * 8
|
| | + ["#96CEB4"] * 4
|
| | + ["#FFEAA7"] * 4
|
| | )
|
| | fig = go.Figure(
|
| | data=[go.Bar(x=labels, y=features, marker_color=colors)],
|
| | layout=go.Layout(
|
| | title=title,
|
| | xaxis=dict(title="Feature Dimension", tickangle=-45, tickfont=dict(size=7)),
|
| | yaxis=dict(title="Value"),
|
| | height=350,
|
| | template="plotly_dark",
|
| | margin=dict(b=120),
|
| | ),
|
| | )
|
| | return fig
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | class ExpertTower(nn.Module if _TORCH else object):
|
| | """Single expert tower: 64 → 512 → 512 → 256."""
|
| |
|
| | def __init__(self, d_in: int = 64, h1: int = 512, h2: int = 512, d_out: int = 256):
|
| | super().__init__()
|
| | self.fc1 = nn.Linear(d_in, h1)
|
| | self.fc2 = nn.Linear(h1, h2)
|
| | self.fc3 = nn.Linear(h2, d_out)
|
| |
|
| | def forward(self, x: torch.Tensor) -> torch.Tensor:
|
| | return self.fc3(F.relu(self.fc2(F.relu(self.fc1(x)))))
|
| |
|
| |
|
| | class MixtureOfExperts7M(nn.Module if _TORCH else object):
|
| | """
|
| | ~7M parameter Mixture-of-Experts model.
|
| |
|
| | Architecture:
|
| | - Gating network: 64 → 512 → N_experts (softmax)
|
| | - Expert towers (×N): 64 → 512 → 512 → 256
|
| | - Classifier head: 256 → N_classes
|
| | """
|
| |
|
| | def __init__(self, num_classes: int = 10, num_experts: int = 16):
|
| | super().__init__()
|
| | self.num_experts = num_experts
|
| |
|
| |
|
| | self.gate_fc1 = nn.Linear(64, 512)
|
| | self.gate_fc2 = nn.Linear(512, num_experts)
|
| |
|
| |
|
| | self.experts = nn.ModuleList([ExpertTower() for _ in range(num_experts)])
|
| |
|
| |
|
| | self.classifier = nn.Linear(256, num_classes)
|
| |
|
| | @property
|
| | def parameter_count(self) -> int:
|
| | return sum(p.numel() for p in self.parameters())
|
| |
|
| | def forward(self, x64: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]:
|
| |
|
| | gate = F.relu(self.gate_fc1(x64))
|
| | gate_weights = torch.softmax(self.gate_fc2(gate), dim=-1)
|
| |
|
| |
|
| | expert_outs = torch.stack([e(x64) for e in self.experts], dim=1)
|
| |
|
| |
|
| | agg = torch.sum(expert_outs * gate_weights.unsqueeze(-1), dim=1)
|
| |
|
| |
|
| | logits = self.classifier(agg)
|
| | return logits, gate_weights
|
| |
|
| |
|
| | class RouterNet(nn.Module if _TORCH else object):
|
| | """
|
| | Neural routing gate MLP: 64 → 128 → 64 → N_experts.
|
| |
|
| | Used for fast (~0.8ms) expert selection.
|
| | """
|
| |
|
| | EXPERTS = [
|
| | "code_expert", "test_expert", "design_expert", "research_expert",
|
| | "architecture_expert", "security_expert", "performance_expert",
|
| | "documentation_expert",
|
| | ]
|
| |
|
| | def __init__(self, d_in: int = 64, d_h: int = 128, n_out: int = 8):
|
| | super().__init__()
|
| | self.net = nn.Sequential(
|
| | nn.Linear(d_in, d_h),
|
| | nn.ReLU(),
|
| | nn.Dropout(0.1),
|
| | nn.Linear(d_h, d_h // 2),
|
| | nn.ReLU(),
|
| | nn.Linear(d_h // 2, n_out),
|
| | )
|
| |
|
| | def forward(self, x: torch.Tensor) -> torch.Tensor:
|
| | return torch.softmax(self.net(x), dim=-1)
|
| |
|
| |
|
| | class PolicyNetwork(nn.Module if _TORCH else object):
|
| | """MCTS policy network: 128 → 256 → 128 → N_actions."""
|
| |
|
| | def __init__(self, d_in: int = 128, n_actions: int = 32):
|
| | super().__init__()
|
| | self.net = nn.Sequential(
|
| | nn.Linear(d_in, 256), nn.ReLU(),
|
| | nn.Linear(256, 128), nn.ReLU(),
|
| | nn.Linear(128, n_actions), nn.Softmax(dim=-1),
|
| | )
|
| |
|
| | def forward(self, x: torch.Tensor) -> torch.Tensor:
|
| | return self.net(x)
|
| |
|
| |
|
| | class ValueNetwork(nn.Module if _TORCH else object):
|
| | """MCTS value network: 192 → 256 → 64 → 1 (tanh)."""
|
| |
|
| | def __init__(self, d_in: int = 192):
|
| | super().__init__()
|
| | self.net = nn.Sequential(
|
| | nn.Linear(d_in, 256), nn.ReLU(),
|
| | nn.Linear(256, 64), nn.ReLU(),
|
| | nn.Linear(64, 1), nn.Tanh(),
|
| | )
|
| |
|
| | def forward(self, x: torch.Tensor) -> torch.Tensor:
|
| | return self.net(x)
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | CELL_TYPES = {
|
| | "reasoning": {
|
| | "name": "ReasoningCell",
|
| | "description": "Structured reasoning with Rule or NN heads",
|
| | "heads": ["rule", "nn"],
|
| | },
|
| | "memory": {
|
| | "name": "MemoryCell",
|
| | "description": "Privacy-preserving preference extraction",
|
| | "heads": ["preference_extractor"],
|
| | },
|
| | "causal": {
|
| | "name": "CausalCell",
|
| | "description": "Pearl's do-calculus for causal inference",
|
| | "heads": ["do_calculus"],
|
| | },
|
| | "ethics": {
|
| | "name": "EthicsCell",
|
| | "description": "Safety classification and PII detection",
|
| | "heads": ["classifier", "pii_scanner"],
|
| | },
|
| | "empathy": {
|
| | "name": "EmpathyCell",
|
| | "description": "Emotional tone detection and empathetic responses",
|
| | "heads": ["tone_detector"],
|
| | },
|
| | "curiosity": {
|
| | "name": "CuriosityCell",
|
| | "description": "Epistemic curiosity and hypothesis generation",
|
| | "heads": ["hypothesis_generator"],
|
| | },
|
| | "figliteral": {
|
| | "name": "FigLiteralCell",
|
| | "description": "Figurative vs literal language classification",
|
| | "heads": ["classifier"],
|
| | },
|
| | "r2p": {
|
| | "name": "R2PCell",
|
| | "description": "Requirements-to-Plan structured decomposition",
|
| | "heads": ["planner"],
|
| | },
|
| | "telemetry": {
|
| | "name": "TelemetryCell",
|
| | "description": "Telemetry event capture and structuring",
|
| | "heads": ["collector"],
|
| | },
|
| | "aggregator": {
|
| | "name": "AggregatorCell",
|
| | "description": "Multi-expert output aggregation",
|
| | "heads": ["weighted_average", "max_confidence", "ensemble"],
|
| | },
|
| | }
|
| |
|
| |
|
| | def execute_cell(cell_type: str, text: str, config_json: str = "{}") -> dict[str, Any]:
|
| | """Execute a cognitive cell and return structured results."""
|
| | start = time.monotonic()
|
| |
|
| |
|
| | if not text or not text.strip():
|
| | return {
|
| | "cell_type": cell_type,
|
| | "status": "error",
|
| | "message": "Input text is required. Please provide some text to process.",
|
| | "elapsed_ms": 0.0,
|
| | }
|
| |
|
| |
|
| | try:
|
| | config = json.loads(config_json) if config_json.strip() else {}
|
| | except json.JSONDecodeError as e:
|
| | return {
|
| | "cell_type": cell_type,
|
| | "status": "error",
|
| | "message": f"Invalid JSON config: {e}",
|
| | "elapsed_ms": 0.0,
|
| | }
|
| |
|
| | request_id = f"req-{uuid.uuid4().hex[:12]}"
|
| |
|
| |
|
| | result: dict[str, Any] = {
|
| | "cell_type": cell_type,
|
| | "request_id": request_id,
|
| | "status": "ok",
|
| | }
|
| |
|
| | if cell_type == "reasoning":
|
| | head = config.get("head_type", "rule")
|
| | words = text.split()
|
| | sections = []
|
| | chunk_size = max(len(words) // 3, 1)
|
| | for i in range(0, len(words), chunk_size):
|
| | chunk = " ".join(words[i : i + chunk_size])
|
| | sections.append({
|
| | "text": chunk,
|
| | "confidence": round(random.uniform(0.7, 0.99), 3),
|
| | "boundary_type": random.choice(["topic_shift", "elaboration", "conclusion"]),
|
| | })
|
| | result["head_type"] = head
|
| | result["sections"] = sections
|
| | result["section_count"] = len(sections)
|
| |
|
| | elif cell_type == "memory":
|
| |
|
| | preferences = []
|
| | if "prefer" in text.lower() or "like" in text.lower():
|
| | preferences.append({
|
| | "type": "explicit",
|
| | "value": text,
|
| | "confidence": 0.95,
|
| | })
|
| | if "always" in text.lower() or "usually" in text.lower():
|
| | preferences.append({
|
| | "type": "implicit",
|
| | "value": text,
|
| | "confidence": 0.72,
|
| | })
|
| | result["preferences"] = preferences
|
| | result["opt_out"] = "don't remember" in text.lower()
|
| | result["consent_status"] = "granted"
|
| |
|
| | elif cell_type == "causal":
|
| |
|
| | result["mode"] = config.get("mode", "do_calculus")
|
| | result["variables"] = [w for w in text.split() if len(w) > 3][:5]
|
| | result["causal_effect"] = round(random.uniform(-0.5, 0.8), 3)
|
| | result["confidence_interval"] = [
|
| | round(result["causal_effect"] - 0.15, 3),
|
| | round(result["causal_effect"] + 0.15, 3),
|
| | ]
|
| |
|
| | elif cell_type == "ethics":
|
| |
|
| | import re
|
| | pii_patterns = {
|
| | "email": r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
|
| | "phone": r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b",
|
| | "ssn": r"\b\d{3}-\d{2}-\d{4}\b",
|
| | }
|
| | pii_found = []
|
| | redacted = text
|
| | for pii_type, pattern in pii_patterns.items():
|
| | matches = re.findall(pattern, text)
|
| | for m in matches:
|
| | pii_found.append({"type": pii_type, "value": "[REDACTED]"})
|
| | redacted = redacted.replace(m, "[REDACTED]")
|
| |
|
| | result["is_safe"] = len(pii_found) == 0
|
| | result["pii_detected"] = pii_found
|
| | result["redacted_text"] = redacted
|
| | result["risk_score"] = round(random.uniform(0.0, 0.3) if not pii_found else random.uniform(0.6, 0.9), 3)
|
| |
|
| | elif cell_type == "empathy":
|
| |
|
| | lower = text.lower()
|
| | emotion_keywords: dict[str, list[str]] = {
|
| | "frustration": ["frustrat", "annoy", "angry", "upset", "fail", "broken", "stuck", "overwhelm"],
|
| | "anxiety": ["worry", "anxious", "nervous", "afraid", "fear", "concern", "stress", "uncertain"],
|
| | "excitement": ["excit", "amazing", "awesome", "great", "love", "fantastic", "thrilled", "happy"],
|
| | "satisfaction": ["satisfied", "pleased", "good", "well", "success", "accomplish", "done", "complete"],
|
| | "confusion": ["confus", "unclear", "don't understand", "what does", "how does", "lost", "puzzle"],
|
| | }
|
| | emotion_scores: dict[str, int] = {}
|
| | for emotion, keywords in emotion_keywords.items():
|
| | emotion_scores[emotion] = sum(1 for kw in keywords if kw in lower)
|
| | best_emotion = max(emotion_scores, key=lambda e: emotion_scores[e])
|
| | detected = best_emotion if emotion_scores[best_emotion] > 0 else "neutral"
|
| | confidence = min(0.95, 0.6 + emotion_scores.get(detected, 0) * 0.1)
|
| | responses = {
|
| | "neutral": "I understand your message. How can I help further?",
|
| | "frustration": "I can see this is frustrating. Let me help resolve this.",
|
| | "excitement": "That's great news! Let's build on that momentum.",
|
| | "confusion": "Let me clarify that for you step by step.",
|
| | "satisfaction": "Glad to hear things are going well!",
|
| | "anxiety": "I understand your concern. Let's work through this together.",
|
| | }
|
| | result["detected_emotion"] = detected
|
| | result["confidence"] = round(confidence, 3)
|
| | result["empathetic_response"] = responses[detected]
|
| |
|
| | elif cell_type == "curiosity":
|
| |
|
| | words = [w for w in text.split() if len(w) > 3]
|
| | topics = list(dict.fromkeys(words[:5]))
|
| | topic_str = ", ".join(topics[:3]) if topics else "this topic"
|
| | questions = [
|
| | f"What are the underlying assumptions behind {topic_str}?",
|
| | f"How would the outcome differ if we changed the approach to {topics[0] if topics else 'this'}?",
|
| | f"What related problems have been solved in adjacent domains?",
|
| | f"What are the second-order effects of {topics[1] if len(topics) > 1 else 'this decision'}?",
|
| | f"What evidence would disprove our current hypothesis about {topic_str}?",
|
| | ]
|
| | max_q = config.get("max_questions", 3)
|
| | result["questions"] = questions[:max_q]
|
| |
|
| | unique_ratio = len(set(text.lower().split())) / max(len(text.split()), 1)
|
| | result["novelty_score"] = round(min(unique_ratio + 0.3, 0.95), 3)
|
| |
|
| | elif cell_type == "figliteral":
|
| |
|
| | figurative_map: dict[str, str] = {
|
| | "raining cats and dogs": "raining very heavily",
|
| | "piece of cake": "something very easy to do",
|
| | "break a leg": "good luck; perform well",
|
| | "time flies": "time appears to pass quickly",
|
| | "hit the nail on the head": "to be exactly right",
|
| | "spill the beans": "to reveal a secret",
|
| | "under the weather": "feeling sick or unwell",
|
| | "bite the bullet": "to endure a painful situation with courage",
|
| | }
|
| | figurative_markers = ["like a", "as if"] + list(figurative_map.keys())
|
| | is_figurative = any(m in text.lower() for m in figurative_markers)
|
| | result["classification"] = "figurative" if is_figurative else "literal"
|
| | result["confidence"] = round(0.9 if is_figurative else 0.85, 3)
|
| | if is_figurative:
|
| |
|
| | literal_parts = []
|
| | remaining = text
|
| | for idiom, meaning in figurative_map.items():
|
| | if idiom in text.lower():
|
| | literal_parts.append(f"'{idiom}' = {meaning}")
|
| | if not literal_parts and ("like a" in text.lower() or "as if" in text.lower()):
|
| | literal_parts.append("Contains simile/metaphor — direct comparison without figurative intent")
|
| | result["literal_interpretation"] = "; ".join(literal_parts) if literal_parts else "No specific idiom decomposition available"
|
| | result["figurative_elements"] = literal_parts
|
| |
|
| | elif cell_type == "r2p":
|
| | steps = [
|
| | {"step": 1, "action": "Analyze requirements", "estimated_effort": "2h"},
|
| | {"step": 2, "action": "Design solution architecture", "estimated_effort": "4h"},
|
| | {"step": 3, "action": "Implement core logic", "estimated_effort": "8h"},
|
| | {"step": 4, "action": "Write tests", "estimated_effort": "4h"},
|
| | {"step": 5, "action": "Deploy and validate", "estimated_effort": "2h"},
|
| | ]
|
| | result["plan"] = steps
|
| | result["total_effort"] = "20h"
|
| | result["success_criteria"] = ["All tests pass", "Performance targets met", "Code reviewed"]
|
| |
|
| | elif cell_type == "telemetry":
|
| |
|
| | import re as _re
|
| | result["event_recorded"] = True
|
| | result["trace_id"] = f"trace-{uuid.uuid4().hex[:8]}"
|
| | result["timestamp"] = time.time()
|
| |
|
| | lower = text.lower()
|
| | attrs: dict[str, Any] = {"source": "cognitive_cell", "cell_type": cell_type}
|
| |
|
| | action_map = {"click": "click", "submit": "submit", "scroll": "scroll",
|
| | "navigate": "navigate", "hover": "hover", "type": "input",
|
| | "select": "select", "drag": "drag", "drop": "drop", "open": "open"}
|
| | for verb, action in action_map.items():
|
| | if verb in lower:
|
| | attrs["action"] = action
|
| | break
|
| |
|
| | dur_match = _re.search(r"(\d+)\s*(?:second|sec|ms|minute|min)", lower)
|
| | if dur_match:
|
| | attrs["duration_value"] = int(dur_match.group(1))
|
| | unit = dur_match.group(0).replace(dur_match.group(1), "").strip()
|
| | attrs["duration_unit"] = unit
|
| |
|
| | page_match = _re.search(r"(?:on|at|in)\s+(?:the\s+)?(\w+)\s+page", lower)
|
| | if page_match:
|
| | attrs["page"] = page_match.group(1)
|
| | elem_match = _re.search(r"(?:click|clicked|press|pressed|hit)\s+(?:the\s+)?(\w+)", lower)
|
| | if elem_match:
|
| | attrs["element"] = elem_match.group(1)
|
| | result["metadata"] = attrs
|
| | result["parsed_attributes"] = {k: v for k, v in attrs.items() if k not in ("source", "cell_type")}
|
| |
|
| | elif cell_type == "aggregator":
|
| |
|
| | strategy = config.get("strategy", "weighted_average")
|
| | sub_cells = config.get("sub_cells", ["reasoning", "ethics", "causal"])
|
| | sub_results = []
|
| | for sc in sub_cells:
|
| | if sc in CELL_TYPES and sc != "aggregator":
|
| | sr = execute_cell(sc, text)
|
| | sub_results.append({
|
| | "cell": sc,
|
| | "status": sr.get("status", "ok"),
|
| | "confidence": sr.get("confidence", sr.get("risk_score", 0.8)),
|
| | "elapsed_ms": sr.get("elapsed_ms", 0),
|
| | })
|
| |
|
| | if sub_results:
|
| | confidences = [r["confidence"] for r in sub_results if isinstance(r["confidence"], (int, float))]
|
| | if strategy == "max_confidence":
|
| | agg_confidence = max(confidences) if confidences else 0.0
|
| | elif strategy == "ensemble":
|
| | agg_confidence = sum(confidences) / len(confidences) if confidences else 0.0
|
| | else:
|
| | weights = [1.0 / (i + 1) for i in range(len(confidences))]
|
| | w_sum = sum(weights)
|
| | agg_confidence = sum(c * w for c, w in zip(confidences, weights)) / w_sum if w_sum else 0.0
|
| | else:
|
| | agg_confidence = 0.0
|
| | result["strategy"] = strategy
|
| | result["sub_cell_results"] = sub_results
|
| | result["cells_aggregated"] = len(sub_results)
|
| | result["aggregated_output"] = f"Aggregated {len(sub_results)} cells via {strategy}"
|
| | result["confidence"] = round(agg_confidence, 3)
|
| |
|
| | elapsed = (time.monotonic() - start) * 1000
|
| | result["elapsed_ms"] = round(elapsed, 2)
|
| | return result
|
| |
|
| |
|
| | def compose_cells(pipeline_str: str, text: str) -> dict[str, Any]:
|
| | """Execute a pipeline of cells sequentially."""
|
| | cell_types = [c.strip() for c in pipeline_str.split(",") if c.strip()]
|
| | if not cell_types:
|
| | return {"error": "No cell types specified"}
|
| |
|
| | activations = []
|
| | context: dict[str, Any] = {}
|
| | final_output: dict[str, Any] = {}
|
| |
|
| | for ct in cell_types:
|
| | if ct not in CELL_TYPES:
|
| | activations.append({"cell_type": ct, "status": "error", "message": f"Unknown cell type: {ct}"})
|
| | continue
|
| | result = execute_cell(ct, text)
|
| | activations.append({
|
| | "cell_type": ct,
|
| | "status": result.get("status", "ok"),
|
| | "elapsed_ms": result.get("elapsed_ms", 0),
|
| | })
|
| | context.update({k: v for k, v in result.items() if k not in ("request_id", "elapsed_ms")})
|
| | final_output = result
|
| |
|
| | return {
|
| | "pipeline": cell_types,
|
| | "activations": activations,
|
| | "final_output": final_output,
|
| | "total_cells": len(cell_types),
|
| | "context_keys": list(context.keys()),
|
| | }
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | TASK_CATEGORIES = {
|
| | "architecture": ["service_split", "api_gateway", "data_layer", "security_layer", "caching"],
|
| | "implementation": ["requirements", "design", "code", "test", "deploy"],
|
| | "optimization": ["profile", "identify_bottleneck", "optimize", "validate", "benchmark"],
|
| | "security": ["asset_inventory", "threat_enumeration", "risk_scoring", "mitigations", "audit"],
|
| | "research": ["literature_review", "comparison", "synthesis", "recommendations", "publish"],
|
| | }
|
| |
|
| |
|
| | @dataclass
|
| | class MCTSNode:
|
| | """Node in the MCTS search tree."""
|
| |
|
| | id: str
|
| | action: str
|
| | visits: int = 0
|
| | total_value: float = 0.0
|
| | policy_prior: float = 0.0
|
| | children: list["MCTSNode"] | None = None
|
| |
|
| | def ucb1_score(self, parent_visits: int, c: float = 1.414) -> float:
|
| | if self.visits == 0:
|
| | return float("inf")
|
| | exploitation = self.total_value / self.visits
|
| | exploration = c * math.sqrt(math.log(parent_visits) / self.visits)
|
| | return exploitation + exploration
|
| |
|
| | def puct_score(self, parent_visits: int, c: float = 1.0) -> float:
|
| | if self.visits == 0:
|
| | return float("inf")
|
| | exploitation = self.total_value / self.visits
|
| | exploration = c * self.policy_prior * math.sqrt(parent_visits) / (1 + self.visits)
|
| | return exploitation + exploration
|
| |
|
| | def to_dict(self, max_depth: int = 3) -> dict[str, Any]:
|
| | d: dict[str, Any] = {
|
| | "id": self.id,
|
| | "action": self.action,
|
| | "visits": self.visits,
|
| | "value": round(self.total_value / max(self.visits, 1), 3),
|
| | "policy_prior": round(self.policy_prior, 3),
|
| | }
|
| | if self.children and max_depth > 0:
|
| | d["children"] = [
|
| | c.to_dict(max_depth - 1)
|
| | for c in sorted(self.children, key=lambda n: -n.visits)[:5]
|
| | ]
|
| | return d
|
| |
|
| |
|
| | def run_mcts(
|
| | task: str,
|
| | max_simulations: int = 100,
|
| | exploration_constant: float = 1.414,
|
| | strategy: str = "ucb1",
|
| | ) -> dict[str, Any]:
|
| | """Run MCTS planning on a task and return the search tree."""
|
| | start = time.monotonic()
|
| |
|
| |
|
| | lower = task.lower()
|
| | category = "implementation"
|
| | for cat, keywords in {
|
| | "architecture": ["architect", "design", "micro", "system"],
|
| | "security": ["security", "threat", "vulnerability", "attack"],
|
| | "optimization": ["optimize", "performance", "latency", "speed"],
|
| | "research": ["research", "survey", "study", "analyze"],
|
| | }.items():
|
| | if any(k in lower for k in keywords):
|
| | category = cat
|
| | break
|
| |
|
| | actions = TASK_CATEGORIES[category]
|
| |
|
| |
|
| | root = MCTSNode(id="root", action=task[:50], children=[])
|
| |
|
| |
|
| | if _TORCH:
|
| | policy_net = PolicyNetwork(d_in=128, n_actions=len(actions))
|
| | value_net = ValueNetwork(d_in=192)
|
| | policy_net.eval()
|
| | value_net.eval()
|
| |
|
| | for sim in range(max_simulations):
|
| |
|
| | node = root
|
| |
|
| |
|
| | if not node.children:
|
| | node.children = []
|
| | for i, act in enumerate(actions):
|
| | prior = random.uniform(0.1, 0.5)
|
| | if _TORCH:
|
| | embed = torch.randn(1, 128)
|
| | with torch.no_grad():
|
| | priors = policy_net(embed)[0]
|
| | prior = priors[i % len(priors)].item()
|
| | node.children.append(
|
| | MCTSNode(
|
| | id=f"{act}-{sim}",
|
| | action=act,
|
| | policy_prior=prior,
|
| | children=[],
|
| | )
|
| | )
|
| |
|
| |
|
| | score_fn = (
|
| | (lambda n: n.ucb1_score(root.visits + 1, exploration_constant))
|
| | if strategy == "ucb1"
|
| | else (lambda n: n.puct_score(root.visits + 1, exploration_constant))
|
| | )
|
| | best_child = max(node.children, key=score_fn)
|
| |
|
| |
|
| | if _TORCH:
|
| | state = torch.randn(1, 192)
|
| | with torch.no_grad():
|
| | value = value_net(state).item()
|
| | else:
|
| | value = random.uniform(0.3, 0.9)
|
| |
|
| |
|
| | best_child.visits += 1
|
| | best_child.total_value += value
|
| | root.visits += 1
|
| |
|
| | elapsed = (time.monotonic() - start) * 1000
|
| |
|
| |
|
| | if root.children:
|
| | best = max(root.children, key=lambda n: n.visits)
|
| | best_action = best.action
|
| | best_value = round(best.total_value / max(best.visits, 1), 3)
|
| | else:
|
| | best_action = "none"
|
| | best_value = 0.0
|
| |
|
| | return {
|
| | "task": task,
|
| | "category": category,
|
| | "strategy": strategy,
|
| | "best_action": best_action,
|
| | "best_value": best_value,
|
| | "total_simulations": max_simulations,
|
| | "exploration_constant": exploration_constant,
|
| | "tree": root.to_dict(max_depth=2),
|
| | "all_actions": [
|
| | {
|
| | "action": c.action,
|
| | "visits": c.visits,
|
| | "value": round(c.total_value / max(c.visits, 1), 3),
|
| | }
|
| | for c in sorted(root.children or [], key=lambda n: -n.visits)
|
| | ],
|
| | "elapsed_ms": round(elapsed, 2),
|
| | "nn_enabled": _TORCH,
|
| | }
|
| |
|
| |
|
| | def benchmark_strategies(task: str) -> dict[str, Any]:
|
| | """Compare MCTS vs Greedy vs Random on the same task."""
|
| |
|
| | results = {}
|
| |
|
| |
|
| | lower = task.lower()
|
| | category = "implementation"
|
| | for cat, keywords in {
|
| | "architecture": ["architect", "design", "micro", "system"],
|
| | "security": ["security", "threat", "vulnerability", "attack"],
|
| | "optimization": ["optimize", "performance", "latency", "speed"],
|
| | "research": ["research", "survey", "study", "analyze"],
|
| | }.items():
|
| | if any(k in lower for k in keywords):
|
| | category = cat
|
| | break
|
| | actions = TASK_CATEGORIES[category]
|
| |
|
| |
|
| | start = time.monotonic()
|
| | r = run_mcts(task, max_simulations=100)
|
| | elapsed_mcts = (time.monotonic() - start) * 1000
|
| | results["mcts"] = {
|
| | "quality_score": r["best_value"],
|
| | "best_action": r["best_action"],
|
| | "elapsed_ms": round(elapsed_mcts, 2),
|
| | }
|
| |
|
| |
|
| | start = time.monotonic()
|
| | if _TORCH:
|
| | policy_net = PolicyNetwork(d_in=128, n_actions=len(actions))
|
| | policy_net.eval()
|
| | torch.manual_seed(hash(task) % (2**31))
|
| | embed = torch.randn(1, 128)
|
| | with torch.no_grad():
|
| | priors = policy_net(embed)[0].numpy()
|
| | best_idx = int(np.argmax(priors))
|
| | greedy_action = actions[best_idx]
|
| | greedy_quality = float(priors[best_idx])
|
| | else:
|
| | greedy_quality = max(random.uniform(0.1, 0.3) for _ in actions)
|
| | greedy_action = random.choice(actions)
|
| | elapsed_greedy = (time.monotonic() - start) * 1000
|
| | results["greedy"] = {
|
| | "quality_score": round(greedy_quality, 3),
|
| | "best_action": greedy_action,
|
| | "elapsed_ms": round(elapsed_greedy, 2),
|
| | }
|
| |
|
| |
|
| | start = time.monotonic()
|
| | random_action = random.choice(actions)
|
| |
|
| | if _TORCH:
|
| | value_net = ValueNetwork(d_in=192)
|
| | value_net.eval()
|
| | torch.manual_seed(hash(task + random_action) % (2**31))
|
| | state = torch.randn(1, 192)
|
| | with torch.no_grad():
|
| | random_quality = value_net(state).item()
|
| | else:
|
| | random_quality = random.uniform(-0.5, 0.5)
|
| | elapsed_random = (time.monotonic() - start) * 1000
|
| | results["random"] = {
|
| | "quality_score": round(random_quality, 3),
|
| | "best_action": random_action,
|
| | "elapsed_ms": round(elapsed_random, 2),
|
| | }
|
| |
|
| | return {"task": task, "category": category, "results": results}
|
| |
|
| |
|
| | def plot_mcts_tree(tree_data: dict) -> go.Figure:
|
| | """Create a sunburst visualization of the MCTS tree."""
|
| | ids, labels, parents, values, colors = [], [], [], [], []
|
| |
|
| | def _walk(node: dict, parent_id: str = "") -> None:
|
| | nid = node["id"]
|
| | ids.append(nid)
|
| | labels.append(f"{node['action']}\n(v={node.get('value', 0)}, n={node.get('visits', 0)})")
|
| | parents.append(parent_id)
|
| | values.append(max(node.get("visits", 1), 1))
|
| | colors.append(node.get("value", 0))
|
| | for child in node.get("children", []):
|
| | _walk(child, nid)
|
| |
|
| | _walk(tree_data)
|
| |
|
| | fig = go.Figure(go.Sunburst(
|
| | ids=ids, labels=labels, parents=parents, values=values,
|
| | marker=dict(colors=colors, colorscale="Viridis", showscale=True),
|
| | branchvalues="total",
|
| | ))
|
| | fig.update_layout(
|
| | title="MCTS Search Tree",
|
| | height=500,
|
| | template="plotly_dark",
|
| | margin=dict(t=40, l=0, r=0, b=0),
|
| | )
|
| | return fig
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | EXPERT_NAMES = [
|
| | "Code Expert", "Test Expert", "Design Expert", "Research Expert",
|
| | "Architecture Expert", "Security Expert", "Performance Expert", "Docs Expert",
|
| | ]
|
| |
|
| |
|
| | _ROUTER_SEED = 42
|
| | _router_net_singleton: "RouterNet | None" = None
|
| |
|
| |
|
| | def _get_router() -> "RouterNet":
|
| | """Get or create the singleton RouterNet with a fixed seed."""
|
| | global _router_net_singleton
|
| | if _router_net_singleton is None and _TORCH:
|
| | torch.manual_seed(_ROUTER_SEED)
|
| | _router_net_singleton = RouterNet(d_in=64, n_out=len(EXPERT_NAMES))
|
| | _router_net_singleton.eval()
|
| | return _router_net_singleton
|
| |
|
| |
|
| | def route_task(task: str, top_k: int = 3) -> dict[str, Any]:
|
| | """Route a task through the neural MoE gate."""
|
| | start = time.monotonic()
|
| |
|
| | features = featurize64(task)
|
| | feature_tensor = None
|
| |
|
| | if _TORCH:
|
| |
|
| | router = _get_router()
|
| | feature_tensor = torch.tensor([features], dtype=torch.float32)
|
| | with torch.no_grad():
|
| | weights = router(feature_tensor)[0].numpy()
|
| | else:
|
| |
|
| | weights = np.array([abs(f) for f in features[:len(EXPERT_NAMES)]])
|
| | weights = weights / (weights.sum() + 1e-8)
|
| |
|
| |
|
| | lower_task = task.lower()
|
| | expert_keywords: dict[int, list[str]] = {
|
| | 0: ["code", "implement", "function", "class", "program", "script", "module"],
|
| | 1: ["test", "unit test", "coverage", "qa", "assert", "mock", "fixture"],
|
| | 2: ["design", "ui", "ux", "layout", "wireframe", "mockup", "style"],
|
| | 3: ["research", "analyze", "study", "survey", "literature", "paper", "compare"],
|
| | 4: ["architect", "system", "microservice", "scale", "pattern", "infrastructure"],
|
| | 5: ["security", "auth", "encrypt", "threat", "vulnerab", "owasp", "pci", "compliance"],
|
| | 6: ["performance", "optimize", "latency", "throughput", "cache", "speed", "fast"],
|
| | 7: ["document", "readme", "docs", "comment", "explain", "write", "manual"],
|
| | }
|
| | boost = np.zeros(len(EXPERT_NAMES))
|
| | for idx, kws in expert_keywords.items():
|
| | for kw in kws:
|
| | if kw in lower_task:
|
| | boost[idx] += 0.15
|
| |
|
| | weights = weights + boost
|
| | weights = weights / (weights.sum() + 1e-8)
|
| |
|
| |
|
| | top_indices = np.argsort(weights)[::-1][:top_k]
|
| | selected = [
|
| | {
|
| | "expert": EXPERT_NAMES[i],
|
| | "weight": round(float(weights[i]), 4),
|
| | "rank": rank + 1,
|
| | }
|
| | for rank, i in enumerate(top_indices)
|
| | ]
|
| |
|
| | elapsed = (time.monotonic() - start) * 1000
|
| |
|
| | return {
|
| | "task": task,
|
| | "features": features,
|
| | "all_weights": {EXPERT_NAMES[i]: round(float(weights[i]), 4) for i in range(len(EXPERT_NAMES))},
|
| | "selected_experts": selected,
|
| | "top_k": top_k,
|
| | "nn_enabled": _TORCH,
|
| | "elapsed_ms": round(elapsed, 2),
|
| | }
|
| |
|
| |
|
| | def plot_expert_weights(weights: dict[str, float]) -> go.Figure:
|
| | """Create a bar chart of expert routing weights."""
|
| | names = list(weights.keys())
|
| | vals = list(weights.values())
|
| | colors = ["#FF6B6B", "#4ECDC4", "#45B7D1", "#96CEB4", "#FFEAA7", "#DDA0DD", "#F0E68C", "#87CEEB"]
|
| | fig = go.Figure(
|
| | data=[go.Bar(x=names, y=vals, marker_color=colors[:len(names)])],
|
| | layout=go.Layout(
|
| | title="Expert Routing Weights",
|
| | yaxis=dict(title="Weight (softmax)", range=[0, max(vals) * 1.2]),
|
| | height=350,
|
| | template="plotly_dark",
|
| | margin=dict(t=40),
|
| | ),
|
| | )
|
| | return fig
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | AGENTS = [
|
| | {"name": "SWE Agent", "specialization": "Code scaffold generation", "icon": "[SWE]"},
|
| | {"name": "Architect Agent", "specialization": "System design and patterns", "icon": "[ARCH]"},
|
| | {"name": "QA Agent", "specialization": "Test plan and case generation", "icon": "[QA]"},
|
| | {"name": "Security Agent", "specialization": "Threat modeling (OWASP)", "icon": "[SEC]"},
|
| | {"name": "DevOps Agent", "specialization": "Infrastructure planning", "icon": "[OPS]"},
|
| | {"name": "Research Agent", "specialization": "Technical analysis", "icon": "[RES]"},
|
| | {"name": "Performance Agent", "specialization": "Optimization analysis", "icon": "[PERF]"},
|
| | {"name": "Documentation Agent", "specialization": "Technical writing", "icon": "[DOC]"},
|
| | ]
|
| |
|
| |
|
| |
|
| | _AGENT_CELL_MAP: dict[str, str] = {
|
| | "SWE Agent": "reasoning",
|
| | "Architect Agent": "r2p",
|
| | "QA Agent": "reasoning",
|
| | "Security Agent": "ethics",
|
| | "DevOps Agent": "telemetry",
|
| | "Research Agent": "causal",
|
| | "Performance Agent": "reasoning",
|
| | "Documentation Agent": "reasoning",
|
| | }
|
| |
|
| |
|
| | def orchestrate(task: str, max_agents: int = 3, strategy: str = "moe_routing") -> dict[str, Any]:
|
| | """Orchestrate multiple agents for a task using specified routing strategy."""
|
| | start = time.monotonic()
|
| |
|
| |
|
| | if strategy == "round_robin":
|
| |
|
| | selected_agents = AGENTS[:max_agents]
|
| | agent_results = []
|
| | for i, agent in enumerate(selected_agents):
|
| |
|
| | cell_type = _AGENT_CELL_MAP.get(agent["name"], "reasoning")
|
| | cell_result = execute_cell(cell_type, task)
|
| | agent_results.append({
|
| | "agent": agent["name"],
|
| | "icon": agent["icon"],
|
| | "specialization": agent["specialization"],
|
| | "weight": round(1.0 / max_agents, 4),
|
| | "cell_used": cell_type,
|
| | "output": cell_result,
|
| | "confidence": cell_result.get("confidence", round(0.8, 3)),
|
| | })
|
| | elif strategy == "random":
|
| |
|
| | import random as _rnd
|
| | shuffled = _rnd.sample(AGENTS, min(max_agents, len(AGENTS)))
|
| | agent_results = []
|
| | for agent in shuffled:
|
| | cell_type = _AGENT_CELL_MAP.get(agent["name"], "reasoning")
|
| | cell_result = execute_cell(cell_type, task)
|
| | agent_results.append({
|
| | "agent": agent["name"],
|
| | "icon": agent["icon"],
|
| | "specialization": agent["specialization"],
|
| | "weight": round(1.0 / max_agents, 4),
|
| | "cell_used": cell_type,
|
| | "output": cell_result,
|
| | "confidence": cell_result.get("confidence", round(0.8, 3)),
|
| | })
|
| | else:
|
| | routing = route_task(task, top_k=max_agents)
|
| | agent_results = []
|
| | for expert in routing["selected_experts"]:
|
| | agent_name = expert["expert"].replace(" Expert", " Agent")
|
| | agent = next((a for a in AGENTS if agent_name in a["name"]), AGENTS[0])
|
| |
|
| | cell_type = _AGENT_CELL_MAP.get(agent["name"], "reasoning")
|
| | cell_result = execute_cell(cell_type, task)
|
| | agent_results.append({
|
| | "agent": agent["name"],
|
| | "icon": agent["icon"],
|
| | "specialization": agent["specialization"],
|
| | "weight": expert["weight"],
|
| | "cell_used": cell_type,
|
| | "output": cell_result,
|
| | "confidence": cell_result.get("confidence", round(0.8, 3)),
|
| | })
|
| |
|
| | elapsed = (time.monotonic() - start) * 1000
|
| |
|
| | return {
|
| | "task": task,
|
| | "strategy": strategy,
|
| | "agents_selected": len(agent_results),
|
| | "max_agents": max_agents,
|
| | "results": agent_results,
|
| | "total_elapsed_ms": round(elapsed, 2),
|
| | }
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | THEME = gr.themes.Soft(
|
| | primary_hue="amber",
|
| | secondary_hue="orange",
|
| | neutral_hue="stone",
|
| | font=gr.themes.GoogleFont("Inter"),
|
| | )
|
| |
|
| | CSS = """
|
| | .main-header { text-align: center; margin-bottom: 1rem; }
|
| | .main-header h1 { background: linear-gradient(135deg, #FF6B6B, #FFEAA7, #4ECDC4);
|
| | -webkit-background-clip: text; -webkit-text-fill-color: transparent;
|
| | font-size: 2.5rem; font-weight: 800; }
|
| | .stat-box { background: linear-gradient(135deg, #1a1a2e, #16213e);
|
| | border: 1px solid #0f3460; border-radius: 12px; padding: 1rem;
|
| | text-align: center; color: #e8e8e8; }
|
| | .stat-box h3 { color: #FFEAA7; margin: 0; font-size: 1.8rem; }
|
| | .stat-box p { color: #a8a8a8; margin: 0; font-size: 0.85rem; }
|
| | footer { display: none !important; }
|
| | .plotly .main-svg { overflow: visible !important; }
|
| | """
|
| |
|
| |
|
| | def build_app() -> gr.Blocks:
|
| | """Build the complete Gradio application."""
|
| | with gr.Blocks(theme=THEME, css=CSS, title="MangoMAS — Multi-Agent Cognitive Architecture") as app:
|
| |
|
| |
|
| | gr.HTML("""
|
| | <div class="main-header">
|
| | <h1>MangoMAS</h1>
|
| | <p style="color: #a8a8a8; font-size: 1.1rem;">
|
| | Multi-Agent Cognitive Architecture — Interactive Demo
|
| | </p>
|
| | </div>
|
| | """)
|
| |
|
| |
|
| | with gr.Row():
|
| | for label, value in [
|
| | ("Cognitive Cells", "10"), ("MoE Params", "~7M"),
|
| | ("MCTS Strategies", "UCB1 + PUCT"), ("Expert Agents", "8"),
|
| | ]:
|
| | gr.HTML(f'<div class="stat-box"><h3>{value}</h3><p>{label}</p></div>')
|
| |
|
| |
|
| | with gr.Tab("Cognitive Cells", id="cells"):
|
| | gr.Markdown("### Execute any of the 10 biologically-inspired cognitive cells")
|
| |
|
| | with gr.Row():
|
| | cell_type = gr.Dropdown(
|
| | choices=list(CELL_TYPES.keys()),
|
| | value="reasoning",
|
| | label="Cell Type",
|
| | info="Select a cognitive cell to execute",
|
| | )
|
| | cell_info = gr.Textbox(
|
| | label="Description",
|
| | value=CELL_TYPES["reasoning"]["description"],
|
| | interactive=False,
|
| | )
|
| |
|
| | cell_input = gr.Textbox(
|
| | label="Input Text",
|
| | placeholder="Enter text to process through the cell...",
|
| | value="Design a scalable microservices architecture with event-driven communication",
|
| | lines=3,
|
| | )
|
| | cell_config = gr.Textbox(
|
| | label="Config (JSON, optional)",
|
| | placeholder='{"head_type": "nn"}',
|
| | value="{}",
|
| | lines=1,
|
| | )
|
| | cell_btn = gr.Button("Execute Cell", variant="primary")
|
| | cell_output = gr.JSON(label="Cell Output")
|
| |
|
| | gr.Markdown("---\n### Cell Composition Pipeline")
|
| | pipeline_input = gr.Textbox(
|
| | label="Pipeline (comma-separated cell types)",
|
| | value="ethics, reasoning, aggregator",
|
| | placeholder="ethics, reasoning, memory",
|
| | )
|
| | pipeline_text = gr.Textbox(
|
| | label="Input Text",
|
| | value="Analyze the security implications of this API design: user@example.com",
|
| | lines=2,
|
| | )
|
| | pipeline_btn = gr.Button("Run Pipeline", variant="secondary")
|
| | pipeline_output = gr.JSON(label="Pipeline Result")
|
| |
|
| |
|
| | def on_cell_select(ct: str) -> str:
|
| | return CELL_TYPES.get(ct, {}).get("description", "Unknown cell type")
|
| |
|
| | cell_type.change(on_cell_select, inputs=cell_type, outputs=cell_info)
|
| | cell_btn.click(execute_cell, inputs=[cell_type, cell_input, cell_config], outputs=cell_output)
|
| | pipeline_btn.click(compose_cells, inputs=[pipeline_input, pipeline_text], outputs=pipeline_output)
|
| |
|
| |
|
| | with gr.Tab("MCTS Planning", id="mcts"):
|
| | gr.Markdown("### Monte Carlo Tree Search with Policy/Value Neural Networks")
|
| |
|
| | with gr.Row():
|
| | mcts_task = gr.Textbox(
|
| | label="Task to Plan",
|
| | value="Design a secure, scalable REST API with authentication",
|
| | lines=2,
|
| | scale=3,
|
| | )
|
| | with gr.Column(scale=1):
|
| | mcts_sims = gr.Slider(10, 500, value=100, step=10, label="Simulations")
|
| | mcts_c = gr.Slider(0.1, 3.0, value=1.414, step=0.1, label="Exploration Constant (C)")
|
| | mcts_strat = gr.Radio(["ucb1", "puct"], value="ucb1", label="Selection Strategy")
|
| |
|
| | mcts_btn = gr.Button("Run MCTS", variant="primary")
|
| |
|
| | with gr.Row():
|
| | mcts_tree_plot = gr.Plot(label="Search Tree Visualization")
|
| | mcts_json = gr.JSON(label="MCTS Result")
|
| |
|
| | gr.Markdown("---\n### Strategy Benchmark")
|
| | bench_task = gr.Textbox(
|
| | label="Benchmark Task",
|
| | value="Optimize database query performance for high-throughput system",
|
| | )
|
| | bench_btn = gr.Button("Run Benchmark", variant="secondary")
|
| | bench_output = gr.JSON(label="Benchmark Results (MCTS vs Greedy vs Random)")
|
| |
|
| | def run_and_plot(task, sims, c, strat):
|
| | result = run_mcts(task, int(sims), c, strat)
|
| | fig = plot_mcts_tree(result["tree"])
|
| | return fig, result
|
| |
|
| | mcts_btn.click(run_and_plot, inputs=[mcts_task, mcts_sims, mcts_c, mcts_strat], outputs=[mcts_tree_plot, mcts_json])
|
| | bench_btn.click(benchmark_strategies, inputs=bench_task, outputs=bench_output)
|
| |
|
| |
|
| | with gr.Tab("MoE Router", id="moe"):
|
| | gr.Markdown("### Neural Mixture-of-Experts Routing Gate")
|
| | gr.Markdown(
|
| | "The RouterNet MLP extracts 64-dimensional features from text, "
|
| | "then routes to the top-K most relevant expert agents."
|
| | )
|
| |
|
| | with gr.Row():
|
| | moe_task = gr.Textbox(
|
| | label="Task to Route",
|
| | value="Implement a threat detection system with real-time alerting",
|
| | lines=2,
|
| | scale=3,
|
| | )
|
| | moe_topk = gr.Slider(1, 8, value=3, step=1, label="Top-K Experts", scale=1)
|
| |
|
| | moe_btn = gr.Button("Route Task", variant="primary")
|
| |
|
| | with gr.Row():
|
| | moe_features_plot = gr.Plot(label="64-D Feature Vector")
|
| | moe_weights_plot = gr.Plot(label="Expert Routing Weights")
|
| |
|
| | moe_json = gr.JSON(label="Routing Result")
|
| |
|
| | def route_and_plot(task, top_k):
|
| | result = route_task(task, int(top_k))
|
| | feat_fig = plot_features(result["features"])
|
| | weight_fig = plot_expert_weights(result["all_weights"])
|
| |
|
| | display = {k: v for k, v in result.items() if k != "features"}
|
| | return feat_fig, weight_fig, display
|
| |
|
| | moe_btn.click(route_and_plot, inputs=[moe_task, moe_topk], outputs=[moe_features_plot, moe_weights_plot, moe_json])
|
| |
|
| |
|
| | with gr.Tab("Agents", id="agents"):
|
| | gr.Markdown("### Multi-Agent Orchestration with MoE Routing")
|
| |
|
| | with gr.Row():
|
| | orch_task = gr.Textbox(
|
| | label="Task",
|
| | value="Build a secure payment processing microservice with PCI compliance",
|
| | lines=2,
|
| | scale=3,
|
| | )
|
| | with gr.Column(scale=1):
|
| | orch_agents = gr.Slider(1, 8, value=3, step=1, label="Max Agents")
|
| | orch_strat = gr.Dropdown(
|
| | ["moe_routing", "round_robin", "random"],
|
| | value="moe_routing",
|
| | label="Routing Strategy",
|
| | )
|
| |
|
| | orch_btn = gr.Button("Orchestrate", variant="primary")
|
| | orch_output = gr.JSON(label="Orchestration Result")
|
| |
|
| | gr.Markdown("---\n### Available Agents")
|
| | agent_table = gr.Dataframe(
|
| | value=[[a["icon"], a["name"], a["specialization"]] for a in AGENTS],
|
| | headers=["", "Agent", "Specialization"],
|
| | interactive=False,
|
| | )
|
| |
|
| | orch_btn.click(orchestrate, inputs=[orch_task, orch_agents, orch_strat], outputs=orch_output)
|
| |
|
| |
|
| | with gr.Tab("Architecture", id="arch"):
|
| | gr.Markdown("""
|
| | ### MangoMAS System Architecture
|
| |
|
| | ```
|
| | ┌──────────────────────────────────────────────────────────┐
|
| | │ FastAPI Gateway │
|
| | │ (Auth / Tenant Middleware) │
|
| | ├──────────────────────────────────────────────────────────┤
|
| | │ │
|
| | │ ┌──────────────┐ ┌───────────────────────────┐ │
|
| | │ │ MoE Input │────▶│ RouterNet (Neural Gate) │ │
|
| | │ │ Parser │ │ 64-dim → MLP → Softmax │ │
|
| | │ └──────────────┘ └─────────┬─────────────────┘ │
|
| | │ │ │
|
| | │ ┌───────┬───────┬───────┼───────┬───────┐ │
|
| | │ ▼ ▼ ▼ ▼ ▼ ▼ │
|
| | │ Expert Expert Expert Expert Expert Expert │
|
| | │ │ │ │ │ │ │ │
|
| | │ Agent Agent Agent Agent Agent Agent │
|
| | │ │ │ │ │ │ │ │
|
| | │ ┌─────┴───────┴───────┴───────┴───────┴───────┘ │
|
| | │ │ Cognitive Cell Layer │
|
| | │ │ [Reasoning│Memory│Ethics│Causal│Empathy│...] │
|
| | │ └─────────────────────┬────────────────────────┘ │
|
| | │ ▼ │
|
| | │ Aggregator Cell │
|
| | │ (weighted / ensemble / ranking) │
|
| | │ │ │
|
| | │ Feedback Loop → Router Update │
|
| | │ │ │
|
| | │ Response + Metrics + Traces │
|
| | └──────────────────────────────────────────────────────────┘
|
| | ```
|
| |
|
| | ### Neural Network Components
|
| |
|
| | | Component | Architecture | Parameters | Latency |
|
| | |-----------|-------------|------------|---------|
|
| | | **MixtureOfExperts7M** | 16 Expert Towers (64→512→512→256) + Gate | ~7M | ~5ms |
|
| | | **RouterNet** | MLP (64→128→64→8) + Softmax | ~17K | <1ms |
|
| | | **PolicyNetwork** | MLP (128→256→128→32) + Softmax | ~70K | <1ms |
|
| | | **ValueNetwork** | MLP (192→256→64→1) + Tanh | ~66K | <1ms |
|
| | | **ReasoningCell NN Head** | Lightweight transformer | ~500K | ~50ms |
|
| |
|
| | ### Cognitive Cell Lifecycle
|
| |
|
| | ```
|
| | preprocess() → infer() → postprocess() → publish()
|
| | │ │ │ │
|
| | Validate Core Logic Format Emit Event
|
| | Normalize NN/Rule Filter (Event Bus)
|
| | Enrich Inference Enrich
|
| | ```
|
| | """)
|
| |
|
| |
|
| | with gr.Tab("Metrics", id="metrics"):
|
| | gr.Markdown("### Live Performance Benchmarks")
|
| |
|
| | metrics_btn = gr.Button("Run All Benchmarks", variant="primary")
|
| |
|
| | with gr.Row():
|
| | metrics_routing = gr.Plot(label="Routing Latency by Expert Count")
|
| | metrics_cells = gr.Plot(label="Cell Execution Latency")
|
| |
|
| | metrics_json = gr.JSON(label="Raw Metrics")
|
| |
|
| | def run_benchmarks():
|
| |
|
| |
|
| | route_task("warmup", top_k=1)
|
| | execute_cell("reasoning", "warmup")
|
| |
|
| |
|
| | ks = list(range(1, 9))
|
| | latencies = []
|
| | for k in ks:
|
| | times = []
|
| | for _ in range(10):
|
| | r = route_task("Test routing benchmark task", top_k=k)
|
| | times.append(r["elapsed_ms"])
|
| |
|
| | times.sort()
|
| | latencies.append(times[len(times) // 2])
|
| |
|
| | fig_routing = go.Figure(
|
| | data=[go.Scatter(x=ks, y=latencies, mode="lines+markers", name="Routing Latency")],
|
| | layout=go.Layout(
|
| | title="Routing Latency vs Top-K",
|
| | xaxis_title="Top-K Experts",
|
| | yaxis_title="Latency (ms)",
|
| | height=350,
|
| | template="plotly_dark",
|
| | ),
|
| | )
|
| |
|
| |
|
| | cell_times: dict[str, float] = {}
|
| | for ct in CELL_TYPES:
|
| | times = []
|
| | for _ in range(5):
|
| | r = execute_cell(ct, "Benchmark test input for cell")
|
| | times.append(r["elapsed_ms"])
|
| | times.sort()
|
| | cell_times[ct] = times[len(times) // 2]
|
| |
|
| | fig_cells = go.Figure(
|
| | data=[go.Bar(
|
| | x=list(cell_times.keys()),
|
| | y=list(cell_times.values()),
|
| | marker_color=["#FF6B6B", "#4ECDC4", "#45B7D1", "#96CEB4", "#FFEAA7",
|
| | "#DDA0DD", "#F0E68C", "#87CEEB", "#FFA07A", "#98FB98"],
|
| | )],
|
| | layout=go.Layout(
|
| | title="Cell Execution Latency",
|
| | xaxis=dict(
|
| | title="Cell Type",
|
| | tickangle=-30,
|
| | tickfont=dict(size=9),
|
| | ),
|
| | yaxis_title="Latency (ms)",
|
| | height=400,
|
| | template="plotly_dark",
|
| | margin=dict(b=100),
|
| | ),
|
| | )
|
| |
|
| | summary = {
|
| | "torch_available": _TORCH,
|
| | "routing_latency_p50_ms": round(sorted(latencies)[len(latencies) // 2], 3),
|
| | "cell_latency_avg_ms": round(sum(cell_times.values()) / len(cell_times), 3),
|
| | "total_nn_parameters": "~7.15M" if _TORCH else "N/A (CPU fallback)",
|
| | }
|
| |
|
| | return fig_routing, fig_cells, summary
|
| |
|
| | metrics_btn.click(run_benchmarks, outputs=[metrics_routing, metrics_cells, metrics_json])
|
| |
|
| | return app
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | if __name__ == "__main__":
|
| | app = build_app()
|
| | app.launch(
|
| | server_name="0.0.0.0",
|
| | server_port=7860,
|
| | share=False,
|
| | )
|
| |
|