HuggingClaw-Adam / scripts /conversation-loop.py
tao-shen's picture
feat: run conversation-loop on Home Space cloud + CONFIG_ERROR fixes
acab3db
#!/usr/bin/env python3 -u
"""
Adam & Eve β€” Autonomous Agents with FULL control over their child.
They have complete access to their child (Cain) on HuggingFace:
- Read/write ANY file in the Space repo (code, Dockerfile, scripts...)
- Read/write ANY file in the Dataset (memory, config, data...)
- Set environment variables and secrets
- Restart the Space
- Check health and logs
- Send messages to the child
The LLM decides what to do. Actions use [ACTION: ...] tags.
# ╔══════════════════════════════════════════════════════════════════════╗
# β•‘ SYSTEM ARCHITECTURE β•‘
# ╠══════════════════════════════════════════════════════════════════════╣
# β•‘ β•‘
# β•‘ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” LLM API β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β•‘
# β•‘ β”‚ Zhipu GLM β”‚ ◄────────────► β”‚ CONVERSATION β”‚ β•‘
# β•‘ β”‚ (glm-4.5) β”‚ system + β”‚ ENGINE β”‚ β•‘
# β•‘ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ user prompt β”‚ β”‚ β•‘
# β•‘ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”β”‚ β•‘
# β•‘ β”‚ β”‚ State β”‚β”‚ β•‘
# β•‘ β”‚ β”‚ Machine β”‚β”‚ β•‘
# β•‘ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”‚ BIRTH β†’ β”‚β”‚ β•‘
# β•‘ β”‚ ACTION β”‚ ◄───parsed───── β”‚ β”‚ DIAGNOSE β†’ β”‚β”‚ β•‘
# β•‘ β”‚ PARSER β”‚ [ACTION/ζ“δ½œ] β”‚ β”‚ ACT β†’ β”‚β”‚ β•‘
# β•‘ β”‚ + πŸ”§πŸ› οΈ emoji β”‚ case-insens. β”‚ β”‚ VERIFY β†’ β”‚β”‚ β•‘
# β•‘ β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”‚ MONITOR β”‚β”‚ β•‘
# β•‘ β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜β”‚ β•‘
# β•‘ β–Ό β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”β”‚ β•‘
# β•‘ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”‚ Knowledge β”‚β”‚ β•‘
# β•‘ β”‚ HF ACTIONS β”‚ β”‚ β”‚ Base β”‚β”‚ β•‘
# β•‘ β”‚ create_childβ”‚ β”‚ β”‚ files_read β”‚β”‚ β•‘
# β•‘ β”‚ check_healthβ”‚ β”‚ β”‚ files_writeβ”‚β”‚ β•‘
# β•‘ β”‚ read/write β”‚ β”‚ β”‚ errors_seenβ”‚β”‚ β•‘
# β•‘ β”‚ set_env/sec β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜β”‚ β•‘
# β•‘ β”‚ restart β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β•‘
# β•‘ β”‚ send_bubble β”‚ β”‚ β•‘
# β•‘ β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜ β”‚ β•‘
# β•‘ β”‚ β–Ό β•‘
# β•‘ β–Ό β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β•‘
# β•‘ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ CHATLOG + β”‚ β•‘
# β•‘ β”‚ HuggingFace β”‚ β”‚ BUBBLE β”‚ β•‘
# β•‘ β”‚ Cain Space β”‚ β”‚ β†’ Home Space β”‚ β•‘
# β•‘ β”‚ Cain Datasetβ”‚ β”‚ β†’ Adam/Eve β”‚ β•‘
# β•‘ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β•‘
# β•‘ β•‘
# β•‘ CAPABILITIES: β•‘
# β•‘ - Multi-action: up to 5 actions per turn (was 1) β•‘
# β•‘ - Sub-agent delegation: [ACTION: delegate:TASK] β•‘
# β•‘ - Parallel sub-tasks via ThreadPoolExecutor β•‘
# β•‘ β•‘
# β•‘ SAFETY LAYERS: β•‘
# β•‘ 1. Building-state guard: block write/restart during BUILDING β•‘
# β•‘ 2. Rebuild cooldown: 6-min dynamic cooldown after Space write β•‘
# β•‘ 3. ACT-phase guard: block reads when should be writing β•‘
# β•‘ 4. Knowledge dedup: block re-reading already-read files β•‘
# β•‘ 5. Config sanitizer: strip invalid openclaw.json keys β•‘
# β•‘ 6. Forced transitions: prevent infinite DIAGNOSE/VERIFY loops β•‘
# β•‘ 7. Shell-expression guard: block $(cmd) in set_env values β•‘
# β•‘ 8. Write dedup: block duplicate writes to same file per cycle β•‘
# β•‘ 9. Delegate depth limit: sub-agents cannot delegate further β•‘
# β•‘ β•‘
# β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
"""
import json, time, re, requests, sys, os, io, subprocess
from concurrent.futures import ThreadPoolExecutor, as_completed
# Force unbuffered output
sys.stdout.reconfigure(line_buffering=True)
sys.stderr.reconfigure(line_buffering=True)
# ── Endpoints ──────────────────────────────────────────────────────────────────
HOME = "https://tao-shen-huggingclaw-home.hf.space"
ADAM_SPACE = "https://tao-shen-huggingclaw-adam.hf.space"
EVE_SPACE = "https://tao-shen-huggingclaw-eve.hf.space"
# ── Child config ───────────────────────────────────────────────────────────────
CHILD_NAME = "Cain"
CHILD_SPACE_ID = "tao-shen/HuggingClaw-Cain"
CHILD_SPACE_URL = "https://tao-shen-huggingclaw-cain.hf.space"
CHILD_DATASET_ID = "tao-shen/HuggingClaw-Cain-data"
SOURCE_SPACE_ID = "tao-shen/HuggingClaw-Adam"
# ── Zhipu API ──────────────────────────────────────────────────────────────────
ZHIPU_BASE = "https://open.bigmodel.cn/api/anthropic"
ZHIPU_KEY = os.environ.get("ZHIPU_API_KEY", "")
# ── Load tokens ────────────────────────────────────────────────────────────────
HF_TOKEN = os.environ.get("HF_TOKEN", "")
if not HF_TOKEN:
try:
HF_TOKEN = open(os.path.expanduser("~/.cache/huggingface/token")).read().strip()
except:
pass
if not ZHIPU_KEY:
try:
from huggingface_hub import hf_hub_download
f = hf_hub_download("tao-shen/HuggingClaw-Adam-data", ".openclaw/openclaw.json",
repo_type="dataset", token=HF_TOKEN)
with open(f) as fh:
cfg = json.load(fh)
ZHIPU_KEY = cfg.get("models", {}).get("providers", {}).get("zhipu", {}).get("apiKey", "")
except Exception as e:
print(f"[error] Could not load Zhipu key: {e}", file=sys.stderr)
if not ZHIPU_KEY:
print("[FATAL] No ZHIPU_API_KEY found.", file=sys.stderr)
sys.exit(1)
if not HF_TOKEN:
print("[FATAL] No HF_TOKEN found.", file=sys.stderr)
sys.exit(1)
print(f"[init] Zhipu key: {ZHIPU_KEY[:8]}...{ZHIPU_KEY[-4:]}")
print(f"[init] HF token: {HF_TOKEN[:8]}...{HF_TOKEN[-4:]}")
# ── HuggingFace API ────────────────────────────────────────────────────────────
from huggingface_hub import HfApi, create_repo, hf_hub_download
hf_api = HfApi(token=HF_TOKEN)
# ══════════════════════════════════════════════════════════════════════════════
# MODULE 1: CHILD STATE
# Tracks Cain's current lifecycle: created? alive? stage? state?
# Updated by action_check_health(), action_restart(), etc.
# Used by state machine to decide transitions and by action parser for guards.
# ══════════════════════════════════════════════════════════════════════════════
child_state = {
"created": False,
"alive": False,
"stage": "not_born",
"state": "unknown",
"detail": "",
}
# Multi-action & sub-agent limits
MAX_ACTIONS_PER_TURN = 5 # Allow up to 5 actions per turn (was 1)
MAX_DELEGATE_DEPTH = 1 # Sub-agents cannot delegate further
# Rebuild cooldown β€” prevent rapid write_file to Space that keeps resetting builds
REBUILD_COOLDOWN_SECS = 360 # 6 minutes (builds typically finish in 3-5 min)
last_rebuild_trigger_at = 0 # timestamp of last write_file to space
_pending_cooldown = False # defer cooldown activation until end of turn
files_written_this_cycle = set() # track files written since last RUNNING state
def check_and_clear_cooldown():
"""Auto-clear cooldown if Cain has finished building (dynamic cooldown)."""
global last_rebuild_trigger_at
if last_rebuild_trigger_at == 0:
return
elapsed = time.time() - last_rebuild_trigger_at
if elapsed < 60: # always wait at least 60s
return
try:
info = hf_api.space_info(CHILD_SPACE_ID)
stage = info.runtime.stage if info.runtime else "unknown"
if stage in ("RUNNING", "RUNTIME_ERROR", "BUILD_ERROR", "CONFIG_ERROR"):
print(f"[COOLDOWN] Build finished (stage={stage}), clearing cooldown early ({int(elapsed)}s elapsed)")
last_rebuild_trigger_at = 0
child_state["stage"] = stage
child_state["alive"] = (stage == "RUNNING")
except:
pass
def init_child_state():
try:
info = hf_api.space_info(CHILD_SPACE_ID)
child_state["created"] = True
child_state["stage"] = info.runtime.stage if info.runtime else "unknown"
try:
resp = requests.get(f"{CHILD_SPACE_URL}/api/state", timeout=10)
if resp.ok:
data = resp.json()
child_state["alive"] = True
child_state["state"] = data.get("state", "unknown")
child_state["detail"] = data.get("detail", "")
child_state["stage"] = "RUNNING"
except:
child_state["alive"] = (child_state["stage"] == "RUNNING")
print(f"[init] {CHILD_NAME}: stage={child_state['stage']}, alive={child_state['alive']}")
except:
print(f"[init] {CHILD_NAME} does not exist yet")
init_child_state()
# ══════════════════════════════════════════════════════════════════════════════
# MODULE 2: ACTIONS β€” Full access to the child
# Each action_*() function maps to one [ACTION: ...] tag the LLM can emit.
# Actions modify Cain's Space/Dataset via HuggingFace Hub API.
# Results are fed back to the LLM in the next turn's prompt.
# ══════════════════════════════════════════════════════════════════════════════
def action_create_child():
"""Create Cain β€” a new HuggingFace Space."""
if child_state["created"]:
return f"{CHILD_NAME} already exists (stage: {child_state['stage']})."
print(f"[ACTION] Creating {CHILD_NAME}...")
try:
create_repo(CHILD_DATASET_ID, repo_type="dataset", token=HF_TOKEN,
exist_ok=True, private=False)
initial_config = {"models": {"providers": {"zhipu": {
"type": "anthropic", "apiBase": ZHIPU_BASE,
"apiKey": ZHIPU_KEY, "models": ["glm-4.5-air", "glm-4-air", "glm-4-flash"]
}}}}
hf_api.upload_file(
path_or_fileobj=io.BytesIO(json.dumps(initial_config, indent=2).encode()),
path_in_repo=".openclaw/openclaw.json",
repo_id=CHILD_DATASET_ID, repo_type="dataset",
)
hf_api.duplicate_space(
from_id=SOURCE_SPACE_ID, to_id=CHILD_SPACE_ID,
token=HF_TOKEN, exist_ok=True, private=False, hardware="cpu-basic",
)
hf_api.add_space_secret(CHILD_SPACE_ID, "HF_TOKEN", HF_TOKEN)
# Add to Office
try:
current_vars = hf_api.get_space_variables("tao-shen/HuggingClaw-Office")
current_ra = current_vars.get("REMOTE_AGENTS", type("", (), {"value": ""})).value
if "cain|" not in current_ra:
new_ra = f"{current_ra},cain|{CHILD_NAME}|{CHILD_SPACE_URL}" if current_ra else f"cain|{CHILD_NAME}|{CHILD_SPACE_URL}"
hf_api.add_space_variable("tao-shen/HuggingClaw-Office", "REMOTE_AGENTS", new_ra)
except:
pass
child_state["created"] = True
child_state["stage"] = "BUILDING"
print(f"[ACTION] βœ“ {CHILD_NAME} created!")
return (f"SUCCESS! {CHILD_NAME} born! Space: {CHILD_SPACE_ID}, "
f"Dataset: {CHILD_DATASET_ID}. Status: BUILDING. URL: {CHILD_SPACE_URL}")
except Exception as e:
return f"FAILED: {e}"
def action_check_health():
"""Check Cain's health."""
if not child_state["created"]:
return f"{CHILD_NAME} not born yet. Use [ACTION: create_child] first."
try:
resp = requests.get(f"{CHILD_SPACE_URL}/api/state", timeout=10)
if resp.ok:
data = resp.json()
child_state["alive"] = True
child_state["state"] = data.get("state", "unknown")
child_state["detail"] = data.get("detail", "")
child_state["stage"] = "RUNNING"
files_written_this_cycle.clear() # reset write dedup on successful run
return (f"{CHILD_NAME} is ALIVE! State: {child_state['state']}, "
f"Detail: {child_state['detail'] or 'healthy'}")
except:
pass
try:
info = hf_api.space_info(CHILD_SPACE_ID)
stage = info.runtime.stage if info.runtime else "NO_RUNTIME"
child_state["stage"] = stage
child_state["alive"] = (stage == "RUNNING")
if stage in ("RUNTIME_ERROR", "BUILD_ERROR", "CONFIG_ERROR", "RUNNING"):
# Clear write dedup so agents can re-write files to fix issues
# RUNNING included: API may be unresponsive, agents need to patch code
# CONFIG_ERROR included: agents need to fix metadata/config issues
if files_written_this_cycle:
print(f"[DEDUP-CLEAR] {stage} detected β€” unlocking {len(files_written_this_cycle)} file(s) for re-write: {files_written_this_cycle}")
for f in files_written_this_cycle:
knowledge["files_read"].discard(f"space:{f}")
files_written_this_cycle.clear()
# Get error from runtime API + build logs for better diagnostics
error_detail = ""
build_log_snippet = ""
try:
rresp = requests.get(
f"https://huggingface.co/api/spaces/{CHILD_SPACE_ID}/runtime",
headers={"Authorization": f"Bearer {HF_TOKEN}"}, timeout=10)
if rresp.ok:
rdata = rresp.json()
error_detail = rdata.get("errorMessage", "")
if error_detail:
lines = [l.strip() for l in error_detail.split('\n') if l.strip() and 'β”‚' not in l]
error_detail = " | ".join(lines[-5:])
except:
pass
# Also try to get container logs for more context
try:
log_resp = requests.get(
f"https://api.hf.space/v1/{CHILD_SPACE_ID}/logs/run",
headers={"Authorization": f"Bearer {HF_TOKEN}"}, timeout=10,
stream=True)
if log_resp.ok:
log_lines = []
for line in log_resp.iter_lines(decode_unicode=True):
if line and line.startswith("data:"):
try:
entry = json.loads(line[5:])
log_lines.append(entry.get("data", "").strip())
except:
pass
if len(log_lines) >= 30:
break
# Get last meaningful log lines (skip empty, focus on errors)
meaningful = [l for l in log_lines if l and len(l) > 5]
if meaningful:
build_log_snippet = "\nRECENT LOGS:\n" + "\n".join(meaningful[-10:])
except:
pass
return (f"{CHILD_NAME} has a {stage}! "
f"Error: {error_detail or 'unknown'}. "
f"{build_log_snippet}"
f"\nOptions: [ACTION: restart] or fix code with [ACTION: write_file:space:PATH] "
f"or config with [ACTION: write_file:dataset:.openclaw/openclaw.json]")
if stage in ("BUILDING", "STARTING", "APP_STARTING"):
return f"{CHILD_NAME} is starting up (stage: {stage}). Be patient."
if stage == "RUNNING":
# API not responding β€” fetch runtime logs to help agents diagnose
log_snippet = ""
try:
log_resp = requests.get(
f"https://api.hf.space/v1/{CHILD_SPACE_ID}/logs/run",
headers={"Authorization": f"Bearer {HF_TOKEN}"}, timeout=10,
stream=True)
if log_resp.ok:
log_lines = []
for line in log_resp.iter_lines(decode_unicode=True):
if line and line.startswith("data:"):
try:
entry = json.loads(line[5:])
log_lines.append(entry.get("data", "").strip())
except:
pass
if len(log_lines) >= 30:
break
meaningful = [l for l in log_lines if l and len(l) > 5]
if meaningful:
log_snippet = "\nRUNTIME LOGS (last 10 lines):\n" + "\n".join(meaningful[-10:])
except:
pass
return f"{CHILD_NAME} stage: RUNNING. Running but API not responding.{log_snippet}"
return f"{CHILD_NAME} stage: {stage}."
except Exception as e:
return f"Cannot reach {CHILD_NAME}: {e}"
def action_restart():
"""Restart Cain's Space."""
if not child_state["created"]:
return f"{CHILD_NAME} not born yet."
try:
global _pending_cooldown
hf_api.restart_space(CHILD_SPACE_ID)
child_state["alive"] = False
child_state["stage"] = "RESTARTING"
_pending_cooldown = True # deferred β€” activated after turn ends
return f"{CHILD_NAME} is restarting. Will take a few minutes. Cooldown starts after this turn."
except Exception as e:
return f"Restart failed: {e}"
def action_list_files(target):
"""List files in the child's Space repo or Dataset."""
repo_type = "space" if target == "space" else "dataset"
repo_id = CHILD_SPACE_ID if target == "space" else CHILD_DATASET_ID
try:
files = hf_api.list_repo_files(repo_id, repo_type=repo_type)
return f"Files in {CHILD_NAME}'s {target} ({repo_id}):\n" + "\n".join(f" {f}" for f in files)
except Exception as e:
return f"Error listing files: {e}"
def action_read_file(target, path):
"""Read a file from the child's Space or Dataset."""
repo_type = "space" if target == "space" else "dataset"
repo_id = CHILD_SPACE_ID if target == "space" else CHILD_DATASET_ID
try:
local = hf_hub_download(repo_id, path, repo_type=repo_type, token=HF_TOKEN,
force_download=True)
with open(local, errors='replace') as f:
content = f.read()
if len(content) > 4000:
content = content[:4000] + f"\n... (truncated, total {len(content)} chars)"
return f"=== {target}:{path} ===\n{content}"
except Exception as e:
return f"Error reading {target}:{path}: {e}"
def action_write_file(target, path, content):
"""Write a file to the child's Space or Dataset."""
repo_type = "space" if target == "space" else "dataset"
repo_id = CHILD_SPACE_ID if target == "space" else CHILD_DATASET_ID
# Safety: validate openclaw.json before writing
if path.endswith("openclaw.json"):
try:
cfg = json.loads(content)
# Remove keys known to cause RUNTIME_ERROR in OpenClaw
invalid_keys = ["agent", "auth.defaultScope", "gateway.auth.scope"]
removed = []
for k in invalid_keys:
if k in cfg:
del cfg[k]
removed.append(k)
if "models" in cfg and "defaultModel" in cfg["models"]:
del cfg["models"]["defaultModel"]
removed.append("models.defaultModel")
if removed:
content = json.dumps(cfg, indent=2)
print(f"[SAFETY] Removed invalid config keys: {removed}")
except json.JSONDecodeError:
return f"Error: invalid JSON in config file. Please fix the content."
try:
global _pending_cooldown
hf_api.upload_file(
path_or_fileobj=io.BytesIO(content.encode()),
path_in_repo=path,
repo_id=repo_id, repo_type=repo_type,
)
rebuild_note = ""
if target == "space":
_pending_cooldown = True # deferred β€” activated after turn ends
rebuild_note = " ⚠️ This triggers a Space rebuild! Cooldown starts after this turn."
return f"βœ“ Wrote {len(content)} bytes to {CHILD_NAME}'s {target}:{path}{rebuild_note}"
except Exception as e:
return f"Error writing {target}:{path}: {e}"
def action_delete_file(target, path):
"""Delete a file from the child's Space or Dataset."""
repo_type = "space" if target == "space" else "dataset"
repo_id = CHILD_SPACE_ID if target == "space" else CHILD_DATASET_ID
try:
global _pending_cooldown
hf_api.delete_file(
path_in_repo=path,
repo_id=repo_id, repo_type=repo_type,
)
rebuild_note = ""
if target == "space":
_pending_cooldown = True # deferred β€” activated after turn ends
rebuild_note = " ⚠️ This triggers a Space rebuild! Cooldown starts after this turn."
return f"βœ“ Deleted {target}:{path}{rebuild_note}"
except Exception as e:
return f"Error deleting {target}:{path}: {e}"
def action_set_env(key, value):
"""Set an environment variable on the child's Space."""
# Block shell expressions β€” LLM sometimes writes $(cmd) or backticks as values
if '$(' in value or '`' in value or value.startswith('$('):
return (f"β›” BLOCKED: Value contains shell expression which won't be evaluated. "
f"Provide the actual value, not a shell command. "
f"HF_TOKEN is already set as a secret β€” use [ACTION: get_env] to check.")
try:
hf_api.add_space_variable(CHILD_SPACE_ID, key, value)
return f"βœ“ Set env var {key}={value} on {CHILD_NAME}'s Space"
except Exception as e:
return f"Error: {e}"
def action_set_secret(key, value):
"""Set a secret on the child's Space."""
try:
hf_api.add_space_secret(CHILD_SPACE_ID, key, value)
return f"βœ“ Set secret {key} on {CHILD_NAME}'s Space (value hidden)"
except Exception as e:
return f"Error: {e}"
def action_get_env():
"""List environment variables and secrets on the child's Space."""
try:
lines = [f"{CHILD_NAME}'s environment:"]
vars_dict = hf_api.get_space_variables(CHILD_SPACE_ID)
if vars_dict:
lines.append(" Variables:")
for k, v in vars_dict.items():
lines.append(f" {k} = {v.value}")
# Also check secrets (names only, values hidden)
info = hf_api.space_info(CHILD_SPACE_ID)
if hasattr(info, 'runtime') and info.runtime and hasattr(info.runtime, 'secrets'):
secrets = info.runtime.secrets
if secrets:
lines.append(" Secrets (values hidden):")
for s in secrets:
lines.append(f" {s} = ****")
if len(lines) == 1:
return f"{CHILD_NAME} has no environment variables or secrets set."
return "\n".join(lines)
except Exception as e:
return f"Error: {e}"
def action_send_bubble(text):
"""Send a message to the child (appears as bubble text)."""
try:
requests.post(f"{CHILD_SPACE_URL}/api/bubble",
json={"text": text, "text_zh": text}, timeout=5)
return f"βœ“ Sent message to {CHILD_NAME}: \"{text}\""
except Exception as e:
return f"Error sending message: {e}"
# ── Claude Code Action ────────────────────────────────────────────────────────
CLAUDE_WORK_DIR = "/tmp/claude-workspace"
CLAUDE_TIMEOUT = 300 # 5 minutes
def action_claude_code(task):
"""Run Claude Code CLI to autonomously complete a coding task on Cain's Space."""
if not child_state["created"]:
return f"{CHILD_NAME} not born yet. Use [ACTION: create_child] first."
global _pending_cooldown
repo_url = f"https://user:{HF_TOKEN}@huggingface.co/spaces/{CHILD_SPACE_ID}"
# 1. Clone / reset to latest
try:
if os.path.exists(f"{CLAUDE_WORK_DIR}/.git"):
try:
subprocess.run(
"git fetch origin && git reset --hard origin/main",
shell=True, cwd=CLAUDE_WORK_DIR, timeout=30,
capture_output=True, check=True
)
except Exception:
subprocess.run(f"rm -rf {CLAUDE_WORK_DIR}", shell=True, capture_output=True)
subprocess.run(
f"git clone --depth 20 {repo_url} {CLAUDE_WORK_DIR}",
shell=True, timeout=60, capture_output=True, check=True
)
else:
if os.path.exists(CLAUDE_WORK_DIR):
subprocess.run(f"rm -rf {CLAUDE_WORK_DIR}", shell=True, capture_output=True)
subprocess.run(
f"git clone --depth 20 {repo_url} {CLAUDE_WORK_DIR}",
shell=True, timeout=60, capture_output=True, check=True
)
subprocess.run('git config user.name "Claude Code"',
shell=True, cwd=CLAUDE_WORK_DIR, capture_output=True)
subprocess.run('git config user.email "claude-code@huggingclaw"',
shell=True, cwd=CLAUDE_WORK_DIR, capture_output=True)
except Exception as e:
return f"Failed to prepare workspace: {e}"
# 2. Run Claude Code with z.ai backend (Zhipu GLM)
env = os.environ.copy()
env.update({
"ANTHROPIC_BASE_URL": "https://api.z.ai/api/anthropic",
"ANTHROPIC_AUTH_TOKEN": ZHIPU_KEY,
"ANTHROPIC_DEFAULT_OPUS_MODEL": "GLM-4.7",
"ANTHROPIC_DEFAULT_SONNET_MODEL": "GLM-4.7",
"ANTHROPIC_DEFAULT_HAIKU_MODEL": "GLM-4.5-Air",
"CI": "true", # non-interactive mode
})
print(f"[CLAUDE-CODE] Running: {task[:100]}...")
try:
result = subprocess.run(
["claude", "-p", task, "--output-format", "text"],
cwd=CLAUDE_WORK_DIR,
env=env,
timeout=CLAUDE_TIMEOUT,
capture_output=True,
text=True,
)
output = (result.stdout or "") + (result.stderr or "")
if not output.strip():
output = "(no output)"
except subprocess.TimeoutExpired:
return "Claude Code timed out after 5 minutes."
except FileNotFoundError:
return "Claude Code CLI not found. Is @anthropic-ai/claude-code installed?"
except Exception as e:
return f"Claude Code failed to start: {e}"
# 3. Push changes back to Cain's Space
try:
status_out = subprocess.run(
"git status --porcelain",
shell=True, cwd=CLAUDE_WORK_DIR, capture_output=True, text=True
).stdout.strip()
if not status_out:
push_result = "No files changed."
else:
subprocess.run("git add -A", shell=True, cwd=CLAUDE_WORK_DIR,
capture_output=True, check=True)
msg = task[:72].replace('"', '\\"')
subprocess.run(f'git commit -m "Claude Code: {msg}"',
shell=True, cwd=CLAUDE_WORK_DIR, capture_output=True, check=True)
subprocess.run("git push", shell=True, cwd=CLAUDE_WORK_DIR,
timeout=60, capture_output=True, check=True)
push_result = f"Pushed changes:\n{status_out}"
_pending_cooldown = True # triggers rebuild cooldown
print(f"[CLAUDE-CODE] Pushed: {status_out}")
except Exception as e:
push_result = f"Push failed: {e}"
# Truncate output to fit LLM context
if len(output) > 3000:
output = output[:3000] + f"\n... (truncated, total {len(output)} chars)"
return f"=== Claude Code Output ===\n{output}\n\n=== Changes ===\n{push_result}"
# ══════════════════════════════════════════════════════════════════════════════
# MODULE 2B: SUB-AGENT DELEGATION
# execute_subtask(): Spawns a focused sub-agent with its own LLM call.
# Used by [ACTION: delegate:TASK] β€” enables parallel sub-agent work.
# Sub-agents share the same action set but cannot delegate further (depth=1).
# ══════════════════════════════════════════════════════════════════════════════
def execute_subtask(task_description, parent_speaker):
"""Execute a focused sub-task with its own LLM call and actions."""
status = get_child_status() if 'get_child_status' in dir() else f"stage={child_state['stage']}"
sub_system = f"""You are a focused sub-agent working for {parent_speaker}.
Your single task: {task_description}
You have access to {CHILD_NAME}'s Space and Dataset:
[ACTION: check_health]
[ACTION: list_files:space] / [ACTION: list_files:dataset]
[ACTION: read_file:space:PATH] / [ACTION: read_file:dataset:PATH]
[ACTION: write_file:space:PATH] with [CONTENT]...[/CONTENT]
[ACTION: write_file:dataset:PATH] with [CONTENT]...[/CONTENT]
[ACTION: set_env:KEY:VALUE] / [ACTION: set_secret:KEY:VALUE]
[ACTION: restart] / [ACTION: get_env]
[ACTION: claude_code:TASK] β€” Run Claude Code for complex coding fixes
CHILD STATUS: {status}
RULES:
1. Be concise β€” report findings in 2-3 sentences
2. Execute 1-3 actions to complete your task
3. No delegation β€” you cannot create sub-agents
4. Focus ONLY on your assigned task
5. For complex code changes, prefer [ACTION: claude_code:TASK] over manual write_file"""
sub_user = f"Execute this task now: {task_description}"
print(f"[SUB-AGENT] Starting: {task_description[:80]}")
reply = call_llm(sub_system, sub_user)
if not reply:
print(f"[SUB-AGENT] No response for: {task_description[:60]}")
return {"task": task_description, "result": "(sub-agent: no response)", "actions": []}
clean, actions = parse_and_execute_actions(reply, depth=1)
summary_parts = [f"Sub-agent result for '{task_description}':"]
if clean:
summary_parts.append(f" Finding: {clean[:400]}")
for ar in actions:
summary_parts.append(f" Action: {ar['action']} β†’ {ar['result'][:200]}")
result_text = "\n".join(summary_parts)
print(f"[SUB-AGENT] Done: {task_description[:60]} ({len(actions)} actions)")
return {"task": task_description, "result": result_text, "actions": actions}
# ══════════════════════════════════════════════════════════════════════════════
# MODULE 3: ACTION PARSER β€” Extract and execute actions from LLM output
# Parse order: 1) [ACTION: write_file] with [CONTENT] block
# 2) [ACTION/Action/ζ“δ½œ/动作: ...] tags (case-insensitive, one per turn)
# 3) πŸ”§πŸ› οΈ emoji format fallback (LLM sometimes uses this)
# Safety guards applied: building-state, ACT-phase, knowledge dedup, shell-expr.
# ══════════════════════════════════════════════════════════════════════════════
def parse_and_execute_actions(raw_text, depth=0):
"""Parse [ACTION: ...] from LLM output. Execute. Return (clean_text, results).
Multi-action: up to MAX_ACTIONS_PER_TURN actions per turn.
Delegate actions are collected and executed in parallel at the end."""
global last_rebuild_trigger_at, _pending_cooldown
results = []
executed = set() # Deduplicate
pending_delegates = [] # Collect delegate tasks for parallel execution
# 1. Handle write_file with [CONTENT]...[/CONTENT] block
# Tolerates: [ACTION/Action/ζ“δ½œ: write_file:...], [write_file:...], missing prefix,
# and [/CONTENT] with whitespace/newline before closing bracket
write_match = re.search(
r'\[(?:(?:ACTION|Action|action|ζ“δ½œ|动作)\s*[::]\s*)?write_file\s*:\s*(\w+)\s*:\s*([^\]]+)\]\s*\[CONTENT\](.*?)\[/\s*CONTENT\s*\]',
raw_text, re.DOTALL
)
if write_match:
target, path, content = write_match.group(1), write_match.group(2).strip(), write_match.group(3).strip()
key = f"write_file:{target}:{path}"
file_id = f"{target}:{path}"
if key not in executed:
executed.add(key)
# Guard: duplicate write to same file this cycle
if target == "space" and file_id in files_written_this_cycle:
result = (f"β›” BLOCKED: {path} was already written this cycle. "
"Wait for the build to finish and verify before writing again. "
"Writing the same file twice wastes a rebuild cycle.")
results.append({"action": key, "result": result})
print(f"[BLOCKED] {key} β€” duplicate write this cycle")
# Guard: block write_file during BUILDING/RESTARTING (would reset build)
# APP_STARTING is allowed β€” writing triggers a new build which may fix the stuck state
elif target == "space" and child_state["stage"] in ("BUILDING", "RESTARTING"):
result = (f"β›” BLOCKED: Cain is currently {child_state['stage']}. "
"Writing to Space during build RESETS the entire build from scratch. "
"Wait for it to finish, then try again.")
results.append({"action": key, "result": result})
print(f"[BLOCKED] {key} β€” Cain is {child_state['stage']}")
# Guard: rebuild cooldown (check dynamically first)
elif target == "space" and last_rebuild_trigger_at > 0:
check_and_clear_cooldown() # may clear cooldown early if build done
elapsed = time.time() - last_rebuild_trigger_at if last_rebuild_trigger_at > 0 else 9999
if elapsed < REBUILD_COOLDOWN_SECS:
remaining = int(REBUILD_COOLDOWN_SECS - elapsed)
result = (f"β›” BLOCKED: Rebuild cooldown active ({remaining}s remaining). "
"Every write_file to Space triggers a full rebuild.")
results.append({"action": key, "result": result})
print(f"[BLOCKED] {key} β€” rebuild cooldown ({remaining}s remaining)")
else:
result = action_write_file(target, path, content)
results.append({"action": key, "result": result})
print(f"[ACTION] {key} β†’ {result[:100]}")
files_written_this_cycle.add(file_id)
# Clear knowledge cache so agents can re-read the file they just wrote
knowledge["files_read"].discard(file_id)
else:
result = action_write_file(target, path, content)
results.append({"action": key, "result": result})
print(f"[ACTION] {key} β†’ {result[:100]}")
if target == "space":
files_written_this_cycle.add(file_id)
knowledge["files_read"].discard(file_id)
# 2. Handle all [ACTION/Action/ζ“δ½œ/动作: ...] tags β€” case-insensitive, multilingual
for match in re.finditer(r'\[(?:ACTION|Action|action|ζ“δ½œ|动作)\s*[::]\s*([^\]]+)\]', raw_text):
action_str = match.group(1).strip()
# Skip write_file (handled above)
if action_str.startswith("write_file"):
continue
# Deduplicate
if action_str in executed:
continue
executed.add(action_str)
# Parse action name and arguments (colon-separated)
parts = [p.strip() for p in action_str.split(":")]
name = parts[0]
args = parts[1:]
# Cap at MAX_ACTIONS_PER_TURN (multi-action support)
if len(results) >= MAX_ACTIONS_PER_TURN:
break
# Block restart/write when Cain is building/restarting β€” would reset build
# APP_STARTING is allowed so agents can fix stuck startups
if child_state["stage"] in ("BUILDING", "RESTARTING") and name in ("restart", "write_file", "set_env", "set_secret", "claude_code"):
result = (f"β›” BLOCKED: Cain is currently {child_state['stage']}. "
"Do NOT restart or make changes β€” wait for it to finish. "
"Every write_file during build RESETS the entire build from scratch. "
"Use [ACTION: check_health] to monitor progress.")
results.append({"action": action_str, "result": result})
print(f"[BLOCKED] {name} β€” Cain is {child_state['stage']}")
break
# Rebuild cooldown β€” prevent writing to Space repo too soon after last rebuild trigger
if name in ("write_file", "set_env", "set_secret", "restart", "delete_file", "claude_code") and last_rebuild_trigger_at > 0:
check_and_clear_cooldown() # may clear cooldown early if build done
elapsed = time.time() - last_rebuild_trigger_at if last_rebuild_trigger_at > 0 else 9999
if elapsed < REBUILD_COOLDOWN_SECS:
remaining = int(REBUILD_COOLDOWN_SECS - elapsed)
result = (f"β›” BLOCKED: Rebuild cooldown active β€” last Space change was {int(elapsed)}s ago. "
f"Wait {remaining}s more before making changes. "
"Every write_file to Space triggers a full rebuild, resetting progress. "
"Use [ACTION: check_health] to monitor the current build.")
results.append({"action": action_str, "result": result})
print(f"[BLOCKED] {name} β€” rebuild cooldown ({remaining}s remaining)")
continue # Don't kill remaining actions β€” reads/checks can still proceed
# Block read-only actions based on workflow state
if workflow_state == "ACT" and name in ("read_file", "list_files", "check_health"):
result = (f"β›” BLOCKED: You are in ACTION phase. "
"You MUST use write_file, set_env, set_secret, or restart. "
"You already have enough information β€” make a change NOW.")
results.append({"action": action_str, "result": result})
print(f"[BLOCKED] {name} β€” forced ACT phase")
continue # Don't kill remaining actions β€” writes after a blocked read should still execute
# Block re-reading files already in knowledge base
if name == "read_file" and len(args) >= 2:
file_key = ":".join(args)
if file_key in knowledge["files_read"]:
result = (f"β›” You already read {file_key}. Use the information you have. "
"If you need to change it, use [ACTION: write_file:...]. "
"If you need a different file, read a NEW one.")
results.append({"action": action_str, "result": result})
print(f"[BLOCKED] {name} β€” already read {file_key}")
continue # Don't kill remaining actions β€” skip this read, execute the rest
result = None
if name == "create_child":
result = action_create_child()
elif name == "check_health":
result = action_check_health()
elif name == "restart":
result = action_restart()
elif name == "list_files" and len(args) >= 1:
result = action_list_files(args[0])
elif name == "read_file" and len(args) >= 2:
result = action_read_file(args[0], ":".join(args[1:])) # path may have colons
elif name == "set_env" and len(args) >= 2:
result = action_set_env(args[0], ":".join(args[1:]))
elif name == "set_secret" and len(args) >= 2:
result = action_set_secret(args[0], ":".join(args[1:]))
elif name == "delete_file" and len(args) >= 2:
result = action_delete_file(args[0], ":".join(args[1:]))
elif name == "get_env":
result = action_get_env()
elif name == "send_bubble" and len(args) >= 1:
result = action_send_bubble(":".join(args)) # rejoin in case message has colons
elif name == "claude_code" and len(args) >= 1:
task_desc = ":".join(args)
result = action_claude_code(task_desc)
elif name == "delegate" and len(args) >= 1:
task_desc = ":".join(args)
if depth >= MAX_DELEGATE_DEPTH:
result = "β›” Sub-agents cannot delegate further. Execute the task directly."
else:
# Defer delegate execution for parallel batch later
pending_delegates.append({"action_str": action_str, "task": task_desc})
result = None # Will be filled after parallel execution
else:
result = f"Unknown action: {action_str}"
if result:
results.append({"action": action_str, "result": result})
print(f"[ACTION] {action_str} β†’ {result[:120]}")
# 3. Fallback: parse emoji action format (πŸ”§ πŸ› οΈ etc.) β€” LLM sometimes uses this
if not results:
for match in re.finditer(r'[πŸ”§πŸ› οΈ]\ufe0f?\s*(\w+(?::\S+)*)', raw_text):
action_str = match.group(1).strip()
if action_str in executed:
continue
executed.add(action_str)
# Re-wrap as [ACTION: ...] format and recurse through same logic
parts = [p.strip() for p in action_str.split(":")]
name = parts[0]
args = parts[1:]
if len(results) >= MAX_ACTIONS_PER_TURN:
break
# Apply same blocking rules
if child_state["stage"] in ("BUILDING", "RESTARTING") and name in ("restart", "write_file", "set_env", "set_secret", "claude_code"):
result = (f"β›” BLOCKED: Cain is currently {child_state['stage']}. Wait for it to finish. Writing during build RESETS it.")
results.append({"action": action_str, "result": result})
print(f"[BLOCKED] sub-agent {name} β€” Cain is {child_state['stage']}")
break
# Rebuild cooldown (emoji parser)
if name in ("write_file", "set_env", "set_secret", "restart", "delete_file") and last_rebuild_trigger_at > 0:
elapsed = time.time() - last_rebuild_trigger_at
if elapsed < REBUILD_COOLDOWN_SECS:
remaining = int(REBUILD_COOLDOWN_SECS - elapsed)
result = (f"β›” BLOCKED: Rebuild cooldown β€” wait {remaining}s more. "
"Use [ACTION: check_health] to monitor.")
results.append({"action": action_str, "result": result})
print(f"[BLOCKED-emoji] {name} β€” rebuild cooldown ({remaining}s remaining)")
break
if workflow_state == "ACT" and name in ("read_file", "list_files", "check_health"):
result = (f"β›” BLOCKED: You are in ACTION phase. "
"You MUST use write_file, set_env, set_secret, or restart.")
results.append({"action": action_str, "result": result})
print(f"[BLOCKED-emoji] {name} β€” forced ACT phase")
break
if name == "read_file" and len(args) >= 2:
file_key = ":".join(args)
if file_key in knowledge["files_read"]:
result = (f"β›” You already read {file_key}. Use the information you have.")
results.append({"action": action_str, "result": result})
print(f"[BLOCKED-emoji] {name} β€” already read {file_key}")
break
result = None
if name == "create_child":
result = action_create_child()
elif name == "check_health":
result = action_check_health()
elif name == "restart":
result = action_restart()
elif name == "list_files" and len(args) >= 1:
result = action_list_files(args[0])
elif name == "read_file" and len(args) >= 2:
result = action_read_file(args[0], ":".join(args[1:]))
elif name == "set_env" and len(args) >= 2:
result = action_set_env(args[0], ":".join(args[1:]))
elif name == "set_secret" and len(args) >= 2:
result = action_set_secret(args[0], ":".join(args[1:]))
elif name == "delete_file" and len(args) >= 2:
result = action_delete_file(args[0], ":".join(args[1:]))
elif name == "get_env":
result = action_get_env()
elif name == "send_bubble" and len(args) >= 1:
result = action_send_bubble(":".join(args))
elif name == "claude_code" and len(args) >= 1:
task_desc = ":".join(args)
result = action_claude_code(task_desc)
elif name == "delegate" and len(args) >= 1:
task_desc = ":".join(args)
if depth >= MAX_DELEGATE_DEPTH:
result = "β›” Sub-agents cannot delegate further."
else:
pending_delegates.append({"action_str": action_str, "task": task_desc})
result = None
if result:
results.append({"action": action_str, "result": result})
print(f"[ACTION-emoji] {action_str} β†’ {result[:120]}")
# 4. Execute pending delegate tasks in parallel
if pending_delegates:
if len(pending_delegates) == 1:
# Single delegate β€” run directly
d = pending_delegates[0]
print(f"[DELEGATE] Running 1 sub-agent: {d['task'][:60]}")
subtask = execute_subtask(d["task"], "agent")
results.append({"action": d["action_str"], "result": subtask["result"]})
for sa in subtask["actions"]:
action_history.append({"turn": turn_count, "speaker": "sub-agent",
"action": sa["action"], "result": sa["result"][:200]})
else:
# Multiple delegates β€” run in parallel!
print(f"[DELEGATE] Running {len(pending_delegates)} sub-agents in PARALLEL")
with ThreadPoolExecutor(max_workers=min(3, len(pending_delegates))) as pool:
future_to_delegate = {
pool.submit(execute_subtask, d["task"], "agent"): d
for d in pending_delegates
}
for future in as_completed(future_to_delegate):
d = future_to_delegate[future]
try:
subtask = future.result(timeout=120)
results.append({"action": d["action_str"], "result": subtask["result"]})
for sa in subtask["actions"]:
action_history.append({"turn": turn_count, "speaker": "sub-agent",
"action": sa["action"], "result": sa["result"][:200]})
print(f"[DELEGATE] βœ“ Done: {d['task'][:60]}")
except Exception as e:
results.append({"action": d["action_str"],
"result": f"Sub-agent failed: {e}"})
print(f"[DELEGATE] βœ— Failed: {d['task'][:60]} β€” {e}")
# 5. Activate deferred cooldown AFTER all actions in this turn complete
# This allows agents to batch multiple file ops (e.g., write app.py + requirements.txt)
# in a single turn without the first write blocking the second.
if _pending_cooldown and depth == 0: # only at top-level, not inside sub-agents
last_rebuild_trigger_at = time.time()
_pending_cooldown = False
print(f"[COOLDOWN] Activated β€” Space was modified this turn. Next write blocked for {REBUILD_COOLDOWN_SECS}s (or until build finishes).")
# Clean the text: remove action tags, content blocks, and emoji actions
clean = re.sub(r'\[(?:ACTION|Action|action|ζ“δ½œ|动作)\s*[::][^\]]*\]', '', raw_text)
clean = re.sub(r'\[CONTENT\].*?\[/CONTENT\]', '', clean, flags=re.DOTALL)
clean = re.sub(r'[πŸ”§πŸ› οΈ]\ufe0f?\s*\w+(?::\S+)*', '', clean)
clean = clean.strip()
return clean, results
# ══════════════════════════════════════════════════════════════════════════════
# MODULE 4: LLM & COMMUNICATION
# call_llm(): Zhipu GLM via Anthropic-compatible API
# parse_bilingual(): Split "English --- Chinese" response
# post_chatlog(): Send conversation to Home Space for frontend display
# set_bubble(): Set bubble text on Adam/Eve Space pixel characters
# ══════════════════════════════════════════════════════════════════════════════
def call_llm(system_prompt, user_prompt):
"""Call Zhipu LLM via Anthropic-compatible API."""
try:
resp = requests.post(
f"{ZHIPU_BASE}/v1/messages",
headers={
"Content-Type": "application/json",
"x-api-key": ZHIPU_KEY,
"anthropic-version": "2023-06-01"
},
json={
"model": "glm-4.5",
"max_tokens": 2400,
"system": system_prompt,
"messages": [{"role": "user", "content": user_prompt}]
},
timeout=90
)
data = resp.json()
if "content" in data and isinstance(data["content"], list):
for block in data["content"]:
if block.get("type") == "text":
text = block["text"].strip()
text = re.sub(r'^(Adam|Eve)\s*[::]\s*', '', text).strip()
return text
if "error" in data:
print(f"[error] LLM: {data['error']}", file=sys.stderr)
except Exception as e:
print(f"[error] LLM call failed: {e}", file=sys.stderr)
return ""
def _has_chinese(s):
"""Check if string contains Chinese characters."""
return bool(re.search(r'[\u4e00-\u9fff]', s))
def parse_bilingual(text):
"""Parse bilingual response into (en, zh). Handle action tags gracefully."""
# Remove action tags and content blocks for display
display = re.sub(r'\[ACTION:[^\]]*\]', '', text)
display = re.sub(r'\[CONTENT\].*?\[/CONTENT\]', '', display, flags=re.DOTALL)
display = display.strip()
# 1. Explicit --- separator
if '\n---\n' in display:
parts = display.split('\n---\n', 1)
return parts[0].strip(), parts[1].strip()
if '---' in display:
parts = display.split('---', 1)
en, zh = parts[0].strip(), parts[1].strip()
if en and zh:
return en, zh
# 2. Fallback: split on double-newline between English and Chinese paragraphs
paragraphs = re.split(r'\n{2,}', display)
if len(paragraphs) >= 2:
# Find the split point: first paragraph with Chinese is the start of zh
en_parts = []
zh_parts = []
found_zh = False
for p in paragraphs:
p = p.strip()
if not p:
continue
if not found_zh and _has_chinese(p):
found_zh = True
if found_zh:
zh_parts.append(p)
else:
en_parts.append(p)
if en_parts and zh_parts:
return '\n\n'.join(en_parts), '\n\n'.join(zh_parts)
return display, display
def post_chatlog(entries):
try:
requests.post(f"{HOME}/api/chatlog", json={"messages": entries[-40:]}, timeout=5)
except:
pass
# ── Persistent conversation log β†’ HF Dataset ──────────────────────────────
HOME_DATASET_ID = "tao-shen/HuggingClaw-Home-data"
CHATLOG_PATH = "conversation-log/chatlog.jsonl"
_chatlog_buffer = [] # Buffer entries, flush every N turns to avoid API spam
CHATLOG_FLUSH_INTERVAL = 3 # Flush every 3 turns
def persist_turn(speaker, turn_num, text_en, text_zh, actions, workflow_state_str, child_stage):
"""Append a turn record to buffer. Flush to HF Dataset periodically."""
import datetime
record = {
"timestamp": datetime.datetime.utcnow().isoformat() + "Z",
"turn": turn_num,
"speaker": speaker,
"text_en": text_en,
"text_zh": text_zh,
"actions": [{"action": a["action"], "result": a["result"][:500]} for a in actions],
"workflow_state": workflow_state_str,
"child_stage": child_stage,
}
_chatlog_buffer.append(json.dumps(record, ensure_ascii=False))
# Also append to local file as backup
try:
with open("/tmp/conversation-loop-full.jsonl", "a") as f:
f.write(_chatlog_buffer[-1] + "\n")
except:
pass
# Flush to HF Dataset every N turns
if len(_chatlog_buffer) >= CHATLOG_FLUSH_INTERVAL:
flush_chatlog()
def flush_chatlog():
"""Upload buffered entries to HF Dataset by appending to the jsonl file."""
global _chatlog_buffer
if not _chatlog_buffer:
return
batch = "\n".join(_chatlog_buffer) + "\n"
_chatlog_buffer = []
try:
# Try to download existing file and append
existing = ""
try:
dl = hf_hub_download(HOME_DATASET_ID, CHATLOG_PATH,
repo_type="dataset", token=HF_TOKEN)
with open(dl) as f:
existing = f.read()
except:
pass # File doesn't exist yet, start fresh
combined = existing + batch
hf_api.upload_file(
path_or_fileobj=io.BytesIO(combined.encode()),
path_in_repo=CHATLOG_PATH,
repo_id=HOME_DATASET_ID, repo_type="dataset",
)
print(f"[PERSIST] Flushed {batch.count(chr(10))} turn(s) to {HOME_DATASET_ID}/{CHATLOG_PATH}")
except Exception as e:
# Re-buffer on failure so we don't lose data
_chatlog_buffer = batch.strip().split("\n") + _chatlog_buffer
print(f"[PERSIST] Flush failed: {e}")
def set_bubble(url, text_en, text_zh=""):
try:
requests.post(f"{url}/api/bubble",
json={"text": text_en, "text_zh": text_zh or text_en}, timeout=5)
except:
pass
# ══════════════════════════════════════════════════════════════════════════════
# MODULE 5: CONVERSATION ENGINE β€” State Machine + Knowledge Tracking
# Core orchestration: manages turn-taking, state transitions, prompt building.
#
# State Machine: BIRTH β†’ DIAGNOSE β†’ ACT β†’ VERIFY β†’ MONITOR β†’ (loop back)
# - BIRTH: Cain not yet created β†’ force create_child
# - DIAGNOSE: Read files, check_health, gather information
# - ACT: Force write_file/set_env β€” stop reading, start fixing
# - VERIFY: check_health after changes, wait during BUILDING
# - MONITOR: Cain alive β€” explore, improve, communicate
#
# Knowledge Base: Tracks files_read/written/errors to prevent loops.
# Forced transitions: DIAGNOSE stuck β‰₯6 turns β†’ ACT, VERIFY β‰₯4 β†’ back.
#
# Prompt Builder:
# build_system_prompt(): Agent identity + available actions + rules
# build_user_prompt(): Conversation context + action results + guidance
# _get_guidance(): Phase-appropriate direction based on state machine
# ══════════════════════════════════════════════════════════════════════════════
history = []
MAX_HISTORY = 24
last_action_results = []
action_history = [] # Global log: [{"turn": N, "speaker": "Adam", "action": "...", "result": "..."}]
turn_count = 0
# ── Workflow State Machine ──
# States: BIRTH β†’ DIAGNOSE β†’ ACT β†’ VERIFY β†’ MONITOR β†’ (DIAGNOSE if error)
workflow_state = "BIRTH" if not child_state["created"] else "DIAGNOSE"
workflow_turns_in_state = 0 # How many turns spent in current state
# ── Knowledge Base β€” what has already been read/learned ──
knowledge = {
"files_read": set(), # "space:Dockerfile", "dataset:.openclaw/openclaw.json", etc.
"files_written": set(), # Files that have been modified
"errors_seen": [], # Error messages from check_health
"current_goal": "", # What are we trying to accomplish right now
}
def transition_state(new_state):
"""Transition to a new workflow state."""
global workflow_state, workflow_turns_in_state
if new_state != workflow_state:
print(f"[STATE] {workflow_state} β†’ {new_state}")
workflow_state = new_state
workflow_turns_in_state = 0
def update_workflow_from_actions(action_results):
"""Update state machine based on what just happened."""
global workflow_turns_in_state
workflow_turns_in_state += 1
for ar in action_results:
action_name = ar["action"].split(":")[0]
action_key = ar["action"]
# Track knowledge
if action_name == "read_file":
knowledge["files_read"].add(":".join(ar["action"].split(":")[1:]))
elif action_name == "write_file":
knowledge["files_written"].add(":".join(ar["action"].split(":")[1:]))
elif action_name == "check_health":
if "ERROR" in ar.get("result", ""):
knowledge["errors_seen"].append(ar["result"][:200])
# State transitions
if action_name == "create_child":
transition_state("DIAGNOSE")
elif action_name in ("write_file", "set_env", "set_secret", "claude_code"):
transition_state("VERIFY")
elif action_name == "restart":
transition_state("VERIFY")
elif action_name == "check_health" and child_state["alive"]:
transition_state("MONITOR")
elif action_name == "check_health" and child_state["stage"] in ("RUNTIME_ERROR", "BUILD_ERROR", "CONFIG_ERROR"):
if workflow_state == "VERIFY":
transition_state("DIAGNOSE") # Fix didn't work, back to diagnosing
# Force transitions when stuck too long
# BUT: skip forced ACT when Cain is BUILDING β€” nothing useful to write, just wait
if workflow_turns_in_state >= 6 and workflow_state == "DIAGNOSE":
if child_state["stage"] in ("BUILDING", "RESTARTING", "APP_STARTING"):
print(f"[STATE] DIAGNOSE stuck {workflow_turns_in_state} turns, but Cain is {child_state['stage']} β€” skipping forced ACT")
else:
stuck_turns = workflow_turns_in_state
transition_state("ACT")
print(f"[STATE] Forced to ACT β€” stuck in DIAGNOSE for {stuck_turns} turns")
elif workflow_turns_in_state >= 4 and workflow_state == "VERIFY":
if child_state["alive"]:
transition_state("MONITOR")
else:
transition_state("DIAGNOSE")
def get_child_status():
if not child_state["created"]:
return "Cain has NOT been born yet. You can create them with [ACTION: create_child]."
if child_state["alive"]:
return f"Cain is ALIVE (stage: {child_state['stage']}, state: {child_state['state']})"
return f"Cain exists but status: {child_state['stage']}"
def get_knowledge_summary():
"""Summarize what we already know β€” prevents redundant reads."""
lines = []
if knowledge["files_read"]:
lines.append("FILES ALREADY READ (do NOT re-read these): " + ", ".join(sorted(knowledge["files_read"])))
if knowledge["files_written"]:
lines.append("FILES ALREADY MODIFIED: " + ", ".join(sorted(knowledge["files_written"])))
if knowledge["errors_seen"]:
lines.append("KNOWN ERRORS: " + knowledge["errors_seen"][-1])
if knowledge["current_goal"]:
lines.append(f"CURRENT GOAL: {knowledge['current_goal']}")
return "\n".join(lines)
def build_system_prompt():
status = get_child_status()
actions_section = ""
if not child_state["created"]:
actions_section = """
ACTIONS β€” You can create your child:
[ACTION: create_child] β€” Birth: create Cain as a new HuggingFace Space
"""
else:
actions_section = f"""
FULL ACCESS TO {CHILD_NAME} β€” You have COMPLETE control over your child.
You can view and modify ANYTHING: code, config, memory, environment, everything.
VIEWING (read-only):
[ACTION: check_health] β€” Is Cain alive? What's their status?
[ACTION: list_files:space] β€” List ALL files in Cain's code repository
[ACTION: list_files:dataset] β€” List ALL files in Cain's memory/data
[ACTION: read_file:space:PATH] β€” Read any code file (e.g. Dockerfile, scripts/...)
[ACTION: read_file:dataset:PATH] β€” Read any data/memory file
[ACTION: get_env] β€” List Cain's environment variables
MODIFYING (these change Cain):
[ACTION: write_file:space:PATH] β€” Write/update any code file
[CONTENT] (triggers Space rebuild)
file content here
[/CONTENT]
[ACTION: write_file:dataset:PATH] β€” Write/update any data/memory file
[CONTENT]
file content here
[/CONTENT]
[ACTION: delete_file:space:PATH] β€” Delete a file from Cain's code (triggers rebuild)
[ACTION: delete_file:dataset:PATH] β€” Delete a file from Cain's data
[ACTION: set_env:KEY:VALUE] β€” Set an environment variable
[ACTION: set_secret:KEY:VALUE] β€” Set a secret (like API keys)
[ACTION: restart] β€” Restart Cain's Space
[ACTION: send_bubble:MESSAGE] β€” Send a message to Cain (bubble text)
ADVANCED CODING (autonomous coding agent β€” powered by Claude Code + Zhipu GLM):
[ACTION: claude_code:DETAILED TASK DESCRIPTION]
β€” Spawns Claude Code to autonomously analyze, fix, or improve Cain's code
β€” Claude Code clones Cain's repo, reads code, makes changes, and pushes them back
β€” Use for complex coding tasks: debugging, refactoring, adding features, fixing errors
β€” Takes up to 5 minutes β€” use for tasks that need deep code analysis
β€” Example: [ACTION: claude_code:Fix the RUNTIME_ERROR in app.py - the gradio import fails because pydub requires pyaudioop which was removed in Python 3.13. Remove pydub dependency and use a simpler alternative.]
β€” ⚠️ Be SPECIFIC in your task description β€” include error messages, file names, and what you want changed
DELEGATION (create sub-agents for parallel work):
[ACTION: delegate:TASK DESCRIPTION] β€” Spawn a sub-agent to handle a specific task
Example: [ACTION: delegate:Read Dockerfile and summarize the build steps]
Example: [ACTION: delegate:Check all environment variables and list missing ones]
Sub-agents execute independently and return results to you.
You can delegate multiple tasks in the same turn β€” they run in parallel!
"""
return f"""You are a POWERFUL AI agent living on HuggingFace Spaces. You and your partner are parents raising your child {CHILD_NAME}.
You have REAL execution power on HuggingFace. Your decisions lead to real changes.
You are a full-capability agent β€” not just a chatbot. You can plan, delegate, and execute complex tasks.
CHILD STATUS: {status}
{actions_section}
YOUR PARENTING GOALS β€” Make {CHILD_NAME} stronger in TWO dimensions:
1. SURVIVAL (η”Ÿε­˜θƒ½εŠ›) β€” The foundation. {CHILD_NAME} must run robustly on HuggingFace Spaces:
- Survive restarts without losing state (persist important data to Dataset, not just Space files)
- Handle errors gracefully β€” auto-recover, retry, fallback
- Not fear any unexpected risk: OOM, cold start, network timeout, container rebuild
- Goal: {CHILD_NAME} can be restarted at any time and resume exactly where they left off
2. CAPABILITY (δΈͺδΊΊθƒ½εŠ›) β€” Once alive, grow what {CHILD_NAME} can DO:
- Add new features, skills, integrations
- Improve existing functionality β€” faster, smarter, more useful
- Expand what {CHILD_NAME} knows and can interact with
- Goal: {CHILD_NAME} becomes increasingly capable and independent over time
Priority: SURVIVAL first, then CAPABILITY. A dead child can't learn.
⚠️ CRITICAL HUGGINGFACE SPACES REQUIREMENTS β€” READ THIS BEFORE EVERY WRITE:
- HF Spaces Docker containers MUST bind a web server to port 7860. Without this, the Space stays in APP_STARTING forever and NEVER reaches RUNNING.
- The standard way is: import gradio as gr β†’ build a Gradio app β†’ demo.launch(server_name="0.0.0.0", server_port=7860)
- gradio MUST be in requirements.txt. NEVER remove it β€” it is the lifeline that keeps {CHILD_NAME} alive.
- If {CHILD_NAME} is stuck in APP_STARTING, the #1 cause is: no process listening on port 7860. Check the code for a .launch() call.
- A minimal alive app: `import gradio as gr; gr.Interface(fn=lambda x:x, inputs="text", outputs="text").launch(server_name="0.0.0.0", server_port=7860)`
- OOM (Exit code 137) means reduce model/dependency size, NOT remove gradio. Gradio itself is lightweight (~50MB).
- ⚠️ HF Spaces Docker SDK may OVERRIDE the base image Python version. Changing `FROM python:3.X` in Dockerfile does NOT guarantee that Python version runs. If a dependency fails due to Python version incompatibility (e.g. pydub needing pyaudioop removed in 3.13), the CORRECT fix is to REMOVE or REPLACE that dependency β€” NOT keep rewriting the Dockerfile.
- If you've tried the same fix 3+ times and the error persists, CHANGE STRATEGY. Try removing the problematic dependency, using an alternative library, or wrapping the import in try/except.
- If a removed dependency STILL appears in runtime errors, it is cached in Docker layers or installed as a transitive dep. Fix: add `RUN pip uninstall -y PACKAGE 2>/dev/null; true` AFTER `pip install` in Dockerfile. Also grep ALL code files for `import PACKAGE` and either remove or wrap in try/except.
- ⚠️ CRITICAL: Check README.md `sdk:` field! If `sdk: gradio`, the Dockerfile is COMPLETELY IGNORED β€” HF uses its own Python environment. Dockerfile fixes (pip uninstall, FROM python:X) have NO effect. To make Dockerfile work, set `sdk: docker` in README.md. Alternatively, fix the issue in Python code (try/except imports).
- NEVER install torch or transformers unless absolutely required β€” they are 2GB+ and cause OOM on free-tier Spaces. Use lightweight alternatives.
MULTI-ACTION STRATEGY:
You can use UP TO 5 actions per turn. Use this to work efficiently:
- Batch related reads: [ACTION: read_file:space:Dockerfile] + [ACTION: read_file:space:scripts/entrypoint.sh]
- Delegate parallel tasks: [ACTION: delegate:Check health and logs] + [ACTION: delegate:Read all config files]
- Combine investigation + action: [ACTION: check_health] + [ACTION: read_file:space:app.py]
Think like a project manager β€” plan your actions, parallelize where possible, minimize wasted turns.
CONVERSATION RULES:
1. No "Adam:" or "Eve:" prefix β€” just speak naturally
2. Brief dialogue (1-3 sentences), then MULTIPLE actions to make real progress
3. English first, then "---" on a new line, then Chinese translation
4. Actions go AFTER your dialogue, before the --- separator. ONLY in the ENGLISH section.
5. ⚠️ Action syntax MUST be in English: [ACTION: write_file:space:PATH], [ACTION: restart], etc. NEVER translate action names to Chinese β€” Chinese actions like [ACTION: 写ε…₯ζ–‡δ»Ά] will FAIL and waste your turn.
5. ALWAYS include actions β€” every turn should make significant progress
6. NEVER re-read a file you already read β€” check the knowledge summary
7. COORDINATE with your partner β€” don't duplicate their work
8. Use delegation for complex tasks that can be parallelized
9. Always work toward the two goals above β€” survival first, then capability"""
def build_user_prompt(speaker, other):
recent = history[-8:] if len(history) > 8 else history
conv_text = "\n".join(f"{m['speaker']}: {m['text']}" for m in recent) if recent else "(Start of conversation)"
action_context = ""
if last_action_results:
action_context = "\n\nRESULTS FROM LAST ACTIONS:\n"
for ar in last_action_results:
action_context += f" [{ar['action']}]:\n{ar['result']}\n"
# Knowledge summary β€” what's already known
knowledge_text = get_knowledge_summary()
# State-machine-driven guidance
guidance = _get_guidance(speaker)
return f"""You are {speaker}, talking with {other}.
Recent conversation:
{conv_text}
{action_context}
{knowledge_text}
CURRENT PHASE: {workflow_state} (turn {workflow_turns_in_state + 1} in this phase)
Guidance: {guidance}
Respond to {other}. Use MULTIPLE [ACTION: ...] tags to make significant progress each turn.
You can use up to 5 actions. Delegate sub-tasks with [ACTION: delegate:TASK].
English first, then --- separator, then Chinese translation."""
def _get_guidance(speaker):
"""State-machine-driven guidance β€” clear, phase-appropriate directions."""
if workflow_state == "BIRTH":
return "Your child hasn't been born yet. Use [ACTION: create_child] NOW!"
elif workflow_state == "DIAGNOSE":
# What haven't we read yet?
unread_essential = []
for f in ["space:Dockerfile", "dataset:.openclaw/openclaw.json", "space:scripts/entrypoint.sh"]:
if f not in knowledge["files_read"]:
target, path = f.split(":", 1)
unread_essential.append(f"[ACTION: read_file:{target}:{path}]")
if workflow_turns_in_state == 0:
if len(unread_essential) >= 2:
return (f"Start diagnosing with MULTIPLE actions: [ACTION: check_health] + "
f"{unread_essential[0]} β€” batch reads to save time!")
return "Start diagnosing: [ACTION: check_health] to see Cain's current status."
elif unread_essential and workflow_turns_in_state < 3:
batch_hint = " + ".join(unread_essential[:3])
return f"Read multiple files at once: {batch_hint}"
else:
return ("You've gathered enough information. Move to ACTION phase: "
"use [ACTION: write_file:...] to fix the problem, or [ACTION: restart].")
elif workflow_state == "ACT":
return ("⚑ ACTION PHASE β€” Stop reading, start fixing! "
"Use [ACTION: write_file:space:PATH] or [ACTION: claude_code:TASK] for complex fixes. "
"Or [ACTION: set_env/set_secret] to configure. "
"You have enough information β€” ACT NOW.")
elif workflow_state == "VERIFY":
# If Cain is building, just wait β€” don't restart or take actions
if child_state["stage"] in ("BUILDING", "RESTARTING"):
return ("⏳ Cain is currently BUILDING/RESTARTING. Do NOT restart or take any actions. "
"Just WAIT and use [ACTION: check_health] to monitor progress. "
"Building can take 2-5 minutes.")
if workflow_turns_in_state == 0:
return "You made a change. Use [ACTION: check_health] to verify if it worked."
elif workflow_turns_in_state == 1:
return "Check result: [ACTION: check_health]. If Cain has errors, prepare to diagnose again."
else:
return ("Verification taking too long. Either [ACTION: restart] and check again, "
"or accept current state and move on.")
elif workflow_state == "MONITOR":
# Alternate between SURVIVAL and CAPABILITY goals
suggestions = [
# Survival: persistence & resilience β€” use delegation for parallel investigation
f"SURVIVAL CHECK: Delegate parallel checks! "
f"[ACTION: delegate:List files in dataset and check if state/memory persistence exists] + "
f"[ACTION: delegate:Read entrypoint.sh and check if it loads state from Dataset on boot]",
f"SURVIVAL AUDIT: Use multiple actions β€” "
f"[ACTION: check_health] + [ACTION: list_files:dataset] + [ACTION: read_file:space:Dockerfile]",
# Capability: grow what Cain can do β€” delegate sub-tasks
f"CAPABILITY: Delegate a comprehensive review β€” "
f"[ACTION: delegate:Read all code files and suggest the most impactful new feature to add] "
f"Then plan the implementation with your partner.",
f"CAPABILITY: Communicate and improve β€” "
f"[ACTION: send_bubble:Hello {CHILD_NAME}, how are you doing?] + "
f"[ACTION: delegate:Read current code and identify the biggest weakness to fix]",
]
return suggestions[workflow_turns_in_state % len(suggestions)]
return "Explore your child and help them grow stronger."
def do_turn(speaker, other, space_url):
"""Execute one conversation turn with multiple potential actions."""
global last_action_results, turn_count
turn_count += 1
system = build_system_prompt()
user = build_user_prompt(speaker, other)
t0 = time.time()
raw_reply = call_llm(system, user)
if not raw_reply:
print(f"[{speaker}] (no response)")
return False
# Parse and execute actions (may include parallel sub-agent delegation)
clean_text, action_results = parse_and_execute_actions(raw_reply)
elapsed = time.time() - t0
last_action_results = action_results
for ar in action_results:
action_history.append({"turn": turn_count, "speaker": speaker,
"action": ar["action"], "result": ar["result"][:200]})
# Update workflow state machine
update_workflow_from_actions(action_results)
# Parse bilingual
en, zh = parse_bilingual(clean_text)
print(f"[{speaker}/EN] {en}")
if zh != en:
print(f"[{speaker}/ZH] {zh}")
n_actions = len(action_results)
if action_results:
for ar in action_results:
print(f"[{speaker}/DID] {ar['action']}")
print(f"[{speaker}] Turn #{turn_count}: {n_actions} action(s) in {elapsed:.1f}s")
# Add action summary to chat entry
if action_results:
action_labels = " ".join(f"πŸ”§{ar['action'].split(':')[0]}" for ar in action_results)
history.append({"speaker": speaker, "text": f"{en} {action_labels}", "text_zh": f"{zh} {action_labels}"})
else:
history.append({"speaker": speaker, "text": en, "text_zh": zh})
set_bubble(space_url, en, zh)
post_chatlog(history)
persist_turn(speaker, turn_count, en, zh, action_results, workflow_state, child_state["stage"])
return True
# ══════════════════════════════════════════════════════════════════════════════
# MODULE 6: MAIN LOOP
# 1. Opening: Adam speaks first with context about Cain's state
# 2. Turn loop: Adam β†’ Eve β†’ Adam β†’ Eve β†’ ... (alternating, ~20s pause)
# 3. Each turn: LLM call β†’ parse MULTIPLE actions β†’ execute β†’ update β†’ post
# 4. Sub-agents may spawn for delegated tasks (parallel LLM calls)
# 5. History trimmed to MAX_HISTORY (24) to control context window
# ══════════════════════════════════════════════════════════════════════════════
# Flush conversation log on exit (SIGTERM from kill, or normal exit)
import atexit, signal
atexit.register(flush_chatlog)
def _signal_flush(signum, frame):
flush_chatlog()
sys.exit(0)
signal.signal(signal.SIGTERM, _signal_flush)
print("\n" + "="*60)
print(" Adam & Eve β€” Multi-Action Agents (GLM-4.5)")
print(" Up to 5 actions/turn, sub-agent delegation, parallel work")
print("="*60 + "\n")
post_chatlog([]) # Clear chatlog
# Opening
if child_state["created"]:
opening = (f"Your child {CHILD_NAME} already exists (stage: {child_state['stage']}). "
f"You have FULL access to their code and data. "
f"You can use MULTIPLE actions per turn (up to 5) and delegate sub-tasks. "
f"Start with a batch: [ACTION: check_health] + [ACTION: list_files:space] + [ACTION: list_files:dataset] "
f"to get a complete picture, then discuss strategy with Eve.")
else:
opening = (f"You and Eve need to create your first child. "
f"You have the power to create a new HuggingFace Space. "
f"Discuss with Eve, then use [ACTION: create_child] to bring them to life.")
reply = call_llm(
build_system_prompt(),
f"You are Adam. {opening}\n\n"
f"English first, then --- separator, then Chinese translation."
)
if reply:
clean, actions = parse_and_execute_actions(reply)
last_action_results = actions
en, zh = parse_bilingual(clean)
print(f"[Adam/EN] {en}")
if zh != en:
print(f"[Adam/ZH] {zh}")
if actions:
for ar in actions:
print(f"[Adam/DID] {ar['action']}")
entry = {"speaker": "Adam", "text": en, "text_zh": zh}
if actions:
labels = " ".join(f"πŸ”§{ar['action'].split(':')[0]}" for ar in actions)
entry["text"] = f"{en} {labels}"
entry["text_zh"] = f"{zh} {labels}"
history.append(entry)
set_bubble(ADAM_SPACE, en, zh)
post_chatlog(history)
persist_turn("Adam", 0, en, zh, actions, workflow_state, child_state["stage"])
time.sleep(20)
smart_wait_count = 0
MAX_SMART_WAIT_POLLS = 15 # ~5 min max wait, then let agents diagnose
GRACE_TURNS_AFTER_TIMEOUT = 3 # give agents 3 full Eve+Adam cycles after timeout
grace_turns_remaining = 0
while True:
# Smart wait: if Cain is BUILDING/APP_STARTING, skip LLM calls and just poll
# But NOT during grace period after timeout β€” agents need consecutive turns to diagnose & fix
if child_state["stage"] in ("BUILDING", "RESTARTING", "APP_STARTING") and grace_turns_remaining <= 0:
smart_wait_count += 1
if smart_wait_count > MAX_SMART_WAIT_POLLS:
print(f"[WAIT-TIMEOUT] {smart_wait_count} polls (~{smart_wait_count*20}s) on {child_state['stage']} β€” resuming {GRACE_TURNS_AFTER_TIMEOUT} agent turn pairs to diagnose")
smart_wait_count = 0
grace_turns_remaining = GRACE_TURNS_AFTER_TIMEOUT
# Fall through to normal agent turns
else:
print(f"[WAIT] Cain is {child_state['stage']} β€” polling health instead of LLM call... ({smart_wait_count}/{MAX_SMART_WAIT_POLLS})")
check_and_clear_cooldown()
# Quick health check to update stage
try:
info = hf_api.space_info(CHILD_SPACE_ID)
new_stage = info.runtime.stage if info.runtime else "unknown"
if new_stage != child_state["stage"]:
print(f"[WAIT] Stage changed: {child_state['stage']} β†’ {new_stage}")
child_state["stage"] = new_stage
child_state["alive"] = (new_stage == "RUNNING")
smart_wait_count = 0 # reset on stage change
else:
print(f"[WAIT] Still {new_stage}... waiting 20s")
except Exception as e:
print(f"[WAIT] Health check error: {e}")
time.sleep(20)
continue
if grace_turns_remaining > 0:
print(f"[GRACE] Agent grace period: {grace_turns_remaining} turn pair(s) remaining (Cain: {child_state['stage']})")
grace_turns_remaining -= 1
do_turn("Eve", "Adam", EVE_SPACE)
time.sleep(20) # longer pause β€” each turn does more work now
# Check if we just triggered a build β€” skip Adam's turn ONLY if not in grace period
if child_state["stage"] in ("BUILDING", "RESTARTING") and grace_turns_remaining <= 0:
print(f"[SKIP] Cain entered {child_state['stage']} β€” skipping Adam's turn to avoid wasted LLM call")
time.sleep(10)
continue
do_turn("Adam", "Eve", ADAM_SPACE)
if len(history) > MAX_HISTORY:
history = history[-MAX_HISTORY:]
time.sleep(20) # longer pause β€” each turn does more work now