import uuid import datetime class CoraMemory: def __init__(self): print("Initializing CoraMemory (The Archive)...") self.client = None self.collection = None try: import chromadb import os db_path = os.environ.get("CHROMA_DB_PATH", "./archive_db") self.client = chromadb.PersistentClient(path=db_path) # Get or create collection self.collection = self.client.get_or_create_collection(name="cora_archives") print("Memory database connected successfully.") except ImportError as e: print(f"Warning: Memory dependencies not found ({e}). Archive features will be disabled.") except Exception as e: print(f"Warning: Memory initialization failed ({e}). Archive features will be disabled.") def save(self, image_path, embedding, prompt, tags): """Saves an archive entry.""" if not self.collection: return None entry_id = str(uuid.uuid4()) # Metadata must be simple types metadata = { "path": image_path, "prompt": prompt, "tags": ",".join(tags), "timestamp": str(datetime.datetime.now()) } self.collection.add( embeddings=[embedding], documents=[prompt], metadatas=[metadata], ids=[entry_id] ) print(f"Saved to archive: {entry_id}") return entry_id def search_by_vector(self, vector, k=5): """Finds k nearest images to the query vector.""" if not self.collection or not vector: return {'ids': [], 'metadatas': [], 'distances': []} results = self.collection.query( query_embeddings=[vector], n_results=k ) return results def search_hybrid(self, vector, k=5, tag_filter=None, source_filter=None): """ Hybrid search: semantic similarity + metadata filtering. Args: vector: Embedding vector for semantic search k: Number of results (will retrieve more then filter) tag_filter: List of tags that must be present (e.g., ["rome", "armor"]) source_filter: Source filter (e.g., "met_museum_open_access") """ if not self.collection or not vector: return {'ids': [], 'metadatas': [], 'distances': []} # First, get top 3x candidates via semantic search candidates = self.collection.query( query_embeddings=[vector], n_results=k * 3 # Over-retrieve to allow filtering ) if not candidates['ids'] or not candidates['ids'][0]: return {'ids': [], 'metadatas': [], 'distances': []} # Filter by metadata filtered_ids = [] filtered_metadatas = [] filtered_distances = [] for i, uid in enumerate(candidates['ids'][0]): metadata = candidates['metadatas'][0][i] tags_str = metadata.get('tags', '') tags = [t.strip().lower() for t in tags_str.split(',')] # Check source filter if source_filter and source_filter.lower() not in tags: continue # Check tag filter (all tags must be present) if tag_filter: tag_filter_lower = [t.lower() for t in tag_filter] if not all(any(ft in tag for tag in tags) for ft in tag_filter_lower): continue filtered_ids.append(uid) filtered_metadatas.append(metadata) filtered_distances.append(candidates['distances'][0][i]) if len(filtered_ids) >= k: break return { 'ids': [filtered_ids] if filtered_ids else [], 'metadatas': [filtered_metadatas] if filtered_metadatas else [], 'distances': [filtered_distances] if filtered_distances else [] } if __name__ == "__main__": mem = CoraMemory() print("Memory system ready.")