NavyDevilDoc commited on
Commit
15383a5
·
verified ·
1 Parent(s): 10f62d5

Update src/rag_engine.py

Browse files
Files changed (1) hide show
  1. src/rag_engine.py +41 -66
src/rag_engine.py CHANGED
@@ -9,10 +9,10 @@ from langchain_huggingface import HuggingFaceEmbeddings
9
  from langchain_openai import OpenAIEmbeddings
10
  from langchain_core.documents import Document
11
 
12
- # Internal Core Imports (The Good Stuff)
13
  from core.PineconeManager import PineconeManager
14
  from core.AcronymManager import AcronymManager
15
- from core.ChunkingManager import ChunkingManager, ChunkingStrategy # NEW: The Traffic Cop
16
  from flashrank import Ranker, RerankRequest
17
 
18
  # CONFIGURATION
@@ -20,7 +20,7 @@ PINECONE_KEY = os.getenv("PINECONE_API_KEY")
20
  UPLOAD_DIR = "source_documents"
21
  logger = logging.getLogger(__name__)
22
 
23
- # Initialize Reranker (Small, fast CPU model)
24
  try:
25
  reranker = Ranker(model_name="ms-marco-TinyBERT-L-2-v2", cache_dir="/tmp/flashrank_cache")
26
  except Exception as e:
@@ -65,36 +65,20 @@ def save_uploaded_file(uploaded_file, username: str) -> str:
65
  f.write(uploaded_file.getbuffer())
66
  return file_path
67
 
68
- # --- CORE LOGIC UPGRADE ---
69
-
70
  def process_file(file_path: str, chunking_strategy: str = "paragraph", embed_model_name: str = "all-mpnet-base-v2") -> List[Document]:
71
- """
72
- Delegates processing to the robust ChunkingManager in src/core.
73
- """
74
  try:
75
  logger.info(f"Initializing ChunkingManager for {file_path} using {chunking_strategy}")
76
-
77
- # Initialize the Manager (it handles loading the right tokenizers/embedders)
78
  manager = ChunkingManager(embedding_model_name=embed_model_name)
 
79
 
80
- # Execute the robust processing pipeline
81
- # This now uses your ParagraphChunker.py or TokenChunker.py logic!
82
- chunks = manager.process_document(
83
- file_path=file_path,
84
- strategy=chunking_strategy,
85
- preprocess=True
86
- )
87
-
88
- # Handle case where process_document returns a Dict (e.g. hierarchical) vs List
89
  if isinstance(chunks, dict):
90
- # For now, flatten dictionary returns if any (future proofing)
91
  flat_chunks = []
92
  for key, val in chunks.items():
93
  if isinstance(val, list): flat_chunks.extend(val)
94
  return flat_chunks
95
 
96
  return chunks
97
-
98
  except Exception as e:
99
  logger.error(f"Error processing {file_path}: {e}")
100
  return []
