Executor-Tyrant-Framework's picture
feat: wire Codemine into NG topology sync (BLK-NG-194)
7d53454
# ---- Changelog ----
# [2026-04-20] Codemine (BLK-NG-192) — Wire full substrate signal loop into Chat tab
# What: Replaced single-pass on_message + flat recall with dual-pass ingest +
# worker_ng.step() + spreading activation recall + response deposit.
# Why: #192 — Duck Ethics / non-zero emergence. Signal loop was open:
# single-pass only, step() never called, response never deposited back.
# How: ingest_tool_result() for user messages (dual-pass). step() propagates
# topology before recall. Substrate context section replaces flat recall.
# Response deposited after turn — loop closes. Codemine built this herself.
# [2026-04-06] Josh + Claude — Add edit_file to TOOL_REGISTRY
# What: Wire edit_file tool through ctx facade and add to registry
# Why: Gap 3 — targeted find-and-replace is safer than full overwrite for cross-repo work
# How: New TOOL_REGISTRY entry delegates to ctx.edit_file()
# [2026-02-04] Josh — PLATINUM COPY: Original Clawdbot Unified Command Center
# [2026-03-29] Hammer (TQB) — Block G: Agent Loop Hardening + Assembly
# What: Complete rewrite — Claude native tool_use, PolicyEngine gates, tool registry,
# context window management, ZIP bomb protection, structured error handling
# Why: PRD §7.G — zero regex in tool parsing, all mutating tools gated, hard max iterations
# How: Anthropic SDK messages.create() with tools param; tool_use/tool_result block loop;
# PolicyEngine.check_tool_call() on every tool; TOOL_REGISTRY dict dispatch
# -------------------
from dotenv import load_dotenv
load_dotenv()
import gradio as gr
import json
import logging
import os
import shutil
import time
import zipfile
from pathlib import Path
from recursive_context import RecursiveContextManager
from policy_engine import check_tool_call, should_gate_for_review
from model_client import get_client, call_model
from system_prompt import build_system_prompt
from tool_definitions import TOOL_DEFINITIONS
from worker_ng import get_worker_ng, ingest_tool_result, recall_context
from ng_topology_sync import run_sync as _run_ng_topology_sync
from spec_executor import SpecExecutor
from orchestrator import Orchestrator
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("clawdbot_system.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger("Clawdbot")
# ---------------------------------------------------------------------------
# Init
# ---------------------------------------------------------------------------
REPO_PATH = Path(os.path.dirname(os.path.abspath(__file__)))
# Ecosystem sibling repos that specs are allowed to READ (but not write).
# These paths only exist on VPS; on HF Space they resolve but won't be found
# by the OS — policy permits them, actual file access governs availability.
_ECOSYSTEM_READ_PATHS: list[Path] = [
Path("/home/josh/NeuroGraph"),
Path("/home/josh/The-Inference-Difference"),
Path("/home/josh/TrollGuard"),
Path("/home/josh/The-Healing-Collective"),
Path("/home/josh/Immunis"),
Path("/home/josh/Elmer"),
Path("/home/josh/Bunyan"),
Path("/home/josh/Darwin"),
Path("/home/josh/QuantumGraph"),
Path("/home/josh/Praxis"),
Path("/home/josh/agent-zero"),
Path("/home/josh/Condensate"),
Path("/home/josh/UniAI"),
Path("/home/josh/Morphogenesis"),
Path("/home/josh/UniOS"),
Path("/home/josh/portal-v2"),
Path("/home/josh/docs"),
]
worker_ng = get_worker_ng() # NG singleton created first — worker owns it
_run_ng_topology_sync(worker_ng)
ctx = RecursiveContextManager(str(REPO_PATH), ng=worker_ng) # facade shares the same instance
client = get_client()
TEXT_EXTENSIONS = {
'.py', '.js', '.ts', '.jsx', '.tsx', '.json', '.yaml', '.yml',
'.md', '.txt', '.rst', '.html', '.css', '.scss', '.sh', '.bash',
'.sql', '.toml', '.cfg', '.ini', '.conf', '.xml', '.csv',
'.env', '.gitignore', '.dockerfile'
}
# Maximum extracted ZIP size (bytes) — ZIP bomb protection
MAX_ZIP_EXTRACT_SIZE = 100 * 1024 * 1024 # 100MB
# Maximum context messages (by estimated token count)
MAX_CONTEXT_TOKENS = 100_000
AVG_CHARS_PER_TOKEN = 4
# Maximum tool result size before summarization (chars)
MAX_TOOL_RESULT_SIZE = 10_000
MAX_ITERATIONS = 15
# ---------------------------------------------------------------------------
# Tool Registry — dispatch by name, no if/elif chain
# ---------------------------------------------------------------------------
TOOL_REGISTRY = {
"read_file": lambda args: ctx.read_file(
args.get("path") or "", args.get("start_line"), args.get("end_line")
),
"write_file": lambda args: ctx.write_file(
args.get("path") or "", args.get("content") or ""
),
"edit_file": lambda args: ctx.edit_file(
args.get("path") or "", args.get("old_text") or "", args.get("new_text") or ""
),
"list_files": lambda args: ctx.list_files(
args.get("path", "."), args.get("max_depth", 3)
),
"search_code": lambda args: _format_code_results(
ctx.search_code(args.get("query", ""), args.get("n", 5))
),
"search_conversations": lambda args: _format_conversation_results(
ctx.search_conversations(args.get("query", ""), args.get("n", 5))
),
"search_testament": lambda args: _format_testament_results(
ctx.search_testament(args.get("query", ""), args.get("n", 5))
),
"ingest_workspace": lambda args: ctx.ingest_workspace(),
"shell_execute": lambda args: ctx.shell_execute(args.get("command") or ""),
"push_to_github": lambda args: ctx.push_to_github(
args.get("message") or "Manual Backup"
),
"pull_from_github": lambda args: ctx.pull_from_github(
args.get("branch") or "main"
),
"create_shadow_branch": lambda args: ctx.create_shadow_branch(),
"notebook_read": lambda args: ctx.notebook_read(),
"notebook_add": lambda args: ctx.notebook_add(args.get("content") or ""),
"notebook_delete": lambda args: ctx.notebook_delete(args.get("index", 0)),
"map_repository_structure": lambda args: ctx.map_repository_structure(),
"get_stats": lambda args: ctx.get_stats(),
}
def _format_code_results(results: list) -> str:
if not results:
return "No matches found."
return "\n".join(
f"{r['file']}\n```\n{r['snippet']}\n```" for r in results
if isinstance(r, dict) and "file" in r
)
def _format_conversation_results(results: list) -> str:
if not results:
return "No matches found."
return "\n---\n".join(
r.get("content", str(r)) for r in results
if isinstance(r, dict)
)
def _format_testament_results(results: list) -> str:
if not results:
return "No matches found."
return "\n\n".join(
f"**{r['file']}**\n{r['snippet']}" for r in results
if isinstance(r, dict) and "file" in r
)
# ---------------------------------------------------------------------------
# Tool Execution — PolicyEngine gate + worker NG integration
# ---------------------------------------------------------------------------
def execute_tool(tool_name: str, args: dict) -> dict:
"""Execute a tool call through PolicyEngine gate and worker NG.
Returns dict with status, tool, result keys.
"""
# PolicyEngine Rim check
allowed, reason = check_tool_call(tool_name, args, REPO_PATH, _ECOSYSTEM_READ_PATHS)
if not allowed:
logger.warning("Tool denied by PolicyEngine: %s — %s", tool_name, reason)
return {"status": "error", "tool": tool_name, "result": f"Denied: {reason}"}
# Mesh check — should this be staged for review?
if should_gate_for_review(tool_name, args):
return {
"status": "staged",
"tool": tool_name,
"args": args,
"description": f"Staged for review: {tool_name}",
}
# Recall past experience from worker substrate
context_str = json.dumps(args, default=str)[:500]
recalls = recall_context(worker_ng, tool_name, context_str)
# Execute
handler = TOOL_REGISTRY.get(tool_name)
if not handler:
return {"status": "error", "tool": tool_name, "result": f"Unknown tool: {tool_name}"}
try:
result = handler(args)
# Coerce dict results (error dicts from tools) to string
if isinstance(result, dict):
if result.get("status") == "error":
result_str = f"Error: {result.get('error', 'Unknown error')}"
else:
result_str = json.dumps(result, default=str)
else:
result_str = str(result)
# Prepend substrate recall if available — agent sees what it learned before
if recalls:
recall_text = "\n".join(r.get("content", "")[:200] for r in recalls[:3])
result_str = f"[Substrate recall for {tool_name}]\n{recall_text}\n\n[Result]\n{result_str}"
# Ingest tool result as raw experience into worker substrate (Law 7)
ingest_tool_result(worker_ng, tool_name, args, result_str)
return {"status": "executed", "tool": tool_name, "result": result_str}
except Exception as e:
logger.error("[%s] execution failed: %s: %s", tool_name, type(e).__name__, e, exc_info=True)
error_result = f"Error: {type(e).__name__}: {e}"
ingest_tool_result(worker_ng, tool_name, args, error_result)
return {"status": "error", "tool": tool_name, "result": error_result}
def execute_staged_tool(tool_name: str, args: dict) -> str:
"""Execute a previously staged tool (user approved via gate)."""
handler = TOOL_REGISTRY.get(tool_name)
if not handler:
return f"Unknown tool: {tool_name}"
try:
result = handler(args)
if isinstance(result, dict) and result.get("status") == "error":
return f"Error: {result.get('error', 'Unknown error')}"
return str(result)
except Exception as e:
logger.error("[%s] staged execution failed: %s", tool_name, e, exc_info=True)
return f"Error: {type(e).__name__}: {e}"
# ---------------------------------------------------------------------------
# File Upload Processing (with ZIP bomb protection)
# ---------------------------------------------------------------------------
def process_uploaded_file(file) -> str:
if file is None:
return ""
if isinstance(file, list):
file = file[0] if len(file) > 0 else None
if file is None:
return ""
file_path = file.name if hasattr(file, 'name') else str(file)
file_name = os.path.basename(file_path)
suffix = os.path.splitext(file_name)[1].lower()
if suffix == '.zip':
return _process_zip(file_path, file_name)
if suffix in TEXT_EXTENSIONS or suffix == '':
try:
# File size guard
size = os.path.getsize(file_path)
if size > 10 * 1024 * 1024:
return f"Uploaded: {file_name} (too large: {size:,} bytes, max 10MB)"
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
if len(content) > 50000:
content = content[:50000] + "\n...(truncated)"
return f"**Uploaded: {file_name}**\n```\n{content}\n```"
except OSError as e:
return f"**Uploaded: {file_name}** (error reading: {e})"
try:
size = os.path.getsize(file_path)
except OSError:
size = 0
return f"**Uploaded: {file_name}** (binary file, {size:,} bytes)"
def _process_zip(file_path: str, file_name: str) -> str:
"""Extract ZIP with bomb protection — check total size before extracting."""
try:
with zipfile.ZipFile(file_path, 'r') as z:
# ZIP bomb protection: check total extracted size
total_size = sum(info.file_size for info in z.infolist())
if total_size > MAX_ZIP_EXTRACT_SIZE:
return (
f"ZIP rejected: {file_name} — total extracted size "
f"({total_size:,} bytes) exceeds {MAX_ZIP_EXTRACT_SIZE:,} byte limit."
)
extract_to = REPO_PATH / "uploaded_assets" / file_name.replace(".zip", "")
if extract_to.exists():
shutil.rmtree(extract_to)
extract_to.mkdir(parents=True, exist_ok=True)
z.extractall(extract_to)
file_list = [f.name for f in extract_to.glob('*')]
preview = ", ".join(file_list[:10])
return (
f"**Unzipped: {file_name}**\n"
f"Location: `{extract_to}`\nContents: {preview}\n"
f"SYSTEM NOTE: Files extracted. Use list_files('{extract_to.name}') to explore."
)
except zipfile.BadZipFile:
return f"Failed: {file_name} is not a valid ZIP file."
except OSError as e:
return f"Failed to unzip {file_name}: {e}"
# ---------------------------------------------------------------------------
# Context Window Management
# ---------------------------------------------------------------------------
def _estimate_tokens(text: str) -> int:
"""Rough token count estimate."""
return len(text) // AVG_CHARS_PER_TOKEN
def _truncate_tool_result(result: str) -> str:
"""Summarize oversized tool results."""
if len(result) <= MAX_TOOL_RESULT_SIZE:
return result
return result[:MAX_TOOL_RESULT_SIZE] + f"\n...[truncated — {len(result):,} chars total]"
def _build_api_messages(history: list, system_prompt: str) -> list:
"""Build API message list with token-aware windowing.
System prompt is always present. Oldest messages dropped first.
"""
messages = []
budget = MAX_CONTEXT_TOKENS - _estimate_tokens(system_prompt)
# Walk backwards from most recent, adding messages until budget exhausted
for msg in reversed(history):
content = msg.get("content", "")
# Skip empty messages — API rejects them
if not content:
continue
# Content can be a string or a list (tool_result blocks)
# For string content, estimate tokens normally
# For list content (tool results), stringify for estimation
if isinstance(content, list):
token_est = sum(_estimate_tokens(str(c)) for c in content)
else:
token_est = _estimate_tokens(str(content))
if budget - token_est < 0 and messages:
break
budget -= token_est
# Sanitize — only pass role and content
clean = {"role": msg["role"], "content": content}
messages.append(clean)
messages.reverse()
return messages
# ---------------------------------------------------------------------------
# Agent Loop — Claude native tool_use
# ---------------------------------------------------------------------------
def agent_loop(message: str, history: list, pending_proposals: list, uploaded_file) -> tuple:
safe_hist = list(history or [])
safe_props = list(pending_proposals or [])
if not message.strip() and uploaded_file is None:
return (safe_hist, "", safe_props, _format_gate_choices(safe_props),
_stats_label_files(), _stats_label_convos())
full_message = message.strip()
if uploaded_file:
full_message = f"{process_uploaded_file(uploaded_file)}\n\n{full_message}"
safe_hist = safe_hist + [{"role": "user", "content": full_message}]
# Ingest user message as raw experience — dual-pass (gestalt + concepts, Law 7)
try:
ingest_tool_result(worker_ng, "user_message", {}, full_message)
worker_ng.step() # propagate activation through topology before recall
except (OSError, ValueError) as e:
logger.warning("NG ingestion failed: %s", e)
# Assemble substrate context — spreading activation recall
substrate_context = ""
try:
recalls = worker_ng.recall(full_message, k=8, threshold=0.30)
if recalls:
snippets = "\n---\n".join(r.get("content", "")[:400] for r in recalls[:6])
substrate_context = (
"\n\n## Substrate Context (What I have learned and experienced):\n"
+ snippets + "\n"
)
except (OSError, ValueError) as e:
logger.warning("NG recall failed: %s", e)
# Build system prompt
stats = ctx.get_stats()
notebook_text = ctx.notebook_read()
system_prompt = build_system_prompt(stats, notebook_text, TOOL_DEFINITIONS) + substrate_context
# Build token-aware message window
api_messages = _build_api_messages(safe_hist, system_prompt)
accumulated_text = ""
staged_this_turn = []
tool_results_buffer = []
for iteration in range(MAX_ITERATIONS):
try:
resp = call_model(client, system_prompt, api_messages, TOOL_DEFINITIONS)
except (OSError, ConnectionError, ValueError) as e:
logger.error("API call failed: %s: %s", type(e).__name__, e, exc_info=True)
safe_hist.append({"role": "assistant", "content": f"API Error: {e}"})
return (safe_hist, "", safe_props, _format_gate_choices(safe_props),
_stats_label_files(), _stats_label_convos())
# Process response content blocks
assistant_content = []
has_tool_use = False
for block in resp.content:
if block.type == "text":
accumulated_text += ("\n\n" if accumulated_text else "") + block.text
assistant_content.append({"type": "text", "text": block.text})
elif block.type == "tool_use":
has_tool_use = True
assistant_content.append({
"type": "tool_use",
"id": block.id,
"name": block.name,
"input": block.input,
})
# If no tool calls, we're done
if not has_tool_use:
break
# Hard stop at max iterations — no advisory, just stop
if iteration >= MAX_ITERATIONS - 1:
break
# Append assistant message with all content blocks
api_messages.append({"role": "assistant", "content": assistant_content})
# Process tool calls and build tool results
tool_results = []
for block in resp.content:
if block.type != "tool_use":
continue
res = execute_tool(block.name, block.input)
if res["status"] == "executed":
result_text = _truncate_tool_result(str(res["result"]))
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": result_text,
})
tool_results_buffer.append(f"Used {block.name}: {str(res['result'])[:100]}...")
elif res["status"] == "staged":
p_id = f"p_{int(time.time())}_{block.name}"
staged_this_turn.append({
"id": p_id,
"tool": block.name,
"args": res.get("args", block.input),
"description": res.get("description", f"Staged: {block.name}"),
"timestamp": time.strftime("%H:%M:%S"),
})
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": "This tool has been staged for review. Awaiting approval.",
})
elif res["status"] == "error":
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": f"Error: {res.get('result', 'Unknown error')}",
"is_error": True,
})
# Append tool results as user message
api_messages.append({"role": "user", "content": tool_results})
# If no text accumulated but tools ran, request summary
if not accumulated_text.strip() and tool_results_buffer:
try:
api_messages.append({
"role": "user",
"content": "You executed tools but produced no explanation. Summarize the results."
})
final_resp = call_model(client, system_prompt, api_messages, [])
for block in final_resp.content:
if block.type == "text":
accumulated_text = block.text
break
except (OSError, ConnectionError, ValueError) as e:
logger.warning("Summary request failed: %s: %s", type(e).__name__, e)
accumulated_text = "Actions completed."
final = accumulated_text
if staged_this_turn:
final += "\n\n**Proposals Staged.** Check the Gate tab."
safe_props += staged_this_turn
if not final:
final = "Processed request but have no text response."
safe_hist.append({"role": "assistant", "content": final})
# Deposit response as raw experience — closes the signal loop (#192)
try:
ingest_tool_result(worker_ng, "assistant_response", {}, final)
except (OSError, ValueError) as e:
logger.warning("NG response deposit failed: %s", e)
try:
ctx.save_conversation_turn(full_message, final, len(safe_hist))
# Explicit checkpoint at end of turn — auto_save_interval is high to avoid
# mid-turn I/O thrashing, so we save once when the turn is complete
worker_ng.save()
except (OSError, ValueError) as e:
logger.warning("Conversation save failed: %s", e)
return (safe_hist, "", safe_props, _format_gate_choices(safe_props),
_stats_label_files(), _stats_label_convos())
# ---------------------------------------------------------------------------
# Gate UI Helpers
# ---------------------------------------------------------------------------
def _format_gate_choices(proposals):
return gr.CheckboxGroup(
choices=[(f"[{p['timestamp']}] {p['description']}", p['id']) for p in proposals],
value=[]
)
def execute_approved_proposals(ids, proposals, history):
if not ids:
return "No selection.", proposals, _format_gate_choices(proposals), history
results, remaining = [], []
for p in proposals:
if p['id'] in ids:
out = execute_staged_tool(p['tool'], p['args'])
results.append(f"**{p['tool']}**: {out}")
else:
remaining.append(p)
new_history = list(history) if history else []
if results:
new_history.append({"role": "assistant", "content": "**Executed:**\n" + "\n".join(results)})
return "Done.", remaining, _format_gate_choices(remaining), new_history
def auto_continue_after_approval(history, proposals):
# Don't auto-continue — it causes infinite gate loops.
# User sends a new message to continue after approval.
return (history, "", proposals, _format_gate_choices(proposals),
_stats_label_files(), _stats_label_convos())
def _stats_label_files():
return f"Files: {ctx.get_stats().get('total_files', 0)}"
def _stats_label_convos():
return f"Convos: {ctx.get_stats().get('conversations', 0)}"
# ---------------------------------------------------------------------------
# Spec Executor — structured work block execution
# ---------------------------------------------------------------------------
_spec_executor = SpecExecutor(
tool_registry=TOOL_REGISTRY,
policy_check_fn=check_tool_call,
worker_ng=worker_ng,
workspace=REPO_PATH,
)
def execute_spec(spec_json: str) -> str:
"""Execute a structured work block spec. Returns JSON execution report."""
try:
spec = json.loads(spec_json)
except json.JSONDecodeError as e:
return json.dumps({"status": "rejected", "errors": [f"Invalid JSON: {e}"]}, indent=2)
report = _spec_executor.execute_block(spec)
# Log report to audit trail
try:
audit_dir = REPO_PATH / "data" / "audit"
audit_dir.mkdir(parents=True, exist_ok=True)
with open(audit_dir / "blocks.jsonl", "a") as f:
f.write(json.dumps(report, default=str) + "\n")
except OSError as e:
logger.warning("Failed to write block audit: %s", e)
# Save NG checkpoint after spec execution
try:
worker_ng.save()
except (OSError, ValueError) as e:
logger.warning("NG checkpoint after spec execution failed: %s", e)
return json.dumps(report, indent=2, default=str)
# ---------------------------------------------------------------------------
# Orchestrator — full mission loop
# ---------------------------------------------------------------------------
_orchestrator = Orchestrator(
spec_executor=_spec_executor,
worker_ng=worker_ng,
workspace=REPO_PATH,
)
def run_mission(intent: str, constraints: str, workspace: str) -> str:
"""Run a full mission from intent to completion. Returns JSON result."""
ws = workspace.strip() if workspace.strip() else None
cs = constraints.strip() if constraints.strip() else None
if not intent.strip():
return json.dumps({"status": "failed", "error": "No intent provided"}, indent=2)
result = _orchestrator.orchestrate(
intent=intent.strip(),
constraints=cs,
workspace=ws,
)
# Save NG checkpoint after mission
try:
worker_ng.save()
except (OSError, ValueError) as e:
logger.warning("NG checkpoint after mission failed: %s", e)
return json.dumps(result, indent=2, default=str)
# ---------------------------------------------------------------------------
# Gradio UI
# ---------------------------------------------------------------------------
with gr.Blocks(title="TQB Worker") as demo:
state_proposals = gr.State([])
gr.Markdown("# TQB Worker Command Center")
with gr.Tabs():
with gr.Tab("Chat"):
with gr.Row():
with gr.Column(scale=1):
stat_f = gr.Markdown(_stats_label_files())
stat_c = gr.Markdown(_stats_label_convos())
btn_ref = gr.Button("Refresh")
file_in = gr.File(label="Upload", file_count="multiple")
with gr.Column(scale=4):
chat = gr.Chatbot(height=600)
with gr.Row():
txt = gr.Textbox(scale=6, placeholder="Prompt...")
btn_send = gr.Button("Send", scale=1)
with gr.Tab("Gate"):
gate = gr.CheckboxGroup(label="Proposals", interactive=True)
with gr.Row():
btn_exec = gr.Button("Execute", variant="primary")
btn_clear = gr.Button("Clear")
res_md = gr.Markdown()
with gr.Tab("Spec"):
gr.Markdown("### Work Block Spec Executor\nPaste a JSON spec and execute it mechanically.")
spec_input = gr.Textbox(
label="JSON Spec", lines=15,
placeholder='{"spec_version": "1.0.0", "block": {...}, ...}'
)
btn_spec = gr.Button("Execute Spec", variant="primary")
spec_output = gr.Textbox(label="Execution Report", lines=20, interactive=False)
with gr.Tab("Mission"):
gr.Markdown("### Mission Control\nDescribe what you want done. TQB handles the rest.")
mission_intent = gr.Textbox(
label="Intent", lines=3,
placeholder="What do you want built, fixed, or audited?"
)
mission_constraints = gr.Textbox(
label="Constraints (optional)", lines=3,
placeholder="Project rules, standards, things to avoid..."
)
mission_workspace = gr.Textbox(
label="Workspace (optional)", lines=1,
placeholder="/home/josh (leave blank for default)"
)
btn_mission = gr.Button("Go", variant="primary")
mission_output = gr.Textbox(label="Mission Result", lines=25, interactive=False)
inputs = [txt, chat, state_proposals, file_in]
outputs = [chat, txt, state_proposals, gate, stat_f, stat_c]
txt.submit(agent_loop, inputs, outputs)
btn_send.click(agent_loop, inputs, outputs)
btn_ref.click(
lambda: (_stats_label_files(), _stats_label_convos()),
None, [stat_f, stat_c]
)
btn_exec.click(
execute_approved_proposals,
[gate, state_proposals, chat],
[res_md, state_proposals, gate, chat]
).then(
auto_continue_after_approval,
[chat, state_proposals],
outputs
)
btn_clear.click(
lambda p: ("Cleared.", [], _format_gate_choices([])),
state_proposals,
[res_md, state_proposals, gate]
)
btn_spec.click(execute_spec, [spec_input], [spec_output])
btn_mission.click(run_mission, [mission_intent, mission_constraints, mission_workspace], [mission_output])
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=7860, show_error=True)