Spaces:
Running
Running
| """ | |
| StateStore β SQLite persistence for Genome v8 agent state. | |
| Stores per-user-per-persona state so that restarting the server | |
| doesn't lose personality evolution (Agent weights + DriveMetabolism). | |
| Extended with proactive tick infrastructure: | |
| - proactive_lock: cross-instance lease-based locking (R7/R13/R23/R28/R30) | |
| - proactive_outbox: message queue with state machine (R4/R10/R15/R21/R31) | |
| - CAS save_state: version-guarded writes (R24/R29) | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import os | |
| import sqlite3 | |
| import time | |
| from typing import Optional | |
| from engine.genome.genome_engine import Agent | |
| from engine.genome.drive_metabolism import DriveMetabolism | |
| class StateStore: | |
| """ | |
| SQLite-backed state persistence for Genome v8 agents. | |
| Usage: | |
| store = StateStore("/path/to/openher.db") | |
| store.save_session("user123", "persona_a", agent, metabolism) | |
| agent, metabolism = store.load_session("user123", "persona_a") | |
| """ | |
| def __init__(self, db_path: str): | |
| self.db_path = db_path | |
| os.makedirs(os.path.dirname(db_path) or ".", exist_ok=True) | |
| self._conn = sqlite3.connect(db_path, check_same_thread=False) | |
| self._conn.row_factory = sqlite3.Row | |
| self._create_tables() | |
| print(f"β ηΆζεε¨: {db_path}") | |
| def _create_tables(self): | |
| self._conn.executescript(""" | |
| CREATE TABLE IF NOT EXISTS genome_state ( | |
| user_id TEXT NOT NULL, | |
| persona_id TEXT NOT NULL, | |
| agent_data TEXT DEFAULT '{}', | |
| metabolism_data TEXT DEFAULT '{}', | |
| state_version INTEGER DEFAULT 0, | |
| last_active_at REAL DEFAULT 0, | |
| interaction_cadence REAL DEFAULT 0, | |
| updated_at REAL DEFAULT 0, | |
| PRIMARY KEY (user_id, persona_id) | |
| ); | |
| CREATE TABLE IF NOT EXISTS chat_summary ( | |
| user_id TEXT NOT NULL, | |
| persona_id TEXT NOT NULL, | |
| summary TEXT DEFAULT '', | |
| message_count INTEGER DEFAULT 0, | |
| updated_at REAL DEFAULT 0, | |
| PRIMARY KEY (user_id, persona_id) | |
| ); | |
| CREATE TABLE IF NOT EXISTS proactive_lock ( | |
| user_id TEXT NOT NULL, | |
| persona_id TEXT NOT NULL, | |
| owner_id TEXT NOT NULL, | |
| acquired_at REAL NOT NULL, | |
| expires_at REAL NOT NULL, | |
| PRIMARY KEY (user_id, persona_id) | |
| ); | |
| CREATE TABLE IF NOT EXISTS proactive_outbox ( | |
| user_id TEXT NOT NULL, | |
| persona_id TEXT NOT NULL, | |
| tick_id TEXT NOT NULL, | |
| reply TEXT NOT NULL, | |
| modality TEXT NOT NULL DEFAULT 'ζε', | |
| monologue TEXT DEFAULT '', | |
| drive_id TEXT DEFAULT '', | |
| dedup_key TEXT DEFAULT '', | |
| created_at REAL NOT NULL, | |
| status TEXT DEFAULT 'pending', | |
| delivered_at REAL, | |
| PRIMARY KEY (user_id, persona_id, tick_id) | |
| ); | |
| """) | |
| # Migrate: add columns if missing (for existing DBs) | |
| for col, typ, default in [ | |
| ("state_version", "INTEGER", "0"), | |
| ("last_active_at", "REAL", "0"), | |
| ("interaction_cadence", "REAL", "0"), | |
| ]: | |
| try: | |
| self._conn.execute( | |
| f"ALTER TABLE genome_state ADD COLUMN {col} {typ} DEFAULT {default}") | |
| except sqlite3.OperationalError: | |
| pass | |
| self._conn.commit() | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # Session state (original + CAS) | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def save_session( | |
| self, | |
| user_id: str, | |
| persona_id: str, | |
| agent: Agent, | |
| metabolism: DriveMetabolism, | |
| ) -> None: | |
| """Persist Agent + DriveMetabolism state to SQLite (legacy, non-CAS).""" | |
| now = time.time() | |
| self._conn.execute( | |
| """ | |
| INSERT INTO genome_state (user_id, persona_id, agent_data, metabolism_data, updated_at) | |
| VALUES (?, ?, ?, ?, ?) | |
| ON CONFLICT(user_id, persona_id) DO UPDATE SET | |
| agent_data = excluded.agent_data, | |
| metabolism_data = excluded.metabolism_data, | |
| updated_at = excluded.updated_at | |
| """, | |
| ( | |
| user_id, persona_id, | |
| json.dumps(agent.to_dict(), ensure_ascii=False), | |
| json.dumps(metabolism.to_dict(), ensure_ascii=False), | |
| now, | |
| ), | |
| ) | |
| self._conn.commit() | |
| def save_state( | |
| self, | |
| user_id: str, | |
| persona_id: str, | |
| *, | |
| agent_data: Optional[str] = None, | |
| metabolism_data: Optional[str] = None, | |
| last_active_at: Optional[float] = None, | |
| interaction_cadence: Optional[float] = None, | |
| expected_version: Optional[int] = None, | |
| ) -> bool: | |
| """ | |
| Unified CAS write entry (R24/R29). | |
| Returns True if write succeeded, False if version mismatch. | |
| """ | |
| now = time.time() | |
| if expected_version is not None: | |
| cur = self._conn.execute(""" | |
| UPDATE genome_state SET | |
| agent_data = COALESCE(?, agent_data), | |
| metabolism_data = COALESCE(?, metabolism_data), | |
| last_active_at = COALESCE(?, last_active_at), | |
| interaction_cadence = COALESCE(?, interaction_cadence), | |
| state_version = state_version + 1, | |
| updated_at = ? | |
| WHERE user_id = ? AND persona_id = ? AND state_version = ? | |
| """, (agent_data, metabolism_data, last_active_at, | |
| interaction_cadence, now, user_id, persona_id, expected_version)) | |
| self._conn.commit() | |
| return cur.rowcount > 0 | |
| else: | |
| self._conn.execute(""" | |
| INSERT INTO genome_state | |
| (user_id, persona_id, agent_data, metabolism_data, | |
| last_active_at, interaction_cadence, updated_at) | |
| VALUES (?, ?, COALESCE(?, '{}'), COALESCE(?, '{}'), | |
| COALESCE(?, 0), COALESCE(?, 0), ?) | |
| ON CONFLICT(user_id, persona_id) DO UPDATE SET | |
| agent_data = COALESCE(excluded.agent_data, genome_state.agent_data), | |
| metabolism_data = COALESCE(excluded.metabolism_data, genome_state.metabolism_data), | |
| last_active_at = COALESCE(excluded.last_active_at, genome_state.last_active_at), | |
| interaction_cadence = COALESCE(excluded.interaction_cadence, genome_state.interaction_cadence), | |
| state_version = genome_state.state_version + 1, | |
| updated_at = excluded.updated_at | |
| """, (user_id, persona_id, agent_data, metabolism_data, | |
| last_active_at, interaction_cadence, now)) | |
| self._conn.commit() | |
| return True | |
| def get_state_version(self, user_id: str, persona_id: str) -> int: | |
| """Get current state_version for CAS.""" | |
| row = self._conn.execute( | |
| "SELECT state_version FROM genome_state WHERE user_id=? AND persona_id=?", | |
| (user_id, persona_id)).fetchone() | |
| return row["state_version"] if row else 0 | |
| def load_session( | |
| self, | |
| user_id: str, | |
| persona_id: str, | |
| ) -> tuple[Optional[Agent], Optional[DriveMetabolism]]: | |
| """Load persisted state. Returns (None, None) if no prior session.""" | |
| row = self._conn.execute( | |
| "SELECT agent_data, metabolism_data FROM genome_state WHERE user_id = ? AND persona_id = ?", | |
| (user_id, persona_id)).fetchone() | |
| if not row: | |
| return None, None | |
| try: | |
| agent_data = json.loads(row["agent_data"]) | |
| metabolism_data = json.loads(row["metabolism_data"]) | |
| agent = Agent.from_dict(agent_data) | |
| metabolism = DriveMetabolism.from_dict(metabolism_data) | |
| return agent, metabolism | |
| except (json.JSONDecodeError, KeyError, ValueError) as e: | |
| print(f"[state] ε θ½½ηΆζε€±θ΄₯ ({user_id}/{persona_id}): {e}") | |
| return None, None | |
| def load_proactive_meta( | |
| self, user_id: str, persona_id: str | |
| ) -> tuple[float, float, int]: | |
| """Load last_active_at, interaction_cadence, state_version.""" | |
| row = self._conn.execute( | |
| "SELECT last_active_at, interaction_cadence, state_version FROM genome_state WHERE user_id=? AND persona_id=?", | |
| (user_id, persona_id)).fetchone() | |
| if row: | |
| return row["last_active_at"], row["interaction_cadence"], row["state_version"] | |
| return 0.0, 0.0, 0 | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # Proactive Lock (R7/R13/R23/R28/R30) | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def try_acquire_lock( | |
| self, user_id: str, persona_id: str, owner_id: str, ttl: float = 600 | |
| ) -> bool: | |
| """ | |
| Atomic lease-based lock via single UPSERT (R23/R28). | |
| Only acquires if no lock exists or existing lock expired. | |
| """ | |
| now = time.time() | |
| with self._conn: | |
| cur = self._conn.execute(""" | |
| INSERT INTO proactive_lock (user_id, persona_id, owner_id, acquired_at, expires_at) | |
| VALUES (?, ?, ?, ?, ?) | |
| ON CONFLICT(user_id, persona_id) DO UPDATE SET | |
| owner_id = excluded.owner_id, | |
| acquired_at = excluded.acquired_at, | |
| expires_at = excluded.expires_at | |
| WHERE proactive_lock.expires_at < ? | |
| """, (user_id, persona_id, owner_id, now, now + ttl, now)) | |
| return cur.rowcount > 0 | |
| def release_lock(self, user_id: str, persona_id: str, owner_id: str): | |
| """Release lock only if we own it (R30).""" | |
| self._conn.execute( | |
| "DELETE FROM proactive_lock WHERE user_id=? AND persona_id=? AND owner_id=?", | |
| (user_id, persona_id, owner_id)) | |
| self._conn.commit() | |
| def renew_lock( | |
| self, user_id: str, persona_id: str, owner_id: str, ttl: float = 600 | |
| ): | |
| """Renew lock TTL only if we own it (R27/R30).""" | |
| now = time.time() | |
| self._conn.execute(""" | |
| UPDATE proactive_lock SET expires_at = ? | |
| WHERE user_id=? AND persona_id=? AND owner_id=? | |
| """, (now + ttl, user_id, persona_id, owner_id)) | |
| self._conn.commit() | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # Proactive Outbox (R4/R10/R15/R21/R31) | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def outbox_insert( | |
| self, user_id: str, persona_id: str, tick_id: str, | |
| reply: str, modality: str, monologue: str, | |
| drive_id: str, dedup_key: str, | |
| ) -> bool: | |
| """Insert into outbox. Returns False if tick_id already exists (idempotent).""" | |
| try: | |
| self._conn.execute(""" | |
| INSERT INTO proactive_outbox | |
| (user_id, persona_id, tick_id, reply, modality, monologue, | |
| drive_id, dedup_key, created_at, status) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 'pending') | |
| """, (user_id, persona_id, tick_id, reply, modality, | |
| monologue, drive_id, dedup_key, time.time())) | |
| self._conn.commit() | |
| return True | |
| except sqlite3.IntegrityError: | |
| return False | |
| def outbox_has_recent(self, user_id: str, persona_id: str, hours: float = 4) -> bool: | |
| """Check if any message was created within cooldown window.""" | |
| cutoff = time.time() - hours * 3600 | |
| row = self._conn.execute(""" | |
| SELECT 1 FROM proactive_outbox | |
| WHERE user_id=? AND persona_id=? AND created_at > ? LIMIT 1 | |
| """, (user_id, persona_id, cutoff)).fetchone() | |
| return row is not None | |
| def outbox_pending_count(self, user_id: str, persona_id: str) -> int: | |
| row = self._conn.execute(""" | |
| SELECT COUNT(*) as cnt FROM proactive_outbox | |
| WHERE user_id=? AND persona_id=? AND status='pending' | |
| """, (user_id, persona_id)).fetchone() | |
| return row["cnt"] if row else 0 | |
| def outbox_has_dedup( | |
| self, user_id: str, persona_id: str, dedup_key: str, hours: float = 4 | |
| ) -> bool: | |
| cutoff = time.time() - hours * 3600 | |
| row = self._conn.execute(""" | |
| SELECT 1 FROM proactive_outbox | |
| WHERE user_id=? AND persona_id=? AND dedup_key=? AND created_at > ? LIMIT 1 | |
| """, (user_id, persona_id, dedup_key, cutoff)).fetchone() | |
| return row is not None | |
| def outbox_try_send(self, user_id: str, persona_id: str, tick_id: str) -> Optional[dict]: | |
| """Atomically take pending β sending (R31).""" | |
| with self._conn: | |
| cur = self._conn.execute(""" | |
| UPDATE proactive_outbox SET status='sending' | |
| WHERE user_id=? AND persona_id=? AND tick_id=? AND status='pending' | |
| """, (user_id, persona_id, tick_id)) | |
| if cur.rowcount == 0: | |
| return None | |
| row = self._conn.execute(""" | |
| SELECT * FROM proactive_outbox WHERE user_id=? AND persona_id=? AND tick_id=? | |
| """, (user_id, persona_id, tick_id)).fetchone() | |
| return dict(row) if row else None | |
| def outbox_mark_delivered(self, user_id: str, persona_id: str, tick_id: str): | |
| self._conn.execute(""" | |
| UPDATE proactive_outbox SET status='delivered', delivered_at=? | |
| WHERE user_id=? AND persona_id=? AND tick_id=? | |
| """, (time.time(), user_id, persona_id, tick_id)) | |
| self._conn.commit() | |
| def outbox_mark_failed(self, user_id: str, persona_id: str, tick_id: str): | |
| self._conn.execute(""" | |
| UPDATE proactive_outbox SET status='pending' | |
| WHERE user_id=? AND persona_id=? AND tick_id=? AND status='sending' | |
| """, (user_id, persona_id, tick_id)) | |
| self._conn.commit() | |
| def outbox_get_pending(self, user_id: str, persona_id: str, limit: int = 3) -> list[dict]: | |
| rows = self._conn.execute(""" | |
| SELECT * FROM proactive_outbox | |
| WHERE user_id=? AND persona_id=? AND status='pending' | |
| ORDER BY created_at ASC LIMIT ? | |
| """, (user_id, persona_id, limit)).fetchall() | |
| return [dict(r) for r in rows] | |
| def outbox_can_enqueue( | |
| self, user_id: str, persona_id: str, dedup_key: str, | |
| cooldown_hours: float = 4, max_pending: int = 3, | |
| ) -> bool: | |
| """3-layer guard: cooldown + pending cap + dedup (R15).""" | |
| if self.outbox_has_recent(user_id, persona_id, cooldown_hours): | |
| return False | |
| if self.outbox_pending_count(user_id, persona_id) >= max_pending: | |
| return False | |
| if self.outbox_has_dedup(user_id, persona_id, dedup_key, cooldown_hours): | |
| return False | |
| return True | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # Chat summary (unchanged) | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def save_chat_summary( | |
| self, user_id: str, persona_id: str, | |
| summary: str, message_count: int, | |
| ) -> None: | |
| """Save a chat summary for future context loading.""" | |
| self._conn.execute(""" | |
| INSERT INTO chat_summary (user_id, persona_id, summary, message_count, updated_at) | |
| VALUES (?, ?, ?, ?, ?) | |
| ON CONFLICT(user_id, persona_id) DO UPDATE SET | |
| summary = excluded.summary, | |
| message_count = excluded.message_count, | |
| updated_at = excluded.updated_at | |
| """, (user_id, persona_id, summary, message_count, time.time())) | |
| self._conn.commit() | |
| def close(self): | |
| self._conn.close() | |