@@ -103,84 +87,83 @@ def ingest_file(file_path: str, username: str, index_name: str, embed_model_name
103
  if not PINECONE_KEY or not index_name: return False, "Pinecone Configuration Missing."
104
 
105
  try:
106
- # 1. Chunking (Delegated to Core)
107
- # Note: We pass the embedding model name so the chunker can initialize correctly
108
  docs = process_file(file_path, chunking_strategy=strategy, embed_model_name=embed_model_name)
109
-
110
  if not docs: return False, "No valid chunks generated."
111
 
112
- # 2. Acronym Learning
 
 
 
 
 
 
 
 
113
  acronym_mgr = AcronymManager()
114
  for doc in docs:
115
  acronym_mgr.scan_text_for_acronyms(doc.page_content)
116
 
117
- # 3. Pinecone Manager
118
  pm = PineconeManager(PINECONE_KEY)
119
 
120
- # 4. SAFETY CHECK
121
  emb_fn = get_embedding_func(embed_model_name)
122
  test_vec = emb_fn.embed_query("test")
123
  model_dim = len(test_vec)
124
  if not pm.check_dimension_compatibility(index_name, model_dim):
125
  return False, f"Dimension Mismatch! Index '{index_name}' expects {model_dim}d vectors."
126
 
127
- # 5. PRE-EMPTIVE DELETE
128
- filename = os.path.basename(file_path)
129
- pm.delete_file(index_name, filename, namespace=username)
130
 
131
- # 6. UPLOAD
132
  vstore = pm.get_vectorstore(index_name, emb_fn, namespace=username)
133
- # Generate stable IDs using the metadata source or filename
134
- custom_ids = [f"{doc.metadata.get('source', filename)}_{i}" for i, doc in enumerate(docs)]
135
  vstore.add_documents(docs, ids=custom_ids)
136
 
137
- return True, f"Successfully updated {filename} ({len(docs)} chunks)."
138
 
139
  except Exception as e:
140
  logger.error(f"Ingestion failed: {e}")
141
  return False, str(e)
142
 
143
  def process_and_add_text(text: str, source_name: str, username: str, index_name: str, embed_model_name: str = None) -> Tuple[bool, str]:
144
- """
145
- Ingests raw text (e.g. flattened summaries) using the ChunkingManager.
146
- """
147
  if not PINECONE_KEY or not index_name: return False, "Pinecone Configuration Missing."
148
 
149
  try:
150
  pm = PineconeManager(PINECONE_KEY)
 
151
 
152
- # 1. PRE-EMPTIVE DELETE
153
- pm.delete_file(index_name, source_name, namespace=username)
154
 
155
- # 2. SAVE BACKUP (Critical: ChunkingManager works best with files)
156
  user_docs_dir = os.path.join(UPLOAD_DIR, username)
157
  os.makedirs(user_docs_dir, exist_ok=True)
158
- backup_path = os.path.join(user_docs_dir, source_name)
159
 
160
  with open(backup_path, "w", encoding='utf-8') as f:
161
  f.write(text)
162
 
163
- # 3. CHUNK & PROCESS (Using the Manager on the backup file)
164
- # This ensures flattened text gets the same robust metadata/cleaning as regular files
165
- logger.info(f"Processing flattened text via ChunkingManager: {source_name}")
166
  manager = ChunkingManager(embedding_model_name=embed_model_name)
167
-
168
- # We use 'token' strategy for summaries usually, or 'paragraph' if preferred
169
  docs = manager.process_document(backup_path, strategy="paragraph", preprocess=True)
170
 
171
- # Override metadata to ensure it's marked as generated
172
  for doc in docs:
 
173
  doc.metadata["file_type"] = "generated"
174
  doc.metadata["strategy"] = "flattened"
175
 
176
- # 4. UPLOAD
177
  emb_fn = get_embedding_func(embed_model_name)
178
  vstore = pm.get_vectorstore(index_name, emb_fn, namespace=username)
179
-
180
- custom_ids = [f"{source_name}_{i}" for i in enumerate(docs)]
181
  vstore.add_documents(docs, ids=custom_ids)
182
 
183
- return True, f"Updated: {source_name} ({len(docs)} chunks)"
184
 
185
  except Exception as e:
186
  logger.error(f"Error indexing text: {e}")
@@ -188,37 +171,27 @@ def process_and_add_text(text: str, source_name: str, username: str, index_name:
188
 
189
  def search_knowledge_base(query: str, username: str, index_name: str, embed_model_name: str, k: int = 5, final_k: int = 5):
190
  if not PINECONE_KEY or not index_name: return []
191
-
192
  try:
193
  pm = PineconeManager(PINECONE_KEY)
194
  emb_fn = get_embedding_func(embed_model_name)
195
  vstore = pm.get_vectorstore(index_name, emb_fn, namespace=username)
196
 
197
- # 1. RETRIEVE BROAD
198
  broad_k = final_k * 3
199
  initial_docs = vstore.similarity_search(query, k=broad_k)
200
 
201
  if not initial_docs or not reranker:
202
  return initial_docs[:final_k]
203
 
204
- # 2. RERANK
205
- passages = [
206
- {"id": str(i), "text": doc.page_content, "meta": doc.metadata}
207
- for i, doc in enumerate(initial_docs)
208
- ]
209
-
210
  rerank_request = RerankRequest(query=query, passages=passages)
211
  ranked_results = reranker.rerank(rerank_request)
212
 
213
- # 3. SELECT TOP K
214
  final_docs = []
215
  for res in ranked_results[:final_k]:
216
  meta = res.get("meta", {})
217
  meta["rerank_score"] = res.get("score")
218
  final_docs.append(Document(page_content=res["text"], metadata=meta))
219
-
220
  return final_docs
221
-
222
  except Exception as e:
223
  logger.error(f"Search failed: {e}")
224
  return []
@@ -227,7 +200,6 @@ def delete_document(username: str, filename: str, index_name: str):
227
  user_dir = os.path.join(UPLOAD_DIR, username)
228
  file_path = os.path.join(user_dir, filename)
229
  if os.path.exists(file_path): os.remove(file_path)
230
-
231
  if PINECONE_KEY and index_name:
232
  try:
233
  pm = PineconeManager(PINECONE_KEY)
@@ -247,8 +219,8 @@ def rebuild_cache_from_pinecone(username: str, index_name: str) -> Tuple[bool, s
247
  ids = pm.get_all_ids(index_name, username)
248
  if not ids: return False, "No data found in Pinecone."
249
 
250
- # Nuke local folder first to handle deletions
251
  user_dir = os.path.join(UPLOAD_DIR, username)
 
252
  if os.path.exists(user_dir): shutil.rmtree(user_dir)
253
  os.makedirs(user_dir, exist_ok=True)
254
 
@@ -260,7 +232,10 @@ def rebuild_cache_from_pinecone(username: str, index_name: str) -> Tuple[bool, s
260
  vectors = response.vectors
261
  for vec_id, vec_data in vectors.items():
262
  meta = vec_data.metadata or {}
263
- source = meta.get('source', 'unknown.txt')
 
 
 
264
  text = meta.get('text') or meta.get('page_content') or ''
265
  try:
266
  if "_" in vec_id: chunk_index = int(vec_id.rsplit('_', 1)[-1])
@@ -276,7 +251,7 @@ def rebuild_cache_from_pinecone(username: str, index_name: str) -> Tuple[bool, s
276
  file_path = os.path.join(user_dir, filename)
277
  with open(file_path, "w", encoding="utf-8") as f: f.write(full_text)
278
  count += 1
279
- return True, f"Restored {count} files (Sorted) from Pinecone!"
280
  except Exception as e:
281
  logger.error(f"Cache rebuild failed: {e}")
282
  return False, str(e)
 
9
  from langchain_openai import OpenAIEmbeddings
10
  from langchain_core.documents import Document
11
 
12
+ # Internal Core Imports
13
  from core.PineconeManager import PineconeManager
14
  from core.AcronymManager import AcronymManager
15
+ from core.ChunkingManager import ChunkingManager
16
  from flashrank import Ranker, RerankRequest
17
 
18
  # CONFIGURATION
 
20
  UPLOAD_DIR = "source_documents"
21
  logger = logging.getLogger(__name__)
22
 
23
+ # Initialize Reranker
24
  try:
25
  reranker = Ranker(model_name="ms-marco-TinyBERT-L-2-v2", cache_dir="/tmp/flashrank_cache")
26
  except Exception as e:
 
65
  f.write(uploaded_file.getbuffer())
66
  return file_path
67
 
 
 
68
  def process_file(file_path: str, chunking_strategy: str = "paragraph", embed_model_name: str = "all-mpnet-base-v2") -> List[Document]:
69
+ """Delegates to ChunkingManager."""
 
 
70
  try:
71
  logger.info(f"Initializing ChunkingManager for {file_path} using {chunking_strategy}")
 
 
72
  manager = ChunkingManager(embedding_model_name=embed_model_name)
73
+ chunks = manager.process_document(file_path=file_path, strategy=chunking_strategy, preprocess=True)
74
 
 
 
 
 
 
 
 
 
 
75
  if isinstance(chunks, dict):
 
76
  flat_chunks = []
77
  for key, val in chunks.items():
78
  if isinstance(val, list): flat_chunks.extend(val)
79
  return flat_chunks
80
 
81
  return chunks
 
82
  except Exception as e:
83
  logger.error(f"Error processing {file_path}: {e}")
84
  return []
 
87
  if not PINECONE_KEY or not index_name: return False, "Pinecone Configuration Missing."
88
 
89
  try:
90
+ # 1. Chunking
 
91
  docs = process_file(file_path, chunking_strategy=strategy, embed_model_name=embed_model_name)
 
92
  if not docs: return False, "No valid chunks generated."
93
 
94
+ # 2. METADATA SANITIZATION (The Fix for Pinecone IDs)
95
+ # We enforce that 'source' is just the filename, stripping the path.
96
+ clean_filename = os.path.basename(file_path)
97
+ for doc in docs:
98
+ doc.metadata["source"] = clean_filename
99
+ # Remove any absolute paths that might have leaked into metadata
100
+ if "file_path" in doc.metadata: del doc.metadata["file_path"]
101
+
102
+ # 3. Acronym Learning
103
  acronym_mgr = AcronymManager()
104
  for doc in docs:
105
  acronym_mgr.scan_text_for_acronyms(doc.page_content)
106
 
107
+ # 4. Pinecone Manager
108
  pm = PineconeManager(PINECONE_KEY)
109
 
110
+ # 5. SAFETY CHECK
111
  emb_fn = get_embedding_func(embed_model_name)
112
  test_vec = emb_fn.embed_query("test")
113
  model_dim = len(test_vec)
114
  if not pm.check_dimension_compatibility(index_name, model_dim):
115
  return False, f"Dimension Mismatch! Index '{index_name}' expects {model_dim}d vectors."
116
 
117
+ # 6. PRE-EMPTIVE DELETE
118
+ pm.delete_file(index_name, clean_filename, namespace=username)
 
119
 
120
+ # 7. UPLOAD
121
  vstore = pm.get_vectorstore(index_name, emb_fn, namespace=username)
122
+ # Now IDs will be "filename.txt_0", "filename.txt_1" etc.
123
+ custom_ids = [f"{clean_filename}_{i}" for i, doc in enumerate(docs)]
124
  vstore.add_documents(docs, ids=custom_ids)
125
 
126
+ return True, f"Successfully updated {clean_filename} ({len(docs)} chunks)."
127
 
128
  except Exception as e:
129
  logger.error(f"Ingestion failed: {e}")
130
  return False, str(e)
131
 
132
  def process_and_add_text(text: str, source_name: str, username: str, index_name: str, embed_model_name: str = None) -> Tuple[bool, str]:
 
 
 
133
  if not PINECONE_KEY or not index_name: return False, "Pinecone Configuration Missing."
134
 
135
  try:
136
  pm = PineconeManager(PINECONE_KEY)
137
+ clean_source = os.path.basename(source_name)
138
 
139
+ # 1. DELETE OLD
140
+ pm.delete_file(index_name, clean_source, namespace=username)
141
 
142
+ # 2. BACKUP
143
  user_docs_dir = os.path.join(UPLOAD_DIR, username)
144
  os.makedirs(user_docs_dir, exist_ok=True)
145
+ backup_path = os.path.join(user_docs_dir, clean_source)
146
 
147
  with open(backup_path, "w", encoding='utf-8') as f:
148
  f.write(text)
149
 
150
+ # 3. CHUNK
 
 
151
  manager = ChunkingManager(embedding_model_name=embed_model_name)
 
 
152
  docs = manager.process_document(backup_path, strategy="paragraph", preprocess=True)
153
 
154
+ # 4. SANITIZE METADATA
155
  for doc in docs:
156
+ doc.metadata["source"] = clean_source
157
  doc.metadata["file_type"] = "generated"
158
  doc.metadata["strategy"] = "flattened"
159
 
160
+ # 5. UPLOAD
161
  emb_fn = get_embedding_func(embed_model_name)
162
  vstore = pm.get_vectorstore(index_name, emb_fn, namespace=username)
163
+ custom_ids = [f"{clean_source}_{i}" for i, _ in enumerate(docs)]
 
164
  vstore.add_documents(docs, ids=custom_ids)
165
 
166
+ return True, f"Updated: {clean_source} ({len(docs)} chunks)"
167
 
168
  except Exception as e:
169
  logger.error(f"Error indexing text: {e}")
 
171
 
172
  def search_knowledge_base(query: str, username: str, index_name: str, embed_model_name: str, k: int = 5, final_k: int = 5):
173
  if not PINECONE_KEY or not index_name: return []
 
174
  try:
175
  pm = PineconeManager(PINECONE_KEY)
176
  emb_fn = get_embedding_func(embed_model_name)
177
  vstore = pm.get_vectorstore(index_name, emb_fn, namespace=username)
178
 
 
179
  broad_k = final_k * 3
180
  initial_docs = vstore.similarity_search(query, k=broad_k)
181
 
182
  if not initial_docs or not reranker:
183
  return initial_docs[:final_k]
184
 
185
+ passages = [{"id": str(i), "text": doc.page_content, "meta": doc.metadata} for i, doc in enumerate(initial_docs)]
 
 
 
 
 
186
  rerank_request = RerankRequest(query=query, passages=passages)
187
  ranked_results = reranker.rerank(rerank_request)
188
 
 
189
  final_docs = []
190
  for res in ranked_results[:final_k]:
191
  meta = res.get("meta", {})
192
  meta["rerank_score"] = res.get("score")
193
  final_docs.append(Document(page_content=res["text"], metadata=meta))
 
194
  return final_docs
 
195
  except Exception as e:
196
  logger.error(f"Search failed: {e}")
197
  return []
 
200
  user_dir = os.path.join(UPLOAD_DIR, username)
201
  file_path = os.path.join(user_dir, filename)
202
  if os.path.exists(file_path): os.remove(file_path)
 
203
  if PINECONE_KEY and index_name:
204
  try:
205
  pm = PineconeManager(PINECONE_KEY)
 
219
  ids = pm.get_all_ids(index_name, username)
220
  if not ids: return False, "No data found in Pinecone."
221
 
 
222
  user_dir = os.path.join(UPLOAD_DIR, username)
223
+ # We wipe it clean first
224
  if os.path.exists(user_dir): shutil.rmtree(user_dir)
225
  os.makedirs(user_dir, exist_ok=True)
226
 
 
232
  vectors = response.vectors
233
  for vec_id, vec_data in vectors.items():
234
  meta = vec_data.metadata or {}
235
+ # THE RESYNC FIX: Force basename to avoid "dir/dir/file" bugs
236
+ raw_source = meta.get('source', 'unknown.txt')
237
+ source = os.path.basename(raw_source)
238
+
239
  text = meta.get('text') or meta.get('page_content') or ''
240
  try:
241
  if "_" in vec_id: chunk_index = int(vec_id.rsplit('_', 1)[-1])
 
251
  file_path = os.path.join(user_dir, filename)
252
  with open(file_path, "w", encoding="utf-8") as f: f.write(full_text)
253
  count += 1
254
+ return True, f"Restored {count} files from Pinecone!"
255
  except Exception as e:
256
  logger.error(f"Cache rebuild failed: {e}")
257
  return False, str(e)