openher / agent /demo_controller.py
kellyxiaowei's picture
Deploy OpenHer Gradio Space — gemma-4-E4B served on Modal
dff25f7 verified
Raw
History Blame Contribute Delete
9.98 kB
"""
DemoController — Engine control console for demo/recording sessions.
Provides "god mode" tools for presentations:
1. Time jump: fast-forward metabolism engine by N hours
2. State injection: directly set frustration/drive values
3. Force proactive: trigger proactive tick immediately
4. Inject memory: plant a memory for later recall
5. Preset messages: pre-loaded demo messages for quick-fire sending
All LLM responses remain real — this only manipulates engine state.
Zero modification to core engine files.
"""
from __future__ import annotations
import os
import time
from pathlib import Path
from typing import Optional
import yaml
from engine.genome.genome_engine import DRIVES
class DemoController:
"""Demo mode control console — manipulate time and engine state."""
def __init__(self, agent):
"""
Args:
agent: ChatAgent instance to control.
"""
self.agent = agent
self.presets: list[dict] = []
self.scenarios: dict[str, dict] = {}
self._preset_index: int = 0
# ── Time Jump ──
def time_jump(self, hours: float) -> dict:
"""Fast-forward metabolism engine by N hours.
Drives and frustration evolve according to physics equations:
- frustration *= e^(-λΔt) (cooling)
- connection += k * Δt (loneliness accumulates)
- novelty += k * Δt (boredom accumulates)
Crucially sets _last_active to simulate "N hours of silence"
so proactive_tick sees the gap.
Returns: engine state snapshot after the jump.
"""
future = time.time() + hours * 3600
self.agent.metabolism.time_metabolism(future)
self.agent.metabolism.sync_to_agent(self.agent.agent)
# KEY: Set _last_active to N hours AGO so proactive sees the gap
if hasattr(self.agent, '_last_active'):
self.agent._last_active = time.time() - hours * 3600
return self.snapshot()
# ── Force Proactive ──
async def force_proactive(self, simulated_hours: float = 0) -> dict:
"""Force an immediate proactive tick (bypasses heartbeat timer).
KEY TRICK: proactive_tick() calls time_metabolism(now) internally,
which uses metabolism._last_tick to compute delta_hours.
We set _last_tick backwards so the tick sees the simulated time gap.
If simulated_hours=0, uses the gap from the most recent time_jump.
Returns the proactive result or silence indicator.
"""
# Determine how far back to set _last_tick
hours_back = simulated_hours
if hours_back <= 0 and hasattr(self.agent, '_last_active'):
hours_back = (time.time() - self.agent._last_active) / 3600
if hours_back <= 0:
hours_back = 4 # Fallback: simulate 4h gap
# Set metabolism._last_tick to simulate N hours of silence
self.agent.metabolism._last_tick = time.time() - hours_back * 3600
result = await self.agent.proactive_tick()
snap = self.snapshot()
if result is not None:
return {
**snap,
"proactive_fired": True,
"proactive_reply": result.get('reply', ''),
"proactive_modality": result.get('modality', '文字'),
"proactive_monologue": result.get('monologue', ''),
"proactive_drive": result.get('drive_id', ''),
}
else:
return {
**snap,
"proactive_fired": False,
"proactive_reason": "no impulse or chose silence",
}
# ── Memory Injection ──
async def inject_memory(self, content: str, category: str = "preference") -> dict:
"""Plant a memory for later recall — dual-path for demo reliability.
Path 1: POST to EverMemOS for long-term storage (async processing)
Path 2: Immediately inject into agent._user_profile so it appears
in the very next prompt without waiting for EverMemOS indexing.
Does NOT modify core engine — just sets existing public fields.
"""
import uuid as _uuid
import time as _time
result = {"injected": False, "content": content, "category": category}
# ── Path 2: Immediate prompt injection (always works) ──
label = {"preference": "偏好", "fact": "事实", "episode": "经历"}.get(category, category)
inject_text = f"{label}: {content}"
current = getattr(self.agent, '_user_profile', '') or ''
if inject_text not in current:
self.agent._user_profile = (current + f"\n{inject_text}").strip()
result["injected"] = True
print(f" [demo] 💾 memory → _user_profile: {inject_text}", flush=True)
# ── Path 1: EverMemOS long-term storage (best effort) ──
evermemos = self.agent.evermemos
if evermemos and evermemos.available and evermemos._client:
try:
now_iso = _time.strftime("%Y-%m-%dT%H:%M:%S+08:00", _time.localtime())
group_id = getattr(self.agent, '_group_id', 'demo')
resp = await evermemos._client.post("/memories", json={
"content": f"[demo注入-{category}] {content}",
"create_time": now_iso,
"message_id": str(_uuid.uuid4()),
"sender": getattr(self.agent, 'user_id', 'demo_user'),
"sender_name": getattr(self.agent, 'user_name', '演示者'),
"role": "user",
"group_id": group_id,
"flush": True,
})
if resp.status_code in (200, 202):
print(f" [demo] 💾 EverMemOS stored: HTTP {resp.status_code}", flush=True)
else:
print(f" [demo] ⚠️ EverMemOS store HTTP {resp.status_code}", flush=True)
except Exception as e:
print(f" [demo] ⚠️ EverMemOS store failed: {e}", flush=True)
return result
# ── State Injection ──
def inject_state(self, overrides: dict) -> dict:
"""Directly inject engine state values.
overrides: {
"frustration": {"connection": 1.8, ...},
"drive_state": {"connection": 0.95, ...},
"drive_baseline": {"connection": 0.85, ...},
}
Returns: engine state snapshot after injection.
"""
if "frustration" in overrides:
for d, v in overrides["frustration"].items():
if d in self.agent.metabolism.frustration:
self.agent.metabolism.frustration[d] = max(0.0, min(5.0, float(v)))
self.agent.metabolism.sync_to_agent(self.agent.agent)
if "drive_state" in overrides:
for d, v in overrides["drive_state"].items():
if d in self.agent.agent.drive_state:
self.agent.agent.drive_state[d] = max(0.0, min(1.0, float(v)))
if "drive_baseline" in overrides:
for d, v in overrides["drive_baseline"].items():
if d in self.agent.agent.drive_baseline:
self.agent.agent.drive_baseline[d] = max(0.0, min(1.0, float(v)))
return self.snapshot()
# ── Presets ──
def load_presets_file(self, filepath: str) -> None:
"""Load presets from a YAML file."""
path = Path(filepath)
if not path.exists():
print(f" [demo] preset file not found: {filepath}")
return
data = yaml.safe_load(path.read_text(encoding='utf-8'))
self.presets = data.get('presets', [])
self.scenarios = data.get('scenarios', {})
self._preset_index = 0
print(f" [demo] loaded {len(self.presets)} presets, "
f"{len(self.scenarios)} scenarios from {path.name}")
def get_presets(self) -> list[dict]:
"""Return all preset messages."""
return self.presets
def get_scenarios(self) -> dict:
"""Return all scenario definitions."""
return self.scenarios
def apply_scenario(self, scenario_id: str) -> dict:
"""Apply a named scenario: time jump + state injection.
Returns: engine state snapshot after applying.
"""
scenario = self.scenarios.get(scenario_id)
if not scenario:
return {"error": f"scenario '{scenario_id}' not found"}
# Time jump first (if specified)
if 'time_jump_hours' in scenario:
self.time_jump(scenario['time_jump_hours'])
# Then inject state
if 'inject' in scenario:
self.inject_state(scenario['inject'])
result = self.snapshot()
result['applied_scenario'] = scenario_id
result['scenario_label'] = scenario.get('label', scenario_id)
return result
# ── Snapshot ──
def snapshot(self) -> dict:
"""Return current engine state snapshot."""
hours_since = 0
if hasattr(self.agent, '_last_active') and self.agent._last_active > 0:
hours_since = (time.time() - self.agent._last_active) / 3600
return {
"drive_state": {
d: round(self.agent.agent.drive_state[d], 3)
for d in DRIVES
},
"drive_baseline": {
d: round(self.agent.agent.drive_baseline[d], 3)
for d in DRIVES
},
"frustration": {
d: round(self.agent.metabolism.frustration[d], 3)
for d in DRIVES
},
"temperature": round(self.agent.metabolism.temperature(), 4),
"total_frustration": round(self.agent.metabolism.total(), 3),
"agent_age": self.agent.agent.age,
"hours_since_active": round(hours_since, 1),
}