File size: 6,979 Bytes
df5d609
4994b71
 
 
df5d609
d147321
 
4994b71
 
df5d609
 
4994b71
 
df5d609
4994b71
df5d609
4994b71
df5d609
4994b71
 
df5d609
 
 
 
 
4994b71
 
df5d609
 
4994b71
 
 
 
 
 
 
 
 
 
d147321
 
 
4994b71
 
 
 
 
 
 
 
 
 
 
 
 
 
 
df5d609
 
 
 
 
 
 
 
 
 
4994b71
 
df5d609
4994b71
 
 
 
 
 
 
 
df5d609
4994b71
 
 
df5d609
4994b71
 
df5d609
 
 
 
d147321
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4994b71
 
 
 
 
 
 
 
 
df5d609
 
4994b71
 
 
 
 
df5d609
 
 
 
4994b71
 
 
 
 
 
 
 
 
 
 
 
 
 
0152ed4
 
4994b71
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
df5d609
 
4994b71
 
df5d609
 
4994b71
 
 
 
 
 
df5d609
4994b71
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
import json
import sys
import time
from datetime import datetime
from pathlib import Path
import sqlite3
import shutil

from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
from tenacity import (
    retry,
    retry_if_exception,
    stop_after_attempt,
    wait_exponential
)
from tqdm import tqdm

# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))

from config.settings import Settings  # noqa: E402
from src.utils.data_loader import LyricsLoader  # noqa: E402


class LyricsProcessor:
    """Process lyrics files into embeddings for vector search."""

    def __init__(
        self,
        lyrics_dir: str,
        output_dir: str,
        batch_size: int = 100
    ):
        self.lyrics_dir = Path(lyrics_dir)
        self.output_dir = Path(output_dir)
        self.batch_size = batch_size
        self.embeddings = OpenAIEmbeddings()
        self.collection_name = Settings.CHROMA_COLLECTION_NAME
        
        print(f"Using collection name: {self.collection_name}")
        
        # Configure text splitter for lyrics
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=300,
            chunk_overlap=75,
            separators=["\n\n", "\n", " ", ""],
            keep_separator=True
        )
        
        # Initialize loader
        self.loader = LyricsLoader(lyrics_dir)
        
        # Ensure output directory exists
        self.output_dir.mkdir(parents=True, exist_ok=True)
        
    def validate_text_splitter(self):
        """Validate text splitter configuration."""
        required_attrs = ['_chunk_size', '_chunk_overlap']
        missing_attrs = [
            attr for attr in required_attrs 
            if not hasattr(self.text_splitter, attr)
        ]
        if missing_attrs:
            msg = f"Text splitter missing required attributes: {missing_attrs}"
            raise AttributeError(msg)

    def _is_rate_limit_error(self, exception):
        """Check if exception is a rate limit error."""
        return "rate_limit" in str(exception).lower()

    @retry(
        retry=retry_if_exception(_is_rate_limit_error),
        wait=wait_exponential(multiplier=1, min=4, max=10),
        stop=stop_after_attempt(3)
    )
    def _create_embeddings_with_backoff(self, vector_store, batch):
        """Create embeddings with retry logic."""
        return vector_store.add_documents(batch)

    def process_lyrics(self) -> None:
        """Main processing pipeline for lyrics."""
        print("Starting lyrics processing pipeline...")
        
        # Validate configuration before starting
        print("Validating configuration...")
        self.validate_text_splitter()
        
        # Check for existing collection
        chroma_dir = Path(self.output_dir) / "chroma"
        if chroma_dir.exists():
            sqlite_file = chroma_dir / "chroma.sqlite3"
            if sqlite_file.exists():
                try:
                    conn = sqlite3.connect(sqlite_file)
                    cursor = conn.cursor()
                    cursor.execute("SELECT name FROM collections WHERE name = ?", 
                                 (self.collection_name,))
                    if cursor.fetchone():
                        response = input(
                            f"\nWarning: Collection '{self.collection_name}' already exists.\n"
                            "Do you want to delete and recreate? (y/N): "
                        )
                        if response.lower() != 'y':
                            print("Aborting.")
                            return
                        print("Removing existing collection...")
                        shutil.rmtree(chroma_dir)
                        chroma_dir.mkdir(parents=True)
                    conn.close()
                except Exception as e:
                    print(f"Error checking existing collection: {e}")
                    print("Continuing with processing...")
        
        # Load all lyrics documents
        print("Loading lyrics files...")
        documents = self.loader.load_lyrics()
        
        if not documents:
            raise ValueError("No valid lyrics files found")
            
        print(f"Loaded {len(documents)} valid lyrics files")
        
        # Split documents
        print("Processing documents...")
        processed_docs = []
        for doc in tqdm(documents, desc="Processing documents"):
            splits = self.text_splitter.split_documents([doc])
            processed_docs.extend(splits)
            
        n_docs = len(documents)
        n_chunks = len(processed_docs)
        chunks_msg = f"Created {n_chunks} chunks from {n_docs}"
        print(f"{chunks_msg} documents")
        
        # Create embeddings in batches
        print("Creating embeddings...")
        total_chunks = len(processed_docs)
        
        for i in tqdm(range(0, total_chunks, self.batch_size)):
            batch = processed_docs[i:i + self.batch_size]
            
            try:
                if i == 0:
                    # Initialize vector store with first batch
                    vector_store = Chroma.from_documents(
                        documents=batch,
                        embedding=self.embeddings,
                        persist_directory=str(self.output_dir / "chroma"),
                        collection_name=self.collection_name
                    )
                else:
                    # Add subsequent batches
                    self._create_embeddings_with_backoff(vector_store, batch)
                
                # Rate limit cooldown
                time.sleep(2)
                
            except Exception as e:
                if self._is_rate_limit_error(e):
                    print("Rate limit reached. Waiting before retry...")
                    time.sleep(60)
                    continue
                raise
        
        # Save processing metadata
        metadata = {
            'processed_at': datetime.now().isoformat(),
            'total_documents': len(documents),
            'total_chunks': total_chunks,
            'chunk_size': getattr(self.text_splitter, '_chunk_size', 300),
            'chunk_overlap': getattr(self.text_splitter, '_chunk_overlap', 75)
        }
        
        meta_path = self.output_dir / 'processing_metadata.json'
        with open(meta_path, 'w') as f:
            json.dump(metadata, f, indent=2)
            
        print("Processing complete!")
        print(f"Processed {len(documents)} documents into {total_chunks} chunks")
        print(f"Embeddings saved to {self.output_dir / 'chroma'}")


if __name__ == "__main__":
    processor = LyricsProcessor(
        lyrics_dir=str(Settings.LYRICS_DIR),
        output_dir=str(Settings.EMBEDDINGS_DIR),
        batch_size=100
    )
    processor.process_lyrics()