Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| 虫群v7 — 记忆模型(Memory Model) | |
| 参数化个人记忆系统:将交互记录编码为可检索的结构 | |
| 设计思路: | |
| - 传统方案:对话存数据库,检索靠关键词匹配 | |
| - 虫群方案:记忆以向量索引+时序结构存储,检索更精准 | |
| - 未来方向:将记忆编码为模型参数(模型即数据库) | |
| - 当前实现:轻量级向量索引 + 时序衰减 + 关键词增强 | |
| """ | |
| import hashlib | |
| import json | |
| import logging | |
| import math | |
| import os | |
| import re | |
| import time | |
| from datetime import datetime | |
| from typing import Dict, List, Optional, Tuple | |
| logger = logging.getLogger(__name__) | |
| # ============================================================ | |
| # 记忆条目 | |
| # ============================================================ | |
| class MemoryEntry: | |
| """单条记忆""" | |
| def __init__(self, user_message: str, ai_response: str, | |
| intent: str = "", route: str = "", | |
| timestamp: str = ""): | |
| self.entry_id = hashlib.md5( | |
| f"{user_message}{ai_response}{time.time()}".encode() | |
| ).hexdigest()[:12] | |
| self.user_message = user_message | |
| self.ai_response = ai_response | |
| self.intent = intent | |
| self.route = route | |
| self.timestamp = timestamp or datetime.now().isoformat() | |
| self.created_at = time.time() | |
| # 关键词提取(简易版,替代分词) | |
| self.keywords = self._extract_keywords(user_message) | |
| # 访问计数(用于衰减/强化) | |
| self.access_count = 0 | |
| self.last_access = self.created_at | |
| def _extract_keywords(self, text: str) -> List[str]: | |
| """简易中文关键词提取:去停用词 + 长词优先""" | |
| # 去标点 | |
| cleaned = re.sub(r'[,。!?、;:""''()\s]', ' ', text) | |
| # 按空格和常见分隔符切分 | |
| words = re.split(r'[\s,.\-!?;:]+', cleaned) | |
| # 过滤短词 | |
| keywords = [w for w in words if len(w) >= 2] | |
| return keywords[:20] # 最多20个 | |
| def to_dict(self) -> Dict: | |
| return { | |
| "entry_id": self.entry_id, | |
| "user_message": self.user_message[:100], | |
| "ai_response": self.ai_response[:100], | |
| "intent": self.intent, | |
| "keywords": self.keywords[:5], | |
| "timestamp": self.timestamp, | |
| "access_count": self.access_count, | |
| } | |
| # ============================================================ | |
| # 记忆模型核心 | |
| # ============================================================ | |
| class MemoryModel: | |
| """ | |
| 记忆模型 — 参数化个人记忆 | |
| 特性: | |
| 1. retrieve_context(): 获取上下文摘要(给元模型用) | |
| 2. retrieve(): 精确检索记忆条目 | |
| 3. store_interaction(): 存储新交互 | |
| 4. 时间衰减: 越早的记忆权重越低 | |
| 5. 关键词增强: 匹配关键词越多权重越高 | |
| """ | |
| # 记忆存储路径 | |
| DATA_DIR = "/home/admin/swarm/data/memory" | |
| MAX_ENTRIES = 5000 # 最大记忆条数 | |
| DECAY_HALF_LIFE = 86400 * 7 # 衰减半衰期: 7天 | |
| def __init__(self, user_id: str = "default"): | |
| self.user_id = user_id | |
| self._entries: List[MemoryEntry] = [] | |
| self._keyword_index: Dict[str, List[str]] = {} # keyword -> [entry_ids] | |
| self._intent_index: Dict[str, List[str]] = {} # intent -> [entry_ids] | |
| self._loaded = False | |
| # 统计 | |
| self._store_count = 0 | |
| self._retrieve_count = 0 | |
| self._hit_count = 0 | |
| # 延迟加载 | |
| self._load() | |
| # ============================================================ | |
| # 核心接口 | |
| # ============================================================ | |
| def retrieve_context(self, message: str) -> str: | |
| """ | |
| 获取与当前消息相关的记忆上下文文本 | |
| 返回格式: "之前你问过XXX,我回答了YYY" | |
| """ | |
| self._retrieve_count += 1 | |
| entries = self.retrieve(message, top_k=3) | |
| if not entries: | |
| return "" | |
| self._hit_count += 1 | |
| parts = [] | |
| for e in entries: | |
| msg = e.get("user_message", "") if isinstance(e, dict) else e.user_message | |
| resp = e.get("ai_response", "") if isinstance(e, dict) else e.ai_response | |
| parts.append(f"之前你问过「{msg[:50]}」,回答是「{resp[:50]}」") | |
| return ";".join(parts) | |
| def retrieve(self, query: str, top_k: int = 5) -> List[Dict]: | |
| """ | |
| 检索相关记忆条目 | |
| 算法: 关键词匹配 + 时间衰减 + 访问频次 | |
| """ | |
| self._retrieve_count += 1 | |
| if not self._entries: | |
| return [] | |
| # 提取查询关键词 | |
| query_words = set() | |
| cleaned = re.sub(r'[,。!?、;:""''()\s]', ' ', query) | |
| for w in re.split(r'[\s,.\-!?;:]+', cleaned): | |
| if len(w) >= 2: | |
| query_words.add(w) | |
| # 计算每条记忆的得分 | |
| scored: List[Tuple[float, MemoryEntry]] = [] | |
| now = time.time() | |
| for entry in self._entries: | |
| score = 0.0 | |
| # 关键词匹配分 (0~0.5) | |
| overlap = query_words & set(entry.keywords) | |
| if overlap: | |
| match_ratio = len(overlap) / max(len(query_words), 1) | |
| score += match_ratio * 0.5 | |
| # 时间衰减分 (0~0.3): 越新越高 | |
| age_seconds = now - entry.created_at | |
| decay = math.exp(-0.693 * age_seconds / self.DECAY_HALF_LIFE) | |
| score += decay * 0.3 | |
| # 访问频次分 (0~0.2): 常访问的记忆更重要 | |
| freq_score = min(entry.access_count / 10.0, 1.0) | |
| score += freq_score * 0.2 | |
| if score > 0.05: # 最低阈值 | |
| scored.append((score, entry)) | |
| # 排序取top_k | |
| scored.sort(key=lambda x: x[0], reverse=True) | |
| results = [] | |
| for score, entry in scored[:top_k]: | |
| entry.access_count += 1 | |
| entry.last_access = now | |
| d = entry.to_dict() | |
| d["_score"] = round(score, 3) | |
| results.append(d) | |
| if results: | |
| self._hit_count += 1 | |
| return results | |
| def store_interaction(self, message: str, result: str, | |
| analysis=None) -> str: | |
| """ | |
| 存储一条交互记忆 | |
| Args: | |
| message: 用户消息 | |
| result: AI回复 | |
| analysis: TaskAnalysis对象(可选) | |
| Returns: | |
| entry_id | |
| """ | |
| self._store_count += 1 | |
| intent = analysis.intent if analysis else "" | |
| route = analysis.route if analysis else "" | |
| entry = MemoryEntry( | |
| user_message=message, | |
| ai_response=result, | |
| intent=intent, | |
| route=route, | |
| ) | |
| self._entries.append(entry) | |
| # 更新关键词索引 | |
| for kw in entry.keywords: | |
| if kw not in self._keyword_index: | |
| self._keyword_index[kw] = [] | |
| self._keyword_index[kw].append(entry.entry_id) | |
| # 更新意图索引 | |
| if intent: | |
| if intent not in self._intent_index: | |
| self._intent_index[intent] = [] | |
| self._intent_index[intent].append(entry.entry_id) | |
| # 超过上限时淘汰最旧且最少访问的 | |
| if len(self._entries) > self.MAX_ENTRIES: | |
| self._evict() | |
| # 定期持久化(每10次存储) | |
| if self._store_count % 10 == 0: | |
| self._save() | |
| return entry.entry_id | |
| # ============================================================ | |
| # 淘汰策略 | |
| # ============================================================ | |
| def _evict(self, count: int = 100): | |
| """淘汰最旧且最少访问的记忆""" | |
| now = time.time() | |
| def evict_score(entry: MemoryEntry) -> float: | |
| """得分越低越先淘汰""" | |
| age_days = (now - entry.created_at) / 86400 | |
| freq = entry.access_count | |
| # 旧 + 低频 = 低分 | |
| return freq / (1 + age_days) | |
| self._entries.sort(key=evict_score, reverse=True) | |
| removed = self._entries[self.MAX_ENTRIES - count:] | |
| self._entries = self._entries[:self.MAX_ENTRIES - count] | |
| # 重建索引 | |
| self._rebuild_index() | |
| logger.debug(f"记忆淘汰: 移除{len(removed)}条") | |
| def _rebuild_index(self): | |
| """重建关键词索引""" | |
| self._keyword_index.clear() | |
| self._intent_index.clear() | |
| for entry in self._entries: | |
| for kw in entry.keywords: | |
| if kw not in self._keyword_index: | |
| self._keyword_index[kw] = [] | |
| self._keyword_index[kw].append(entry.entry_id) | |
| if entry.intent: | |
| if entry.intent not in self._intent_index: | |
| self._intent_index[entry.intent] = [] | |
| self._intent_index[entry.intent].append(entry.entry_id) | |
| # ============================================================ | |
| # 持久化 | |
| # ============================================================ | |
| def _load(self): | |
| """从磁盘加载记忆""" | |
| if self._loaded: | |
| return | |
| filepath = os.path.join(self.DATA_DIR, f"{self.user_id}.json") | |
| if not os.path.exists(filepath): | |
| self._loaded = True | |
| return | |
| try: | |
| with open(filepath, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| for item in data.get("entries", []): | |
| entry = MemoryEntry( | |
| user_message=item.get("user_message", ""), | |
| ai_response=item.get("ai_response", ""), | |
| intent=item.get("intent", ""), | |
| route=item.get("route", ""), | |
| timestamp=item.get("timestamp", ""), | |
| ) | |
| entry.access_count = item.get("access_count", 0) | |
| entry.created_at = item.get("created_at", time.time()) | |
| self._entries.append(entry) | |
| self._rebuild_index() | |
| logger.info(f"记忆加载: {len(self._entries)}条 (用户: {self.user_id})") | |
| except Exception as e: | |
| logger.warning(f"记忆加载失败: {e}") | |
| self._loaded = True | |
| def _save(self): | |
| """持久化记忆到磁盘""" | |
| os.makedirs(self.DATA_DIR, exist_ok=True) | |
| filepath = os.path.join(self.DATA_DIR, f"{self.user_id}.json") | |
| try: | |
| data = { | |
| "user_id": self.user_id, | |
| "version": 1, | |
| "entries": [ | |
| { | |
| "entry_id": e.entry_id, | |
| "user_message": e.user_message, | |
| "ai_response": e.ai_response, | |
| "intent": e.intent, | |
| "route": e.route, | |
| "timestamp": e.timestamp, | |
| "created_at": e.created_at, | |
| "access_count": e.access_count, | |
| "keywords": e.keywords, | |
| } | |
| for e in self._entries[-self.MAX_ENTRIES:] | |
| ] | |
| } | |
| with open(filepath, "w", encoding="utf-8") as f: | |
| json.dump(data, f, ensure_ascii=False, indent=2) | |
| logger.debug(f"记忆保存: {len(self._entries)}条") | |
| except Exception as e: | |
| logger.warning(f"记忆保存失败: {e}") | |
| # ============================================================ | |
| # 统计 | |
| # ============================================================ | |
| def get_stats(self) -> Dict: | |
| """获取记忆统计""" | |
| return { | |
| "user_id": self.user_id, | |
| "total_entries": len(self._entries), | |
| "keyword_index_size": len(self._keyword_index), | |
| "intent_index_size": len(self._intent_index), | |
| "store_count": self._store_count, | |
| "retrieve_count": self._retrieve_count, | |
| "hit_count": self._hit_count, | |
| "hit_rate": round( | |
| self._hit_count / max(self._retrieve_count, 1), 3 | |
| ), | |
| } | |
| # ============================================================ | |
| # v7.1增强: 实时训练 + 记忆矩阵 + 超长对话 | |
| # ============================================================ | |
| def encode_realtime(self, message: str, result: str, analysis=None): | |
| """ | |
| 实时训练: 每次交互都更新记忆参数 | |
| 实现策略: | |
| - 维护一个微型训练缓冲区(最近N条交互) | |
| - 当缓冲区满时触发微调(模拟) | |
| - 关键: 高频记忆权重增大,低频记忆权重衰减 | |
| """ | |
| # 先正常存储 | |
| entry_id = self.store_interaction(message, result, analysis) | |
| # 更新记忆强度参数 | |
| self._update_memory_params(message, result) | |
| # 检查是否需要触发微调 | |
| if self._store_count % 20 == 0: | |
| self._micro_finetune() | |
| return entry_id | |
| def _update_memory_params(self, message: str, result: str): | |
| """ | |
| 更新记忆参数(模拟参数化存储) | |
| 思路: 不是真的训练模型,而是维护一个"参数字典" | |
| key=概念, value=强度权重 | |
| 频繁出现的概念权重增大 → 类似TF-IDF的反向操作 | |
| """ | |
| if not hasattr(self, '_param_dict'): | |
| self._param_dict: Dict[str, float] = {} | |
| # 提取概念(用关键词替代) | |
| concepts = self._extract_concepts(message + " " + result) | |
| for concept in concepts: | |
| if concept in self._param_dict: | |
| # 已有概念: 增强权重(但不超过上限) | |
| self._param_dict[concept] = min( | |
| self._param_dict[concept] * 1.1, 2.0 | |
| ) | |
| else: | |
| # 新概念: 初始权重 | |
| self._param_dict[concept] = 1.0 | |
| # 所有概念轻微衰减(防止无限增长) | |
| for k in list(self._param_dict.keys()): | |
| self._param_dict[k] *= 0.999 | |
| if self._param_dict[k] < 0.1: | |
| del self._param_dict[k] | |
| def _extract_concepts(self, text: str) -> List[str]: | |
| """提取概念(复用关键词提取)""" | |
| cleaned = re.sub(r'[,。!?、;:""''()\s]', ' ', text) | |
| words = re.split(r'[\s,.\-!?;:]+', cleaned) | |
| return [w for w in words if len(w) >= 2][:10] | |
| def _micro_finetune(self): | |
| """ | |
| 微调(模拟): 基于近期交互微调记忆权重 | |
| 真实实现: 用缓冲区数据做1-2步梯度下降 | |
| 当前实现: 强化近期高频记忆、衰减低频记忆 | |
| """ | |
| if not hasattr(self, '_param_dict') or not self._param_dict: | |
| return | |
| # 按权重排序,保留top 500概念 | |
| sorted_params = sorted( | |
| self._param_dict.items(), key=lambda x: x[1], reverse=True | |
| ) | |
| self._param_dict = dict(sorted_params[:500]) | |
| logger.debug(f"记忆微调: 保留{len(self._param_dict)}个概念参数") | |
| def fork_for_task(self, task_name: str) -> 'MemoryModel': | |
| """ | |
| 为特定任务复制一个专属记忆模型 | |
| 用法: 物理课备课时fork一个physics记忆模型 | |
| 课上只用物理相关记忆,课后合并回主记忆 | |
| Returns: | |
| 新的MemoryModel实例(副本) | |
| """ | |
| # 创建副本 | |
| forked = MemoryModel.__new__(MemoryModel) | |
| forked.user_id = f"{self.user_id}__{task_name}" | |
| forked._entries = [] # 不复制全部,只建空壳 | |
| forked._keyword_index = {} | |
| forked._intent_index = {} | |
| forked._loaded = True | |
| forked._store_count = 0 | |
| forked._retrieve_count = 0 | |
| forked._hit_count = 0 | |
| # 复制相关记忆(按意图过滤) | |
| task_keywords = self._extract_concepts(task_name) | |
| for entry in self._entries: | |
| overlap = set(entry.keywords) & set(task_keywords) | |
| if overlap: | |
| forked._entries.append(entry) | |
| for kw in entry.keywords: | |
| if kw not in forked._keyword_index: | |
| forked._keyword_index[kw] = [] | |
| forked._keyword_index[kw].append(entry.entry_id) | |
| # 复制参数字典 | |
| if hasattr(self, '_param_dict'): | |
| forked._param_dict = { | |
| k: v for k, v in self._param_dict.items() | |
| if k in task_keywords or v > 1.0 | |
| } | |
| else: | |
| forked._param_dict = {} | |
| logger.info(f"记忆分叉: {task_name}, 复制{len(forked._entries)}条相关记忆") | |
| return forked | |
| def merge_from(self, other: 'MemoryModel'): | |
| """ | |
| 合并另一个记忆模型(从分叉回归) | |
| 用法: 课上积累的物理记忆,课后合并回主记忆 | |
| """ | |
| merged = 0 | |
| for entry in other._entries: | |
| # 检查是否已存在 | |
| existing_ids = {e.entry_id for e in self._entries} | |
| if entry.entry_id not in existing_ids: | |
| self._entries.append(entry) | |
| for kw in entry.keywords: | |
| if kw not in self._keyword_index: | |
| self._keyword_index[kw] = [] | |
| self._keyword_index[kw].append(entry.entry_id) | |
| merged += 1 | |
| # 合并参数 | |
| if hasattr(other, '_param_dict') and hasattr(self, '_param_dict'): | |
| for k, v in other._param_dict.items(): | |
| if k in self._param_dict: | |
| self._param_dict[k] = max(self._param_dict[k], v) | |
| else: | |
| self._param_dict[k] = v | |
| self._save() | |
| logger.info(f"记忆合并: +{merged}条新记忆") | |
| def get_unlimited_context(self, query: str, max_tokens: int = 4000) -> str: | |
| """ | |
| 超长对话支持: 通过记忆压缩实现无限上下文 | |
| 原理: | |
| - 传统: 把所有历史拼上去(受token限制) | |
| - 虫群: 只检索最相关的记忆+最近N轮(滑动窗口) | |
| - 压缩: 旧记忆自动摘要,保持信息密度 | |
| """ | |
| parts = [] | |
| char_count = 0 | |
| # 1. 最近N轮对话(滑动窗口,最近10条) | |
| recent = self._entries[-10:] | |
| for e in reversed(recent): | |
| line = f"用户: {e.user_message[:100]}\n助手: {e.ai_response[:100]}" | |
| if char_count + len(line) > max_tokens: | |
| break | |
| parts.insert(0, line) | |
| char_count += len(line) | |
| # 2. 相关历史记忆(按相关性) | |
| relevant = self.retrieve(query, top_k=5) | |
| seen_ids = {e.entry_id for e in recent} | |
| for r in relevant: | |
| rid = r.get("entry_id", "") | |
| if rid in seen_ids: | |
| continue | |
| line = f"[历史] 用户: {r['user_message'][:80]}\n[历史] 助手: {r['ai_response'][:80]}" | |
| if char_count + len(line) > max_tokens: | |
| break | |
| parts.insert(0, line) | |
| char_count += len(line) | |
| seen_ids.add(rid) | |
| # 3. 概念参数摘要 | |
| if hasattr(self, '_param_dict') and self._param_dict: | |
| top_concepts = sorted( | |
| self._param_dict.items(), key=lambda x: x[1], reverse=True | |
| )[:10] | |
| concept_str = "核心概念: " + ", ".join( | |
| f"{k}({v:.1f})" for k, v in top_concepts | |
| ) | |
| parts.insert(0, concept_str) | |
| return "\n---\n".join(parts) | |
| def get_param_stats(self) -> Dict: | |
| """获取记忆参数统计""" | |
| params = getattr(self, '_param_dict', {}) | |
| return { | |
| "concept_count": len(params), | |
| "top_concepts": sorted( | |
| params.items(), key=lambda x: x[1], reverse=True | |
| )[:5] if params else [], | |
| "avg_weight": sum(params.values()) / max(len(params), 1) if params else 0, | |
| } | |
| # ============================================================ | |
| # 记忆矩阵: 多任务记忆的统一管理 | |
| # ============================================================ | |
| class MemoryMatrix: | |
| """ | |
| 记忆矩阵 — 管理用户的所有任务记忆 | |
| 结构: | |
| 主记忆(General) — 日常交互 | |
| ├── 物理记忆(Physics) — 备课专用 | |
| ├── 编程记忆(Coding) — 开发专用 | |
| └── ...按需创建 | |
| 用法: | |
| matrix = MemoryMatrix("user_001") | |
| matrix.activate("physics") # 激活物理记忆 | |
| matrix.store("...", "...") # 存入当前激活记忆 | |
| matrix.deactivate() # 回到主记忆 | |
| """ | |
| def __init__(self, user_id: str = "default"): | |
| self.user_id = user_id | |
| self._main = MemoryModel(user_id) | |
| self._forks: Dict[str, MemoryModel] = {} | |
| self._active: Optional[str] = None # 当前激活的任务名 | |
| def activate(self, task_name: str): | |
| """激活一个任务记忆(不存在则创建)""" | |
| if task_name not in self._forks: | |
| # 从主记忆分叉 | |
| self._forks[task_name] = self._main.fork_for_task(task_name) | |
| logger.info(f"创建任务记忆: {task_name}") | |
| self._active = task_name | |
| def deactivate(self): | |
| """停用任务记忆,合并回主记忆""" | |
| if self._active and self._active in self._forks: | |
| self._main.merge_from(self._forks[self._active]) | |
| # 保留分叉不删除(下次可复用) | |
| self._active = None | |
| def current(self) -> MemoryModel: | |
| """获取当前活跃的记忆模型""" | |
| if self._active and self._active in self._forks: | |
| return self._forks[self._active] | |
| return self._main | |
| def store(self, message: str, result: str, analysis=None): | |
| """存入当前活跃记忆""" | |
| self.current.store_interaction(message, result, analysis) | |
| def retrieve(self, query: str, top_k: int = 5) -> List[Dict]: | |
| """从当前活跃记忆检索""" | |
| return self.current.retrieve(query, top_k) | |
| def get_context(self, query: str) -> str: | |
| """获取上下文(优先当前任务+补充主记忆)""" | |
| task_ctx = self.current.retrieve_context(query) | |
| if self._active: | |
| main_ctx = self._main.retrieve_context(query) | |
| if task_ctx and main_ctx: | |
| return f"[任务:{self._active}] {task_ctx}\n[通用] {main_ctx}" | |
| return task_ctx | |
| def list_tasks(self) -> List[str]: | |
| """列出所有任务记忆""" | |
| return list(self._forks.keys()) | |
| def get_status(self) -> Dict: | |
| """获取矩阵状态""" | |
| tasks = {} | |
| for name, mem in self._forks.items(): | |
| tasks[name] = mem.get_stats() | |
| return { | |
| "user_id": self.user_id, | |
| "active_task": self._active, | |
| "main_stats": self._main.get_stats(), | |
| "task_count": len(self._forks), | |
| "tasks": tasks, | |
| } | |