| | from __future__ import annotations |
| |
|
| | import json |
| | import os |
| | import threading |
| | import uuid |
| | from datetime import datetime |
| | from typing import Annotated, Dict, List, Literal, Optional |
| |
|
| | import gradio as gr |
| | from ._docstrings import autodoc |
| |
|
| | _MODULE_DIR = os.path.dirname(os.path.abspath(__file__)) |
| | MEMORY_FILE = os.path.join(os.path.dirname(_MODULE_DIR), "memories.json") |
| | _MEMORY_LOCK = threading.RLock() |
| | _MAX_MEMORIES = 10_000 |
| |
|
| |
|
| | def _now_iso() -> str: |
| | return datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") |
| |
|
| |
|
| | def _load_memories() -> List[Dict[str, str]]: |
| | if not os.path.exists(MEMORY_FILE): |
| | return [] |
| | try: |
| | with open(MEMORY_FILE, "r", encoding="utf-8") as file: |
| | data = json.load(file) |
| | if isinstance(data, list): |
| | cleaned: List[Dict[str, str]] = [] |
| | for item in data: |
| | if isinstance(item, dict) and "id" in item and "text" in item: |
| | cleaned.append(item) |
| | return cleaned |
| | return [] |
| | except Exception: |
| | try: |
| | backup = MEMORY_FILE + ".corrupt" |
| | if not os.path.exists(backup): |
| | os.replace(MEMORY_FILE, backup) |
| | except Exception: |
| | pass |
| | return [] |
| |
|
| |
|
| | def _save_memories(memories: List[Dict[str, str]]) -> None: |
| | tmp_path = MEMORY_FILE + ".tmp" |
| | with open(tmp_path, "w", encoding="utf-8") as file: |
| | json.dump(memories, file, ensure_ascii=False, indent=2) |
| | os.replace(tmp_path, MEMORY_FILE) |
| |
|
| |
|
| | def _mem_save(text: str, tags: str) -> str: |
| | text_clean = (text or "").strip() |
| | if not text_clean: |
| | return "Error: memory text is empty." |
| | with _MEMORY_LOCK: |
| | memories = _load_memories() |
| | if memories and memories[-1].get("text") == text_clean: |
| | return "Skipped: identical to last stored memory." |
| | mem_id = str(uuid.uuid4()) |
| | entry = { |
| | "id": mem_id, |
| | "text": text_clean, |
| | "timestamp": _now_iso(), |
| | "tags": tags.strip(), |
| | } |
| | memories.append(entry) |
| | if len(memories) > _MAX_MEMORIES: |
| | overflow = len(memories) - _MAX_MEMORIES |
| | memories = memories[overflow:] |
| | _save_memories(memories) |
| | return f"Memory saved: {mem_id}" |
| |
|
| |
|
| | def _mem_list(limit: int, include_tags: bool) -> str: |
| | limit = max(1, min(200, limit)) |
| | with _MEMORY_LOCK: |
| | memories = _load_memories() |
| | if not memories: |
| | return "No memories stored yet." |
| | chosen = memories[-limit:][::-1] |
| | lines: List[str] = [] |
| | for memory in chosen: |
| | base = f"{memory['id'][:8]} [{memory.get('timestamp','?')}] {memory.get('text','')}" |
| | if include_tags and memory.get("tags"): |
| | base += f" | tags: {memory['tags']}" |
| | lines.append(base) |
| | omitted = len(memories) - len(chosen) |
| | if omitted > 0: |
| | lines.append(f"… ({omitted} older memorie{'s' if omitted!=1 else ''} omitted; total={len(memories)})") |
| | return "\n".join(lines) |
| |
|
| |
|
| | def _parse_search_query(query: str) -> Dict[str, List[str]]: |
| | import re |
| |
|
| | result = {"tag_terms": [], "text_terms": [], "operator": "and"} |
| | if not query or not query.strip(): |
| | return result |
| | query = re.sub(r"\s+", " ", query.strip()) |
| | if re.search(r"\bOR\b", query, re.IGNORECASE): |
| | result["operator"] = "or" |
| | parts = re.split(r"\s+OR\s+", query, flags=re.IGNORECASE) |
| | else: |
| | parts = re.split(r"\s+(?:AND\s+)?", query, flags=re.IGNORECASE) |
| | parts = [p for p in parts if p.strip() and p.strip().upper() != "AND"] |
| | for part in parts: |
| | part = part.strip() |
| | if not part: |
| | continue |
| | tag_match = re.match(r"^tag:(.+)$", part, re.IGNORECASE) |
| | if tag_match: |
| | tag_name = tag_match.group(1).strip() |
| | if tag_name: |
| | result["tag_terms"].append(tag_name.lower()) |
| | else: |
| | result["text_terms"].append(part.lower()) |
| | return result |
| |
|
| |
|
| | def _match_memory_with_query(memory: Dict[str, str], parsed_query: Dict[str, List[str]]) -> bool: |
| | tag_terms = parsed_query["tag_terms"] |
| | text_terms = parsed_query["text_terms"] |
| | operator = parsed_query["operator"] |
| | if not tag_terms and not text_terms: |
| | return False |
| | memory_text = memory.get("text", "").lower() |
| | memory_tags = memory.get("tags", "").lower() |
| | memory_tag_list = [tag.strip() for tag in memory_tags.split(",") if tag.strip()] |
| | tag_matches = [any(tag_term in tag for tag in memory_tag_list) for tag_term in tag_terms] |
| | combined_text = memory_text + " " + memory_tags |
| | text_matches = [text_term in combined_text for text_term in text_terms] |
| | all_matches = tag_matches + text_matches |
| | if not all_matches: |
| | return False |
| | if operator == "or": |
| | return any(all_matches) |
| | return all(all_matches) |
| |
|
| |
|
| | def _mem_search(query: str, limit: int) -> str: |
| | q = (query or "").strip() |
| | if not q: |
| | return "Error: empty query." |
| | parsed_query = _parse_search_query(q) |
| | if not parsed_query["tag_terms"] and not parsed_query["text_terms"]: |
| | return "Error: no valid search terms found." |
| | limit = max(1, min(200, limit)) |
| | with _MEMORY_LOCK: |
| | memories = _load_memories() |
| | matches: List[Dict[str, str]] = [] |
| | total_matches = 0 |
| | for memory in reversed(memories): |
| | if _match_memory_with_query(memory, parsed_query): |
| | total_matches += 1 |
| | if len(matches) < limit: |
| | matches.append(memory) |
| | if not matches: |
| | return f"No matches for: {query}" |
| | lines = [ |
| | f"{memory['id'][:8]} [{memory.get('timestamp','?')}] {memory.get('text','')}" + (f" | tags: {memory['tags']}" if memory.get('tags') else "") |
| | for memory in matches |
| | ] |
| | omitted = total_matches - len(matches) |
| | if omitted > 0: |
| | lines.append(f"… ({omitted} additional match{'es' if omitted!=1 else ''} omitted; total_matches={total_matches})") |
| | return "\n".join(lines) |
| |
|
| |
|
| | def _mem_delete(memory_id: str) -> str: |
| | key = (memory_id or "").strip().lower() |
| | if len(key) < 4: |
| | return "Error: supply at least 4 characters of the id." |
| | with _MEMORY_LOCK: |
| | memories = _load_memories() |
| | matched = [memory for memory in memories if memory["id"].lower().startswith(key)] |
| | if not matched: |
| | return "Memory not found." |
| | if len(matched) > 1 and key != matched[0]["id"].lower(): |
| | sample = ", ".join(memory["id"][:8] for memory in matched[:5]) |
| | more = "…" if len(matched) > 5 else "" |
| | return f"Ambiguous prefix (matches {len(matched)} ids: {sample}{more}). Provide more characters." |
| | target_id = matched[0]["id"] |
| | memories = [memory for memory in memories if memory["id"] != target_id] |
| | _save_memories(memories) |
| | return f"Deleted memory: {target_id}" |
| |
|
| |
|
| | |
| | TOOL_SUMMARY = ( |
| | "Manage short text memories (save, list, search, delete) in a local JSON store with tags and simple query language; " |
| | "returns a result string (confirmation, listing, matches, or error)." |
| | ) |
| |
|
| |
|
| | @autodoc( |
| | summary=TOOL_SUMMARY, |
| | ) |
| | def Memory_Manager( |
| | action: Annotated[Literal["save", "list", "search", "delete"], "Action to perform: save | list | search | delete"], |
| | text: Annotated[Optional[str], "Text content (Save only)"] = None, |
| | tags: Annotated[Optional[str], "Comma-separated tags (Save only)"] = None, |
| | query: Annotated[Optional[str], "Enhanced search with tag:name syntax, AND/OR operators (Search only)"] = None, |
| | limit: Annotated[int, "Max results (List/Search only)"] = 20, |
| | memory_id: Annotated[Optional[str], "Full UUID or unique prefix (Delete only)"] = None, |
| | include_tags: Annotated[bool, "Include tags (List/Search only)"] = True, |
| | ) -> str: |
| | act = (action or "").lower().strip() |
| | text = text or "" |
| | tags = tags or "" |
| | query = query or "" |
| | memory_id = memory_id or "" |
| | if act == "save": |
| | if not text.strip(): |
| | return "Error: 'text' is required when action=save." |
| | return _mem_save(text=text, tags=tags) |
| | if act == "list": |
| | return _mem_list(limit=limit, include_tags=include_tags) |
| | if act == "search": |
| | if not query.strip(): |
| | return "Error: 'query' is required when action=search." |
| | return _mem_search(query=query, limit=limit) |
| | if act == "delete": |
| | if not memory_id.strip(): |
| | return "Error: 'memory_id' is required when action=delete." |
| | return _mem_delete(memory_id=memory_id) |
| | return "Error: invalid action (use save|list|search|delete)." |
| |
|
| |
|
| | def build_interface() -> gr.Interface: |
| | return gr.Interface( |
| | fn=Memory_Manager, |
| | inputs=[ |
| | gr.Radio(label="Action", choices=["save", "list", "search", "delete"], value="list"), |
| | gr.Textbox(label="Text", lines=3, placeholder="Memory text (save)"), |
| | gr.Textbox(label="Tags", placeholder="tag1, tag2", max_lines=1), |
| | gr.Textbox(label="Query", placeholder="tag:work AND tag:project OR meeting", max_lines=1), |
| | gr.Slider(1, 200, value=20, step=1, label="Limit"), |
| | gr.Textbox(label="Memory ID / Prefix", placeholder="UUID or prefix (delete)", max_lines=1), |
| | gr.Checkbox(value=True, label="Include Tags"), |
| | ], |
| | outputs=gr.Textbox(label="Result", lines=14), |
| | title="Memory Manager", |
| | description=( |
| | "<div style=\"text-align:center\">Lightweight local JSON memory store (no external DB). Choose an Action, fill only the relevant fields, and run.</div>" |
| | ), |
| | api_description=TOOL_SUMMARY, |
| | flagging_mode="never", |
| | ) |
| |
|
| |
|
| | __all__ = ["Memory_Manager", "build_interface", "_load_memories", "_save_memories"] |
| |
|