| import uuid
|
| import datetime
|
|
|
| class CoraMemory:
|
| def __init__(self):
|
| print("Initializing CoraMemory (The Archive)...")
|
| self.client = None
|
| self.collection = None
|
|
|
| try:
|
| import chromadb
|
| import os
|
| db_path = os.environ.get("CHROMA_DB_PATH", "./archive_db")
|
| self.client = chromadb.PersistentClient(path=db_path)
|
|
|
|
|
| self.collection = self.client.get_or_create_collection(name="cora_archives")
|
| print("Memory database connected successfully.")
|
| except ImportError as e:
|
| print(f"Warning: Memory dependencies not found ({e}). Archive features will be disabled.")
|
| except Exception as e:
|
| print(f"Warning: Memory initialization failed ({e}). Archive features will be disabled.")
|
|
|
| def save(self, image_path, embedding, prompt, tags):
|
| """Saves an archive entry."""
|
| if not self.collection:
|
| return None
|
|
|
| entry_id = str(uuid.uuid4())
|
|
|
|
|
| metadata = {
|
| "path": image_path,
|
| "prompt": prompt,
|
| "tags": ",".join(tags),
|
| "timestamp": str(datetime.datetime.now())
|
| }
|
|
|
| self.collection.add(
|
| embeddings=[embedding],
|
| documents=[prompt],
|
| metadatas=[metadata],
|
| ids=[entry_id]
|
| )
|
| print(f"Saved to archive: {entry_id}")
|
| return entry_id
|
|
|
| def search_by_vector(self, vector, k=5):
|
| """Finds k nearest images to the query vector."""
|
| if not self.collection or not vector:
|
| return {'ids': [], 'metadatas': [], 'distances': []}
|
|
|
| results = self.collection.query(
|
| query_embeddings=[vector],
|
| n_results=k
|
| )
|
| return results
|
|
|
| def search_hybrid(self, vector, k=5, tag_filter=None, source_filter=None):
|
| """
|
| Hybrid search: semantic similarity + metadata filtering.
|
|
|
| Args:
|
| vector: Embedding vector for semantic search
|
| k: Number of results (will retrieve more then filter)
|
| tag_filter: List of tags that must be present (e.g., ["rome", "armor"])
|
| source_filter: Source filter (e.g., "met_museum_open_access")
|
| """
|
| if not self.collection or not vector:
|
| return {'ids': [], 'metadatas': [], 'distances': []}
|
|
|
|
|
| candidates = self.collection.query(
|
| query_embeddings=[vector],
|
| n_results=k * 3
|
| )
|
|
|
| if not candidates['ids'] or not candidates['ids'][0]:
|
| return {'ids': [], 'metadatas': [], 'distances': []}
|
|
|
|
|
| filtered_ids = []
|
| filtered_metadatas = []
|
| filtered_distances = []
|
|
|
| for i, uid in enumerate(candidates['ids'][0]):
|
| metadata = candidates['metadatas'][0][i]
|
| tags_str = metadata.get('tags', '')
|
| tags = [t.strip().lower() for t in tags_str.split(',')]
|
|
|
|
|
| if source_filter and source_filter.lower() not in tags:
|
| continue
|
|
|
|
|
| if tag_filter:
|
| tag_filter_lower = [t.lower() for t in tag_filter]
|
| if not all(any(ft in tag for tag in tags) for ft in tag_filter_lower):
|
| continue
|
|
|
| filtered_ids.append(uid)
|
| filtered_metadatas.append(metadata)
|
| filtered_distances.append(candidates['distances'][0][i])
|
|
|
| if len(filtered_ids) >= k:
|
| break
|
|
|
| return {
|
| 'ids': [filtered_ids] if filtered_ids else [],
|
| 'metadatas': [filtered_metadatas] if filtered_metadatas else [],
|
| 'distances': [filtered_distances] if filtered_distances else []
|
| }
|
|
|
| if __name__ == "__main__":
|
| mem = CoraMemory()
|
| print("Memory system ready.")
|
|
|