openher / agent /proactive.py
kellyxiaowei's picture
Deploy OpenHer Gradio Space โ€” gemma-4-E4B served on Modal
dff25f7 verified
Raw
History Blame Contribute Delete
9.11 kB
"""
ProactiveMixin โ€” Drive-driven autonomous messaging for ChatAgent.
Implements the proactive tick: when a drive exceeds its baseline threshold,
the persona can initiate conversation without user input.
"""
from __future__ import annotations
import time
import uuid
from typing import Optional
from providers.llm.client import ChatMessage
from engine.genome.genome_engine import DRIVES, DRIVE_LABELS
from engine.genome.critic import critic_sense
from agent.parser import extract_reply
# Config defaults (from memory_config.yaml if available)
try:
import yaml as _yaml
from pathlib import Path as _Path
_cfg_path = _Path(__file__).parent.parent / "providers" / "memory" / "evermemos" / "memory_config.yaml"
_cfg_data = _yaml.safe_load(_cfg_path.read_text()).get("evermemos", {}) if _cfg_path.exists() else {}
except Exception:
_cfg_data = {}
_DEFAULT_IMPULSE_THRESHOLD = _cfg_data.get("impulse_threshold", 0.8)
class ProactiveMixin:
"""Drive-driven autonomous messaging (proactive tick)."""
_IMPULSE_THRESHOLD = _DEFAULT_IMPULSE_THRESHOLD
def _has_impulse(self) -> Optional[tuple]:
"""
Drive self-check: is any drive significantly above its baseline?
Returns (drive_id, description) if impulse detected, else None.
Baseline is emergent (Step 3.5 evolves it each turn via Critic).
Score = (normalized_frustration - baseline) / baseline.
Score >= threshold means current desire is significantly above "normal".
"""
strongest = None
max_score = 0.0
for d in DRIVES:
norm_frust = self.metabolism.frustration[d] / 5.0 # 0~1
baseline = self.agent.drive_baseline[d] # 0~1
# Relative deviation from baseline
score = norm_frust * (1.0 + baseline)
if score > max_score:
max_score = score
strongest = d
if max_score >= self._IMPULSE_THRESHOLD and strongest:
desc = f"ๅ†…ๅฟƒ็š„{DRIVE_LABELS[strongest]}ๅ†ฒๅŠจๆญฃๅœจๅ˜ๅผบใ€‚"
return (strongest, desc)
return None
async def proactive_tick(self) -> Optional[dict]:
"""
Drive-driven autonomous tick. No user input required.
Flow:
1. Advance metabolism (Drive energy evolves with time)
2. Check impulse (Drive deviation from baseline)
3. If impulse โ†’ memory flashback + build stimulus
4. Critic/Actor pipeline (same as chat, frozen learning)
5. Actor decides: speak or stay silent
Returns:
{'reply': str, 'modality': str, 'monologue': str,
'proactive': True, 'drive_id': str, 'tick_id': str}
or None (no impulse / decided to stay silent)
"""
async with self._turn_lock:
return await self._proactive_tick_inner()
async def _proactive_tick_inner(self) -> Optional[dict]:
"""Inner proactive tick (called under lock)."""
start = time.time()
tick_id = str(uuid.uuid4())
# โ”€โ”€ Step 1: Advance metabolism โ”€โ”€
self.metabolism.time_metabolism(start)
# โ”€โ”€ Step 2: Drive self-check โ”€โ”€
impulse = self._has_impulse()
if not impulse:
return None # No impulse โ†’ zero cost (no LLM calls)
drive_id, impulse_desc = impulse
print(f" [proactive] ๐Ÿ’ญ impulse detected: {impulse_desc}")
# โ”€โ”€ Step 3: Memory flashback โ”€โ”€
# Search EverMemOS using impulse content โ€” simulates "a memory pops up"
flashback_parts = []
if self.evermemos and self.evermemos.available:
try:
facts, episodes, profile = await self.evermemos.search_relevant_memories(
query=impulse_desc,
user_id=self.evermemos_uid,
group_id=self._group_id,
)
if episodes:
flashback_parts.append(f"[่ฎฐๅฟ†้—ชๅ›ž] {episodes}")
if facts:
flashback_parts.append(f"[้—ชๅ›ž็ป†่Š‚] {facts}")
except Exception as e:
print(f" [proactive] flashback search failed: {e}")
# โ”€โ”€ Step 4: Build stimulus (data formatting, not decision logic) โ”€โ”€
name = self.user_name or "ไฝ "
hours = (start - self._last_active) / 3600 if self._last_active > 0 else 0
parts = [f"[ๅ†…ๅœจ็Šถๆ€] ๅทฒ{hours:.0f}ๅฐๆ—ถๆœชไธŽ{name}ไบ’ๅŠจใ€‚{impulse_desc}"]
parts.extend(flashback_parts)
if self._foresight_text:
parts.append(f"[้ข„ๆ„Ÿ] {self._foresight_text}")
stimulus = "\n".join(parts)
# โ”€โ”€ Step 5: Load session context (if not already cached) โ”€โ”€
relationship_prior = await self._evermemos_gather()
# โ”€โ”€ Step 6: Critic perception (same pipeline, stimulus instead of user_message) โ”€โ”€
frust_dict = {d: round(self.metabolism.frustration[d], 2) for d in DRIVES}
_p = self.persona
_mbti = getattr(_p, 'mbti', '') or 'ๆœช็Ÿฅ'
_tags = 'ใ€'.join(getattr(_p, 'tags', [])[:3])
_persona_hint = f"{_p.name} ({_mbti}) โ€” {_tags}" if _tags else f"{_p.name} ({_mbti})"
context, frustration_delta, rel_delta, drive_satisfaction = await critic_sense(
stimulus, self.llm, frust_dict,
user_profile=self._user_profile,
episode_summary=self._episode_summary,
persona_hint=_persona_hint,
)
# โ”€โ”€ R1: FROZEN โ€” Do NOT update relationship EMA (no user feedback) โ”€โ”€
# Read-only: use prior values without writing to EMA
relationship_4d = {
'relationship_depth': self._relationship_ema.get('relationship_depth', 0.0),
'trust_level': self._relationship_ema.get('trust_level', 0.0),
'emotional_valence': self._relationship_ema.get('emotional_valence', 0.0),
'pending_foresight': self._relationship_ema.get('pending_foresight', 0.0),
}
context.update(relationship_4d)
# โ”€โ”€ Step 7: Metabolism โ†’ reward (frustration release) โ”€โ”€
reward = self.metabolism.apply_llm_delta(frustration_delta)
self.metabolism.sync_to_agent(self.agent)
# โ”€โ”€ R1: FROZEN โ€” Do NOT evolve drive baselines (Step 3.5) โ”€โ”€
# โ”€โ”€ R1: FROZEN โ€” Do NOT do Hebbian learning (Step 10) โ”€โ”€
# โ”€โ”€ Step 8: Build single-pass prompt (matching ChatAgent pattern) โ”€โ”€
base_signals = self.agent.compute_signals(context)
noisy_signals = self.metabolism.apply_thermodynamic_noise(base_signals)
self.style_memory.set_clock(start)
few_shot = self.style_memory.build_few_shot_prompt(
context, top_k=3, monologue_only=False, lang=self.persona.lang,
)
single_prompt = self._build_single_prompt(few_shot, noisy_signals)
# โ”€โ”€ Step 8.5: Memory injection into prompt โ”€โ”€
if self._session_ctx and self._session_ctx.has_history:
if self.persona.lang == 'en':
if self._user_profile:
single_prompt += f"\n\n[{name}'s preferences] {self._user_profile[:300]}"
if self._episode_summary:
single_prompt += f"\n\n[Past interactions with {name}] {self._episode_summary[:300]}"
if self._foresight_text:
single_prompt += f"\n\n[Worth noting] {self._foresight_text}"
else:
if self._user_profile:
single_prompt += f"\n\n[ๅ…ณไบŽ{name}็š„ๅๅฅฝ] {self._user_profile[:300]}"
if self._episode_summary:
single_prompt += f"\n\n[ไธŽ{name}่ฟ‡ๅŽปๅ‘็”Ÿ็š„ไบ‹] {self._episode_summary[:300]}"
if self._foresight_text:
single_prompt += f"\n\n[่ฟ‘ๆœŸๅ€ผๅพ—ๅ…ณๅฟƒ] {self._foresight_text}"
# โ”€โ”€ Step 9: Single-pass LLM call โ”€โ”€
single_messages = [
ChatMessage(role="system", content=single_prompt),
ChatMessage(role="user", content=stimulus),
]
single_response = await self.llm.chat(single_messages)
monologue, reply, modality = extract_reply(single_response.content)
elapsed = start and (time.time() - start) or 0
if elapsed > 300:
print(f" [proactive] โš ๏ธ tick took {elapsed:.0f}s, approaching TTL")
# โ”€โ”€ Actor decided to stay silent โ”€โ”€
if modality == "้™้ป˜" or not reply.strip():
print(f" [proactive] ๐Ÿคซ decided to stay silent: {monologue[:60]}")
return None
# โ”€โ”€ Actor decided to speak โ”€โ”€
print(f" [proactive] ๐Ÿ’ฌ sending: {reply[:40]}...")
# Update last_active (proactive message counts as activity)
self._last_active = time.time()
return {
'reply': reply,
'modality': modality,
'monologue': monologue,
'proactive': True,
'drive_id': drive_id,
'tick_id': tick_id,
}