Spaces:
Running
Running
| """ | |
| DemoController — Engine control console for demo/recording sessions. | |
| Provides "god mode" tools for presentations: | |
| 1. Time jump: fast-forward metabolism engine by N hours | |
| 2. State injection: directly set frustration/drive values | |
| 3. Force proactive: trigger proactive tick immediately | |
| 4. Inject memory: plant a memory for later recall | |
| 5. Preset messages: pre-loaded demo messages for quick-fire sending | |
| All LLM responses remain real — this only manipulates engine state. | |
| Zero modification to core engine files. | |
| """ | |
| from __future__ import annotations | |
| import os | |
| import time | |
| from pathlib import Path | |
| from typing import Optional | |
| import yaml | |
| from engine.genome.genome_engine import DRIVES | |
| class DemoController: | |
| """Demo mode control console — manipulate time and engine state.""" | |
| def __init__(self, agent): | |
| """ | |
| Args: | |
| agent: ChatAgent instance to control. | |
| """ | |
| self.agent = agent | |
| self.presets: list[dict] = [] | |
| self.scenarios: dict[str, dict] = {} | |
| self._preset_index: int = 0 | |
| # ── Time Jump ── | |
| def time_jump(self, hours: float) -> dict: | |
| """Fast-forward metabolism engine by N hours. | |
| Drives and frustration evolve according to physics equations: | |
| - frustration *= e^(-λΔt) (cooling) | |
| - connection += k * Δt (loneliness accumulates) | |
| - novelty += k * Δt (boredom accumulates) | |
| Crucially sets _last_active to simulate "N hours of silence" | |
| so proactive_tick sees the gap. | |
| Returns: engine state snapshot after the jump. | |
| """ | |
| future = time.time() + hours * 3600 | |
| self.agent.metabolism.time_metabolism(future) | |
| self.agent.metabolism.sync_to_agent(self.agent.agent) | |
| # KEY: Set _last_active to N hours AGO so proactive sees the gap | |
| if hasattr(self.agent, '_last_active'): | |
| self.agent._last_active = time.time() - hours * 3600 | |
| return self.snapshot() | |
| # ── Force Proactive ── | |
| async def force_proactive(self, simulated_hours: float = 0) -> dict: | |
| """Force an immediate proactive tick (bypasses heartbeat timer). | |
| KEY TRICK: proactive_tick() calls time_metabolism(now) internally, | |
| which uses metabolism._last_tick to compute delta_hours. | |
| We set _last_tick backwards so the tick sees the simulated time gap. | |
| If simulated_hours=0, uses the gap from the most recent time_jump. | |
| Returns the proactive result or silence indicator. | |
| """ | |
| # Determine how far back to set _last_tick | |
| hours_back = simulated_hours | |
| if hours_back <= 0 and hasattr(self.agent, '_last_active'): | |
| hours_back = (time.time() - self.agent._last_active) / 3600 | |
| if hours_back <= 0: | |
| hours_back = 4 # Fallback: simulate 4h gap | |
| # Set metabolism._last_tick to simulate N hours of silence | |
| self.agent.metabolism._last_tick = time.time() - hours_back * 3600 | |
| result = await self.agent.proactive_tick() | |
| snap = self.snapshot() | |
| if result is not None: | |
| return { | |
| **snap, | |
| "proactive_fired": True, | |
| "proactive_reply": result.get('reply', ''), | |
| "proactive_modality": result.get('modality', '文字'), | |
| "proactive_monologue": result.get('monologue', ''), | |
| "proactive_drive": result.get('drive_id', ''), | |
| } | |
| else: | |
| return { | |
| **snap, | |
| "proactive_fired": False, | |
| "proactive_reason": "no impulse or chose silence", | |
| } | |
| # ── Memory Injection ── | |
| async def inject_memory(self, content: str, category: str = "preference") -> dict: | |
| """Plant a memory for later recall — dual-path for demo reliability. | |
| Path 1: POST to EverMemOS for long-term storage (async processing) | |
| Path 2: Immediately inject into agent._user_profile so it appears | |
| in the very next prompt without waiting for EverMemOS indexing. | |
| Does NOT modify core engine — just sets existing public fields. | |
| """ | |
| import uuid as _uuid | |
| import time as _time | |
| result = {"injected": False, "content": content, "category": category} | |
| # ── Path 2: Immediate prompt injection (always works) ── | |
| label = {"preference": "偏好", "fact": "事实", "episode": "经历"}.get(category, category) | |
| inject_text = f"{label}: {content}" | |
| current = getattr(self.agent, '_user_profile', '') or '' | |
| if inject_text not in current: | |
| self.agent._user_profile = (current + f"\n{inject_text}").strip() | |
| result["injected"] = True | |
| print(f" [demo] 💾 memory → _user_profile: {inject_text}", flush=True) | |
| # ── Path 1: EverMemOS long-term storage (best effort) ── | |
| evermemos = self.agent.evermemos | |
| if evermemos and evermemos.available and evermemos._client: | |
| try: | |
| now_iso = _time.strftime("%Y-%m-%dT%H:%M:%S+08:00", _time.localtime()) | |
| group_id = getattr(self.agent, '_group_id', 'demo') | |
| resp = await evermemos._client.post("/memories", json={ | |
| "content": f"[demo注入-{category}] {content}", | |
| "create_time": now_iso, | |
| "message_id": str(_uuid.uuid4()), | |
| "sender": getattr(self.agent, 'user_id', 'demo_user'), | |
| "sender_name": getattr(self.agent, 'user_name', '演示者'), | |
| "role": "user", | |
| "group_id": group_id, | |
| "flush": True, | |
| }) | |
| if resp.status_code in (200, 202): | |
| print(f" [demo] 💾 EverMemOS stored: HTTP {resp.status_code}", flush=True) | |
| else: | |
| print(f" [demo] ⚠️ EverMemOS store HTTP {resp.status_code}", flush=True) | |
| except Exception as e: | |
| print(f" [demo] ⚠️ EverMemOS store failed: {e}", flush=True) | |
| return result | |
| # ── State Injection ── | |
| def inject_state(self, overrides: dict) -> dict: | |
| """Directly inject engine state values. | |
| overrides: { | |
| "frustration": {"connection": 1.8, ...}, | |
| "drive_state": {"connection": 0.95, ...}, | |
| "drive_baseline": {"connection": 0.85, ...}, | |
| } | |
| Returns: engine state snapshot after injection. | |
| """ | |
| if "frustration" in overrides: | |
| for d, v in overrides["frustration"].items(): | |
| if d in self.agent.metabolism.frustration: | |
| self.agent.metabolism.frustration[d] = max(0.0, min(5.0, float(v))) | |
| self.agent.metabolism.sync_to_agent(self.agent.agent) | |
| if "drive_state" in overrides: | |
| for d, v in overrides["drive_state"].items(): | |
| if d in self.agent.agent.drive_state: | |
| self.agent.agent.drive_state[d] = max(0.0, min(1.0, float(v))) | |
| if "drive_baseline" in overrides: | |
| for d, v in overrides["drive_baseline"].items(): | |
| if d in self.agent.agent.drive_baseline: | |
| self.agent.agent.drive_baseline[d] = max(0.0, min(1.0, float(v))) | |
| return self.snapshot() | |
| # ── Presets ── | |
| def load_presets_file(self, filepath: str) -> None: | |
| """Load presets from a YAML file.""" | |
| path = Path(filepath) | |
| if not path.exists(): | |
| print(f" [demo] preset file not found: {filepath}") | |
| return | |
| data = yaml.safe_load(path.read_text(encoding='utf-8')) | |
| self.presets = data.get('presets', []) | |
| self.scenarios = data.get('scenarios', {}) | |
| self._preset_index = 0 | |
| print(f" [demo] loaded {len(self.presets)} presets, " | |
| f"{len(self.scenarios)} scenarios from {path.name}") | |
| def get_presets(self) -> list[dict]: | |
| """Return all preset messages.""" | |
| return self.presets | |
| def get_scenarios(self) -> dict: | |
| """Return all scenario definitions.""" | |
| return self.scenarios | |
| def apply_scenario(self, scenario_id: str) -> dict: | |
| """Apply a named scenario: time jump + state injection. | |
| Returns: engine state snapshot after applying. | |
| """ | |
| scenario = self.scenarios.get(scenario_id) | |
| if not scenario: | |
| return {"error": f"scenario '{scenario_id}' not found"} | |
| # Time jump first (if specified) | |
| if 'time_jump_hours' in scenario: | |
| self.time_jump(scenario['time_jump_hours']) | |
| # Then inject state | |
| if 'inject' in scenario: | |
| self.inject_state(scenario['inject']) | |
| result = self.snapshot() | |
| result['applied_scenario'] = scenario_id | |
| result['scenario_label'] = scenario.get('label', scenario_id) | |
| return result | |
| # ── Snapshot ── | |
| def snapshot(self) -> dict: | |
| """Return current engine state snapshot.""" | |
| hours_since = 0 | |
| if hasattr(self.agent, '_last_active') and self.agent._last_active > 0: | |
| hours_since = (time.time() - self.agent._last_active) / 3600 | |
| return { | |
| "drive_state": { | |
| d: round(self.agent.agent.drive_state[d], 3) | |
| for d in DRIVES | |
| }, | |
| "drive_baseline": { | |
| d: round(self.agent.agent.drive_baseline[d], 3) | |
| for d in DRIVES | |
| }, | |
| "frustration": { | |
| d: round(self.agent.metabolism.frustration[d], 3) | |
| for d in DRIVES | |
| }, | |
| "temperature": round(self.agent.metabolism.temperature(), 4), | |
| "total_frustration": round(self.agent.metabolism.total(), 3), | |
| "agent_age": self.agent.agent.age, | |
| "hours_since_active": round(hours_since, 1), | |
| } | |