Spaces:
Sleeping
Sleeping
| """ | |
| Script to migrate JSON embeddings to ChromaDB with metadata | |
| """ | |
| import json | |
| import logging | |
| from pathlib import Path | |
| from typing import List, Dict | |
| import chromadb | |
| from chromadb.config import Settings | |
| from tqdm import tqdm | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class EmbeddingMigrator: | |
| def __init__(self, embeddings_dir: Path, chroma_db_path: Path): | |
| self.embeddings_dir = embeddings_dir | |
| self.chroma_db_path = chroma_db_path | |
| # Initialize ChromaDB with persistent storage | |
| self.client = chromadb.PersistentClient( | |
| path=str(chroma_db_path), | |
| settings=Settings( | |
| anonymized_telemetry=False, | |
| allow_reset=True | |
| ) | |
| ) | |
| def create_collection(self): | |
| """Create or get the documentation collection.""" | |
| # Delete existing collection if it exists (for clean migration) | |
| try: | |
| self.client.delete_collection("documentation") | |
| logger.info("Deleted existing collection") | |
| except: | |
| pass | |
| # Create new collection | |
| self.collection = self.client.create_collection( | |
| name="documentation", | |
| metadata={"description": "Technical documentation for Harmony and Chorus products"} | |
| ) | |
| logger.info("Created new collection: documentation") | |
| def migrate_embedding_file(self, file_path: Path) -> int: | |
| """Migrate a single embedding JSON file to ChromaDB.""" | |
| logger.info(f"Migrating {file_path.name}...") | |
| with open(file_path, 'r') as f: | |
| data = json.load(f) | |
| # Extract metadata from filename | |
| store_name = file_path.stem # e.g., "harmony_1_8" | |
| # Parse product and version | |
| if store_name == "general_faq": | |
| product = "general" | |
| version = "all" | |
| else: | |
| parts = store_name.split("_", 1) | |
| if len(parts) == 2: | |
| product = parts[0] | |
| version = parts[1].replace("_", ".") | |
| else: | |
| product = "unknown" | |
| version = "unknown" | |
| chunks = data.get("chunks", []) | |
| # Prepare batch data | |
| ids = [] | |
| embeddings = [] | |
| metadatas = [] | |
| documents = [] | |
| for i, chunk in enumerate(chunks): | |
| # Generate unique ID | |
| chunk_id = f"{store_name}_chunk_{i}" | |
| ids.append(chunk_id) | |
| # Extract text and embedding | |
| text = chunk.get("text", "") | |
| embedding = chunk.get("embedding", []) | |
| documents.append(text) | |
| embeddings.append(embedding) | |
| # Build metadata | |
| metadata = { | |
| "product": product, | |
| "version": version, | |
| "store_name": store_name, | |
| "chunk_index": i, | |
| "chunk_id": chunk_id | |
| } | |
| # Add optional metadata if available | |
| if "metadata" in chunk: | |
| chunk_meta = chunk["metadata"] | |
| metadata.update({ | |
| "source": chunk_meta.get("source", ""), | |
| "page": chunk_meta.get("page", -1), | |
| "token_count": chunk_meta.get("token_count", 0) | |
| }) | |
| # Add chunk_id from original if available | |
| if "chunk_id" in chunk: | |
| metadata["original_chunk_id"] = chunk["chunk_id"] | |
| metadatas.append(metadata) | |
| # Add to ChromaDB in batches | |
| batch_size = 100 | |
| total_added = 0 | |
| for i in range(0, len(ids), batch_size): | |
| batch_end = min(i + batch_size, len(ids)) | |
| self.collection.add( | |
| ids=ids[i:batch_end], | |
| embeddings=embeddings[i:batch_end], | |
| metadatas=metadatas[i:batch_end], | |
| documents=documents[i:batch_end] | |
| ) | |
| total_added += (batch_end - i) | |
| logger.info(f" Added {total_added}/{len(ids)} chunks") | |
| return len(ids) | |
| def migrate_all(self): | |
| """Migrate all embedding files to ChromaDB.""" | |
| self.create_collection() | |
| # Find all JSON files | |
| json_files = list(self.embeddings_dir.glob("*.json")) | |
| logger.info(f"Found {len(json_files)} embedding files to migrate") | |
| total_chunks = 0 | |
| for file_path in json_files: | |
| chunks_added = self.migrate_embedding_file(file_path) | |
| total_chunks += chunks_added | |
| logger.info(f"\nMigration complete!") | |
| logger.info(f"Total chunks migrated: {total_chunks}") | |
| # Verify collection | |
| count = self.collection.count() | |
| logger.info(f"ChromaDB collection count: {count}") | |
| # Test query | |
| self.test_query() | |
| def test_query(self): | |
| """Test the migrated data with a sample query.""" | |
| logger.info("\nTesting ChromaDB queries...") | |
| # Test 1: Query with product/version filter | |
| results = self.collection.query( | |
| query_texts=["How to install Harmony?"], | |
| n_results=3, | |
| where={"$and": [{"product": "harmony"}, {"version": "1.8"}]} | |
| ) | |
| logger.info(f"Test query 1 returned {len(results['ids'][0])} results") | |
| if results['ids'][0]: | |
| logger.info(f" First result metadata: {results['metadatas'][0][0]}") | |
| # Test 2: Query across all versions | |
| results = self.collection.query( | |
| query_texts=["system requirements"], | |
| n_results=3, | |
| where={"product": {"$eq": "harmony"}} | |
| ) | |
| logger.info(f"Test query 2 returned {len(results['ids'][0])} results") | |
| # Test 3: Get unique products and versions | |
| all_data = self.collection.get() | |
| products_versions = set() | |
| for metadata in all_data['metadatas']: | |
| products_versions.add((metadata['product'], metadata['version'])) | |
| logger.info("\nAvailable products and versions:") | |
| for product, version in sorted(products_versions): | |
| logger.info(f" - {product} {version}") | |
| def main(): | |
| """Run the migration.""" | |
| # Set up paths | |
| script_dir = Path(__file__).parent | |
| project_root = script_dir.parent | |
| embeddings_dir = project_root / "data" / "embeddings" | |
| chroma_db_path = project_root / "data" / "chroma_db" | |
| # Create ChromaDB directory | |
| chroma_db_path.mkdir(parents=True, exist_ok=True) | |
| # Run migration | |
| migrator = EmbeddingMigrator(embeddings_dir, chroma_db_path) | |
| migrator.migrate_all() | |
| if __name__ == "__main__": | |
| main() |