Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| 虫群记忆系统 — 统一记忆接口 | |
| 三层记忆架构: | |
| 1. QA缓存 — 精确匹配(0ms) | |
| 2. 海马区 — 语义检索(模糊匹配) | |
| 3. 长期记忆 — 持久化存储 | |
| 来源: SwarmChat qa_cache + hippocampus + VectorMemoryStore | |
| """ | |
| import os | |
| import json | |
| import time | |
| import pickle | |
| import numpy as np | |
| from typing import Dict, List, Optional, Tuple | |
| class QACache: | |
| """QA精确缓存 — 精确匹配0ms响应""" | |
| def __init__(self, cache_path: str = None): | |
| self.cache: Dict[str, str] = {} | |
| self.cache_path = cache_path | |
| self._dirty = False | |
| self._save_interval = 5 | |
| self._hits = 0 | |
| if cache_path and os.path.exists(cache_path): | |
| with open(cache_path, 'r', encoding='utf-8') as f: | |
| self.cache = json.load(f) | |
| def lookup(self, question: str) -> Optional[str]: | |
| """精确查找""" | |
| answer = self.cache.get(question) | |
| if answer: | |
| self._hits += 1 | |
| return answer | |
| def store(self, question: str, answer: str): | |
| """存储QA对""" | |
| self.cache[question] = answer | |
| self._dirty = True | |
| self._auto_save() | |
| def _auto_save(self): | |
| if self._dirty and self.cache_path: | |
| with open(self.cache_path, 'w', encoding='utf-8') as f: | |
| json.dump(self.cache, f, ensure_ascii=False, indent=2) | |
| self._dirty = False | |
| def stats(self) -> Dict: | |
| return {'total': len(self.cache), 'hits': self._hits} | |
| class FuzzyMatcher: | |
| """模糊匹配 — 基于词向量语义相似度""" | |
| def __init__(self, threshold: float = 0.79): | |
| self.threshold = threshold | |
| self._entries: List[Tuple[str, str, np.ndarray]] = [] # (question, answer, normed_vec) | |
| self._vecs75 = None | |
| self._w2i = None | |
| def load_vectors(self, vecs75: np.ndarray, words: List[str]): | |
| """加载词向量""" | |
| norms = np.linalg.norm(vecs75, axis=1, keepdims=True) | |
| norms[norms < 1e-8] = 1 | |
| self._vecs75 = vecs75 / norms | |
| self._w2i = {w: i for i, w in enumerate(words)} | |
| def _text_to_vec(self, text: str) -> Optional[np.ndarray]: | |
| """文本→平均向量""" | |
| if self._vecs75 is None: | |
| return None | |
| vecs = [] | |
| for ch in text: | |
| if ch in self._w2i: | |
| vecs.append(self._vecs75[self._w2i[ch]]) | |
| if not vecs: | |
| return None | |
| avg = np.mean(vecs, axis=0) | |
| norm = np.linalg.norm(avg) | |
| return avg / max(norm, 1e-8) | |
| def add_entry(self, question: str, answer: str): | |
| """添加模糊匹配条目""" | |
| vec = self._text_to_vec(question) | |
| if vec is not None: | |
| self._entries.append((question, answer, vec)) | |
| def lookup(self, question: str) -> Optional[Tuple[str, float]]: | |
| """模糊查找,返回(answer, similarity)""" | |
| qvec = self._text_to_vec(question) | |
| if qvec is None or not self._entries: | |
| return None | |
| best_sim = 0 | |
| best_answer = None | |
| for _, answer, evec in self._entries: | |
| sim = float(np.dot(qvec, evec)) | |
| if sim > best_sim: | |
| best_sim = sim | |
| best_answer = answer | |
| if best_sim >= self.threshold: | |
| return best_answer, best_sim | |
| return None | |
| class Hippocampus: | |
| """海马区 — 语义向量记忆检索""" | |
| def __init__(self, threshold: float = 0.75, persist_path: str = None): | |
| self.threshold = threshold | |
| self.persist_path = persist_path | |
| self._memories: List[Dict] = [] # [{text, vec, role}] | |
| self._write_count = 0 | |
| self._save_every = 5 | |
| def store(self, text: str, vec: np.ndarray, role: str = 'learned'): | |
| """存入海马区""" | |
| norm = np.linalg.norm(vec) | |
| if norm < 1e-8: | |
| return | |
| self._memories.append({ | |
| 'text': text, | |
| 'vec': vec / norm, | |
| 'role': role, | |
| 'time': time.time() | |
| }) | |
| self._write_count += 1 | |
| self._auto_persist() | |
| def recall(self, query_vec: np.ndarray, top_k: int = 3) -> Optional[str]: | |
| """语义检索""" | |
| if not self._memories: | |
| return None | |
| norm = np.linalg.norm(query_vec) | |
| if norm < 1e-8: | |
| return None | |
| qvec = query_vec / norm | |
| best_sim = 0 | |
| best_text = None | |
| for mem in self._memories: | |
| sim = float(np.dot(qvec, mem['vec'])) | |
| if sim > best_sim: | |
| best_sim = sim | |
| best_text = mem['text'] | |
| if best_sim >= self.threshold: | |
| return best_text | |
| return None | |
| def _auto_persist(self): | |
| if self.persist_path and self._write_count % self._save_every == 0: | |
| self.save() | |
| def save(self): | |
| if not self.persist_path: | |
| return | |
| data = [] | |
| for m in self._memories: | |
| data.append({'text': m['text'], 'vec': m['vec'].tolist(), | |
| 'role': m['role'], 'time': m['time']}) | |
| with open(self.persist_path, 'wb') as f: | |
| pickle.dump(data, f) | |
| def load(self): | |
| if not self.persist_path or not os.path.exists(self.persist_path): | |
| return | |
| with open(self.persist_path, 'rb') as f: | |
| data = pickle.load(f) | |
| self._memories = [] | |
| for d in data: | |
| self._memories.append({ | |
| 'text': d['text'], | |
| 'vec': np.array(d['vec'], dtype=np.float32), | |
| 'role': d.get('role', 'unknown'), | |
| 'time': d.get('time', 0) | |
| }) | |
| def stats(self) -> Dict: | |
| return {'total': len(self._memories), 'writes': self._write_count} | |
| class SwarmMemory: | |
| """统一记忆系统 — 三层架构入口""" | |
| def __init__(self, data_dir: str = None): | |
| self.data_dir = data_dir or os.path.expanduser('~/swarm_product/data') | |
| os.makedirs(self.data_dir, exist_ok=True) | |
| # 三层记忆 | |
| self.qa_cache = QACache(os.path.join(self.data_dir, 'qa_cache.json')) | |
| self.fuzzy = FuzzyMatcher(threshold=0.79) | |
| self.hippocampus = Hippocampus( | |
| threshold=0.75, | |
| persist_path=os.path.join(self.data_dir, 'hippocampus.pkl') | |
| ) | |
| self.hippocampus.load() | |
| self._vecs75 = None | |
| self._words = None | |
| def load_vectors(self, pkl_path: str): | |
| """加载vecs75词向量""" | |
| with open(pkl_path, 'rb') as f: | |
| data = pickle.load(f) | |
| self._words = data['words'] | |
| self._vecs75 = data['vecs75'] | |
| self.fuzzy.load_vectors(self._vecs75, self._words) | |
| # 重建模糊索引 | |
| for q, a in self.qa_cache.cache.items(): | |
| self.fuzzy.add_entry(q, a) | |
| def lookup(self, question: str) -> Tuple[Optional[str], str, float]: | |
| """ | |
| 三层查找 | |
| Returns: (answer, layer, confidence) | |
| layer: 'cache'/'fuzzy'/'hippocampus'/None | |
| """ | |
| # 第一层: 精确缓存 | |
| answer = self.qa_cache.lookup(question) | |
| if answer: | |
| return answer, 'cache', 1.0 | |
| # 第二层: 模糊匹配 | |
| result = self.fuzzy.lookup(question) | |
| if result: | |
| answer, sim = result | |
| return answer, 'fuzzy', sim | |
| # 第三层: 海马区 | |
| if self._vecs75 is not None: | |
| qvec = self._text_to_vec(question) | |
| if qvec is not None: | |
| answer = self.hippocampus.recall(qvec) | |
| if answer: | |
| return answer, 'hippocampus', 0.75 | |
| return None, None, 0.0 | |
| def store(self, question: str, answer: str, vec: np.ndarray = None): | |
| """存储到所有层""" | |
| self.qa_cache.store(question, answer) | |
| self.fuzzy.add_entry(question, answer) | |
| if vec is not None: | |
| self.hippocampus.store(answer, vec, role='learned') | |
| def _text_to_vec(self, text: str) -> Optional[np.ndarray]: | |
| if self._vecs75 is None: | |
| return None | |
| w2i = {w: i for i, w in enumerate(self._words)} | |
| vecs = [] | |
| for ch in text: | |
| if ch in w2i: | |
| vecs.append(self._vecs75[w2i[ch]]) | |
| if not vecs: | |
| return None | |
| avg = np.mean(vecs, axis=0) | |
| return avg | |
| def stats(self) -> Dict: | |
| return { | |
| 'qa_cache': self.qa_cache.stats(), | |
| 'hippocampus': self.hippocampus.stats(), | |
| 'fuzzy_entries': len(self.fuzzy._entries), | |
| 'vectors_loaded': self._vecs75 is not None | |
| } | |