| import json | |
| import re | |
| from pathlib import Path | |
| from typing import Dict, List | |
| MEMORY_DIR = Path("data") / "memory" | |
| MEMORY_DIR.mkdir(parents=True, exist_ok=True) | |
| def _safe_user_id(user_id: str) -> str: | |
| safe = re.sub(r"[^a-zA-Z0-9_-]", "_", user_id.strip()) | |
| return safe or "default_user" | |
| def _memory_path(user_id: str) -> Path: | |
| return MEMORY_DIR / f"{_safe_user_id(user_id)}.json" | |
| def load_history(user_id: str) -> List[Dict[str, str]]: | |
| path = _memory_path(user_id) | |
| if not path.exists(): | |
| return [] | |
| try: | |
| with path.open("r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| if isinstance(data, list): | |
| return data | |
| return [] | |
| except Exception: | |
| return [] | |
| def save_history(user_id: str, history: List[Dict[str, str]]) -> None: | |
| path = _memory_path(user_id) | |
| with path.open("w", encoding="utf-8") as f: | |
| json.dump(history, f, ensure_ascii=True, indent=2) | |
| def append_message(user_id: str, role: str, content: str, max_items: int = 30) -> None: | |
| history = load_history(user_id) | |
| history.append({"role": role, "content": content}) | |
| history = history[-max_items:] | |
| save_history(user_id, history) | |
| def save_interaction(user_id: str, user_message: str, assistant_response: str) -> None: | |
| append_message(user_id, "user", user_message) | |
| append_message(user_id, "assistant", assistant_response) | |
| def get_relevant_context(user_id: str, message: str, max_items: int = 8) -> str: | |
| history = load_history(user_id) | |
| if not history: | |
| return "" | |
| query_terms = {t for t in re.findall(r"\w+", message.lower()) if len(t) > 2} | |
| scored = [] | |
| total = len(history) | |
| for idx, item in enumerate(history): | |
| content = item.get("content", "") | |
| role = item.get("role", "") | |
| terms = {t for t in re.findall(r"\w+", content.lower()) if len(t) > 2} | |
| overlap = len(query_terms & terms) | |
| recency_bonus = (idx + 1) / max(total, 1) | |
| score = overlap + (0.35 * recency_bonus) | |
| if overlap > 0 or idx >= total - 6: | |
| scored.append((score, idx, role, content)) | |
| if not scored: | |
| trimmed = history[-max_items:] | |
| else: | |
| scored.sort(key=lambda row: row[0], reverse=True) | |
| top = sorted(scored[:max_items], key=lambda row: row[1]) | |
| trimmed = [{"role": role, "content": content} for _, _, role, content in top] | |
| lines = [] | |
| for item in trimmed: | |
| role = item.get("role", "assistant").capitalize() | |
| content = item.get("content", "") | |
| lines.append(f"{role}: {content}") | |
| return "\n".join(lines) | |