import json import sys import time from datetime import datetime from pathlib import Path import sqlite3 import shutil from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.vectorstores import Chroma from langchain_openai import OpenAIEmbeddings from tenacity import ( retry, retry_if_exception, stop_after_attempt, wait_exponential ) from tqdm import tqdm # Add parent directory to path for imports sys.path.insert(0, str(Path(__file__).parent.parent)) from config.settings import Settings # noqa: E402 from src.utils.data_loader import LyricsLoader # noqa: E402 class LyricsProcessor: """Process lyrics files into embeddings for vector search.""" def __init__( self, lyrics_dir: str, output_dir: str, batch_size: int = 100 ): self.lyrics_dir = Path(lyrics_dir) self.output_dir = Path(output_dir) self.batch_size = batch_size self.embeddings = OpenAIEmbeddings() self.collection_name = Settings.CHROMA_COLLECTION_NAME print(f"Using collection name: {self.collection_name}") # Configure text splitter for lyrics self.text_splitter = RecursiveCharacterTextSplitter( chunk_size=300, chunk_overlap=75, separators=["\n\n", "\n", " ", ""], keep_separator=True ) # Initialize loader self.loader = LyricsLoader(lyrics_dir) # Ensure output directory exists self.output_dir.mkdir(parents=True, exist_ok=True) def validate_text_splitter(self): """Validate text splitter configuration.""" required_attrs = ['_chunk_size', '_chunk_overlap'] missing_attrs = [ attr for attr in required_attrs if not hasattr(self.text_splitter, attr) ] if missing_attrs: msg = f"Text splitter missing required attributes: {missing_attrs}" raise AttributeError(msg) def _is_rate_limit_error(self, exception): """Check if exception is a rate limit error.""" return "rate_limit" in str(exception).lower() @retry( retry=retry_if_exception(_is_rate_limit_error), wait=wait_exponential(multiplier=1, min=4, max=10), stop=stop_after_attempt(3) ) def _create_embeddings_with_backoff(self, vector_store, batch): """Create embeddings with retry logic.""" return vector_store.add_documents(batch) def process_lyrics(self) -> None: """Main processing pipeline for lyrics.""" print("Starting lyrics processing pipeline...") # Validate configuration before starting print("Validating configuration...") self.validate_text_splitter() # Check for existing collection chroma_dir = Path(self.output_dir) / "chroma" if chroma_dir.exists(): sqlite_file = chroma_dir / "chroma.sqlite3" if sqlite_file.exists(): try: conn = sqlite3.connect(sqlite_file) cursor = conn.cursor() cursor.execute("SELECT name FROM collections WHERE name = ?", (self.collection_name,)) if cursor.fetchone(): response = input( f"\nWarning: Collection '{self.collection_name}' already exists.\n" "Do you want to delete and recreate? (y/N): " ) if response.lower() != 'y': print("Aborting.") return print("Removing existing collection...") shutil.rmtree(chroma_dir) chroma_dir.mkdir(parents=True) conn.close() except Exception as e: print(f"Error checking existing collection: {e}") print("Continuing with processing...") # Load all lyrics documents print("Loading lyrics files...") documents = self.loader.load_lyrics() if not documents: raise ValueError("No valid lyrics files found") print(f"Loaded {len(documents)} valid lyrics files") # Split documents print("Processing documents...") processed_docs = [] for doc in tqdm(documents, desc="Processing documents"): splits = self.text_splitter.split_documents([doc]) processed_docs.extend(splits) n_docs = len(documents) n_chunks = len(processed_docs) chunks_msg = f"Created {n_chunks} chunks from {n_docs}" print(f"{chunks_msg} documents") # Create embeddings in batches print("Creating embeddings...") total_chunks = len(processed_docs) for i in tqdm(range(0, total_chunks, self.batch_size)): batch = processed_docs[i:i + self.batch_size] try: if i == 0: # Initialize vector store with first batch vector_store = Chroma.from_documents( documents=batch, embedding=self.embeddings, persist_directory=str(self.output_dir / "chroma"), collection_name=self.collection_name ) else: # Add subsequent batches self._create_embeddings_with_backoff(vector_store, batch) # Rate limit cooldown time.sleep(2) except Exception as e: if self._is_rate_limit_error(e): print("Rate limit reached. Waiting before retry...") time.sleep(60) continue raise # Save processing metadata metadata = { 'processed_at': datetime.now().isoformat(), 'total_documents': len(documents), 'total_chunks': total_chunks, 'chunk_size': getattr(self.text_splitter, '_chunk_size', 300), 'chunk_overlap': getattr(self.text_splitter, '_chunk_overlap', 75) } meta_path = self.output_dir / 'processing_metadata.json' with open(meta_path, 'w') as f: json.dump(metadata, f, indent=2) print("Processing complete!") print(f"Processed {len(documents)} documents into {total_chunks} chunks") print(f"Embeddings saved to {self.output_dir / 'chroma'}") if __name__ == "__main__": processor = LyricsProcessor( lyrics_dir=str(Settings.LYRICS_DIR), output_dir=str(Settings.EMBEDDINGS_DIR), batch_size=100 ) processor.process_lyrics()