v3_ai_assistant / py /scripts /migrate_to_chromadb.py
Julian Vanecek
Initial commit: AI Assistant Multi-Agent System for HuggingFace Spaces
bb80caa
"""
Script to migrate JSON embeddings to ChromaDB with metadata
"""
import json
import logging
from pathlib import Path
from typing import List, Dict
import chromadb
from chromadb.config import Settings
from tqdm import tqdm
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class EmbeddingMigrator:
def __init__(self, embeddings_dir: Path, chroma_db_path: Path):
self.embeddings_dir = embeddings_dir
self.chroma_db_path = chroma_db_path
# Initialize ChromaDB with persistent storage
self.client = chromadb.PersistentClient(
path=str(chroma_db_path),
settings=Settings(
anonymized_telemetry=False,
allow_reset=True
)
)
def create_collection(self):
"""Create or get the documentation collection."""
# Delete existing collection if it exists (for clean migration)
try:
self.client.delete_collection("documentation")
logger.info("Deleted existing collection")
except:
pass
# Create new collection
self.collection = self.client.create_collection(
name="documentation",
metadata={"description": "Technical documentation for Harmony and Chorus products"}
)
logger.info("Created new collection: documentation")
def migrate_embedding_file(self, file_path: Path) -> int:
"""Migrate a single embedding JSON file to ChromaDB."""
logger.info(f"Migrating {file_path.name}...")
with open(file_path, 'r') as f:
data = json.load(f)
# Extract metadata from filename
store_name = file_path.stem # e.g., "harmony_1_8"
# Parse product and version
if store_name == "general_faq":
product = "general"
version = "all"
else:
parts = store_name.split("_", 1)
if len(parts) == 2:
product = parts[0]
version = parts[1].replace("_", ".")
else:
product = "unknown"
version = "unknown"
chunks = data.get("chunks", [])
# Prepare batch data
ids = []
embeddings = []
metadatas = []
documents = []
for i, chunk in enumerate(chunks):
# Generate unique ID
chunk_id = f"{store_name}_chunk_{i}"
ids.append(chunk_id)
# Extract text and embedding
text = chunk.get("text", "")
embedding = chunk.get("embedding", [])
documents.append(text)
embeddings.append(embedding)
# Build metadata
metadata = {
"product": product,
"version": version,
"store_name": store_name,
"chunk_index": i,
"chunk_id": chunk_id
}
# Add optional metadata if available
if "metadata" in chunk:
chunk_meta = chunk["metadata"]
metadata.update({
"source": chunk_meta.get("source", ""),
"page": chunk_meta.get("page", -1),
"token_count": chunk_meta.get("token_count", 0)
})
# Add chunk_id from original if available
if "chunk_id" in chunk:
metadata["original_chunk_id"] = chunk["chunk_id"]
metadatas.append(metadata)
# Add to ChromaDB in batches
batch_size = 100
total_added = 0
for i in range(0, len(ids), batch_size):
batch_end = min(i + batch_size, len(ids))
self.collection.add(
ids=ids[i:batch_end],
embeddings=embeddings[i:batch_end],
metadatas=metadatas[i:batch_end],
documents=documents[i:batch_end]
)
total_added += (batch_end - i)
logger.info(f" Added {total_added}/{len(ids)} chunks")
return len(ids)
def migrate_all(self):
"""Migrate all embedding files to ChromaDB."""
self.create_collection()
# Find all JSON files
json_files = list(self.embeddings_dir.glob("*.json"))
logger.info(f"Found {len(json_files)} embedding files to migrate")
total_chunks = 0
for file_path in json_files:
chunks_added = self.migrate_embedding_file(file_path)
total_chunks += chunks_added
logger.info(f"\nMigration complete!")
logger.info(f"Total chunks migrated: {total_chunks}")
# Verify collection
count = self.collection.count()
logger.info(f"ChromaDB collection count: {count}")
# Test query
self.test_query()
def test_query(self):
"""Test the migrated data with a sample query."""
logger.info("\nTesting ChromaDB queries...")
# Test 1: Query with product/version filter
results = self.collection.query(
query_texts=["How to install Harmony?"],
n_results=3,
where={"$and": [{"product": "harmony"}, {"version": "1.8"}]}
)
logger.info(f"Test query 1 returned {len(results['ids'][0])} results")
if results['ids'][0]:
logger.info(f" First result metadata: {results['metadatas'][0][0]}")
# Test 2: Query across all versions
results = self.collection.query(
query_texts=["system requirements"],
n_results=3,
where={"product": {"$eq": "harmony"}}
)
logger.info(f"Test query 2 returned {len(results['ids'][0])} results")
# Test 3: Get unique products and versions
all_data = self.collection.get()
products_versions = set()
for metadata in all_data['metadatas']:
products_versions.add((metadata['product'], metadata['version']))
logger.info("\nAvailable products and versions:")
for product, version in sorted(products_versions):
logger.info(f" - {product} {version}")
def main():
"""Run the migration."""
# Set up paths
script_dir = Path(__file__).parent
project_root = script_dir.parent
embeddings_dir = project_root / "data" / "embeddings"
chroma_db_path = project_root / "data" / "chroma_db"
# Create ChromaDB directory
chroma_db_path.mkdir(parents=True, exist_ok=True)
# Run migration
migrator = EmbeddingMigrator(embeddings_dir, chroma_db_path)
migrator.migrate_all()
if __name__ == "__main__":
main()