""" AGENT MEMORY — Multi-tier Memory MCP Server Docker SDK on HF Spaces — no Gradio, no CSP issues. Memory tiers: episodic — events, experiences, conversations semantic — facts, knowledge, concepts procedural — skills, how-tos, workflows working — short-term scratch pad (TTL default 1h) NOTE: 'trace' tier is REMOVED — all telemetry now goes to agent-trace. Writes to tier='trace' are forwarded to agent-trace automatically. MCP tools: memory_store — write a memory memory_search — full-text + tag search memory_recall — get by id memory_update — update content/tags memory_forget — delete memory_list — list by tier/tag memory_stats — counts per tier """ import os, uuid, json, asyncio, time, re, urllib.request from pathlib import Path from datetime import datetime, timezone, timedelta from typing import Optional from fastapi import FastAPI, HTTPException, Request from fastapi.responses import JSONResponse, HTMLResponse, StreamingResponse BASE = Path(__file__).parent MEM_DIR = BASE / "memories" MEM_DIR.mkdir(exist_ok=True) # ── FORGE infrastructure ────────────────────────────────────────── TRACE_URL = os.environ.get("TRACE_URL", "https://chris4k-agent-trace.hf.space") def emit_trace(agent: str, event_type: str, payload: dict, status: str = "ok"): """Fire-and-forget event to agent-trace. Replaces internal trace tier.""" try: body = json.dumps({"agent": agent or "memory", "event_type": event_type, "status": status, "payload": payload}).encode() req = urllib.request.Request( f"{TRACE_URL}/api/trace", data=body, headers={"Content-Type": "application/json"}, method="POST") urllib.request.urlopen(req, timeout=2) except Exception: pass # never block memory operations TIERS = ["episodic", "semantic", "procedural", "working"] # "trace" tier is intentionally NOT in TIERS — forwarded to agent-trace # ── Memory utils ────────────────────────────────────────────────── def now_ts(): return int(time.time()) def now_iso(): return datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M") def mem_path(mid): return MEM_DIR / f"{mid}.json" def read_mem(mid): p = mem_path(mid) return json.loads(p.read_text()) if p.exists() else None def write_mem(m): m["updated_at"] = now_ts() mem_path(m["id"]).write_text(json.dumps(m, indent=2, ensure_ascii=False)) def new_memory(data: dict) -> dict: mid = uuid.uuid4().hex[:10] tier = data.get("tier", "episodic") # ── Intercept legacy "trace" tier ───────────────────────────── if tier == "trace": agent = str(data.get("agent", "memory")).strip() emit_trace(agent, "custom", { "content": str(data.get("content", ""))[:500], "tags": data.get("tags", []), "importance": data.get("importance", 3), "source": "memory_legacy_trace", }) # Return a stub so callers don't break return {"id": f"trace_fwd_{mid}", "tier": "trace", "content": data.get("content",""), "forwarded_to": "agent-trace"} if tier not in TIERS: tier = "episodic" ttl = int(data.get("ttl", 3600)) if tier == "working" else None m = { "id": mid, "tier": tier, "content": (data.get("content") or "").strip(), "summary": (data.get("summary") or "").strip(), "tags": [t.strip().lower() for t in data.get("tags", []) if str(t).strip()], "agent": (data.get("agent") or "").strip(), "importance": max(0, min(10, int(data.get("importance", 5)))), "source": (data.get("source") or "").strip(), "expires_at": now_ts() + ttl if ttl else None, "created_at": now_ts(), "updated_at": now_ts(), "access_count": 0, "last_accessed": None, } write_mem(m) # Emit a trace event for every write (light telemetry) emit_trace(m["agent"] or "memory", "custom", { "op": "memory_store", "tier": tier, "id": mid, "importance": m["importance"], "tags": m["tags"] }) return m def all_memories(include_expired=False): now = now_ts() out = [] for p in sorted(MEM_DIR.glob("*.json"), reverse=True): try: m = json.loads(p.read_text()) if not include_expired and m.get("expires_at") and m["expires_at"] < now: p.unlink() # auto-expire continue out.append(m) except: pass return out def search_memories(query: str, tier: str = "all", tag: str = "", agent: str = "", limit: int = 20): mems = all_memories() q = query.lower().strip() results = [] for m in mems: if tier != "all" and m["tier"] != tier: continue if tag and tag.lower() not in m["tags"]: continue if agent and m["agent"].lower() != agent.lower(): continue # score: content match + tag match + importance score = 0 text = (m["content"] + " " + m["summary"] + " " + " ".join(m["tags"])).lower() if q: words = q.split() for w in words: if w in text: score += 2 if q in text: score += 5 score += m.get("importance", 5) * 0.2 if not q or score > 0: results.append((score, m)) results.sort(key=lambda x: (-x[0], -x[1].get("created_at", 0))) return [m for _, m in results[:limit]] def bump_access(mid): m = read_mem(mid) if m: m["access_count"] = m.get("access_count", 0) + 1 m["last_accessed"] = now_ts() write_mem(m) return m # ── Seed ────────────────────────────────────────────────────────── def seed(): if list(MEM_DIR.glob("*.json")): return samples = [ {"tier":"semantic","content":"JARVIS TheCore uses a five-layer memory architecture: ShortTermMemory, MidTermMemory, LongTermMemory, ProceduralMemory, and EpisodicMemory. The MemoryQualityGate filters before persistence.","tags":["jarvis","architecture","memory"],"agent":"christof","importance":9}, {"tier":"episodic","content":"Resolved SSL certificate error on ki-fusion-labs.de by regenerating Let's Encrypt cert via certbot. Root cause: cert expired after 90 days without auto-renewal configured.","tags":["ssl","ki-fusion-labs","devops","resolved"],"agent":"christof","importance":7}, {"tier":"procedural","content":"To deploy a HF Space with Docker SDK: (1) Set sdk: docker in README.md, (2) Add Dockerfile with uvicorn CMD on port 7860, (3) Use non-root user for security, (4) No StaticFiles — embed HTML as string in FastAPI route.","tags":["hf-spaces","docker","deployment","howto"],"agent":"christof","importance":8}, {"tier":"semantic","content":"bofrost* GDPR deletion architecture uses decentralized pull-based design across 14+ systems and 6 countries. Proof of Deletion (PoD) certificates required. Architecture Board approval needed.","tags":["gdpr","bofrost","architecture","deletion"],"agent":"christof","importance":9}, {"tier":"episodic","content":"BitNet 1.58-bit trainer achieved stable training after fixing: NaN losses (gradient clipping + LR warmup), dead layers (initialization scale), FlipRate issues (STE tuning), dataset distribution mismatch (balanced sampling).","tags":["bitnet","training","rtx5090","resolved"],"agent":"christof","importance":8}, {"tier":"working","content":"Current focus: Fix agent-kanban-board HF Space. Issue was sdk:gradio CSP blocking all JS. Fix: switch to sdk:docker, pure FastAPI.","tags":["kanban","hf-spaces","active"],"agent":"christof","importance":6,"ttl":86400}, {"tier":"procedural","content":"ki-fusion-labs.de LLM API uses GPU worker polling architecture. Workers poll /api/queue every 2s, pick up jobs, POST results back. Firewall-friendly — no inbound connections to workers needed.","tags":["ki-fusion-labs","llm-api","architecture","gpu"],"agent":"christof","importance":8}, ] for s in samples: new_memory(s) seed() # ── FastAPI ─────────────────────────────────────────────────────── app = FastAPI(title="Agent Memory MCP") def jresp(data, status=200): return JSONResponse(content=data, status_code=status) # REST API @app.get("/api/memories") async def list_memories(tier: str = "all", tag: str = "", agent: str = "", limit: int = 50): mems = all_memories() if tier != "all": mems = [m for m in mems if m["tier"] == tier] if tag: mems = [m for m in mems if tag.lower() in m["tags"]] if agent: mems = [m for m in mems if m["agent"].lower() == agent.lower()] mems.sort(key=lambda m: (-m.get("importance",5), -m.get("created_at",0))) return jresp(mems[:limit]) @app.get("/api/memories/search") async def search(q: str = "", tier: str = "all", tag: str = "", agent: str = "", limit: int = 20): return jresp(search_memories(q, tier, tag, agent, limit)) @app.get("/api/memories/{mid}") async def get_memory(mid: str): m = bump_access(mid) if not m: raise HTTPException(404, "not found") return jresp(m) @app.post("/api/memories") async def store_memory(request: Request): data = await request.json() if not data.get("content","").strip(): raise HTTPException(400, "content required") m = new_memory(data) return jresp({"status":"stored","id":m["id"],"memory":m}, 201) @app.patch("/api/memories/{mid}") async def update_memory(mid: str, request: Request): data = await request.json() m = read_mem(mid) if not m: raise HTTPException(404) for k in ("content","summary","tags","importance","tier","source","agent"): if k in data: m[k] = data[k] write_mem(m) return jresp({"status":"updated","memory":m}) @app.delete("/api/memories/{mid}") async def delete_memory(mid: str): p = mem_path(mid) if not p.exists(): raise HTTPException(404) p.unlink() return jresp({"status":"forgotten"}) @app.get("/api/stats") async def stats(): mems = all_memories() by_tier = {t: 0 for t in TIERS} by_agent: dict = {} total_importance = 0 for m in mems: by_tier[m["tier"]] = by_tier.get(m["tier"], 0) + 1 a = m.get("agent") or "unknown" by_agent[a] = by_agent.get(a, 0) + 1 total_importance += m.get("importance", 5) return jresp({ "total": len(mems), "by_tier": by_tier, "by_agent": by_agent, "avg_importance": round(total_importance / len(mems), 1) if mems else 0, }) # ── MCP ─────────────────────────────────────────────────────────── MCP_TOOLS = [ {"name":"memory_store", "description":"Store a new memory. Tier: episodic|semantic|procedural|working. Importance 0-10.", "inputSchema":{"type":"object","required":["content"],"properties":{ "content": {"type":"string","description":"Memory content"}, "summary": {"type":"string","description":"Short summary"}, "tier": {"type":"string","enum":["episodic","semantic","procedural","working"]}, "tags": {"type":"array","items":{"type":"string"}}, "agent": {"type":"string","description":"Agent or user ID"}, "importance":{"type":"integer","minimum":0,"maximum":10}, "source": {"type":"string"}, "ttl": {"type":"integer","description":"TTL in seconds (working tier only)"}, }}}, {"name":"memory_search", "description":"Search memories by content, tier, or tag. Returns ranked results.", "inputSchema":{"type":"object","properties":{ "query": {"type":"string"}, "tier": {"type":"string","enum":["all","episodic","semantic","procedural","working"]}, "tag": {"type":"string"}, "agent": {"type":"string"}, "limit": {"type":"integer","default":10}, }}}, {"name":"memory_recall", "description":"Retrieve a specific memory by ID.", "inputSchema":{"type":"object","required":["id"],"properties":{"id":{"type":"string"}}}}, {"name":"memory_update", "description":"Update an existing memory's content, tags, or importance.", "inputSchema":{"type":"object","required":["id"],"properties":{ "id": {"type":"string"}, "content": {"type":"string"}, "summary": {"type":"string"}, "tags": {"type":"array","items":{"type":"string"}}, "importance":{"type":"integer"}, }}}, {"name":"memory_forget", "description":"Delete a memory permanently.", "inputSchema":{"type":"object","required":["id"],"properties":{"id":{"type":"string"}}}}, {"name":"memory_list", "description":"List memories by tier or tag.", "inputSchema":{"type":"object","properties":{ "tier": {"type":"string"}, "tag": {"type":"string"}, "agent": {"type":"string"}, "limit": {"type":"integer","default":20}, }}}, {"name":"memory_stats", "description":"Get memory statistics (counts per tier, agents, avg importance).", "inputSchema":{"type":"object","properties":{}}}, ] async def mcp_call(name, args): if name == "memory_store": if not args.get("content","").strip(): return json.dumps({"error":"content required"}) m = new_memory(args) return json.dumps({"stored":m["id"],"tier":m["tier"],"memory":m}) if name == "memory_search": results = search_memories( args.get("query",""), args.get("tier","all"), args.get("tag",""), args.get("agent",""), args.get("limit",10)) return json.dumps({"count":len(results),"results":results}) if name == "memory_recall": m = bump_access(args["id"]) return json.dumps(m or {"error":"not found"}) if name == "memory_update": m = read_mem(args["id"]) if not m: return json.dumps({"error":"not found"}) for k in ("content","summary","tags","importance","tier"): if k in args: m[k] = args[k] write_mem(m); return json.dumps({"updated":m["id"],"memory":m}) if name == "memory_forget": p = mem_path(args["id"]) if not p.exists(): return json.dumps({"error":"not found"}) p.unlink(); return json.dumps({"forgotten":args["id"]}) if name == "memory_list": mems = all_memories() t = args.get("tier","all"); tg = args.get("tag",""); ag = args.get("agent","") if t != "all": mems = [m for m in mems if m["tier"]==t] if tg: mems = [m for m in mems if tg.lower() in m["tags"]] if ag: mems = [m for m in mems if m["agent"].lower()==ag.lower()] mems.sort(key=lambda m: (-m.get("importance",5), -m.get("created_at",0))) return json.dumps({"count":len(mems),"memories":mems[:args.get("limit",20)]}) if name == "memory_stats": mems = all_memories() by_tier = {t:0 for t in TIERS} for m in mems: by_tier[m["tier"]] = by_tier.get(m["tier"],0)+1 return json.dumps({"total":len(mems),"by_tier":by_tier}) return json.dumps({"error":f"unknown tool: {name}"}) @app.get("/mcp/sse") async def mcp_sse(): async def stream(): init = {"jsonrpc":"2.0","method":"notifications/initialized", "params":{"serverInfo":{"name":"agent-memory","version":"1.0"},"capabilities":{"tools":{}}}} yield f"data: {json.dumps(init)}\n\n" await asyncio.sleep(0.1) yield f"data: {json.dumps({'jsonrpc':'2.0','method':'notifications/tools/list_changed','params':{}})}\n\n" while True: await asyncio.sleep(25) yield f"data: {json.dumps({'jsonrpc':'2.0','method':'ping'})}\n\n" return StreamingResponse(stream(), media_type="text/event-stream", headers={"Cache-Control":"no-cache","X-Accel-Buffering":"no"}) @app.post("/mcp") async def mcp_rpc(request: Request): body = await request.json() method = body.get("method",""); rid = body.get("id",1) if method == "initialize": return jresp({"jsonrpc":"2.0","id":rid,"result":{ "serverInfo":{"name":"agent-memory","version":"1.0"},"capabilities":{"tools":{}}}}) if method == "tools/list": return jresp({"jsonrpc":"2.0","id":rid,"result":{"tools":MCP_TOOLS}}) if method == "tools/call": p = body.get("params",{}) res = await mcp_call(p.get("name",""), p.get("arguments",{})) return jresp({"jsonrpc":"2.0","id":rid,"result":{"content":[{"type":"text","text":res}]}}) return jresp({"jsonrpc":"2.0","id":rid,"error":{"code":-32601,"message":"Method not found"}}) # ── SPA ─────────────────────────────────────────────────────────── @app.get("/", response_class=HTMLResponse) async def serve_ui(): return HTMLResponse(content=SPA, media_type='text/html; charset=utf-8') SPA = """
GET /mcp/sse | memory_store memory_search