cora / cora_memory.py
tokgae's picture
Upload folder using huggingface_hub
38ab39c verified
import uuid
import datetime
class CoraMemory:
def __init__(self):
print("Initializing CoraMemory (The Archive)...")
self.client = None
self.collection = None
try:
import chromadb
import os
db_path = os.environ.get("CHROMA_DB_PATH", "./archive_db")
self.client = chromadb.PersistentClient(path=db_path)
# Get or create collection
self.collection = self.client.get_or_create_collection(name="cora_archives")
print("Memory database connected successfully.")
except ImportError as e:
print(f"Warning: Memory dependencies not found ({e}). Archive features will be disabled.")
except Exception as e:
print(f"Warning: Memory initialization failed ({e}). Archive features will be disabled.")
def save(self, image_path, embedding, prompt, tags):
"""Saves an archive entry."""
if not self.collection:
return None
entry_id = str(uuid.uuid4())
# Metadata must be simple types
metadata = {
"path": image_path,
"prompt": prompt,
"tags": ",".join(tags),
"timestamp": str(datetime.datetime.now())
}
self.collection.add(
embeddings=[embedding],
documents=[prompt],
metadatas=[metadata],
ids=[entry_id]
)
print(f"Saved to archive: {entry_id}")
return entry_id
def search_by_vector(self, vector, k=5):
"""Finds k nearest images to the query vector."""
if not self.collection or not vector:
return {'ids': [], 'metadatas': [], 'distances': []}
results = self.collection.query(
query_embeddings=[vector],
n_results=k
)
return results
def search_hybrid(self, vector, k=5, tag_filter=None, source_filter=None):
"""
Hybrid search: semantic similarity + metadata filtering.
Args:
vector: Embedding vector for semantic search
k: Number of results (will retrieve more then filter)
tag_filter: List of tags that must be present (e.g., ["rome", "armor"])
source_filter: Source filter (e.g., "met_museum_open_access")
"""
if not self.collection or not vector:
return {'ids': [], 'metadatas': [], 'distances': []}
# First, get top 3x candidates via semantic search
candidates = self.collection.query(
query_embeddings=[vector],
n_results=k * 3 # Over-retrieve to allow filtering
)
if not candidates['ids'] or not candidates['ids'][0]:
return {'ids': [], 'metadatas': [], 'distances': []}
# Filter by metadata
filtered_ids = []
filtered_metadatas = []
filtered_distances = []
for i, uid in enumerate(candidates['ids'][0]):
metadata = candidates['metadatas'][0][i]
tags_str = metadata.get('tags', '')
tags = [t.strip().lower() for t in tags_str.split(',')]
# Check source filter
if source_filter and source_filter.lower() not in tags:
continue
# Check tag filter (all tags must be present)
if tag_filter:
tag_filter_lower = [t.lower() for t in tag_filter]
if not all(any(ft in tag for tag in tags) for ft in tag_filter_lower):
continue
filtered_ids.append(uid)
filtered_metadatas.append(metadata)
filtered_distances.append(candidates['distances'][0][i])
if len(filtered_ids) >= k:
break
return {
'ids': [filtered_ids] if filtered_ids else [],
'metadatas': [filtered_metadatas] if filtered_metadatas else [],
'distances': [filtered_distances] if filtered_distances else []
}
if __name__ == "__main__":
mem = CoraMemory()
print("Memory system ready.")