Orbit / app /services /memory.py
Orbit Automations
Auto-sync from Julius-606/Orbit monorepo
4a693cf
################################################################################
# FILE: backend/app/services/memory.py
# VERSION: 1.0.0 | SYSTEM: Orbit Memory Protocol
# IDENTITY: The Long-term Memory (Vector DB / ChromaDB)
################################################################################
import chromadb
from chromadb.utils import embedding_functions
import logging
import os
logger = logging.getLogger("Orbit-Memory")
class MemoryService:
def __init__(self):
# Store memory in a local directory
persist_directory = os.path.join(os.getcwd(), "orbit_memory")
self.client = chromadb.PersistentClient(path=persist_directory)
# Using default embedding function (requires internet for some, but Chroma's default is local)
self.collection = self.client.get_or_create_collection(
name="orbit_user_preferences",
metadata={"hnsw:space": "cosine"}
)
def learn(self, fact: str):
"""Orbit learns something about the user."""
try:
# We use the fact itself as the ID or a hash
fact_id = str(hash(fact))
self.collection.add(
documents=[fact],
ids=[fact_id]
)
logger.info(f"Orbit learned: {fact}")
except Exception as e:
logger.error(f"Failed to learn: {e}")
def query(self, user_query: str, n_results: int = 3) -> str:
"""Orbit remembers relevant context."""
try:
results = self.collection.query(
query_texts=[user_query],
n_results=n_results
)
documents = results.get('documents', [[]])[0]
if documents:
return "\n".join(documents)
return ""
except Exception as e:
logger.error(f"Memory retrieval failed: {e}")
return ""
memory_service = MemoryService()