swarm-chat / src /memory /memory_system.py
lk080424
虫巢-200M训练部署: npz+json替代pkl, 三区循环训练, 4454QA数据
358ab64
Raw
History Blame Contribute Delete
8.79 kB
#!/usr/bin/env python3
"""
虫群记忆系统 — 统一记忆接口
三层记忆架构:
1. QA缓存 — 精确匹配(0ms)
2. 海马区 — 语义检索(模糊匹配)
3. 长期记忆 — 持久化存储
来源: SwarmChat qa_cache + hippocampus + VectorMemoryStore
"""
import os
import json
import time
import pickle
import numpy as np
from typing import Dict, List, Optional, Tuple
class QACache:
"""QA精确缓存 — 精确匹配0ms响应"""
def __init__(self, cache_path: str = None):
self.cache: Dict[str, str] = {}
self.cache_path = cache_path
self._dirty = False
self._save_interval = 5
self._hits = 0
if cache_path and os.path.exists(cache_path):
with open(cache_path, 'r', encoding='utf-8') as f:
self.cache = json.load(f)
def lookup(self, question: str) -> Optional[str]:
"""精确查找"""
answer = self.cache.get(question)
if answer:
self._hits += 1
return answer
def store(self, question: str, answer: str):
"""存储QA对"""
self.cache[question] = answer
self._dirty = True
self._auto_save()
def _auto_save(self):
if self._dirty and self.cache_path:
with open(self.cache_path, 'w', encoding='utf-8') as f:
json.dump(self.cache, f, ensure_ascii=False, indent=2)
self._dirty = False
def stats(self) -> Dict:
return {'total': len(self.cache), 'hits': self._hits}
class FuzzyMatcher:
"""模糊匹配 — 基于词向量语义相似度"""
def __init__(self, threshold: float = 0.79):
self.threshold = threshold
self._entries: List[Tuple[str, str, np.ndarray]] = [] # (question, answer, normed_vec)
self._vecs75 = None
self._w2i = None
def load_vectors(self, vecs75: np.ndarray, words: List[str]):
"""加载词向量"""
norms = np.linalg.norm(vecs75, axis=1, keepdims=True)
norms[norms < 1e-8] = 1
self._vecs75 = vecs75 / norms
self._w2i = {w: i for i, w in enumerate(words)}
def _text_to_vec(self, text: str) -> Optional[np.ndarray]:
"""文本→平均向量"""
if self._vecs75 is None:
return None
vecs = []
for ch in text:
if ch in self._w2i:
vecs.append(self._vecs75[self._w2i[ch]])
if not vecs:
return None
avg = np.mean(vecs, axis=0)
norm = np.linalg.norm(avg)
return avg / max(norm, 1e-8)
def add_entry(self, question: str, answer: str):
"""添加模糊匹配条目"""
vec = self._text_to_vec(question)
if vec is not None:
self._entries.append((question, answer, vec))
def lookup(self, question: str) -> Optional[Tuple[str, float]]:
"""模糊查找,返回(answer, similarity)"""
qvec = self._text_to_vec(question)
if qvec is None or not self._entries:
return None
best_sim = 0
best_answer = None
for _, answer, evec in self._entries:
sim = float(np.dot(qvec, evec))
if sim > best_sim:
best_sim = sim
best_answer = answer
if best_sim >= self.threshold:
return best_answer, best_sim
return None
class Hippocampus:
"""海马区 — 语义向量记忆检索"""
def __init__(self, threshold: float = 0.75, persist_path: str = None):
self.threshold = threshold
self.persist_path = persist_path
self._memories: List[Dict] = [] # [{text, vec, role}]
self._write_count = 0
self._save_every = 5
def store(self, text: str, vec: np.ndarray, role: str = 'learned'):
"""存入海马区"""
norm = np.linalg.norm(vec)
if norm < 1e-8:
return
self._memories.append({
'text': text,
'vec': vec / norm,
'role': role,
'time': time.time()
})
self._write_count += 1
self._auto_persist()
def recall(self, query_vec: np.ndarray, top_k: int = 3) -> Optional[str]:
"""语义检索"""
if not self._memories:
return None
norm = np.linalg.norm(query_vec)
if norm < 1e-8:
return None
qvec = query_vec / norm
best_sim = 0
best_text = None
for mem in self._memories:
sim = float(np.dot(qvec, mem['vec']))
if sim > best_sim:
best_sim = sim
best_text = mem['text']
if best_sim >= self.threshold:
return best_text
return None
def _auto_persist(self):
if self.persist_path and self._write_count % self._save_every == 0:
self.save()
def save(self):
if not self.persist_path:
return
data = []
for m in self._memories:
data.append({'text': m['text'], 'vec': m['vec'].tolist(),
'role': m['role'], 'time': m['time']})
with open(self.persist_path, 'wb') as f:
pickle.dump(data, f)
def load(self):
if not self.persist_path or not os.path.exists(self.persist_path):
return
with open(self.persist_path, 'rb') as f:
data = pickle.load(f)
self._memories = []
for d in data:
self._memories.append({
'text': d['text'],
'vec': np.array(d['vec'], dtype=np.float32),
'role': d.get('role', 'unknown'),
'time': d.get('time', 0)
})
def stats(self) -> Dict:
return {'total': len(self._memories), 'writes': self._write_count}
class SwarmMemory:
"""统一记忆系统 — 三层架构入口"""
def __init__(self, data_dir: str = None):
self.data_dir = data_dir or os.path.expanduser('~/swarm_product/data')
os.makedirs(self.data_dir, exist_ok=True)
# 三层记忆
self.qa_cache = QACache(os.path.join(self.data_dir, 'qa_cache.json'))
self.fuzzy = FuzzyMatcher(threshold=0.79)
self.hippocampus = Hippocampus(
threshold=0.75,
persist_path=os.path.join(self.data_dir, 'hippocampus.pkl')
)
self.hippocampus.load()
self._vecs75 = None
self._words = None
def load_vectors(self, pkl_path: str):
"""加载vecs75词向量"""
with open(pkl_path, 'rb') as f:
data = pickle.load(f)
self._words = data['words']
self._vecs75 = data['vecs75']
self.fuzzy.load_vectors(self._vecs75, self._words)
# 重建模糊索引
for q, a in self.qa_cache.cache.items():
self.fuzzy.add_entry(q, a)
def lookup(self, question: str) -> Tuple[Optional[str], str, float]:
"""
三层查找
Returns: (answer, layer, confidence)
layer: 'cache'/'fuzzy'/'hippocampus'/None
"""
# 第一层: 精确缓存
answer = self.qa_cache.lookup(question)
if answer:
return answer, 'cache', 1.0
# 第二层: 模糊匹配
result = self.fuzzy.lookup(question)
if result:
answer, sim = result
return answer, 'fuzzy', sim
# 第三层: 海马区
if self._vecs75 is not None:
qvec = self._text_to_vec(question)
if qvec is not None:
answer = self.hippocampus.recall(qvec)
if answer:
return answer, 'hippocampus', 0.75
return None, None, 0.0
def store(self, question: str, answer: str, vec: np.ndarray = None):
"""存储到所有层"""
self.qa_cache.store(question, answer)
self.fuzzy.add_entry(question, answer)
if vec is not None:
self.hippocampus.store(answer, vec, role='learned')
def _text_to_vec(self, text: str) -> Optional[np.ndarray]:
if self._vecs75 is None:
return None
w2i = {w: i for i, w in enumerate(self._words)}
vecs = []
for ch in text:
if ch in w2i:
vecs.append(self._vecs75[w2i[ch]])
if not vecs:
return None
avg = np.mean(vecs, axis=0)
return avg
def stats(self) -> Dict:
return {
'qa_cache': self.qa_cache.stats(),
'hippocampus': self.hippocampus.stats(),
'fuzzy_entries': len(self.fuzzy._entries),
'vectors_loaded': self._vecs75 is not None
}