#!/usr/bin/env python3 """ 虫群记忆系统 — 统一记忆接口 三层记忆架构: 1. QA缓存 — 精确匹配(0ms) 2. 海马区 — 语义检索(模糊匹配) 3. 长期记忆 — 持久化存储 来源: SwarmChat qa_cache + hippocampus + VectorMemoryStore """ import os import json import time import pickle import numpy as np from typing import Dict, List, Optional, Tuple class QACache: """QA精确缓存 — 精确匹配0ms响应""" def __init__(self, cache_path: str = None): self.cache: Dict[str, str] = {} self.cache_path = cache_path self._dirty = False self._save_interval = 5 self._hits = 0 if cache_path and os.path.exists(cache_path): with open(cache_path, 'r', encoding='utf-8') as f: self.cache = json.load(f) def lookup(self, question: str) -> Optional[str]: """精确查找""" answer = self.cache.get(question) if answer: self._hits += 1 return answer def store(self, question: str, answer: str): """存储QA对""" self.cache[question] = answer self._dirty = True self._auto_save() def _auto_save(self): if self._dirty and self.cache_path: with open(self.cache_path, 'w', encoding='utf-8') as f: json.dump(self.cache, f, ensure_ascii=False, indent=2) self._dirty = False def stats(self) -> Dict: return {'total': len(self.cache), 'hits': self._hits} class FuzzyMatcher: """模糊匹配 — 基于词向量语义相似度""" def __init__(self, threshold: float = 0.79): self.threshold = threshold self._entries: List[Tuple[str, str, np.ndarray]] = [] # (question, answer, normed_vec) self._vecs75 = None self._w2i = None def load_vectors(self, vecs75: np.ndarray, words: List[str]): """加载词向量""" norms = np.linalg.norm(vecs75, axis=1, keepdims=True) norms[norms < 1e-8] = 1 self._vecs75 = vecs75 / norms self._w2i = {w: i for i, w in enumerate(words)} def _text_to_vec(self, text: str) -> Optional[np.ndarray]: """文本→平均向量""" if self._vecs75 is None: return None vecs = [] for ch in text: if ch in self._w2i: vecs.append(self._vecs75[self._w2i[ch]]) if not vecs: return None avg = np.mean(vecs, axis=0) norm = np.linalg.norm(avg) return avg / max(norm, 1e-8) def add_entry(self, question: str, answer: str): """添加模糊匹配条目""" vec = self._text_to_vec(question) if vec is not None: self._entries.append((question, answer, vec)) def lookup(self, question: str) -> Optional[Tuple[str, float]]: """模糊查找,返回(answer, similarity)""" qvec = self._text_to_vec(question) if qvec is None or not self._entries: return None best_sim = 0 best_answer = None for _, answer, evec in self._entries: sim = float(np.dot(qvec, evec)) if sim > best_sim: best_sim = sim best_answer = answer if best_sim >= self.threshold: return best_answer, best_sim return None class Hippocampus: """海马区 — 语义向量记忆检索""" def __init__(self, threshold: float = 0.75, persist_path: str = None): self.threshold = threshold self.persist_path = persist_path self._memories: List[Dict] = [] # [{text, vec, role}] self._write_count = 0 self._save_every = 5 def store(self, text: str, vec: np.ndarray, role: str = 'learned'): """存入海马区""" norm = np.linalg.norm(vec) if norm < 1e-8: return self._memories.append({ 'text': text, 'vec': vec / norm, 'role': role, 'time': time.time() }) self._write_count += 1 self._auto_persist() def recall(self, query_vec: np.ndarray, top_k: int = 3) -> Optional[str]: """语义检索""" if not self._memories: return None norm = np.linalg.norm(query_vec) if norm < 1e-8: return None qvec = query_vec / norm best_sim = 0 best_text = None for mem in self._memories: sim = float(np.dot(qvec, mem['vec'])) if sim > best_sim: best_sim = sim best_text = mem['text'] if best_sim >= self.threshold: return best_text return None def _auto_persist(self): if self.persist_path and self._write_count % self._save_every == 0: self.save() def save(self): if not self.persist_path: return data = [] for m in self._memories: data.append({'text': m['text'], 'vec': m['vec'].tolist(), 'role': m['role'], 'time': m['time']}) with open(self.persist_path, 'wb') as f: pickle.dump(data, f) def load(self): if not self.persist_path or not os.path.exists(self.persist_path): return with open(self.persist_path, 'rb') as f: data = pickle.load(f) self._memories = [] for d in data: self._memories.append({ 'text': d['text'], 'vec': np.array(d['vec'], dtype=np.float32), 'role': d.get('role', 'unknown'), 'time': d.get('time', 0) }) def stats(self) -> Dict: return {'total': len(self._memories), 'writes': self._write_count} class SwarmMemory: """统一记忆系统 — 三层架构入口""" def __init__(self, data_dir: str = None): self.data_dir = data_dir or os.path.expanduser('~/swarm_product/data') os.makedirs(self.data_dir, exist_ok=True) # 三层记忆 self.qa_cache = QACache(os.path.join(self.data_dir, 'qa_cache.json')) self.fuzzy = FuzzyMatcher(threshold=0.79) self.hippocampus = Hippocampus( threshold=0.75, persist_path=os.path.join(self.data_dir, 'hippocampus.pkl') ) self.hippocampus.load() self._vecs75 = None self._words = None def load_vectors(self, pkl_path: str): """加载vecs75词向量""" with open(pkl_path, 'rb') as f: data = pickle.load(f) self._words = data['words'] self._vecs75 = data['vecs75'] self.fuzzy.load_vectors(self._vecs75, self._words) # 重建模糊索引 for q, a in self.qa_cache.cache.items(): self.fuzzy.add_entry(q, a) def lookup(self, question: str) -> Tuple[Optional[str], str, float]: """ 三层查找 Returns: (answer, layer, confidence) layer: 'cache'/'fuzzy'/'hippocampus'/None """ # 第一层: 精确缓存 answer = self.qa_cache.lookup(question) if answer: return answer, 'cache', 1.0 # 第二层: 模糊匹配 result = self.fuzzy.lookup(question) if result: answer, sim = result return answer, 'fuzzy', sim # 第三层: 海马区 if self._vecs75 is not None: qvec = self._text_to_vec(question) if qvec is not None: answer = self.hippocampus.recall(qvec) if answer: return answer, 'hippocampus', 0.75 return None, None, 0.0 def store(self, question: str, answer: str, vec: np.ndarray = None): """存储到所有层""" self.qa_cache.store(question, answer) self.fuzzy.add_entry(question, answer) if vec is not None: self.hippocampus.store(answer, vec, role='learned') def _text_to_vec(self, text: str) -> Optional[np.ndarray]: if self._vecs75 is None: return None w2i = {w: i for i, w in enumerate(self._words)} vecs = [] for ch in text: if ch in w2i: vecs.append(self._vecs75[w2i[ch]]) if not vecs: return None avg = np.mean(vecs, axis=0) return avg def stats(self) -> Dict: return { 'qa_cache': self.qa_cache.stats(), 'hippocampus': self.hippocampus.stats(), 'fuzzy_entries': len(self.fuzzy._entries), 'vectors_loaded': self._vecs75 is not None }