"""Episodic SQLite storage.""" import json import sqlite3 import time import logging from typing import Dict, Any, List from api.deps import load_config, get_logger logger = get_logger("kapo.memory.episodic") class EpisodicDB: def __init__(self): cfg = load_config() self.db_path = cfg.get("DB_PATH") or "./episodic.db" self._init_db() def _init_db(self): conn = sqlite3.connect(self.db_path) cur = conn.cursor() cur.execute( """ CREATE TABLE IF NOT EXISTS experiences ( id INTEGER PRIMARY KEY AUTOINCREMENT, task TEXT, plan TEXT, tools_used TEXT, result TEXT, success INTEGER, timestamp TEXT ) """ ) conn.commit() conn.close() def insert_experience(self, task: str, plan: Dict[str, Any], tools_used: Dict[str, Any], result: Dict[str, Any], success: int): conn = sqlite3.connect(self.db_path) cur = conn.cursor() cur.execute( """INSERT INTO experiences(task, plan, tools_used, result, success, timestamp) VALUES(?,?,?,?,?,?)""", (task, json.dumps(plan), json.dumps(tools_used), json.dumps(result), success, time.strftime("%Y-%m-%dT%H:%M:%S")), ) conn.commit() conn.close() def list_recent(self, limit: int = 20) -> List[Dict[str, Any]]: conn = sqlite3.connect(self.db_path) cur = conn.cursor() cur.execute("SELECT task, plan, tools_used, result, success, timestamp FROM experiences ORDER BY id DESC LIMIT ?", (limit,)) rows = cur.fetchall() conn.close() out = [] for r in rows: out.append({ "task": r[0], "plan": json.loads(r[1]), "tools_used": json.loads(r[2]), "result": json.loads(r[3]), "success": r[4], "timestamp": r[5], }) return out def search_similar(self, text: str, top_k: int = 3): """??? ?????? ??? embeddings ?????? (?????? ?????).""" try: from sentence_transformers import SentenceTransformer cfg = load_config() model = cfg.get("EMBED_MODEL") or "sentence-transformers/all-MiniLM-L6-v2" embedder = SentenceTransformer(model) records = self.list_recent(limit=200) if not records: return [] texts = [r.get("task", "") for r in records] vectors = embedder.encode(texts, show_progress_bar=False) qv = embedder.encode([text])[0] # cosine similarity def cos(a, b): import math dot = sum(x*y for x, y in zip(a, b)) na = math.sqrt(sum(x*x for x in a)) nb = math.sqrt(sum(x*x for x in b)) return dot / (na*nb + 1e-9) scored = [(cos(qv, v), r) for v, r in zip(vectors, records)] scored.sort(key=lambda x: x[0], reverse=True) return [r for _, r in scored[:top_k]] except Exception: logger.exception("Similarity search failed") return []