Spaces:
Running
Running
File size: 8,531 Bytes
968e24d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 | """
Generate embeddings in CHUNKS - GPU-safe with resume capability
Processes 20K paragraphs at a time with cooling breaks
"""
from sentence_transformers import SentenceTransformer
import json
import numpy as np
from pathlib import Path
from tqdm import tqdm
import torch
import time
class ChunkedEmbeddingGenerator:
"""Generate embeddings in safe chunks with resume capability"""
def __init__(self, model_name: str = "BAAI/bge-base-en-v1.5", chunk_size: int = 20000):
"""
Initialize with sentence transformer
chunk_size: Process this many paragraphs at a time
"""
print(f"Loading model: {model_name}")
# Check CUDA availability
if torch.cuda.is_available():
print(f"β CUDA available! GPU: {torch.cuda.get_device_name(0)}")
print(f" GPU memory: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.1f} GB")
self.device = 'cuda'
else:
print("β οΈ CUDA not available, using CPU")
self.device = 'cpu'
self.model = SentenceTransformer(model_name)
self.model.to(self.device)
self.chunk_size = chunk_size
print(f"β Model loaded on: {self.device}")
print(f"β Chunk size: {chunk_size:,} paragraphs")
def load_paragraphs(self, index_file: Path):
"""Load paragraphs from JSONL"""
print("\nLoading paragraphs from index...")
paragraphs = []
with open(index_file, 'r', encoding='utf-8') as f:
total_lines = sum(1 for _ in f)
with open(index_file, 'r', encoding='utf-8') as f:
for line in tqdm(f, total=total_lines, desc="Loading index"):
para = json.loads(line)
paragraphs.append(para)
print(f"β Loaded {len(paragraphs):,} paragraphs")
return paragraphs
def process_chunk(self, texts, batch_size=64):
"""Process one chunk of texts"""
embeddings_list = []
with tqdm(total=len(texts), desc="Processing chunk", unit="para") as pbar:
for i in range(0, len(texts), batch_size):
batch = texts[i:i+batch_size]
# Generate embeddings
batch_embeddings = self.model.encode(
batch,
convert_to_numpy=True,
normalize_embeddings=True,
show_progress_bar=False,
device=self.device
)
embeddings_list.append(batch_embeddings)
pbar.update(len(batch))
# Clear GPU cache every 10 batches
if self.device == 'cuda' and i % (batch_size * 10) == 0:
torch.cuda.empty_cache()
return np.vstack(embeddings_list)
def generate_embeddings_chunked(self, index_file: Path, output_dir: Path, batch_size: int = 64):
"""Generate embeddings in chunks with resume capability"""
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
chunks_dir = output_dir / "chunks"
chunks_dir.mkdir(exist_ok=True)
# Load paragraphs
paragraphs = self.load_paragraphs(index_file)
total_paras = len(paragraphs)
# Calculate chunks
num_chunks = (total_paras + self.chunk_size - 1) // self.chunk_size
print(f"\nβ Will process in {num_chunks} chunks of {self.chunk_size:,} paragraphs each")
# Check for existing chunks
existing_chunks = list(chunks_dir.glob("chunk_*.npy"))
completed_chunks = len(existing_chunks)
if completed_chunks > 0:
print(f"β Found {completed_chunks} existing chunks")
response = input(f"Resume from chunk {completed_chunks + 1}? (yes/no): ")
if response.lower() != 'yes':
print("Starting fresh...")
for f in existing_chunks:
f.unlink()
completed_chunks = 0
# Process chunks
start_time = time.time()
for chunk_idx in range(completed_chunks, num_chunks):
print(f"\n{'='*70}")
print(f"CHUNK {chunk_idx + 1}/{num_chunks}")
print(f"{'='*70}")
# Get chunk data
start_idx = chunk_idx * self.chunk_size
end_idx = min(start_idx + self.chunk_size, total_paras)
chunk_paras = paragraphs[start_idx:end_idx]
print(f"Processing paragraphs {start_idx:,} to {end_idx:,}")
# Extract texts
texts = [p['text'] for p in chunk_paras]
ids = [p['id'] for p in chunk_paras]
# Process chunk
chunk_start = time.time()
embeddings = self.process_chunk(texts, batch_size)
chunk_time = time.time() - chunk_start
print(f"β Chunk completed in {chunk_time/60:.1f} minutes")
print(f" Speed: {len(texts)/chunk_time:.1f} para/s")
# Save chunk
chunk_file = chunks_dir / f"chunk_{chunk_idx:03d}.npy"
ids_file = chunks_dir / f"chunk_{chunk_idx:03d}_ids.json"
np.save(chunk_file, embeddings)
with open(ids_file, 'w') as f:
json.dump(ids, f)
print(f"β Saved to: {chunk_file.name}")
# Clear GPU and add cooling break
if self.device == 'cuda':
torch.cuda.empty_cache()
if chunk_idx < num_chunks - 1: # Not last chunk
print("\nβΈοΈ Cooling break: 10 seconds...")
time.sleep(10)
# Combine all chunks
print(f"\n{'='*70}")
print("COMBINING CHUNKS...")
print(f"{'='*70}")
all_embeddings = []
all_ids = []
for chunk_idx in tqdm(range(num_chunks), desc="Loading chunks"):
chunk_file = chunks_dir / f"chunk_{chunk_idx:03d}.npy"
ids_file = chunks_dir / f"chunk_{chunk_idx:03d}_ids.json"
embeddings = np.load(chunk_file)
with open(ids_file, 'r') as f:
ids = json.load(f)
all_embeddings.append(embeddings)
all_ids.extend(ids)
final_embeddings = np.vstack(all_embeddings)
print(f"β Combined shape: {final_embeddings.shape}")
# Save final files
print("\nSaving final files...")
embeddings_file = output_dir / "paragraph_embeddings.npy"
np.save(embeddings_file, final_embeddings)
print(f"β Saved: {embeddings_file}")
ids_file = output_dir / "paragraph_ids.json"
with open(ids_file, 'w') as f:
json.dump(all_ids, f)
print(f"β Saved: {ids_file}")
# Save metadata
total_time = time.time() - start_time
metadata = {
'model_name': "BAAI/bge-base-en-v1.5",
'embedding_dim': int(final_embeddings.shape[1]),
'total_paragraphs': len(all_ids),
'device_used': self.device,
'batch_size': batch_size,
'chunk_size': self.chunk_size,
'num_chunks': num_chunks,
'total_time_minutes': total_time / 60
}
with open(output_dir / "embedding_metadata.json", 'w') as f:
json.dump(metadata, f, indent=2)
print(f"\nβ Total time: {total_time/60:.1f} minutes")
print(f" Average speed: {len(all_ids)/total_time:.1f} para/s")
return final_embeddings, all_ids
if __name__ == "__main__":
print("="*70)
print("NyayLens - Chunked Embedding Generation (GPU-Safe)")
print("="*70)
print()
generator = ChunkedEmbeddingGenerator(
model_name="BAAI/bge-base-en-v1.5",
chunk_size=20000 # Process 20K at a time
)
embeddings, ids = generator.generate_embeddings_chunked(
index_file=Path("data/processed/indexed/paragraph_index.jsonl"),
output_dir=Path("data/processed/embeddings"),
batch_size=64 # Reduced for safety
)
print()
print("="*70)
print(f"β COMPLETE! Generated {len(embeddings):,} embeddings")
print("="*70)
|