Qar-Raz commited on
Commit
f1d2c2b
·
verified ·
1 Parent(s): c27a4e3

Sync backend Docker context from GitHub main

Browse files
Files changed (3) hide show
  1. data/data_loader.py +91 -0
  2. data/ingest.py +310 -0
  3. data/vector_db.py +247 -0
data/data_loader.py ADDED
@@ -0,0 +1,91 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import re
2
+ import pandas as pd
3
+ from typing import List, Dict, Any
4
+
5
+
6
+ def load_cbt_book(file_path: str = "EntireBookCleaned.txt") -> pd.DataFrame:
7
+ """
8
+ Loads the CBT book from a text file and parses it into documents.
9
+ Each page is treated as a separate document.
10
+
11
+ Args:
12
+ file_path: Path to the cleaned book text file
13
+
14
+ Returns:
15
+ DataFrame with columns: id, title, url, full_text
16
+ """
17
+ try:
18
+ with open(file_path, 'r', encoding='utf-8') as f:
19
+ content = f.read()
20
+ except FileNotFoundError:
21
+ raise FileNotFoundError(f"Book file not found: {file_path}")
22
+
23
+ # Split content by page markers
24
+ # Pattern matches "--- Page X ---" or "--- Page X of Y ---"
25
+ page_pattern = r'---\s*Page\s+(\d+)(?:\s+of\s+\d+)?\s*---'
26
+
27
+ # Split the content into pages
28
+ pages = re.split(page_pattern, content)
29
+
30
+ # pages[0] is content before first page marker (usually empty)
31
+ # Then alternating: page_number, page_content, page_number, page_content...
32
+
33
+ documents = []
34
+ i = 1 # Start from first page number
35
+
36
+ while i < len(pages) - 1:
37
+ page_num = pages[i].strip()
38
+ page_content = pages[i + 1].strip() if i + 1 < len(pages) else ""
39
+
40
+ # Clean up the content - remove excessive whitespace
41
+ page_content = re.sub(r'\n{3,}', '\n\n', page_content)
42
+ page_content = page_content.strip()
43
+
44
+ if page_content: # Only add non-empty pages
45
+ # Extract a title from the first line if possible
46
+ lines = page_content.split('\n')
47
+ title_line = lines[0].strip() if lines else f"Page {page_num}"
48
+
49
+ # Use first meaningful line as title, or default to page number
50
+ if len(title_line) > 10 and len(title_line) < 200:
51
+ title = title_line
52
+ else:
53
+ title = f"CBT Book - Page {page_num}"
54
+
55
+ documents.append({
56
+ "id": f"cbt-page-{page_num}",
57
+ "title": title,
58
+ "url": f"https://res.cloudinary.com/dajb4c1g5/image/upload/v1774864993/topic_pdfs/93/merged_pdf_1774864989649.pdf.pdf#page={page_num}",
59
+ "full_text": page_content
60
+ })
61
+
62
+ i += 2 # Move to next page number
63
+
64
+ if not documents:
65
+ raise ValueError("No documents were parsed from the book file")
66
+
67
+ df = pd.DataFrame(documents)
68
+ print(f"Loaded {len(df)} pages from CBT book")
69
+ return df
70
+
71
+
72
+ def get_book_stats(df: pd.DataFrame) -> Dict[str, Any]:
73
+ """
74
+ Get statistics about the loaded book.
75
+
76
+ Args:
77
+ df: DataFrame containing book pages
78
+
79
+ Returns:
80
+ Dictionary with statistics
81
+ """
82
+ total_chars = df['full_text'].str.len().sum()
83
+ avg_chars = df['full_text'].str.len().mean()
84
+
85
+ return {
86
+ "total_pages": len(df),
87
+ "total_characters": total_chars,
88
+ "average_chars_per_page": round(avg_chars, 2),
89
+ "min_chars": df['full_text'].str.len().min(),
90
+ "max_chars": df['full_text'].str.len().max()
91
+ }
data/ingest.py ADDED
@@ -0,0 +1,310 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Script to ingest CBT book data into Pinecone vector database.
3
+ Ingests the book 6 times with different chunking formats for ablation study.
4
+ All chunks are stored in a SINGLE index with metadata to differentiate.
5
+ Run this once before starting the API server.
6
+ """
7
+ import os
8
+ import time
9
+ from dotenv import load_dotenv
10
+ from config_loader import cfg
11
+ from data.data_loader import load_cbt_book, get_book_stats
12
+ from data.vector_db import get_pinecone_index, refresh_pinecone_index
13
+ from retriever.processor import ChunkProcessor
14
+
15
+
16
+ # 6 different chunking techniques for ablation study
17
+ CHUNKING_TECHNIQUES = [
18
+ {
19
+ "name": "fixed",
20
+ "description": "Fixed-size chunking - splits every N characters (may cut sentences mid-way)",
21
+ "chunk_size": 1000,
22
+ "chunk_overlap": 100,
23
+ "kwargs": {"separator": ""}, # No separator for fixed splitting
24
+ },
25
+ {
26
+ "name": "sentence",
27
+ "description": "Sentence-level chunking - respects sentence boundaries (NLTK)",
28
+ "chunk_size": 1000,
29
+ "chunk_overlap": 100,
30
+ "kwargs": {},
31
+ },
32
+ {
33
+ "name": "paragraph",
34
+ "description": "Paragraph-level chunking - uses natural paragraph breaks",
35
+ "chunk_size": 2500,
36
+ "chunk_overlap": 100,
37
+ "kwargs": {"separator": "\n\n"}, # Split on paragraph breaks
38
+ },
39
+ # {
40
+ # "name": "semantic",
41
+ # "description": "Semantic chunking - splits where topic/meaning shifts (embedding similarity)",
42
+ # "chunk_size": 2000,
43
+ # "chunk_overlap": 100,
44
+ # "kwargs": {"breakpoint_threshold_type": "percentile", "breakpoint_threshold_amount": 70},
45
+ # },
46
+ {
47
+ "name": "recursive",
48
+ "description": "Recursive chunking - hierarchical splitting (paragraphs → sentences → words → chars)",
49
+ "chunk_size": 2000,
50
+ "chunk_overlap": 100,
51
+ "kwargs": {"separators": ["\n\n", "\n", ". ", "! ", "? ", "; ", ", ", " ", ""], "keep_separator": True},
52
+ },
53
+ {
54
+ "name": "page",
55
+ "description": "Page-level chunking - uses entire book pages as-is",
56
+ "chunk_size": 10000, # Very large to keep full pages
57
+ "chunk_overlap": 0, # No overlap between pages
58
+ "kwargs": {"separator": "--- Page"}, # Split on page markers
59
+ },
60
+ {
61
+ "name": "markdown",
62
+ "description": "Markdown header chunking - splits by headers (#, ##, ###, ####) with 4k char limit",
63
+ "chunk_size": 4000, # Max 4k chars per chunk
64
+ "chunk_overlap": 0, # No overlap for markdown
65
+ "kwargs": {}, # Custom implementation
66
+ },
67
+ ]
68
+
69
+
70
+ def ingest_single_technique(
71
+ raw_data,
72
+ proc,
73
+ technique_config,
74
+ technique_index,
75
+ total_techniques,
76
+ ):
77
+ """Chunk the book using a single technique and return chunks with metadata."""
78
+ technique_name = technique_config["name"]
79
+ chunk_size = technique_config["chunk_size"]
80
+ chunk_overlap = technique_config["chunk_overlap"]
81
+ kwargs = technique_config.get("kwargs", {})
82
+
83
+ print(f"\n[{technique_index}/{total_techniques}] Processing '{technique_name}'...")
84
+ print(f" Description: {technique_config['description']}")
85
+ print(f" Chunk size: {chunk_size}, Overlap: {chunk_overlap}")
86
+
87
+ # Chunk and embed
88
+ final_chunks = proc.process(
89
+ raw_data,
90
+ technique=technique_name,
91
+ chunk_size=chunk_size,
92
+ chunk_overlap=chunk_overlap,
93
+ max_docs=cfg.project.get("doc_limit"),
94
+ verbose=False,
95
+ **kwargs,
96
+ )
97
+
98
+ # Add technique metadata to each chunk for differentiation
99
+ # Prefix ID with technique name to ensure uniqueness across techniques
100
+ for chunk in final_chunks:
101
+ chunk["metadata"]["chunking_technique"] = technique_name
102
+ chunk["id"] = f"{technique_name}-{chunk['id']}"
103
+
104
+ print(f" Created {len(final_chunks)} chunks")
105
+
106
+ return final_chunks
107
+
108
+
109
+ def ingest_data():
110
+ """Load CBT book, chunk it 6 ways, and upload ALL to a SINGLE Pinecone index.
111
+
112
+ Returns:
113
+ Tuple of (all_chunks, configured_technique_chunks, processor) for reuse in retrieval pipeline.
114
+ """
115
+ load_dotenv()
116
+
117
+ pinecone_key = os.getenv("PINECONE_API_KEY")
118
+ if not pinecone_key:
119
+ raise RuntimeError("PINECONE_API_KEY not found in environment variables")
120
+
121
+ print("=" * 80)
122
+ print("CBT BOOK INGESTION PIPELINE - 6 TECHNIQUES (SINGLE INDEX)")
123
+ print("=" * 80)
124
+ print(f"\nTechniques to process: {len(CHUNKING_TECHNIQUES)}")
125
+ for i, tech in enumerate(CHUNKING_TECHNIQUES, 1):
126
+ print(f" {i}. {tech['name']}: {tech['description']}")
127
+ print(f"\nAll chunks will be stored in a SINGLE index: {cfg.db['base_index_name']}-{cfg.processing['technique']}")
128
+ print("Chunks are differentiated by 'chunking_technique' metadata field.")
129
+
130
+ # 1. Load the CBT book (once, reused for all techniques)
131
+ print(f"\n{'='*80}")
132
+ print("STEP 1: LOADING CBT BOOK")
133
+ print(f"{'='*80}")
134
+ print("\nLoading CBT book from EntireBookCleaned.txt...")
135
+ raw_data = load_cbt_book("data/EntireBookCleaned.txt")
136
+ stats = get_book_stats(raw_data)
137
+ print(f" Loaded {stats['total_pages']} pages")
138
+ print(f" Total characters: {stats['total_characters']:,}")
139
+ print(f" Average chars per page: {stats['average_chars_per_page']:.0f}")
140
+
141
+ # 2. Initialize processor (once, reused for all techniques)
142
+ print(f"\nInitializing embedding model: {cfg.processing['embedding_model']}")
143
+ proc = ChunkProcessor(model_name=cfg.processing['embedding_model'], verbose=False)
144
+
145
+ # 3. Process each technique sequentially and collect all chunks
146
+ print(f"\n{'='*80}")
147
+ print("STEP 2: CHUNKING WITH 6 TECHNIQUES")
148
+ print(f"{'='*80}")
149
+
150
+ all_chunks = []
151
+ configured_technique_chunks = []
152
+ results = {}
153
+
154
+ for i, technique in enumerate(CHUNKING_TECHNIQUES, 1):
155
+ try:
156
+ chunks = ingest_single_technique(
157
+ raw_data=raw_data,
158
+ proc=proc,
159
+ technique_config=technique,
160
+ technique_index=i,
161
+ total_techniques=len(CHUNKING_TECHNIQUES),
162
+ )
163
+ all_chunks.extend(chunks)
164
+
165
+ # Save chunks for the configured technique (for retrieval pipeline)
166
+ if technique["name"] == cfg.processing['technique']:
167
+ configured_technique_chunks = chunks
168
+
169
+ results[technique["name"]] = {
170
+ "status": "success",
171
+ "chunks": len(chunks),
172
+ }
173
+
174
+ # Wait between techniques to avoid rate limits (for embedding API)
175
+ if i < len(CHUNKING_TECHNIQUES):
176
+ print(f" Waiting 5 seconds before next technique (rate limit protection)...")
177
+ import time
178
+ time.sleep(5)
179
+
180
+ except Exception as e:
181
+ print(f" ERROR with technique '{technique['name']}': {e}")
182
+ results[technique["name"]] = {
183
+ "status": "failed",
184
+ "error": str(e),
185
+ }
186
+
187
+ # 4. Upload ALL chunks to a SINGLE Pinecone index
188
+ print(f"\n{'='*80}")
189
+ print("STEP 3: UPLOADING TO SINGLE PINECONE INDEX")
190
+ print(f"{'='*80}")
191
+
192
+ index_name = f"{cfg.db['base_index_name']}-{cfg.processing['technique']}"
193
+ print(f"\nIndex name: {index_name}")
194
+ print(f"Dimension: {cfg.db['dimension']}")
195
+ print(f"Metric: {cfg.db['metric']}")
196
+ print(f"Total chunks to upload: {len(all_chunks)}")
197
+
198
+ index = get_pinecone_index(
199
+ pinecone_key,
200
+ cfg.db['base_index_name'],
201
+ technique=cfg.processing['technique'],
202
+ dimension=cfg.db['dimension'],
203
+ metric=cfg.db['metric'],
204
+ )
205
+
206
+ print("Uploading " + str(len(all_chunks)) + " vectors to Pinecone...")
207
+ refresh_pinecone_index(index, all_chunks, batch_size=cfg.db['batch_size'])
208
+
209
+ # Upload sparse vectors to a separate index
210
+
211
+ print("Preparing to upload sparse vectors for BM25...")
212
+ try:
213
+ from pinecone import Pinecone, ServerlessSpec
214
+ try:
215
+ from pinecone_text.sparse import BM25Encoder
216
+ except ImportError:
217
+ print("Skipping BM25 indexing - run pip install pinecone-text")
218
+ return all_chunks, configured_technique_chunks, proc, index
219
+ pc = Pinecone(api_key=pinecone_key)
220
+
221
+ sparse_index_name = "cbt-book-sparse"
222
+ existing_indexes = [idx.name for idx in pc.list_indexes()]
223
+ if sparse_index_name not in existing_indexes:
224
+ print(f"Creating sparse index: {sparse_index_name}")
225
+ pc.create_index(
226
+ name=sparse_index_name,
227
+ dimension=512, # required space-filler dimension
228
+ metric="dotproduct",
229
+ spec=ServerlessSpec(cloud="aws", region="us-east-1")
230
+ )
231
+ # wait for index
232
+ import time
233
+ while not pc.describe_index(sparse_index_name).status["ready"]:
234
+ time.sleep(1)
235
+
236
+ sparse_index = pc.Index(sparse_index_name)
237
+
238
+ # Encode sparse vectors
239
+ print("Encoding sparse vectors...")
240
+ bm25 = BM25Encoder().default()
241
+ sparse_chunks = []
242
+
243
+ # Learn BM25
244
+ corpus = [chunk["metadata"]["text"] for chunk in all_chunks]
245
+ bm25.fit(corpus)
246
+
247
+ for chunk in all_chunks:
248
+ sparse_values = bm25.encode_documents(chunk["metadata"]["text"])
249
+
250
+ # Skip empty sparse vectors to prevent Pinecone errors
251
+ if not sparse_values.get("indices") or len(sparse_values.get("indices", [])) == 0:
252
+ continue
253
+
254
+ new_chunk = {
255
+ "id": chunk["id"],
256
+
257
+ "sparse_values": sparse_values,
258
+ "metadata": chunk["metadata"]
259
+ }
260
+ sparse_chunks.append(new_chunk)
261
+
262
+ print(f"Upserting {len(sparse_chunks)} valid sparse vectors to {sparse_index_name}...")
263
+
264
+ # Upsert sparse vectors
265
+ if sparse_chunks:
266
+ batch_size = cfg.db.get("batch_size", 100)
267
+ for i in range(0, len(sparse_chunks), batch_size):
268
+ batch = sparse_chunks[i:i+batch_size]
269
+ sparse_index.upsert(vectors=batch)
270
+ print("Sparse vector upsert complete.")
271
+ else:
272
+ print("No valid sparse vectors to upsert.")
273
+
274
+ except Exception as e:
275
+ print(f"Error during sparse vector upload: {e}")
276
+
277
+ # 5. Summary
278
+ print(f"\n{'='*80}")
279
+ print("INGESTION COMPLETE - SUMMARY")
280
+ print(f"{'='*80}")
281
+ print(f"\n{'Technique':<15} {'Status':<12} {'Chunks':<10}")
282
+ print("-" * 40)
283
+ total_chunks = 0
284
+ for tech in CHUNKING_TECHNIQUES:
285
+ name = tech["name"]
286
+ result = results.get(name, {})
287
+ status = result.get("status", "unknown")
288
+ chunks = result.get("chunks", 0)
289
+ if status == "success":
290
+ total_chunks += chunks
291
+ print(f"{name:<15} {status:<12} {chunks:<10}")
292
+ print("-" * 40)
293
+ print(f"{'TOTAL':<15} {'':<12} {total_chunks:<10}")
294
+
295
+ print(f"\nSingle index: {index_name}")
296
+ print(f"Total vectors: {len(all_chunks)}")
297
+ print("\nChunks can be filtered by 'chunking_technique' metadata field:")
298
+ for tech in CHUNKING_TECHNIQUES:
299
+ if results.get(tech["name"], {}).get("status") == "success":
300
+ print(f" - chunking_technique: '{tech['name']}'")
301
+
302
+ print("\nYou can now start the API server with:")
303
+ print(" python -m uvicorn api:app --host 0.0.0.0 --port 8000")
304
+
305
+ # Return chunks and processor for reuse in retrieval pipeline
306
+ return all_chunks, configured_technique_chunks, proc, index
307
+
308
+
309
+ if __name__ == "__main__":
310
+ ingest_data()
data/vector_db.py ADDED
@@ -0,0 +1,247 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import time
2
+ import re
3
+ import json
4
+ from pathlib import Path
5
+ from typing import Any, Dict, List
6
+ from pinecone import Pinecone, ServerlessSpec
7
+
8
+
9
+ # Added cacheing to reduce consecutive startup time
10
+ # --@Qamar
11
+
12
+ def slugify_technique(name):
13
+ """Converts 'Sentence Splitter' to 'sentence-splitter' for Pinecone naming."""
14
+ return re.sub(r'[^a-z0-9]+', '-', name.lower()).strip('-')
15
+
16
+ def get_index_by_name(api_key: str, index_name: str):
17
+ """
18
+ Directly connects to a Pinecone index by its full string name.
19
+ Useful for the API/Production side where the name is already known.
20
+ """
21
+ pc = Pinecone(api_key=api_key)
22
+
23
+ # Check if it exists first to avoid a 404 crash
24
+ existing_indexes = [idx.name for idx in pc.list_indexes()]
25
+ if index_name not in existing_indexes:
26
+ raise ValueError(f"Index '{index_name}' does not exist in your Pinecone project.")
27
+
28
+ print(f" Connecting to Index: {index_name}")
29
+ return pc.Index(index_name)
30
+
31
+ def get_pinecone_index(api_key, base_name, technique, dimension=384, metric="cosine"):
32
+ """
33
+ Creates/Returns an index specifically for a technique.
34
+ Example: 'arxiv-index-token'
35
+ """
36
+ pc = Pinecone(api_key=api_key)
37
+ tech_slug = slugify_technique(technique)
38
+ full_index_name = f"{base_name}-{tech_slug}"
39
+
40
+ existing_indexes = [idx.name for idx in pc.list_indexes()]
41
+
42
+ if full_index_name not in existing_indexes:
43
+ print(f" Creating specialized index: {full_index_name}...")
44
+ pc.create_index(
45
+ name=full_index_name,
46
+ dimension=dimension,
47
+ metric=metric,
48
+ spec=ServerlessSpec(cloud="aws", region="us-east-1")
49
+ )
50
+ # Wait for index to spin up
51
+ while not pc.describe_index(full_index_name).status['ready']:
52
+ time.sleep(1)
53
+
54
+ # Use our new helper to return the index object
55
+ return get_index_by_name(api_key, full_index_name)
56
+
57
+ def refresh_pinecone_index(index, final_chunks, batch_size=100):
58
+ """
59
+ Refreshes the specific index. Since index is now technique-specific,
60
+ we just check if it's already populated.
61
+ """
62
+ if not final_chunks:
63
+ print("No chunks provided to refresh.")
64
+ return False
65
+
66
+ try:
67
+ # Check current stats for this specific index
68
+ stats = index.describe_index_stats()
69
+ current_count = stats.get('total_vector_count', 0)
70
+ expected_count = len(final_chunks)
71
+
72
+ print(f" Index Stats -> Existing: {current_count} | New Chunks: {expected_count}")
73
+
74
+ if current_count == 0:
75
+ print(f"➕ Index is empty. Upserting {expected_count} vectors...")
76
+ vectors = prepare_vectors_for_upsert(final_chunks)
77
+ upsert_to_pinecone(index, vectors, batch_size)
78
+ return True
79
+
80
+ elif current_count < expected_count:
81
+ # Simple check to see if we need to top up or refresh
82
+ print(f" Vector count mismatch ({current_count} < {expected_count}). Updating index...")
83
+ vectors = prepare_vectors_for_upsert(final_chunks)
84
+ upsert_to_pinecone(index, vectors, batch_size)
85
+ return True
86
+
87
+ else:
88
+ print(f" Index is already populated with {current_count} vectors. Ready for search.")
89
+ return False
90
+
91
+ except Exception as e:
92
+ print(f" Error refreshing index: {e}")
93
+ return False
94
+
95
+ # Utility functions remain the same as previous version
96
+ def prepare_vectors_for_upsert(final_chunks):
97
+ vectors = []
98
+ for chunk in final_chunks:
99
+ meta = chunk.get('metadata', {})
100
+ vector = {
101
+ 'id': chunk['id'],
102
+ 'values': chunk['values'],
103
+ 'metadata': {
104
+ 'text': meta.get('text', ""),
105
+ 'title': meta.get('title', ""),
106
+ 'url': meta.get('url', ""),
107
+ 'chunk_index': meta.get('chunk_index', 0),
108
+ 'technique': meta.get('technique', "unknown"),
109
+ 'chunking_technique': meta.get('chunking_technique', "unknown")
110
+ }
111
+ }
112
+ #if 'sparse_values' in chunk:
113
+ #vector['sparse_values'] = chunk['sparse_values']
114
+ vectors.append(vector)
115
+ return vectors
116
+
117
+ def upsert_to_pinecone(index, chunks, batch_size=100):
118
+ for i in range(0, len(chunks), batch_size):
119
+ batch = chunks[i : i + batch_size]
120
+ index.upsert(vectors=batch)
121
+
122
+ # Some methods for loading chunks back from Pinecone with local caching to speed up BM25 initialization
123
+
124
+ def _sanitize_index_name(index_name: str) -> str:
125
+ return re.sub(r'[^a-zA-Z0-9._-]+', '-', index_name).strip('-') or 'default-index'
126
+
127
+
128
+ def _chunk_cache_path(cache_dir: str, index_name: str) -> Path:
129
+ cache_root = Path(cache_dir)
130
+ cache_root.mkdir(parents=True, exist_ok=True)
131
+ safe_name = _sanitize_index_name(index_name)
132
+ return cache_root / f"bm25_chunks_{safe_name}.json"
133
+
134
+
135
+ def _read_chunk_cache(path: Path) -> Dict[str, Any]:
136
+ with path.open("r", encoding="utf-8") as f:
137
+ return json.load(f)
138
+
139
+
140
+ def _write_chunk_cache(path: Path, payload: Dict[str, Any]) -> None:
141
+ with path.open("w", encoding="utf-8") as f:
142
+ json.dump(payload, f)
143
+
144
+
145
+ def load_chunks_with_local_cache(
146
+ index,
147
+ index_name: str,
148
+ cache_dir: str = ".cache",
149
+ batch_size: int = 100,
150
+ force_refresh: bool = False,
151
+ ) -> tuple[List[Dict[str, Any]], str]:
152
+
153
+ cache_file = _chunk_cache_path(cache_dir=cache_dir, index_name=index_name)
154
+ stats = index.describe_index_stats()
155
+ current_count = stats.get("total_vector_count", 0)
156
+
157
+ if not force_refresh and cache_file.exists():
158
+ try:
159
+ cached_payload = _read_chunk_cache(cache_file)
160
+ cached_meta = cached_payload.get("meta", {})
161
+ cached_count = cached_meta.get("vector_count", -1)
162
+ cached_chunks = cached_payload.get("chunks", [])
163
+
164
+ if cached_count == current_count and cached_chunks:
165
+ print(
166
+ f" Loaded BM25 chunk cache: {cache_file} "
167
+ f"(chunks={len(cached_chunks)}, vectors={cached_count})"
168
+ )
169
+ return cached_chunks, "cache"
170
+
171
+ print(
172
+ " BM25 cache stale or empty. "
173
+ f"cache_vectors={cached_count}, pinecone_vectors={current_count}. Refreshing..."
174
+ )
175
+ except Exception as e:
176
+ print(f" Failed to read BM25 cache ({cache_file}): {e}. Refreshing from Pinecone...")
177
+
178
+ chunks = load_chunks_from_pinecone(index=index, batch_size=batch_size)
179
+ payload = {
180
+ "meta": {
181
+ "index_name": index_name,
182
+ "vector_count": current_count,
183
+ "updated_at_epoch_s": int(time.time()),
184
+ },
185
+ "chunks": chunks,
186
+ }
187
+
188
+ try:
189
+ _write_chunk_cache(cache_file, payload)
190
+ print(f" Saved BM25 chunk cache: {cache_file} (chunks={len(chunks)})")
191
+ except Exception as e:
192
+ print(f" Failed to write BM25 cache ({cache_file}): {e}")
193
+
194
+ return chunks, "pinecone"
195
+
196
+
197
+ def load_chunks_from_pinecone(index, batch_size: int = 100) -> list[dict[str, any]]:
198
+ """
199
+ Scans the Pinecone index to retrieve all text metadata for the BM25 corpus.
200
+ """
201
+ stats = index.describe_index_stats()
202
+ namespaces = list(stats.get('namespaces', {}).keys())
203
+ # If no namespaces are explicitly named, Pinecone uses an empty string for the default
204
+ if not namespaces:
205
+ namespaces = [""]
206
+
207
+ all_chunks: List[Dict[str, Any]] = []
208
+ seen_ids = set()
209
+
210
+ print(f"Loading vectors for BM25 from namespaces: {namespaces}")
211
+
212
+ for ns in namespaces:
213
+ # Pinecone's list() generator returns batches of IDs
214
+ for id_batch in index.list(namespace=ns, limit=batch_size):
215
+ if not id_batch:
216
+ continue
217
+
218
+ # Fetch the actual content (metadata) for this batch of IDs
219
+ fetched = index.fetch(ids=id_batch, namespace=ns)
220
+ vectors = getattr(fetched, "vectors", {})
221
+
222
+ for vector_id, vector_data in vectors.items():
223
+ if vector_id in seen_ids:
224
+ continue
225
+ seen_ids.add(vector_id)
226
+
227
+ # Safely extract metadata
228
+ metadata = getattr(vector_data, "metadata", {})
229
+ if metadata is None:
230
+ metadata = {}
231
+ if not isinstance(metadata, dict):
232
+ metadata = dict(metadata)
233
+
234
+ text = metadata.get("text")
235
+
236
+ if not text:
237
+ continue
238
+
239
+ all_chunks.append({
240
+ "id": vector_id,
241
+ "metadata": metadata
242
+ })
243
+
244
+ print(f" Finished namespace: '{ns if ns else 'default'}'")
245
+
246
+ print(f"Total chunks loaded into memory: {len(all_chunks)}")
247
+ return all_chunks