govon-runtime / src /inference /session_context.py
umyunsang's picture
sync: main branch src/ with PR#561+#563 (tool calling + E2E observability)
0b04246 verified
"""์„ธ์…˜ ์ปจํ…์ŠคํŠธ ๋ฐ SQLite ๊ธฐ๋ฐ˜ ์„ธ์…˜ ์ €์žฅ์†Œ.
GovOn Shell MVP์˜ ์„ธ์…˜ ๋ชจ๋ธ์€ ๋‹ค์Œ์„ ์ €์žฅํ•œ๋‹ค.
- ๋Œ€ํ™” ๊ธฐ๋ก
- tool ์‚ฌ์šฉ ๊ธฐ๋ก
- task loop ๋‹จ์œ„ ์‹คํ–‰ ๋กœ๊ทธ
์ดˆ์•ˆ ๋ฒ„์ „, ์„ ํƒ ๊ทผ๊ฑฐ ๋ชฉ๋ก ๊ฐ™์€ ๋ฌด๊ฑฐ์šด ์ƒํƒœ๋Š” ์ œํ’ˆ ๊ธฐ๋ณธ ์ €์žฅ ๋ฒ”์œ„์—์„œ ์ œ์™ธํ•œ๋‹ค.
Schema versioning
-----------------
_init_db()๋Š” schema_version ํ…Œ์ด๋ธ”์„ ํ†ตํ•ด ์ˆœ์ฐจ migration์„ ๊ด€๋ฆฌํ•œ๋‹ค.
ํ˜„์žฌ ์ตœ์‹  ๋ฒ„์ „: SCHEMA_VERSION = 2
Migration history:
v1 โ†’ v2: tool_runs์— graph_run_request_id ์ปฌ๋Ÿผ ์ถ”๊ฐ€ (์ด์ „์—๋Š” ad-hoc ALTER TABLE)
"""
from __future__ import annotations
import json
import os
import sqlite3
import time
import uuid
from contextlib import closing
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional
from loguru import logger
SCHEMA_VERSION = 2
"""ํ˜„์žฌ SessionStore SQLite ์Šคํ‚ค๋งˆ ๋ฒ„์ „."""
def _default_session_db_path() -> str:
base_dir = Path(os.getenv("GOVON_HOME", Path.home() / ".govon"))
base_dir.mkdir(parents=True, exist_ok=True)
return str(base_dir / "sessions.sqlite3")
@dataclass
class ConversationTurn:
"""๋Œ€ํ™” ํ•œ ํ„ด."""
role: str
content: str
timestamp: float = field(default_factory=time.time)
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class ToolRunRecord:
"""๋„๊ตฌ ์‹คํ–‰ ๋กœ๊ทธ."""
tool: str
success: bool
graph_run_request_id: Optional[str] = None
latency_ms: float = 0.0
error: Optional[str] = None
metadata: Dict[str, Any] = field(default_factory=dict)
timestamp: float = field(default_factory=time.time)
@dataclass
class GraphRunRecord:
"""task loop ๋‹จ์œ„ ์‹คํ–‰ ๋กœ๊ทธ."""
request_id: str
plan_summary: str = ""
approval_status: Optional[str] = None
executed_capabilities: List[str] = field(default_factory=list)
status: str = "completed"
error: Optional[str] = None
total_latency_ms: float = 0.0
metadata: Dict[str, Any] = field(default_factory=dict)
started_at: float = field(default_factory=time.time)
completed_at: float = field(default_factory=time.time)
@dataclass
class SessionContext:
"""์„ธ์…˜ ๊ธฐ๋ฐ˜ ๋Œ€ํ™”/๋„๊ตฌ ๊ธฐ๋ก ์ปจํ…์ŠคํŠธ."""
session_id: str = field(default_factory=lambda: str(uuid.uuid4()))
max_history: int = 20
conversations: List[ConversationTurn] = field(default_factory=list)
tool_runs: List[ToolRunRecord] = field(default_factory=list)
graph_runs: List[GraphRunRecord] = field(default_factory=list)
metadata: Dict[str, Any] = field(default_factory=dict)
created_at: float = field(default_factory=time.time)
_persist_turn: Optional[Callable[[ConversationTurn], None]] = field(default=None, repr=False)
_persist_tool_run: Optional[Callable[[ToolRunRecord], None]] = field(default=None, repr=False)
_persist_graph_run: Optional[Callable[[GraphRunRecord], None]] = field(default=None, repr=False)
_persist_metadata: Optional[Callable[[str, Any], None]] = field(default=None, repr=False)
def add_turn(self, role: str, content: str, **kwargs: Any) -> None:
"""๋Œ€ํ™” ํ„ด์„ ์ถ”๊ฐ€ํ•˜๊ณ  ํ•„์š” ์‹œ ์˜์†ํ™”ํ•œ๋‹ค."""
turn = ConversationTurn(role=role, content=content, metadata=kwargs)
self.conversations.append(turn)
if len(self.conversations) > self.max_history:
removed = len(self.conversations) - self.max_history
self.conversations = self.conversations[removed:]
logger.debug(f"์„ธ์…˜ {self.session_id}: ์˜ค๋ž˜๋œ ๋Œ€ํ™” {removed}ํ„ด ์ œ๊ฑฐ")
if self._persist_turn:
self._persist_turn(turn)
def add_tool_run(
self,
tool: str,
success: bool,
graph_run_request_id: Optional[str] = None,
latency_ms: float = 0.0,
error: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> None:
"""๋„๊ตฌ ์‹คํ–‰ ๋กœ๊ทธ๋ฅผ ์ถ”๊ฐ€ํ•˜๊ณ  ํ•„์š” ์‹œ ์˜์†ํ™”ํ•œ๋‹ค."""
record = ToolRunRecord(
tool=tool,
graph_run_request_id=graph_run_request_id,
success=success,
latency_ms=latency_ms,
error=error,
metadata=metadata or {},
)
self.tool_runs.append(record)
if self._persist_tool_run:
self._persist_tool_run(record)
@property
def recent_history(self) -> List[ConversationTurn]:
return list(self.conversations)
@property
def recent_tool_runs(self) -> List[ToolRunRecord]:
return list(self.tool_runs)
def add_graph_run(
self,
request_id: str,
plan_summary: str = "",
approval_status: Optional[str] = None,
executed_capabilities: Optional[List[str]] = None,
status: str = "completed",
error: Optional[str] = None,
total_latency_ms: float = 0.0,
metadata: Optional[Dict[str, Any]] = None,
started_at: Optional[float] = None,
completed_at: Optional[float] = None,
) -> None:
"""task loop ๋‹จ์œ„ ์‹คํ–‰ ๋กœ๊ทธ๋ฅผ ์ถ”๊ฐ€ํ•˜๊ณ  ํ•„์š” ์‹œ ์˜์†ํ™”ํ•œ๋‹ค."""
record = GraphRunRecord(
request_id=request_id,
plan_summary=plan_summary,
approval_status=approval_status,
executed_capabilities=list(executed_capabilities or []),
status=status,
error=error,
total_latency_ms=total_latency_ms,
metadata=metadata or {},
started_at=started_at or time.time(),
completed_at=completed_at or time.time(),
)
for index, existing in enumerate(self.graph_runs):
if existing.request_id == request_id:
self.graph_runs[index] = record
break
else:
self.graph_runs.append(record)
if self._persist_graph_run:
self._persist_graph_run(record)
@property
def recent_graph_runs(self) -> List[GraphRunRecord]:
return list(self.graph_runs)
def set_metadata(self, key: str, value: Any) -> None:
"""์„ธ์…˜ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ๋ฅผ ์„ค์ •ํ•˜๊ณ  ์˜์†ํ™”ํ•œ๋‹ค."""
self.metadata[key] = value
if self._persist_metadata:
self._persist_metadata(key, value)
def build_context_summary(self) -> str:
"""์ตœ๊ทผ ๋Œ€ํ™”์™€ tool ์‚ฌ์šฉ ๊ธฐ๋ก์„ ์š”์•ฝํ•œ๋‹ค."""
parts: List[str] = []
if self.conversations:
history_lines = []
for turn in self.conversations[-5:]:
role_label = "์‚ฌ์šฉ์ž" if turn.role == "user" else "์‹œ์Šคํ…œ"
history_lines.append(f"[{role_label}] {turn.content}")
parts.append("### ์ตœ๊ทผ ๋Œ€ํ™”\n" + "\n".join(history_lines))
if self.tool_runs:
tool_lines = []
for record in self.tool_runs[-5:]:
status = "์„ฑ๊ณต" if record.success else "์‹คํŒจ"
line = f"- {record.tool}: {status}"
if record.error:
line += f" ({record.error})"
tool_lines.append(line)
parts.append("### ์ตœ๊ทผ ๋„๊ตฌ ์‹คํ–‰\n" + "\n".join(tool_lines))
if self.graph_runs:
run_lines = []
for record in self.graph_runs[-3:]:
approval = record.approval_status or "๋ฏธ๊ธฐ๋ก"
tools = ", ".join(record.executed_capabilities) or "๋„๊ตฌ ์—†์Œ"
line = f"- {record.status} / ์Šน์ธ={approval} / tools={tools}"
if record.error:
line += f" ({record.error})"
run_lines.append(line)
parts.append("### ์ตœ๊ทผ ์ž‘์—… ์‹คํ–‰\n" + "\n".join(run_lines))
return "\n\n".join(parts)
class SessionStore:
"""SQLite ๊ธฐ๋ฐ˜ ์„ธ์…˜ ์ €์žฅ์†Œ."""
def __init__(self, db_path: Optional[str] = None, max_history: int = 20) -> None:
self._db_path = db_path or os.getenv("GOVON_SESSION_DB") or _default_session_db_path()
self._max_history = max_history
self._init_db()
@property
def db_path(self) -> str:
return self._db_path
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self._db_path)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys = ON")
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=5000")
return conn
def _init_db(self) -> None:
"""๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค๋ฅผ ์ดˆ๊ธฐํ™”ํ•˜๊ณ  ์ˆœ์ฐจ schema migration์„ ์‹คํ–‰ํ•œ๋‹ค.
schema_version ํ…Œ์ด๋ธ”์„ ํ†ตํ•ด ํ˜„์žฌ ๋ฒ„์ „์„ ์ถ”์ ํ•˜๋ฉฐ,
๋ฒ„์ „ ๋ฒˆํ˜ธ ์ˆœ์„œ๋Œ€๋กœ migration ํ•จ์ˆ˜๋ฅผ ์ ์šฉํ•œ๋‹ค.
๊ฐ migration์€ ์›์ž์ (atomic)์œผ๋กœ ์‹คํ–‰๋œ๋‹ค.
"""
Path(self._db_path).parent.mkdir(parents=True, exist_ok=True)
with closing(self._connect()) as conn, conn:
# schema_version ํ…Œ์ด๋ธ”: ๋ฒ„์ „ ์ถ”์ ์˜ ๋‹จ์ผ ์†Œ์Šค
conn.execute("""
CREATE TABLE IF NOT EXISTS schema_version (
version INTEGER PRIMARY KEY
)
""")
row = conn.execute("SELECT MAX(version) AS v FROM schema_version").fetchone()
current_version = row["v"] if row and row["v"] is not None else 0
# ๋ฒ„์ „ 1: ๊ธฐ๋ณธ ์Šคํ‚ค๋งˆ ์ƒ์„ฑ
if current_version < 1:
self._migrate_v1()
# ๋ฒ„์ „ 2: tool_runs.graph_run_request_id ์ปฌ๋Ÿผ ์ถ”๊ฐ€
if current_version < 2:
self._migrate_v2()
def _migrate_v1(self) -> None:
"""v1: ๊ธฐ๋ณธ ์Šคํ‚ค๋งˆ(sessions, messages, tool_runs, graph_runs, metadata) ์ƒ์„ฑ."""
with closing(self._connect()) as conn, conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS sessions (
session_id TEXT PRIMARY KEY,
created_at REAL NOT NULL,
updated_at REAL NOT NULL,
metadata_json TEXT NOT NULL DEFAULT '{}'
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
timestamp REAL NOT NULL,
metadata_json TEXT NOT NULL DEFAULT '{}',
FOREIGN KEY(session_id) REFERENCES sessions(session_id) ON DELETE CASCADE
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS tool_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
tool TEXT NOT NULL,
success INTEGER NOT NULL,
latency_ms REAL NOT NULL DEFAULT 0,
error TEXT,
metadata_json TEXT NOT NULL DEFAULT '{}',
timestamp REAL NOT NULL,
FOREIGN KEY(session_id) REFERENCES sessions(session_id) ON DELETE CASCADE
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_tool_runs_session_id
ON tool_runs(session_id)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS graph_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
request_id TEXT NOT NULL,
plan_summary TEXT NOT NULL DEFAULT '',
approval_status TEXT,
executed_capabilities_json TEXT NOT NULL DEFAULT '[]',
status TEXT NOT NULL,
error TEXT,
total_latency_ms REAL NOT NULL DEFAULT 0,
metadata_json TEXT NOT NULL DEFAULT '{}',
started_at REAL NOT NULL,
completed_at REAL NOT NULL,
FOREIGN KEY(session_id) REFERENCES sessions(session_id) ON DELETE CASCADE
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_graph_runs_session_id
ON graph_runs(session_id)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_graph_runs_session_request
ON graph_runs(session_id, request_id)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS metadata (
owner_type TEXT NOT NULL,
owner_id TEXT NOT NULL,
key TEXT NOT NULL,
value_json TEXT NOT NULL,
updated_at REAL NOT NULL,
PRIMARY KEY (owner_type, owner_id, key)
)
""")
conn.execute("INSERT OR IGNORE INTO schema_version(version) VALUES (1)")
logger.debug("SessionStore schema migration v1 ์™„๋ฃŒ")
def _migrate_v2(self) -> None:
"""v2: tool_runs์— graph_run_request_id ์ปฌ๋Ÿผ ๋ฐ ๋ณตํ•ฉ ์ธ๋ฑ์Šค ์ถ”๊ฐ€."""
with closing(self._connect()) as conn, conn:
existing_columns = {
row["name"] for row in conn.execute("PRAGMA table_info(tool_runs)").fetchall()
}
if "graph_run_request_id" not in existing_columns:
conn.execute("ALTER TABLE tool_runs ADD COLUMN graph_run_request_id TEXT")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_tool_runs_session_graph_run
ON tool_runs(session_id, graph_run_request_id)
""")
conn.execute("INSERT OR IGNORE INTO schema_version(version) VALUES (2)")
logger.debug("SessionStore schema migration v2 ์™„๋ฃŒ")
def _ensure_session(self, session_id: str, created_at: Optional[float] = None) -> None:
now = time.time()
created = created_at or now
with closing(self._connect()) as conn, conn:
conn.execute(
"""
INSERT INTO sessions(session_id, created_at, updated_at, metadata_json)
VALUES (?, ?, ?, '{}')
ON CONFLICT(session_id) DO UPDATE SET updated_at=excluded.updated_at
""",
(session_id, created, now),
)
def _load_session_metadata_json(self, session_id: str) -> Dict[str, Any]:
row = self._load_session_metadata(session_id)
if row is None:
return {}
return json.loads(row["metadata_json"] or "{}")
def _upsert_session_metadata_json(self, session_id: str, metadata: Dict[str, Any]) -> None:
self._ensure_session(session_id)
with closing(self._connect()) as conn, conn:
conn.execute(
"UPDATE sessions SET metadata_json=?, updated_at=? WHERE session_id=?",
(json.dumps(metadata, ensure_ascii=False), time.time(), session_id),
)
def _append_turn(self, session_id: str, turn: ConversationTurn) -> None:
self._ensure_session(session_id)
with closing(self._connect()) as conn, conn:
conn.execute(
"""
INSERT INTO messages(session_id, role, content, timestamp, metadata_json)
VALUES (?, ?, ?, ?, ?)
""",
(
session_id,
turn.role,
turn.content,
turn.timestamp,
json.dumps(turn.metadata, ensure_ascii=False),
),
)
conn.execute(
"UPDATE sessions SET updated_at=? WHERE session_id=?",
(time.time(), session_id),
)
def _append_tool_run(self, session_id: str, record: ToolRunRecord) -> None:
self._ensure_session(session_id)
with closing(self._connect()) as conn, conn:
conn.execute(
"""
INSERT INTO tool_runs(
session_id,
graph_run_request_id,
tool,
success,
latency_ms,
error,
metadata_json,
timestamp
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
session_id,
record.graph_run_request_id,
record.tool,
1 if record.success else 0,
record.latency_ms,
record.error,
json.dumps(record.metadata, ensure_ascii=False),
record.timestamp,
),
)
conn.execute(
"UPDATE sessions SET updated_at=? WHERE session_id=?",
(time.time(), session_id),
)
def _append_graph_run(self, session_id: str, record: GraphRunRecord) -> None:
self._ensure_session(session_id)
with closing(self._connect()) as conn, conn:
existing = conn.execute(
"""
SELECT id
FROM graph_runs
WHERE session_id=? AND request_id=?
""",
(session_id, record.request_id),
).fetchone()
payload = (
record.plan_summary,
record.approval_status,
json.dumps(record.executed_capabilities, ensure_ascii=False),
record.status,
record.error,
record.total_latency_ms,
json.dumps(record.metadata, ensure_ascii=False),
record.started_at,
record.completed_at,
session_id,
record.request_id,
)
if existing:
conn.execute(
"""
UPDATE graph_runs
SET
plan_summary=?,
approval_status=?,
executed_capabilities_json=?,
status=?,
error=?,
total_latency_ms=?,
metadata_json=?,
started_at=?,
completed_at=?
WHERE session_id=? AND request_id=?
""",
payload,
)
else:
conn.execute(
"""
INSERT INTO graph_runs(
session_id,
request_id,
plan_summary,
approval_status,
executed_capabilities_json,
status,
error,
total_latency_ms,
metadata_json,
started_at,
completed_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
session_id,
record.request_id,
record.plan_summary,
record.approval_status,
json.dumps(record.executed_capabilities, ensure_ascii=False),
record.status,
record.error,
record.total_latency_ms,
json.dumps(record.metadata, ensure_ascii=False),
record.started_at,
record.completed_at,
),
)
conn.execute(
"UPDATE sessions SET updated_at=? WHERE session_id=?",
(time.time(), session_id),
)
def _upsert_metadata(self, session_id: str, key: str, value: Any) -> None:
metadata = self._load_session_metadata_json(session_id)
metadata[key] = value
self._upsert_session_metadata_json(session_id, metadata)
with closing(self._connect()) as conn, conn:
conn.execute(
"""
INSERT INTO metadata(owner_type, owner_id, key, value_json, updated_at)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(owner_type, owner_id, key) DO UPDATE SET
value_json=excluded.value_json,
updated_at=excluded.updated_at
""",
(
"session",
session_id,
key,
json.dumps(value, ensure_ascii=False),
time.time(),
),
)
def _load_messages(self, session_id: str, max_history: int) -> List[ConversationTurn]:
with closing(self._connect()) as conn, conn:
rows = conn.execute(
"""
SELECT role, content, timestamp, metadata_json
FROM messages
WHERE session_id=?
ORDER BY id ASC
""",
(session_id,),
).fetchall()
turns = [
ConversationTurn(
role=row["role"],
content=row["content"],
timestamp=row["timestamp"],
metadata=json.loads(row["metadata_json"] or "{}"),
)
for row in rows
]
return turns[-max_history:]
def _load_tool_runs(self, session_id: str) -> List[ToolRunRecord]:
with closing(self._connect()) as conn, conn:
rows = conn.execute(
"""
SELECT tool, success, latency_ms, error, metadata_json, timestamp
, graph_run_request_id
FROM tool_runs
WHERE session_id=?
ORDER BY id ASC
""",
(session_id,),
).fetchall()
return [
ToolRunRecord(
tool=row["tool"],
graph_run_request_id=row["graph_run_request_id"],
success=bool(row["success"]),
latency_ms=row["latency_ms"],
error=row["error"],
metadata=json.loads(row["metadata_json"] or "{}"),
timestamp=row["timestamp"],
)
for row in rows
]
def _load_graph_runs(self, session_id: str) -> List[GraphRunRecord]:
with closing(self._connect()) as conn, conn:
rows = conn.execute(
"""
SELECT
request_id,
plan_summary,
approval_status,
executed_capabilities_json,
status,
error,
total_latency_ms,
metadata_json,
started_at,
completed_at
FROM graph_runs
WHERE session_id=?
ORDER BY id ASC
""",
(session_id,),
).fetchall()
return [
GraphRunRecord(
request_id=row["request_id"],
plan_summary=row["plan_summary"],
approval_status=row["approval_status"],
executed_capabilities=json.loads(row["executed_capabilities_json"] or "[]"),
status=row["status"],
error=row["error"],
total_latency_ms=row["total_latency_ms"],
metadata=json.loads(row["metadata_json"] or "{}"),
started_at=row["started_at"],
completed_at=row["completed_at"],
)
for row in rows
]
def _load_session_metadata(self, session_id: str) -> Optional[sqlite3.Row]:
with closing(self._connect()) as conn, conn:
return conn.execute(
"SELECT session_id, created_at, metadata_json FROM sessions WHERE session_id=?",
(session_id,),
).fetchone()
def _load_metadata_entries(self, owner_type: str, owner_id: str) -> Dict[str, Any]:
with closing(self._connect()) as conn, conn:
rows = conn.execute(
"""
SELECT key, value_json
FROM metadata
WHERE owner_type=? AND owner_id=?
ORDER BY key ASC
""",
(owner_type, owner_id),
).fetchall()
return {row["key"]: json.loads(row["value_json"] or "null") for row in rows}
def _build_context(self, session_id: str, max_history: int) -> Optional[SessionContext]:
row = self._load_session_metadata(session_id)
if row is None:
return None
metadata = json.loads(row["metadata_json"] or "{}")
metadata.update(self._load_metadata_entries("session", session_id))
return SessionContext(
session_id=session_id,
max_history=max_history,
conversations=self._load_messages(session_id, max_history),
tool_runs=self._load_tool_runs(session_id),
graph_runs=self._load_graph_runs(session_id),
metadata=metadata,
created_at=row["created_at"],
_persist_turn=lambda turn: self._append_turn(session_id, turn),
_persist_tool_run=lambda record: self._append_tool_run(session_id, record),
_persist_graph_run=lambda record: self._append_graph_run(session_id, record),
_persist_metadata=lambda key, value: self._upsert_metadata(session_id, key, value),
)
def get_or_create(
self,
session_id: Optional[str] = None,
max_history: Optional[int] = None,
) -> SessionContext:
history_limit = max_history or self._max_history
if session_id:
existing = self._build_context(session_id, history_limit)
if existing is not None:
return existing
sid = session_id or str(uuid.uuid4())
created_at = time.time()
self._ensure_session(sid, created_at=created_at)
logger.info(f"์ƒˆ ์„ธ์…˜ ์ƒ์„ฑ: {sid}")
return SessionContext(
session_id=sid,
max_history=history_limit,
created_at=created_at,
_persist_turn=lambda turn: self._append_turn(sid, turn),
_persist_tool_run=lambda record: self._append_tool_run(sid, record),
_persist_graph_run=lambda record: self._append_graph_run(sid, record),
_persist_metadata=lambda key, value: self._upsert_metadata(sid, key, value),
)
def get(self, session_id: str) -> Optional[SessionContext]:
return self._build_context(session_id, self._max_history)
def delete(self, session_id: str) -> bool:
with closing(self._connect()) as conn, conn:
deleted = conn.execute(
"DELETE FROM sessions WHERE session_id=?", (session_id,)
).rowcount
conn.execute("DELETE FROM messages WHERE session_id=?", (session_id,))
conn.execute("DELETE FROM tool_runs WHERE session_id=?", (session_id,))
conn.execute("DELETE FROM graph_runs WHERE session_id=?", (session_id,))
conn.execute(
"DELETE FROM metadata WHERE owner_type='session' AND owner_id=?",
(session_id,),
)
return bool(deleted)
@property
def count(self) -> int:
with closing(self._connect()) as conn, conn:
row = conn.execute("SELECT COUNT(*) AS cnt FROM sessions").fetchone()
return int(row["cnt"] if row else 0)
def cleanup_old_sessions(self, max_age_days: int) -> int:
"""sessions.updated_at ๊ธฐ์ค€์œผ๋กœ ์˜ค๋ž˜๋œ ์„ธ์…˜์„ ์‚ญ์ œํ•œ๋‹ค.
์ž๋™ ํ˜ธ์ถœ๋˜์ง€ ์•Š์œผ๋ฉฐ, ์šด์˜์ž๊ฐ€ ๋ช…์‹œ์ ์œผ๋กœ ํ˜ธ์ถœํ•ด์•ผ ํ•œ๋‹ค.
ON DELETE CASCADE๊ฐ€ ์„ค์ •๋œ ํ…Œ์ด๋ธ”(messages, tool_runs, graph_runs, metadata)์€
์„ธ์…˜ ์‚ญ์ œ ์‹œ ํ•จ๊ป˜ ์ •๋ฆฌ๋œ๋‹ค.
Parameters
----------
max_age_days : int
์ด ์ผ์ˆ˜๋ณด๋‹ค ์˜ค๋ž˜๋œ ์„ธ์…˜(updated_at ๊ธฐ์ค€)์„ ์‚ญ์ œํ•œ๋‹ค.
Returns
-------
int
์‚ญ์ œ๋œ ์„ธ์…˜ ์ˆ˜.
"""
cutoff = time.time() - max_age_days * 86400
with closing(self._connect()) as conn, conn:
# ์‚ญ์ œ ๋Œ€์ƒ session_id ๋ชฉ๋ก ๋จผ์ € ์กฐํšŒ (metadata ํ…Œ์ด๋ธ”์—๋Š” FK CASCADE๊ฐ€ ์—†์Œ)
cursor = conn.execute(
"SELECT session_id FROM sessions WHERE updated_at < ?",
(cutoff,),
)
expired_ids = [row["session_id"] for row in cursor.fetchall()]
# ์ฒญํฌ ๋‹จ์œ„๋กœ metadata ์‚ญ์ œ (SQLite MAX_VARIABLE_NUMBER ์ œํ•œ ๋ฐฉ์–ด)
_CHUNK_SIZE = 500
for i in range(0, len(expired_ids), _CHUNK_SIZE):
chunk = expired_ids[i : i + _CHUNK_SIZE]
placeholders = ",".join("?" * len(chunk))
conn.execute(
f"DELETE FROM metadata WHERE owner_type='session' AND owner_id IN ({placeholders})", # noqa: S608
chunk,
)
deleted = conn.execute("DELETE FROM sessions WHERE updated_at < ?", (cutoff,)).rowcount
if deleted:
logger.info(f"SessionStore: {deleted}๊ฐœ ์„ธ์…˜ ์ •๋ฆฌ (max_age_days={max_age_days})")
return deleted