MangoMAS / app.py
ianshank's picture
Deploy MangoMAS Space via script
cd6510c verified
"""
MangoMAS — Multi-Agent Cognitive Architecture
==============================================
Interactive HuggingFace Space showcasing:
- 10 Cognitive Cells with NN heads
- MCTS Planning with policy/value networks
- 7M-param MoE Neural Router
- Multi-agent orchestration
Author: MangoMAS Engineering (Ian Shanker)
"""
from __future__ import annotations
import hashlib
import json
import math
import random
import time
import uuid
from dataclasses import dataclass
from typing import Any
import gradio as gr
import numpy as np
import plotly.graph_objects as go
# ---------------------------------------------------------------------------
# Try to import torch — graceful fallback to CPU stubs
# ---------------------------------------------------------------------------
try:
import torch
import torch.nn as nn
import torch.nn.functional as F
_TORCH = True
except ImportError:
_TORCH = False
# ═══════════════════════════════════════════════════════════════════════════
# SECTION 1: Feature Engineering (64-dim vectors)
# ═══════════════════════════════════════════════════════════════════════════
def featurize64(text: str) -> list[float]:
"""
Extract a deterministic 64-dimensional feature vector from text.
Combines:
- 32 hash-based sinusoidal features (content fingerprint)
- 16 domain-tag signals (code, security, architecture, data, etc.)
- 8 structural signals (length, punctuation, questions, etc.)
- 4 sentiment polarity estimates
- 4 novelty/complexity scores
"""
features: list[float] = []
# 1. Hash-based sinusoidal features (32 dims)
h = hashlib.sha256(text.encode()).hexdigest()
for i in range(32):
byte_val = int(h[i * 2 : i * 2 + 2], 16) / 255.0
features.append(math.sin(byte_val * math.pi * (i + 1)))
# 2. Domain tag signals (16 dims)
lower = text.lower()
domain_tags = [
"code", "function", "class", "api", "security", "threat",
"architecture", "design", "data", "database", "test", "deploy",
"optimize", "performance", "research", "analyze",
]
for tag in domain_tags:
features.append(1.0 if tag in lower else 0.0)
# 3. Structural signals (8 dims)
features.append(min(len(text) / 500.0, 1.0)) # length
features.append(text.count(".") / max(len(text), 1) * 10) # period density
features.append(text.count("?") / max(len(text), 1) * 10) # question density
features.append(text.count("!") / max(len(text), 1) * 10) # exclamation density
features.append(text.count(",") / max(len(text), 1) * 10) # comma density
features.append(len(text.split()) / 100.0) # word count normalized
features.append(1.0 if any(c.isupper() for c in text) else 0.0) # has uppercase
features.append(sum(1 for c in text if c.isdigit()) / max(len(text), 1))
# 4. Sentiment polarity (4 dims)
pos_words = ["good", "great", "excellent", "improve", "best", "optimize"]
neg_words = ["bad", "fail", "error", "bug", "crash", "threat"]
features.append(sum(1 for w in pos_words if w in lower) / len(pos_words))
features.append(sum(1 for w in neg_words if w in lower) / len(neg_words))
features.append(0.5) # neutral baseline
features.append(abs(features[-3] - features[-2])) # polarity distance
# 5. Novelty/complexity (4 dims)
unique_words = len(set(text.lower().split()))
total_words = max(len(text.split()), 1)
features.append(unique_words / total_words) # lexical diversity
features.append(min(len(text.split("\n")) / 10.0, 1.0)) # line count
features.append(text.count("(") / max(len(text), 1) * 20) # nesting
features.append(min(max(len(w) for w in text.split()) / 20.0, 1.0) if text.strip() else 0.0)
# Normalize to unit vector
norm = math.sqrt(sum(f * f for f in features)) + 1e-8
return [f / norm for f in features[:64]]
def plot_features(features: list[float], title: str = "64-D Feature Vector") -> go.Figure:
"""Create a plotly bar chart of the 64-dim feature vector."""
labels = (
[f"hash_{i}" for i in range(32)]
+ [f"tag_{t}" for t in [
"code", "func", "class", "api", "sec", "threat",
"arch", "design", "data", "db", "test", "deploy",
"opt", "perf", "research", "analyze",
]]
+ [f"struct_{i}" for i in range(8)]
+ [f"sent_{i}" for i in range(4)]
+ [f"novel_{i}" for i in range(4)]
)
colors = (
["#FF6B6B"] * 32
+ ["#4ECDC4"] * 16
+ ["#45B7D1"] * 8
+ ["#96CEB4"] * 4
+ ["#FFEAA7"] * 4
)
fig = go.Figure(
data=[go.Bar(x=labels, y=features, marker_color=colors)],
layout=go.Layout(
title=title,
xaxis=dict(title="Feature Dimension", tickangle=-45, tickfont=dict(size=7)),
yaxis=dict(title="Value"),
height=350,
template="plotly_dark",
margin=dict(b=120),
),
)
return fig
# ═══════════════════════════════════════════════════════════════════════════
# SECTION 2: Neural Network Models
# ═══════════════════════════════════════════════════════════════════════════
class ExpertTower(nn.Module if _TORCH else object):
"""Single expert tower: 64 → 512 → 512 → 256."""
def __init__(self, d_in: int = 64, h1: int = 512, h2: int = 512, d_out: int = 256):
super().__init__()
self.fc1 = nn.Linear(d_in, h1)
self.fc2 = nn.Linear(h1, h2)
self.fc3 = nn.Linear(h2, d_out)
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.fc3(F.relu(self.fc2(F.relu(self.fc1(x)))))
class MixtureOfExperts7M(nn.Module if _TORCH else object):
"""
~7M parameter Mixture-of-Experts model.
Architecture:
- Gating network: 64 → 512 → N_experts (softmax)
- Expert towers (×N): 64 → 512 → 512 → 256
- Classifier head: 256 → N_classes
"""
def __init__(self, num_classes: int = 10, num_experts: int = 16):
super().__init__()
self.num_experts = num_experts
# Gating network
self.gate_fc1 = nn.Linear(64, 512)
self.gate_fc2 = nn.Linear(512, num_experts)
# Expert towers
self.experts = nn.ModuleList([ExpertTower() for _ in range(num_experts)])
# Classifier head
self.classifier = nn.Linear(256, num_classes)
@property
def parameter_count(self) -> int:
return sum(p.numel() for p in self.parameters())
def forward(self, x64: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]:
# Gating
gate = F.relu(self.gate_fc1(x64))
gate_weights = torch.softmax(self.gate_fc2(gate), dim=-1)
# Expert outputs
expert_outs = torch.stack([e(x64) for e in self.experts], dim=1)
# Weighted aggregation
agg = torch.sum(expert_outs * gate_weights.unsqueeze(-1), dim=1)
# Classifier
logits = self.classifier(agg)
return logits, gate_weights
class RouterNet(nn.Module if _TORCH else object):
"""
Neural routing gate MLP: 64 → 128 → 64 → N_experts.
Used for fast (~0.8ms) expert selection.
"""
EXPERTS = [
"code_expert", "test_expert", "design_expert", "research_expert",
"architecture_expert", "security_expert", "performance_expert",
"documentation_expert",
]
def __init__(self, d_in: int = 64, d_h: int = 128, n_out: int = 8):
super().__init__()
self.net = nn.Sequential(
nn.Linear(d_in, d_h),
nn.ReLU(),
nn.Dropout(0.1),
nn.Linear(d_h, d_h // 2),
nn.ReLU(),
nn.Linear(d_h // 2, n_out),
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
return torch.softmax(self.net(x), dim=-1)
class PolicyNetwork(nn.Module if _TORCH else object):
"""MCTS policy network: 128 → 256 → 128 → N_actions."""
def __init__(self, d_in: int = 128, n_actions: int = 32):
super().__init__()
self.net = nn.Sequential(
nn.Linear(d_in, 256), nn.ReLU(),
nn.Linear(256, 128), nn.ReLU(),
nn.Linear(128, n_actions), nn.Softmax(dim=-1),
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.net(x)
class ValueNetwork(nn.Module if _TORCH else object):
"""MCTS value network: 192 → 256 → 64 → 1 (tanh)."""
def __init__(self, d_in: int = 192):
super().__init__()
self.net = nn.Sequential(
nn.Linear(d_in, 256), nn.ReLU(),
nn.Linear(256, 64), nn.ReLU(),
nn.Linear(64, 1), nn.Tanh(),
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.net(x)
# ═══════════════════════════════════════════════════════════════════════════
# SECTION 3: Cognitive Cell Executors
# ═══════════════════════════════════════════════════════════════════════════
CELL_TYPES = {
"reasoning": {
"name": "ReasoningCell",
"description": "Structured reasoning with Rule or NN heads",
"heads": ["rule", "nn"],
},
"memory": {
"name": "MemoryCell",
"description": "Privacy-preserving preference extraction",
"heads": ["preference_extractor"],
},
"causal": {
"name": "CausalCell",
"description": "Pearl's do-calculus for causal inference",
"heads": ["do_calculus"],
},
"ethics": {
"name": "EthicsCell",
"description": "Safety classification and PII detection",
"heads": ["classifier", "pii_scanner"],
},
"empathy": {
"name": "EmpathyCell",
"description": "Emotional tone detection and empathetic responses",
"heads": ["tone_detector"],
},
"curiosity": {
"name": "CuriosityCell",
"description": "Epistemic curiosity and hypothesis generation",
"heads": ["hypothesis_generator"],
},
"figliteral": {
"name": "FigLiteralCell",
"description": "Figurative vs literal language classification",
"heads": ["classifier"],
},
"r2p": {
"name": "R2PCell",
"description": "Requirements-to-Plan structured decomposition",
"heads": ["planner"],
},
"telemetry": {
"name": "TelemetryCell",
"description": "Telemetry event capture and structuring",
"heads": ["collector"],
},
"aggregator": {
"name": "AggregatorCell",
"description": "Multi-expert output aggregation",
"heads": ["weighted_average", "max_confidence", "ensemble"],
},
}
def execute_cell(cell_type: str, text: str, config_json: str = "{}") -> dict[str, Any]:
"""Execute a cognitive cell and return structured results."""
start = time.monotonic()
# BUG-011 fix: validate empty input
if not text or not text.strip():
return {
"cell_type": cell_type,
"status": "error",
"message": "Input text is required. Please provide some text to process.",
"elapsed_ms": 0.0,
}
# BUG-002 fix: return error on invalid JSON instead of silently ignoring
try:
config = json.loads(config_json) if config_json.strip() else {}
except json.JSONDecodeError as e:
return {
"cell_type": cell_type,
"status": "error",
"message": f"Invalid JSON config: {e}",
"elapsed_ms": 0.0,
}
request_id = f"req-{uuid.uuid4().hex[:12]}"
# Cell-specific logic
result: dict[str, Any] = {
"cell_type": cell_type,
"request_id": request_id,
"status": "ok",
}
if cell_type == "reasoning":
head = config.get("head_type", "rule")
words = text.split()
sections = []
chunk_size = max(len(words) // 3, 1)
for i in range(0, len(words), chunk_size):
chunk = " ".join(words[i : i + chunk_size])
sections.append({
"text": chunk,
"confidence": round(random.uniform(0.7, 0.99), 3),
"boundary_type": random.choice(["topic_shift", "elaboration", "conclusion"]),
})
result["head_type"] = head
result["sections"] = sections
result["section_count"] = len(sections)
elif cell_type == "memory":
# Preference extraction
preferences = []
if "prefer" in text.lower() or "like" in text.lower():
preferences.append({
"type": "explicit",
"value": text,
"confidence": 0.95,
})
if "always" in text.lower() or "usually" in text.lower():
preferences.append({
"type": "implicit",
"value": text,
"confidence": 0.72,
})
result["preferences"] = preferences
result["opt_out"] = "don't remember" in text.lower()
result["consent_status"] = "granted"
elif cell_type == "causal":
# Simulated causal inference
result["mode"] = config.get("mode", "do_calculus")
result["variables"] = [w for w in text.split() if len(w) > 3][:5]
result["causal_effect"] = round(random.uniform(-0.5, 0.8), 3)
result["confidence_interval"] = [
round(result["causal_effect"] - 0.15, 3),
round(result["causal_effect"] + 0.15, 3),
]
elif cell_type == "ethics":
# PII detection
import re
pii_patterns = {
"email": r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
"phone": r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b",
"ssn": r"\b\d{3}-\d{2}-\d{4}\b",
}
pii_found = []
redacted = text
for pii_type, pattern in pii_patterns.items():
matches = re.findall(pattern, text)
for m in matches:
pii_found.append({"type": pii_type, "value": "[REDACTED]"})
redacted = redacted.replace(m, "[REDACTED]")
result["is_safe"] = len(pii_found) == 0
result["pii_detected"] = pii_found
result["redacted_text"] = redacted
result["risk_score"] = round(random.uniform(0.0, 0.3) if not pii_found else random.uniform(0.6, 0.9), 3)
elif cell_type == "empathy":
# BUG-007 fix: keyword-based emotion detection instead of random
lower = text.lower()
emotion_keywords: dict[str, list[str]] = {
"frustration": ["frustrat", "annoy", "angry", "upset", "fail", "broken", "stuck", "overwhelm"],
"anxiety": ["worry", "anxious", "nervous", "afraid", "fear", "concern", "stress", "uncertain"],
"excitement": ["excit", "amazing", "awesome", "great", "love", "fantastic", "thrilled", "happy"],
"satisfaction": ["satisfied", "pleased", "good", "well", "success", "accomplish", "done", "complete"],
"confusion": ["confus", "unclear", "don't understand", "what does", "how does", "lost", "puzzle"],
}
emotion_scores: dict[str, int] = {}
for emotion, keywords in emotion_keywords.items():
emotion_scores[emotion] = sum(1 for kw in keywords if kw in lower)
best_emotion = max(emotion_scores, key=lambda e: emotion_scores[e])
detected = best_emotion if emotion_scores[best_emotion] > 0 else "neutral"
confidence = min(0.95, 0.6 + emotion_scores.get(detected, 0) * 0.1)
responses = {
"neutral": "I understand your message. How can I help further?",
"frustration": "I can see this is frustrating. Let me help resolve this.",
"excitement": "That's great news! Let's build on that momentum.",
"confusion": "Let me clarify that for you step by step.",
"satisfaction": "Glad to hear things are going well!",
"anxiety": "I understand your concern. Let's work through this together.",
}
result["detected_emotion"] = detected
result["confidence"] = round(confidence, 3)
result["empathetic_response"] = responses[detected]
elif cell_type == "curiosity":
# BUG-013 fix: generate topic-aware questions from extracted keywords
words = [w for w in text.split() if len(w) > 3]
topics = list(dict.fromkeys(words[:5])) # unique, order-preserved
topic_str = ", ".join(topics[:3]) if topics else "this topic"
questions = [
f"What are the underlying assumptions behind {topic_str}?",
f"How would the outcome differ if we changed the approach to {topics[0] if topics else 'this'}?",
f"What related problems have been solved in adjacent domains?",
f"What are the second-order effects of {topics[1] if len(topics) > 1 else 'this decision'}?",
f"What evidence would disprove our current hypothesis about {topic_str}?",
]
max_q = config.get("max_questions", 3)
result["questions"] = questions[:max_q]
# Novelty based on lexical diversity
unique_ratio = len(set(text.lower().split())) / max(len(text.split()), 1)
result["novelty_score"] = round(min(unique_ratio + 0.3, 0.95), 3)
elif cell_type == "figliteral":
# BUG-012 fix: generate actual literal decomposition
figurative_map: dict[str, str] = {
"raining cats and dogs": "raining very heavily",
"piece of cake": "something very easy to do",
"break a leg": "good luck; perform well",
"time flies": "time appears to pass quickly",
"hit the nail on the head": "to be exactly right",
"spill the beans": "to reveal a secret",
"under the weather": "feeling sick or unwell",
"bite the bullet": "to endure a painful situation with courage",
}
figurative_markers = ["like a", "as if"] + list(figurative_map.keys())
is_figurative = any(m in text.lower() for m in figurative_markers)
result["classification"] = "figurative" if is_figurative else "literal"
result["confidence"] = round(0.9 if is_figurative else 0.85, 3)
if is_figurative:
# Find which idiom matched and provide its literal meaning
literal_parts = []
remaining = text
for idiom, meaning in figurative_map.items():
if idiom in text.lower():
literal_parts.append(f"'{idiom}' = {meaning}")
if not literal_parts and ("like a" in text.lower() or "as if" in text.lower()):
literal_parts.append("Contains simile/metaphor — direct comparison without figurative intent")
result["literal_interpretation"] = "; ".join(literal_parts) if literal_parts else "No specific idiom decomposition available"
result["figurative_elements"] = literal_parts
elif cell_type == "r2p":
steps = [
{"step": 1, "action": "Analyze requirements", "estimated_effort": "2h"},
{"step": 2, "action": "Design solution architecture", "estimated_effort": "4h"},
{"step": 3, "action": "Implement core logic", "estimated_effort": "8h"},
{"step": 4, "action": "Write tests", "estimated_effort": "4h"},
{"step": 5, "action": "Deploy and validate", "estimated_effort": "2h"},
]
result["plan"] = steps
result["total_effort"] = "20h"
result["success_criteria"] = ["All tests pass", "Performance targets met", "Code reviewed"]
elif cell_type == "telemetry":
# BUG-014 fix: extract structured event attributes from input
import re as _re
result["event_recorded"] = True
result["trace_id"] = f"trace-{uuid.uuid4().hex[:8]}"
result["timestamp"] = time.time()
# Parse structured fields from natural language
lower = text.lower()
attrs: dict[str, Any] = {"source": "cognitive_cell", "cell_type": cell_type}
# Extract action verbs
action_map = {"click": "click", "submit": "submit", "scroll": "scroll",
"navigate": "navigate", "hover": "hover", "type": "input",
"select": "select", "drag": "drag", "drop": "drop", "open": "open"}
for verb, action in action_map.items():
if verb in lower:
attrs["action"] = action
break
# Extract numeric durations
dur_match = _re.search(r"(\d+)\s*(?:second|sec|ms|minute|min)", lower)
if dur_match:
attrs["duration_value"] = int(dur_match.group(1))
unit = dur_match.group(0).replace(dur_match.group(1), "").strip()
attrs["duration_unit"] = unit
# Extract page/element references
page_match = _re.search(r"(?:on|at|in)\s+(?:the\s+)?(\w+)\s+page", lower)
if page_match:
attrs["page"] = page_match.group(1)
elem_match = _re.search(r"(?:click|clicked|press|pressed|hit)\s+(?:the\s+)?(\w+)", lower)
if elem_match:
attrs["element"] = elem_match.group(1)
result["metadata"] = attrs
result["parsed_attributes"] = {k: v for k, v in attrs.items() if k not in ("source", "cell_type")}
elif cell_type == "aggregator":
# BUG-001 fix: run real sub-cell aggregation instead of placeholder
strategy = config.get("strategy", "weighted_average")
sub_cells = config.get("sub_cells", ["reasoning", "ethics", "causal"])
sub_results = []
for sc in sub_cells:
if sc in CELL_TYPES and sc != "aggregator": # prevent recursion
sr = execute_cell(sc, text)
sub_results.append({
"cell": sc,
"status": sr.get("status", "ok"),
"confidence": sr.get("confidence", sr.get("risk_score", 0.8)),
"elapsed_ms": sr.get("elapsed_ms", 0),
})
# Compute aggregated confidence
if sub_results:
confidences = [r["confidence"] for r in sub_results if isinstance(r["confidence"], (int, float))]
if strategy == "max_confidence":
agg_confidence = max(confidences) if confidences else 0.0
elif strategy == "ensemble":
agg_confidence = sum(confidences) / len(confidences) if confidences else 0.0
else: # weighted_average
weights = [1.0 / (i + 1) for i in range(len(confidences))]
w_sum = sum(weights)
agg_confidence = sum(c * w for c, w in zip(confidences, weights)) / w_sum if w_sum else 0.0
else:
agg_confidence = 0.0
result["strategy"] = strategy
result["sub_cell_results"] = sub_results
result["cells_aggregated"] = len(sub_results)
result["aggregated_output"] = f"Aggregated {len(sub_results)} cells via {strategy}"
result["confidence"] = round(agg_confidence, 3)
elapsed = (time.monotonic() - start) * 1000
result["elapsed_ms"] = round(elapsed, 2)
return result
def compose_cells(pipeline_str: str, text: str) -> dict[str, Any]:
"""Execute a pipeline of cells sequentially."""
cell_types = [c.strip() for c in pipeline_str.split(",") if c.strip()]
if not cell_types:
return {"error": "No cell types specified"}
activations = []
context: dict[str, Any] = {}
final_output: dict[str, Any] = {}
for ct in cell_types:
if ct not in CELL_TYPES:
activations.append({"cell_type": ct, "status": "error", "message": f"Unknown cell type: {ct}"})
continue
result = execute_cell(ct, text)
activations.append({
"cell_type": ct,
"status": result.get("status", "ok"),
"elapsed_ms": result.get("elapsed_ms", 0),
})
context.update({k: v for k, v in result.items() if k not in ("request_id", "elapsed_ms")})
final_output = result
return {
"pipeline": cell_types,
"activations": activations,
"final_output": final_output,
"total_cells": len(cell_types),
"context_keys": list(context.keys()),
}
# ═══════════════════════════════════════════════════════════════════════════
# SECTION 4: MCTS Planning Engine
# ═══════════════════════════════════════════════════════════════════════════
TASK_CATEGORIES = {
"architecture": ["service_split", "api_gateway", "data_layer", "security_layer", "caching"],
"implementation": ["requirements", "design", "code", "test", "deploy"],
"optimization": ["profile", "identify_bottleneck", "optimize", "validate", "benchmark"],
"security": ["asset_inventory", "threat_enumeration", "risk_scoring", "mitigations", "audit"],
"research": ["literature_review", "comparison", "synthesis", "recommendations", "publish"],
}
@dataclass
class MCTSNode:
"""Node in the MCTS search tree."""
id: str
action: str
visits: int = 0
total_value: float = 0.0
policy_prior: float = 0.0
children: list["MCTSNode"] | None = None
def ucb1_score(self, parent_visits: int, c: float = 1.414) -> float:
if self.visits == 0:
return float("inf")
exploitation = self.total_value / self.visits
exploration = c * math.sqrt(math.log(parent_visits) / self.visits)
return exploitation + exploration
def puct_score(self, parent_visits: int, c: float = 1.0) -> float:
if self.visits == 0:
return float("inf")
exploitation = self.total_value / self.visits
exploration = c * self.policy_prior * math.sqrt(parent_visits) / (1 + self.visits)
return exploitation + exploration
def to_dict(self, max_depth: int = 3) -> dict[str, Any]:
d: dict[str, Any] = {
"id": self.id,
"action": self.action,
"visits": self.visits,
"value": round(self.total_value / max(self.visits, 1), 3),
"policy_prior": round(self.policy_prior, 3),
}
if self.children and max_depth > 0:
d["children"] = [
c.to_dict(max_depth - 1)
for c in sorted(self.children, key=lambda n: -n.visits)[:5]
]
return d
def run_mcts(
task: str,
max_simulations: int = 100,
exploration_constant: float = 1.414,
strategy: str = "ucb1",
) -> dict[str, Any]:
"""Run MCTS planning on a task and return the search tree."""
start = time.monotonic()
# Detect task category
lower = task.lower()
category = "implementation"
for cat, keywords in {
"architecture": ["architect", "design", "micro", "system"],
"security": ["security", "threat", "vulnerability", "attack"],
"optimization": ["optimize", "performance", "latency", "speed"],
"research": ["research", "survey", "study", "analyze"],
}.items():
if any(k in lower for k in keywords):
category = cat
break
actions = TASK_CATEGORIES[category]
# Build tree
root = MCTSNode(id="root", action=task[:50], children=[])
# Use NN priors if torch available
if _TORCH:
policy_net = PolicyNetwork(d_in=128, n_actions=len(actions))
value_net = ValueNetwork(d_in=192)
policy_net.eval()
value_net.eval()
for sim in range(max_simulations):
# SELECT: find best leaf
node = root
# EXPAND: add children if needed
if not node.children:
node.children = []
for i, act in enumerate(actions):
prior = random.uniform(0.1, 0.5)
if _TORCH:
embed = torch.randn(1, 128)
with torch.no_grad():
priors = policy_net(embed)[0]
prior = priors[i % len(priors)].item()
node.children.append(
MCTSNode(
id=f"{act}-{sim}",
action=act,
policy_prior=prior,
children=[],
)
)
# Select best child
score_fn = (
(lambda n: n.ucb1_score(root.visits + 1, exploration_constant))
if strategy == "ucb1"
else (lambda n: n.puct_score(root.visits + 1, exploration_constant))
)
best_child = max(node.children, key=score_fn)
# SIMULATE: get value estimate
if _TORCH:
state = torch.randn(1, 192)
with torch.no_grad():
value = value_net(state).item()
else:
value = random.uniform(0.3, 0.9)
# BACKPROPAGATE
best_child.visits += 1
best_child.total_value += value
root.visits += 1
elapsed = (time.monotonic() - start) * 1000
# Best plan
if root.children:
best = max(root.children, key=lambda n: n.visits)
best_action = best.action
best_value = round(best.total_value / max(best.visits, 1), 3)
else:
best_action = "none"
best_value = 0.0
return {
"task": task,
"category": category,
"strategy": strategy,
"best_action": best_action,
"best_value": best_value,
"total_simulations": max_simulations,
"exploration_constant": exploration_constant,
"tree": root.to_dict(max_depth=2),
"all_actions": [
{
"action": c.action,
"visits": c.visits,
"value": round(c.total_value / max(c.visits, 1), 3),
}
for c in sorted(root.children or [], key=lambda n: -n.visits)
],
"elapsed_ms": round(elapsed, 2),
"nn_enabled": _TORCH,
}
def benchmark_strategies(task: str) -> dict[str, Any]:
"""Compare MCTS vs Greedy vs Random on the same task."""
# BUG-005 fix: implement real greedy and random strategies
results = {}
# Detect category for action pool
lower = task.lower()
category = "implementation"
for cat, keywords in {
"architecture": ["architect", "design", "micro", "system"],
"security": ["security", "threat", "vulnerability", "attack"],
"optimization": ["optimize", "performance", "latency", "speed"],
"research": ["research", "survey", "study", "analyze"],
}.items():
if any(k in lower for k in keywords):
category = cat
break
actions = TASK_CATEGORIES[category]
# MCTS — full tree search
start = time.monotonic()
r = run_mcts(task, max_simulations=100)
elapsed_mcts = (time.monotonic() - start) * 1000
results["mcts"] = {
"quality_score": r["best_value"],
"best_action": r["best_action"],
"elapsed_ms": round(elapsed_mcts, 2),
}
# Greedy — single-step: pick action with highest policy prior
start = time.monotonic()
if _TORCH:
policy_net = PolicyNetwork(d_in=128, n_actions=len(actions))
policy_net.eval()
torch.manual_seed(hash(task) % (2**31))
embed = torch.randn(1, 128)
with torch.no_grad():
priors = policy_net(embed)[0].numpy()
best_idx = int(np.argmax(priors))
greedy_action = actions[best_idx]
greedy_quality = float(priors[best_idx])
else:
greedy_quality = max(random.uniform(0.1, 0.3) for _ in actions)
greedy_action = random.choice(actions)
elapsed_greedy = (time.monotonic() - start) * 1000
results["greedy"] = {
"quality_score": round(greedy_quality, 3),
"best_action": greedy_action,
"elapsed_ms": round(elapsed_greedy, 2),
}
# Random — pick random action with random value
start = time.monotonic()
random_action = random.choice(actions)
# Simulate value via value network
if _TORCH:
value_net = ValueNetwork(d_in=192)
value_net.eval()
torch.manual_seed(hash(task + random_action) % (2**31))
state = torch.randn(1, 192)
with torch.no_grad():
random_quality = value_net(state).item()
else:
random_quality = random.uniform(-0.5, 0.5)
elapsed_random = (time.monotonic() - start) * 1000
results["random"] = {
"quality_score": round(random_quality, 3),
"best_action": random_action,
"elapsed_ms": round(elapsed_random, 2),
}
return {"task": task, "category": category, "results": results}
def plot_mcts_tree(tree_data: dict) -> go.Figure:
"""Create a sunburst visualization of the MCTS tree."""
ids, labels, parents, values, colors = [], [], [], [], []
def _walk(node: dict, parent_id: str = "") -> None:
nid = node["id"]
ids.append(nid)
labels.append(f"{node['action']}\n(v={node.get('value', 0)}, n={node.get('visits', 0)})")
parents.append(parent_id)
values.append(max(node.get("visits", 1), 1))
colors.append(node.get("value", 0))
for child in node.get("children", []):
_walk(child, nid)
_walk(tree_data)
fig = go.Figure(go.Sunburst(
ids=ids, labels=labels, parents=parents, values=values,
marker=dict(colors=colors, colorscale="Viridis", showscale=True),
branchvalues="total",
))
fig.update_layout(
title="MCTS Search Tree",
height=500,
template="plotly_dark",
margin=dict(t=40, l=0, r=0, b=0),
)
return fig
# ═══════════════════════════════════════════════════════════════════════════
# SECTION 5: MoE Routing
# ═══════════════════════════════════════════════════════════════════════════
EXPERT_NAMES = [
"Code Expert", "Test Expert", "Design Expert", "Research Expert",
"Architecture Expert", "Security Expert", "Performance Expert", "Docs Expert",
]
# BUG-003 fix: singleton router with fixed seed for deterministic routing
_ROUTER_SEED = 42
_router_net_singleton: "RouterNet | None" = None
def _get_router() -> "RouterNet":
"""Get or create the singleton RouterNet with a fixed seed."""
global _router_net_singleton
if _router_net_singleton is None and _TORCH:
torch.manual_seed(_ROUTER_SEED)
_router_net_singleton = RouterNet(d_in=64, n_out=len(EXPERT_NAMES))
_router_net_singleton.eval()
return _router_net_singleton # type: ignore[return-value]
def route_task(task: str, top_k: int = 3) -> dict[str, Any]:
"""Route a task through the neural MoE gate."""
start = time.monotonic()
features = featurize64(task)
feature_tensor = None
if _TORCH:
# BUG-003 fix: use singleton router (deterministic weights)
router = _get_router()
feature_tensor = torch.tensor([features], dtype=torch.float32)
with torch.no_grad():
weights = router(feature_tensor)[0].numpy()
else:
# Fallback: deterministic routing from features
weights = np.array([abs(f) for f in features[:len(EXPERT_NAMES)]])
weights = weights / (weights.sum() + 1e-8)
# BUG-004 fix: apply keyword-based semantic boost to expert routing
lower_task = task.lower()
expert_keywords: dict[int, list[str]] = {
0: ["code", "implement", "function", "class", "program", "script", "module"], # Code Expert
1: ["test", "unit test", "coverage", "qa", "assert", "mock", "fixture"], # Test Expert
2: ["design", "ui", "ux", "layout", "wireframe", "mockup", "style"], # Design Expert
3: ["research", "analyze", "study", "survey", "literature", "paper", "compare"], # Research Expert
4: ["architect", "system", "microservice", "scale", "pattern", "infrastructure"], # Architecture Expert
5: ["security", "auth", "encrypt", "threat", "vulnerab", "owasp", "pci", "compliance"], # Security Expert
6: ["performance", "optimize", "latency", "throughput", "cache", "speed", "fast"], # Performance Expert
7: ["document", "readme", "docs", "comment", "explain", "write", "manual"], # Docs Expert
}
boost = np.zeros(len(EXPERT_NAMES))
for idx, kws in expert_keywords.items():
for kw in kws:
if kw in lower_task:
boost[idx] += 0.15 # increase weight for matching keywords
# Apply boost and renormalize
weights = weights + boost
weights = weights / (weights.sum() + 1e-8)
# Top-K selection
top_indices = np.argsort(weights)[::-1][:top_k]
selected = [
{
"expert": EXPERT_NAMES[i],
"weight": round(float(weights[i]), 4),
"rank": rank + 1,
}
for rank, i in enumerate(top_indices)
]
elapsed = (time.monotonic() - start) * 1000
return {
"task": task,
"features": features,
"all_weights": {EXPERT_NAMES[i]: round(float(weights[i]), 4) for i in range(len(EXPERT_NAMES))},
"selected_experts": selected,
"top_k": top_k,
"nn_enabled": _TORCH,
"elapsed_ms": round(elapsed, 2),
}
def plot_expert_weights(weights: dict[str, float]) -> go.Figure:
"""Create a bar chart of expert routing weights."""
names = list(weights.keys())
vals = list(weights.values())
colors = ["#FF6B6B", "#4ECDC4", "#45B7D1", "#96CEB4", "#FFEAA7", "#DDA0DD", "#F0E68C", "#87CEEB"]
fig = go.Figure(
data=[go.Bar(x=names, y=vals, marker_color=colors[:len(names)])],
layout=go.Layout(
title="Expert Routing Weights", # BUG-016 fix: shortened title
yaxis=dict(title="Weight (softmax)", range=[0, max(vals) * 1.2]),
height=350,
template="plotly_dark",
margin=dict(t=40),
),
)
return fig
# ═══════════════════════════════════════════════════════════════════════════
# SECTION 6: Agent Orchestration
# ═══════════════════════════════════════════════════════════════════════════
AGENTS = [
{"name": "SWE Agent", "specialization": "Code scaffold generation", "icon": "[SWE]"},
{"name": "Architect Agent", "specialization": "System design and patterns", "icon": "[ARCH]"},
{"name": "QA Agent", "specialization": "Test plan and case generation", "icon": "[QA]"},
{"name": "Security Agent", "specialization": "Threat modeling (OWASP)", "icon": "[SEC]"},
{"name": "DevOps Agent", "specialization": "Infrastructure planning", "icon": "[OPS]"},
{"name": "Research Agent", "specialization": "Technical analysis", "icon": "[RES]"},
{"name": "Performance Agent", "specialization": "Optimization analysis", "icon": "[PERF]"},
{"name": "Documentation Agent", "specialization": "Technical writing", "icon": "[DOC]"},
]
# Agent-to-cell mapping for real processing
_AGENT_CELL_MAP: dict[str, str] = {
"SWE Agent": "reasoning",
"Architect Agent": "r2p",
"QA Agent": "reasoning",
"Security Agent": "ethics",
"DevOps Agent": "telemetry",
"Research Agent": "causal",
"Performance Agent": "reasoning",
"Documentation Agent": "reasoning",
}
def orchestrate(task: str, max_agents: int = 3, strategy: str = "moe_routing") -> dict[str, Any]:
"""Orchestrate multiple agents for a task using specified routing strategy."""
start = time.monotonic()
# BUG-010 fix: implement all three routing strategies
if strategy == "round_robin":
# Select agents in round-robin order
selected_agents = AGENTS[:max_agents]
agent_results = []
for i, agent in enumerate(selected_agents):
# BUG-009 fix: execute real cell per agent
cell_type = _AGENT_CELL_MAP.get(agent["name"], "reasoning")
cell_result = execute_cell(cell_type, task)
agent_results.append({
"agent": agent["name"],
"icon": agent["icon"],
"specialization": agent["specialization"],
"weight": round(1.0 / max_agents, 4),
"cell_used": cell_type,
"output": cell_result,
"confidence": cell_result.get("confidence", round(0.8, 3)),
})
elif strategy == "random":
# Randomly select agents
import random as _rnd
shuffled = _rnd.sample(AGENTS, min(max_agents, len(AGENTS)))
agent_results = []
for agent in shuffled:
cell_type = _AGENT_CELL_MAP.get(agent["name"], "reasoning")
cell_result = execute_cell(cell_type, task)
agent_results.append({
"agent": agent["name"],
"icon": agent["icon"],
"specialization": agent["specialization"],
"weight": round(1.0 / max_agents, 4),
"cell_used": cell_type,
"output": cell_result,
"confidence": cell_result.get("confidence", round(0.8, 3)),
})
else: # moe_routing (default)
routing = route_task(task, top_k=max_agents)
agent_results = []
for expert in routing["selected_experts"]:
agent_name = expert["expert"].replace(" Expert", " Agent")
agent = next((a for a in AGENTS if agent_name in a["name"]), AGENTS[0])
# BUG-009 fix: execute real cell per agent
cell_type = _AGENT_CELL_MAP.get(agent["name"], "reasoning")
cell_result = execute_cell(cell_type, task)
agent_results.append({
"agent": agent["name"],
"icon": agent["icon"],
"specialization": agent["specialization"],
"weight": expert["weight"],
"cell_used": cell_type,
"output": cell_result,
"confidence": cell_result.get("confidence", round(0.8, 3)),
})
elapsed = (time.monotonic() - start) * 1000
return {
"task": task,
"strategy": strategy,
"agents_selected": len(agent_results),
"max_agents": max_agents,
"results": agent_results,
"total_elapsed_ms": round(elapsed, 2),
}
# ═══════════════════════════════════════════════════════════════════════════
# SECTION 7: Gradio Interface
# ═══════════════════════════════════════════════════════════════════════════
THEME = gr.themes.Soft(
primary_hue="amber",
secondary_hue="orange",
neutral_hue="stone",
font=gr.themes.GoogleFont("Inter"),
)
CSS = """
.main-header { text-align: center; margin-bottom: 1rem; }
.main-header h1 { background: linear-gradient(135deg, #FF6B6B, #FFEAA7, #4ECDC4);
-webkit-background-clip: text; -webkit-text-fill-color: transparent;
font-size: 2.5rem; font-weight: 800; }
.stat-box { background: linear-gradient(135deg, #1a1a2e, #16213e);
border: 1px solid #0f3460; border-radius: 12px; padding: 1rem;
text-align: center; color: #e8e8e8; }
.stat-box h3 { color: #FFEAA7; margin: 0; font-size: 1.8rem; }
.stat-box p { color: #a8a8a8; margin: 0; font-size: 0.85rem; }
footer { display: none !important; }
.plotly .main-svg { overflow: visible !important; }
"""
def build_app() -> gr.Blocks:
"""Build the complete Gradio application."""
with gr.Blocks(theme=THEME, css=CSS, title="MangoMAS — Multi-Agent Cognitive Architecture") as app:
# Header
gr.HTML("""
<div class="main-header">
<h1>MangoMAS</h1>
<p style="color: #a8a8a8; font-size: 1.1rem;">
Multi-Agent Cognitive Architecture — Interactive Demo
</p>
</div>
""")
# Stats bar
with gr.Row():
for label, value in [
("Cognitive Cells", "10"), ("MoE Params", "~7M"),
("MCTS Strategies", "UCB1 + PUCT"), ("Expert Agents", "8"),
]:
gr.HTML(f'<div class="stat-box"><h3>{value}</h3><p>{label}</p></div>')
# ── TAB 1: Cognitive Cells ─────────────────────────────────────────
with gr.Tab("Cognitive Cells", id="cells"):
gr.Markdown("### Execute any of the 10 biologically-inspired cognitive cells")
with gr.Row():
cell_type = gr.Dropdown(
choices=list(CELL_TYPES.keys()),
value="reasoning",
label="Cell Type",
info="Select a cognitive cell to execute",
)
cell_info = gr.Textbox(
label="Description",
value=CELL_TYPES["reasoning"]["description"],
interactive=False,
)
cell_input = gr.Textbox(
label="Input Text",
placeholder="Enter text to process through the cell...",
value="Design a scalable microservices architecture with event-driven communication",
lines=3,
)
cell_config = gr.Textbox(
label="Config (JSON, optional)",
placeholder='{"head_type": "nn"}',
value="{}",
lines=1,
)
cell_btn = gr.Button("Execute Cell", variant="primary")
cell_output = gr.JSON(label="Cell Output")
gr.Markdown("---\n### Cell Composition Pipeline")
pipeline_input = gr.Textbox(
label="Pipeline (comma-separated cell types)",
value="ethics, reasoning, aggregator",
placeholder="ethics, reasoning, memory",
)
pipeline_text = gr.Textbox(
label="Input Text",
value="Analyze the security implications of this API design: user@example.com",
lines=2,
)
pipeline_btn = gr.Button("Run Pipeline", variant="secondary")
pipeline_output = gr.JSON(label="Pipeline Result")
# Wiring
def on_cell_select(ct: str) -> str:
return CELL_TYPES.get(ct, {}).get("description", "Unknown cell type")
cell_type.change(on_cell_select, inputs=cell_type, outputs=cell_info)
cell_btn.click(execute_cell, inputs=[cell_type, cell_input, cell_config], outputs=cell_output)
pipeline_btn.click(compose_cells, inputs=[pipeline_input, pipeline_text], outputs=pipeline_output)
# ── TAB 2: MCTS Planning ──────────────────────────────────────────
with gr.Tab("MCTS Planning", id="mcts"):
gr.Markdown("### Monte Carlo Tree Search with Policy/Value Neural Networks")
with gr.Row():
mcts_task = gr.Textbox(
label="Task to Plan",
value="Design a secure, scalable REST API with authentication",
lines=2,
scale=3,
)
with gr.Column(scale=1):
mcts_sims = gr.Slider(10, 500, value=100, step=10, label="Simulations")
mcts_c = gr.Slider(0.1, 3.0, value=1.414, step=0.1, label="Exploration Constant (C)")
mcts_strat = gr.Radio(["ucb1", "puct"], value="ucb1", label="Selection Strategy")
mcts_btn = gr.Button("Run MCTS", variant="primary")
with gr.Row():
mcts_tree_plot = gr.Plot(label="Search Tree Visualization")
mcts_json = gr.JSON(label="MCTS Result")
gr.Markdown("---\n### Strategy Benchmark")
bench_task = gr.Textbox(
label="Benchmark Task",
value="Optimize database query performance for high-throughput system",
)
bench_btn = gr.Button("Run Benchmark", variant="secondary")
bench_output = gr.JSON(label="Benchmark Results (MCTS vs Greedy vs Random)")
def run_and_plot(task, sims, c, strat):
result = run_mcts(task, int(sims), c, strat)
fig = plot_mcts_tree(result["tree"])
return fig, result
mcts_btn.click(run_and_plot, inputs=[mcts_task, mcts_sims, mcts_c, mcts_strat], outputs=[mcts_tree_plot, mcts_json])
bench_btn.click(benchmark_strategies, inputs=bench_task, outputs=bench_output)
# ── TAB 3: MoE Router ─────────────────────────────────────────────
with gr.Tab("MoE Router", id="moe"):
gr.Markdown("### Neural Mixture-of-Experts Routing Gate")
gr.Markdown(
"The RouterNet MLP extracts 64-dimensional features from text, "
"then routes to the top-K most relevant expert agents."
)
with gr.Row():
moe_task = gr.Textbox(
label="Task to Route",
value="Implement a threat detection system with real-time alerting",
lines=2,
scale=3,
)
moe_topk = gr.Slider(1, 8, value=3, step=1, label="Top-K Experts", scale=1)
moe_btn = gr.Button("Route Task", variant="primary")
with gr.Row():
moe_features_plot = gr.Plot(label="64-D Feature Vector")
moe_weights_plot = gr.Plot(label="Expert Routing Weights")
moe_json = gr.JSON(label="Routing Result")
def route_and_plot(task, top_k):
result = route_task(task, int(top_k))
feat_fig = plot_features(result["features"])
weight_fig = plot_expert_weights(result["all_weights"])
# Don't send features array to JSON (too large)
display = {k: v for k, v in result.items() if k != "features"}
return feat_fig, weight_fig, display
moe_btn.click(route_and_plot, inputs=[moe_task, moe_topk], outputs=[moe_features_plot, moe_weights_plot, moe_json])
# ── TAB 4: Agent Orchestration ─────────────────────────────────────
with gr.Tab("Agents", id="agents"):
gr.Markdown("### Multi-Agent Orchestration with MoE Routing")
with gr.Row():
orch_task = gr.Textbox(
label="Task",
value="Build a secure payment processing microservice with PCI compliance",
lines=2,
scale=3,
)
with gr.Column(scale=1):
orch_agents = gr.Slider(1, 8, value=3, step=1, label="Max Agents")
orch_strat = gr.Dropdown(
["moe_routing", "round_robin", "random"],
value="moe_routing",
label="Routing Strategy",
)
orch_btn = gr.Button("Orchestrate", variant="primary")
orch_output = gr.JSON(label="Orchestration Result")
gr.Markdown("---\n### Available Agents")
agent_table = gr.Dataframe(
value=[[a["icon"], a["name"], a["specialization"]] for a in AGENTS],
headers=["", "Agent", "Specialization"],
interactive=False,
)
orch_btn.click(orchestrate, inputs=[orch_task, orch_agents, orch_strat], outputs=orch_output)
# ── TAB 5: Architecture ────────────────────────────────────────────
with gr.Tab("Architecture", id="arch"):
gr.Markdown("""
### MangoMAS System Architecture
```
┌──────────────────────────────────────────────────────────┐
│ FastAPI Gateway │
│ (Auth / Tenant Middleware) │
├──────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌───────────────────────────┐ │
│ │ MoE Input │────▶│ RouterNet (Neural Gate) │ │
│ │ Parser │ │ 64-dim → MLP → Softmax │ │
│ └──────────────┘ └─────────┬─────────────────┘ │
│ │ │
│ ┌───────┬───────┬───────┼───────┬───────┐ │
│ ▼ ▼ ▼ ▼ ▼ ▼ │
│ Expert Expert Expert Expert Expert Expert │
│ │ │ │ │ │ │ │
│ Agent Agent Agent Agent Agent Agent │
│ │ │ │ │ │ │ │
│ ┌─────┴───────┴───────┴───────┴───────┴───────┘ │
│ │ Cognitive Cell Layer │
│ │ [Reasoning│Memory│Ethics│Causal│Empathy│...] │
│ └─────────────────────┬────────────────────────┘ │
│ ▼ │
│ Aggregator Cell │
│ (weighted / ensemble / ranking) │
│ │ │
│ Feedback Loop → Router Update │
│ │ │
│ Response + Metrics + Traces │
└──────────────────────────────────────────────────────────┘
```
### Neural Network Components
| Component | Architecture | Parameters | Latency |
|-----------|-------------|------------|---------|
| **MixtureOfExperts7M** | 16 Expert Towers (64→512→512→256) + Gate | ~7M | ~5ms |
| **RouterNet** | MLP (64→128→64→8) + Softmax | ~17K | <1ms |
| **PolicyNetwork** | MLP (128→256→128→32) + Softmax | ~70K | <1ms |
| **ValueNetwork** | MLP (192→256→64→1) + Tanh | ~66K | <1ms |
| **ReasoningCell NN Head** | Lightweight transformer | ~500K | ~50ms |
### Cognitive Cell Lifecycle
```
preprocess() → infer() → postprocess() → publish()
│ │ │ │
Validate Core Logic Format Emit Event
Normalize NN/Rule Filter (Event Bus)
Enrich Inference Enrich
```
""")
# ── TAB 6: Metrics ─────────────────────────────────────────────────
with gr.Tab("Metrics", id="metrics"):
gr.Markdown("### Live Performance Benchmarks")
metrics_btn = gr.Button("Run All Benchmarks", variant="primary")
with gr.Row():
metrics_routing = gr.Plot(label="Routing Latency by Expert Count")
metrics_cells = gr.Plot(label="Cell Execution Latency")
metrics_json = gr.JSON(label="Raw Metrics")
def run_benchmarks():
# BUG-008 fix: warmup run to stabilize timing, then average over more iterations
# Warmup: discard first run
route_task("warmup", top_k=1)
execute_cell("reasoning", "warmup")
# Routing latency vs top-K
ks = list(range(1, 9))
latencies = []
for k in ks:
times = []
for _ in range(10): # increased from 5 to 10 for stability
r = route_task("Test routing benchmark task", top_k=k)
times.append(r["elapsed_ms"])
# Use median instead of mean to reduce outlier impact
times.sort()
latencies.append(times[len(times) // 2])
fig_routing = go.Figure(
data=[go.Scatter(x=ks, y=latencies, mode="lines+markers", name="Routing Latency")],
layout=go.Layout(
title="Routing Latency vs Top-K",
xaxis_title="Top-K Experts",
yaxis_title="Latency (ms)",
height=350,
template="plotly_dark",
),
)
# Cell execution latency
cell_times: dict[str, float] = {}
for ct in CELL_TYPES:
times = []
for _ in range(5): # increased from 3 to 5
r = execute_cell(ct, "Benchmark test input for cell")
times.append(r["elapsed_ms"])
times.sort()
cell_times[ct] = times[len(times) // 2] # median
fig_cells = go.Figure(
data=[go.Bar(
x=list(cell_times.keys()),
y=list(cell_times.values()),
marker_color=["#FF6B6B", "#4ECDC4", "#45B7D1", "#96CEB4", "#FFEAA7",
"#DDA0DD", "#F0E68C", "#87CEEB", "#FFA07A", "#98FB98"],
)],
layout=go.Layout(
title="Cell Execution Latency",
xaxis=dict(
title="Cell Type",
tickangle=-30, # BUG-017 fix: reduce rotation angle
tickfont=dict(size=9),
),
yaxis_title="Latency (ms)",
height=400, # slightly taller for label room
template="plotly_dark",
margin=dict(b=100), # BUG-017 fix: more bottom margin
),
)
summary = {
"torch_available": _TORCH,
"routing_latency_p50_ms": round(sorted(latencies)[len(latencies) // 2], 3),
"cell_latency_avg_ms": round(sum(cell_times.values()) / len(cell_times), 3),
"total_nn_parameters": "~7.15M" if _TORCH else "N/A (CPU fallback)",
}
return fig_routing, fig_cells, summary
metrics_btn.click(run_benchmarks, outputs=[metrics_routing, metrics_cells, metrics_json])
return app
# ═══════════════════════════════════════════════════════════════════════════
# MAIN
# ═══════════════════════════════════════════════════════════════════════════
if __name__ == "__main__":
app = build_app()
app.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
)