customercore / src /agent /memory.py
Saibalaji Namburi
feat: initial commit — CustomerCore Phases 1-9 complete
3cfefe7
Raw
History Blame Contribute Delete
4.36 kB
import os
import structlog
from typing import Optional, List
log = structlog.get_logger()
SUPABASE_DB_URL = os.environ.get("SUPABASE_DB_URL", "")
SUPABASE_DB_HOST = os.environ.get("SUPABASE_DB_HOST", "")
_mem0_client: Optional[any] = None
# In-memory storage for local dev / fallback when Supabase is down
_local_memory_store: dict[str, list[str]] = {}
def get_mem0_client():
"""Get the Mem0 client, or raise Exception if not configured/available."""
global _mem0_client
if _mem0_client is None:
if not SUPABASE_DB_HOST:
raise ValueError("SUPABASE_DB_HOST is not configured. Falling back to local in-memory store.")
from mem0 import Memory
config = {
"vector_store": {
"provider": "pgvector",
"config": {
"dbname": "postgres",
"user": "postgres",
"password": os.environ.get("SUPABASE_DB_PASSWORD", ""),
"host": SUPABASE_DB_HOST,
"port": 5432,
"collection_name": "memories",
},
},
"embedder": {
"provider": "ollama",
"config": {
"model": "bge-m3",
"ollama_base_url": os.environ.get("OLLAMA_URL", "http://localhost:11434"),
},
},
"llm": {
"provider": "litellm",
"config": {
"model": "gemma-3-1b",
"api_base": os.environ.get("LITELLM_URL", "http://localhost:4000"),
"api_key": os.environ.get("LITELLM_MASTER_KEY", "sk-test"),
},
},
}
_mem0_client = Memory.from_config(config)
log.info("mem0_initialized")
return _mem0_client
def store_memory(customer_id: str, tenant_id: str, content: str, agent_id: str = "triage_agent"):
"""Store customer memory either in Mem0/Supabase or local in-memory store."""
user_id = f"{tenant_id}:{customer_id}"
try:
client = get_mem0_client()
client.add(
messages=[{"role": "user", "content": content}],
user_id=user_id,
agent_id=agent_id,
)
log.info("memory_stored", user_id=user_id, agent_id=agent_id)
except Exception as e:
# Fallback to local dict store
log.debug("mem0_store_failed_using_local_fallback", user_id=user_id, error=str(e))
if user_id not in _local_memory_store:
_local_memory_store[user_id] = []
_local_memory_store[user_id].append(content)
log.info("memory_stored_locally", user_id=user_id)
def recall_memories(customer_id: str, tenant_id: str, query: str, limit: int = 5) -> List[str]:
"""Recall customer memories from Mem0 or local in-memory store."""
user_id = f"{tenant_id}:{customer_id}"
try:
client = get_mem0_client()
results = client.search(query=query, user_id=user_id, limit=limit)
memories = [r.get("memory", "") for r in results]
log.info("memories_recalled", user_id=user_id, count=len(memories))
return memories
except Exception as e:
# Fallback to local dict search
log.debug("mem0_recall_failed_using_local_fallback", user_id=user_id, error=str(e))
local_mems = _local_memory_store.get(user_id, [])
# Very simple query match fallback: return matches or recent ones up to limit
matches = []
for mem in reversed(local_mems):
if any(word in mem.lower() for word in query.lower().split()):
matches.append(mem)
if len(matches) >= limit:
break
if not matches:
matches = local_mems[-limit:]
log.info("memories_recalled_locally", user_id=user_id, count=len(matches))
return matches
def get_all_memories(customer_id: str, tenant_id: str) -> List[str]:
"""Get all memories for a customer."""
user_id = f"{tenant_id}:{customer_id}"
try:
client = get_mem0_client()
results = client.get_all(user_id=user_id)
return [r.get("memory", "") for r in results]
except Exception as e:
log.debug("mem0_get_all_failed_using_local_fallback", error=str(e))
return _local_memory_store.get(user_id, [])