NavyDevilDoc commited on
Commit
10f62d5
·
verified ·
1 Parent(s): 25a53e2

Update src/rag_engine.py

Browse files
Files changed (1) hide show
  1. src/rag_engine.py +118 -118
src/rag_engine.py CHANGED
@@ -3,15 +3,17 @@ import shutil
3
  import logging
4
  from typing import List, Tuple, Optional
5
  from huggingface_hub import snapshot_download
6
- from langchain_community.document_loaders import PyPDFLoader, TextLoader, UnstructuredWordDocumentLoader, UnstructuredPowerPointLoader
7
- from langchain_text_splitters import RecursiveCharacterTextSplitter
8
  from langchain_huggingface import HuggingFaceEmbeddings
9
  from langchain_openai import OpenAIEmbeddings
10
- from langchain_community.vectorstores import Pinecone as LangchainPinecone
11
  from langchain_core.documents import Document
 
 
12
  from core.PineconeManager import PineconeManager
13
  from core.AcronymManager import AcronymManager
14
- from flashrank import Ranker, RerankRequest # NEW IMPORT
 
15
 
16
  # CONFIGURATION
17
  PINECONE_KEY = os.getenv("PINECONE_API_KEY")
@@ -19,7 +21,6 @@ UPLOAD_DIR = "source_documents"
19
  logger = logging.getLogger(__name__)
20
 
21
  # Initialize Reranker (Small, fast CPU model)
22
- # Only initializes once when the app starts
23
  try:
24
  reranker = Ranker(model_name="ms-marco-TinyBERT-L-2-v2", cache_dir="/tmp/flashrank_cache")
25
  except Exception as e:
@@ -33,29 +34,19 @@ def get_embedding_func(model_name: str = "sentence-transformers/all-MiniLM-L6-v2
33
  if not os.getenv("OPENAI_API_KEY"): raise ValueError("OpenAI API Key not found.")
34
  return OpenAIEmbeddings(model=model_name)
35
 
36
- # CHECK 2: YOUR CUSTOM FINE-TUNE (Updated for Subfolders)
37
  elif "navy-custom-models" in model_name:
38
  logger.info(f"Downloading custom model from: {model_name}")
39
-
40
- # 1. Parse the repo and folder from your string
41
- # Input: "NavyDevilDoc/navy-custom-models/bge-finetuned"
42
  parts = model_name.split("/")
43
- # Repo ID is the first two parts: "NavyDevilDoc/navy-custom-models"
44
  repo_id = f"{parts[0]}/{parts[1]}"
45
- # Folder is the rest: "bge-finetuned"
46
  folder_name = parts[2]
47
 
48
- # 2. Download ONLY that folder
49
  storage_path = snapshot_download(
50
  repo_id=repo_id,
51
  repo_type="model",
52
  allow_patterns=f"{folder_name}/*"
53
  )
54
-
55
- # 3. Construct the local path to the inner folder
56
  local_model_path = os.path.join(storage_path, folder_name)
57
-
58
- # 4. Load from the local path
59
  return HuggingFaceEmbeddings(model_name=local_model_path)
60
 
61
  # CHECK 3: Standard Public Models
@@ -74,88 +65,84 @@ def save_uploaded_file(uploaded_file, username: str) -> str:
74
  f.write(uploaded_file.getbuffer())
75
  return file_path
76
 
77
- class ParagraphChunker:
78
- def split_text(self, text):
79
- return [p.strip() for p in text.split('\n\n') if p.strip()]
80
 
81
- def process_file(file_path: str, chunking_strategy: str = "paragraph") -> List[Document]:
82
- ext = os.path.splitext(file_path)[1].lower()
 
 
83
  try:
84
- if ext == ".pdf": loader = PyPDFLoader(file_path)
85
- elif ext == ".txt": loader = TextLoader(file_path, encoding='utf-8')
86
- elif ext == ".docx": loader = UnstructuredWordDocumentLoader(file_path)
87
- elif ext == ".pptx": loader = UnstructuredPowerPointLoader(file_path)
88
- elif ext == ".md": loader = TextLoader(file_path, encoding='utf-8')
89
- else: return []
90
 
91
- raw_docs = loader.load()
92
- text = "\n\n".join([d.page_content for d in raw_docs])
93
 
94
- if chunking_strategy == "token":
95
- splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
96
- chunks = splitter.create_documents([text])
97
- else:
98
- chunker = ParagraphChunker()
99
- texts = chunker.split_text(text)
100
- chunks = [Document(page_content=t) for t in texts]
101
-
102
- # Add metadata
103
- filename = os.path.basename(file_path)
104
- for doc in chunks:
105
- doc.metadata["source"] = filename
106
- doc.metadata["strategy"] = chunking_strategy
 
 
107
 
108
  return chunks
 
109
  except Exception as e:
110
  logger.error(f"Error processing {file_path}: {e}")
111
  return []
112
 
113
- def search_knowledge_base(query: str, username: str, index_name: str, embed_model_name: str, k: int = 5, final_k: int = 5):
114
- """
115
- Searches Pinecone with Reranking.
116
- 1. Fetches 3x candidates (Top 15).
117
- 2. Reranks using TinyBERT.
118
- 3. Returns Top 5.
119
- """
120
- if not PINECONE_KEY or not index_name: return []
121
 
122
  try:
 
 
 
 
 
 
 
 
 
 
 
 
123
  pm = PineconeManager(PINECONE_KEY)
 
 
124
  emb_fn = get_embedding_func(embed_model_name)
 
 
 
 
 
 
 
 
 
 
125
  vstore = pm.get_vectorstore(index_name, emb_fn, namespace=username)
 
 
 
126
 
127
- # 1. RETRIEVE BROAD (Fetch 3x what we need)
128
- broad_k = final_k * 3
129
- initial_docs = vstore.similarity_search(query, k=broad_k)
130
-
131
- if not initial_docs or not reranker:
132
- return initial_docs[:final_k]
133
-
134
- # 2. RERANK (The Brain Upgrade)
135
- passages = [
136
- {"id": str(i), "text": doc.page_content, "meta": doc.metadata}
137
- for i, doc in enumerate(initial_docs)
138
- ]
139
-
140
- rerank_request = RerankRequest(query=query, passages=passages)
141
- ranked_results = reranker.rerank(rerank_request)
142
-
143
- # 3. SELECT TOP K
144
- final_docs = []
145
- for res in ranked_results[:final_k]:
146
- meta = res.get("meta", {})
147
- meta["rerank_score"] = res.get("score") # Useful for debugging
148
- final_docs.append(Document(page_content=res["text"], metadata=meta))
149
-
150
- return final_docs
151
 
152
  except Exception as e:
153
- logger.error(f"Search failed: {e}")
154
- return []
155
 
156
  def process_and_add_text(text: str, source_name: str, username: str, index_name: str, embed_model_name: str = None) -> Tuple[bool, str]:
157
  """
158
- Ingests raw text with a specific embedding model.
159
  """
160
  if not PINECONE_KEY or not index_name: return False, "Pinecone Configuration Missing."
161
 
@@ -165,7 +152,7 @@ def process_and_add_text(text: str, source_name: str, username: str, index_name:
165
  # 1. PRE-EMPTIVE DELETE
166
  pm.delete_file(index_name, source_name, namespace=username)
167
 
168
- # 2. SAVE BACKUP
169
  user_docs_dir = os.path.join(UPLOAD_DIR, username)
170
  os.makedirs(user_docs_dir, exist_ok=True)
171
  backup_path = os.path.join(user_docs_dir, source_name)
@@ -173,58 +160,68 @@ def process_and_add_text(text: str, source_name: str, username: str, index_name:
173
  with open(backup_path, "w", encoding='utf-8') as f:
174
  f.write(text)
175
 
176
- # 3. UPLOAD (The Fix: Use the passed model name)
177
- emb_fn = get_embedding_func(embed_model_name)
 
 
178
 
179
- doc = Document(
180
- page_content=text,
181
- metadata={"source": source_name, "strategy": "flattened", "file_type": "generated"}
182
- )
183
 
 
 
 
 
 
 
 
184
  vstore = pm.get_vectorstore(index_name, emb_fn, namespace=username)
185
- vstore.add_documents([doc], ids=[f"{source_name}_0"])
186
 
187
- return True, f"Updated: {source_name}"
 
 
 
 
188
  except Exception as e:
189
  logger.error(f"Error indexing text: {e}")
190
  return False, str(e)
191
 
192
- def ingest_file(file_path: str, username: str, index_name: str, embed_model_name: str = None, strategy: str = "paragraph") -> Tuple[bool, str]:
193
- if not PINECONE_KEY or not index_name: return False, "Pinecone Configuration Missing."
 
194
  try:
195
- # 1. Chunking
196
- docs = process_file(file_path, chunking_strategy=strategy)
197
- if not docs: return False, "No valid chunks generated."
198
-
199
- # 2. Acronym Learning
200
- acronym_mgr = AcronymManager()
201
- for doc in docs:
202
- acronym_mgr.scan_text_for_acronyms(doc.page_content)
203
-
204
- # 3. Pinecone Manager
205
  pm = PineconeManager(PINECONE_KEY)
206
-
207
- # 4. SAFETY CHECK
208
  emb_fn = get_embedding_func(embed_model_name)
209
- test_vec = emb_fn.embed_query("test")
210
- model_dim = len(test_vec)
211
- if not pm.check_dimension_compatibility(index_name, model_dim):
212
- return False, f"Dimension Mismatch! Index '{index_name}' expects {model_dim}d vectors."
213
-
214
- # 5. PRE-EMPTIVE DELETE
215
- filename = os.path.basename(file_path)
216
- pm.delete_file(index_name, filename, namespace=username)
217
-
218
- # 6. UPLOAD
219
  vstore = pm.get_vectorstore(index_name, emb_fn, namespace=username)
220
- custom_ids = [f"{doc.metadata.get('source', filename)}_{i}" for i, doc in enumerate(docs)]
221
- vstore.add_documents(docs, ids=custom_ids)
222
 
223
- return True, f"Successfully updated {filename} ({len(docs)} chunks)."
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
224
 
225
  except Exception as e:
226
- logger.error(f"Ingestion failed: {e}")
227
- return False, str(e)
228
 
229
  def delete_document(username: str, filename: str, index_name: str):
230
  user_dir = os.path.join(UPLOAD_DIR, username)
@@ -250,6 +247,11 @@ def rebuild_cache_from_pinecone(username: str, index_name: str) -> Tuple[bool, s
250
  ids = pm.get_all_ids(index_name, username)
251
  if not ids: return False, "No data found in Pinecone."
252
 
 
 
 
 
 
253
  batch_size = 100
254
  reconstructed_files = {}
255
  for i in range(0, len(ids), batch_size):
@@ -267,11 +269,9 @@ def rebuild_cache_from_pinecone(username: str, index_name: str) -> Tuple[bool, s
267
  if source not in reconstructed_files: reconstructed_files[source] = []
268
  reconstructed_files[source].append((chunk_index, text))
269
 
270
- user_dir = os.path.join(UPLOAD_DIR, username)
271
- os.makedirs(user_dir, exist_ok=True)
272
  count = 0
273
  for filename, chunks in reconstructed_files.items():
274
- chunks.sort(key=lambda x: x[0]) # SORTING FIX
275
  full_text = "\n\n".join([c[1] for c in chunks])
276
  file_path = os.path.join(user_dir, filename)
277
  with open(file_path, "w", encoding="utf-8") as f: f.write(full_text)
 
3
  import logging
4
  from typing import List, Tuple, Optional
5
  from huggingface_hub import snapshot_download
6
+
7
+ # LangChain Imports
8
  from langchain_huggingface import HuggingFaceEmbeddings
9
  from langchain_openai import OpenAIEmbeddings
 
10
  from langchain_core.documents import Document
11
+
12
+ # Internal Core Imports (The Good Stuff)
13
  from core.PineconeManager import PineconeManager
14
  from core.AcronymManager import AcronymManager
15
+ from core.ChunkingManager import ChunkingManager, ChunkingStrategy # NEW: The Traffic Cop
16
+ from flashrank import Ranker, RerankRequest
17
 
18
  # CONFIGURATION
19
  PINECONE_KEY = os.getenv("PINECONE_API_KEY")
 
21
  logger = logging.getLogger(__name__)
22
 
23
  # Initialize Reranker (Small, fast CPU model)
 
24
  try:
25
  reranker = Ranker(model_name="ms-marco-TinyBERT-L-2-v2", cache_dir="/tmp/flashrank_cache")
26
  except Exception as e:
 
34
  if not os.getenv("OPENAI_API_KEY"): raise ValueError("OpenAI API Key not found.")
35
  return OpenAIEmbeddings(model=model_name)
36
 
37
+ # CHECK 2: YOUR CUSTOM FINE-TUNE
38
  elif "navy-custom-models" in model_name:
39
  logger.info(f"Downloading custom model from: {model_name}")
 
 
 
40
  parts = model_name.split("/")
 
41
  repo_id = f"{parts[0]}/{parts[1]}"
 
42
  folder_name = parts[2]
43
 
 
44
  storage_path = snapshot_download(
45
  repo_id=repo_id,
46
  repo_type="model",
47
  allow_patterns=f"{folder_name}/*"
48
  )
 
 
49
  local_model_path = os.path.join(storage_path, folder_name)
 
 
50
  return HuggingFaceEmbeddings(model_name=local_model_path)
51
 
52
  # CHECK 3: Standard Public Models
 
65
  f.write(uploaded_file.getbuffer())
66
  return file_path
67
 
68
+ # --- CORE LOGIC UPGRADE ---
 
 
69
 
70
+ def process_file(file_path: str, chunking_strategy: str = "paragraph", embed_model_name: str = "all-mpnet-base-v2") -> List[Document]:
71
+ """
72
+ Delegates processing to the robust ChunkingManager in src/core.
73
+ """
74
  try:
75
+ logger.info(f"Initializing ChunkingManager for {file_path} using {chunking_strategy}")
 
 
 
 
 
76
 
77
+ # Initialize the Manager (it handles loading the right tokenizers/embedders)
78
+ manager = ChunkingManager(embedding_model_name=embed_model_name)
79
 
80
+ # Execute the robust processing pipeline
81
+ # This now uses your ParagraphChunker.py or TokenChunker.py logic!
82
+ chunks = manager.process_document(
83
+ file_path=file_path,
84
+ strategy=chunking_strategy,
85
+ preprocess=True
86
+ )
87
+
88
+ # Handle case where process_document returns a Dict (e.g. hierarchical) vs List
89
+ if isinstance(chunks, dict):
90
+ # For now, flatten dictionary returns if any (future proofing)
91
+ flat_chunks = []
92
+ for key, val in chunks.items():
93
+ if isinstance(val, list): flat_chunks.extend(val)
94
+ return flat_chunks
95
 
96
  return chunks
97
+
98
  except Exception as e:
99
  logger.error(f"Error processing {file_path}: {e}")
100
  return []
101
 
102
+ def ingest_file(file_path: str, username: str, index_name: str, embed_model_name: str = "sentence-transformers/all-MiniLM-L6-v2", strategy: str = "paragraph") -> Tuple[bool, str]:
103
+ if not PINECONE_KEY or not index_name: return False, "Pinecone Configuration Missing."
 
 
 
 
 
 
104
 
105
  try:
106
+ # 1. Chunking (Delegated to Core)
107
+ # Note: We pass the embedding model name so the chunker can initialize correctly
108
+ docs = process_file(file_path, chunking_strategy=strategy, embed_model_name=embed_model_name)
109
+
110
+ if not docs: return False, "No valid chunks generated."
111
+
112
+ # 2. Acronym Learning
113
+ acronym_mgr = AcronymManager()
114
+ for doc in docs:
115
+ acronym_mgr.scan_text_for_acronyms(doc.page_content)
116
+
117
+ # 3. Pinecone Manager
118
  pm = PineconeManager(PINECONE_KEY)
119
+
120
+ # 4. SAFETY CHECK
121
  emb_fn = get_embedding_func(embed_model_name)
122
+ test_vec = emb_fn.embed_query("test")
123
+ model_dim = len(test_vec)
124
+ if not pm.check_dimension_compatibility(index_name, model_dim):
125
+ return False, f"Dimension Mismatch! Index '{index_name}' expects {model_dim}d vectors."
126
+
127
+ # 5. PRE-EMPTIVE DELETE
128
+ filename = os.path.basename(file_path)
129
+ pm.delete_file(index_name, filename, namespace=username)
130
+
131
+ # 6. UPLOAD
132
  vstore = pm.get_vectorstore(index_name, emb_fn, namespace=username)
133
+ # Generate stable IDs using the metadata source or filename
134
+ custom_ids = [f"{doc.metadata.get('source', filename)}_{i}" for i, doc in enumerate(docs)]
135
+ vstore.add_documents(docs, ids=custom_ids)
136
 
137
+ return True, f"Successfully updated {filename} ({len(docs)} chunks)."
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
138
 
139
  except Exception as e:
140
+ logger.error(f"Ingestion failed: {e}")
141
+ return False, str(e)
142
 
143
  def process_and_add_text(text: str, source_name: str, username: str, index_name: str, embed_model_name: str = None) -> Tuple[bool, str]:
144
  """
145
+ Ingests raw text (e.g. flattened summaries) using the ChunkingManager.
146
  """
147
  if not PINECONE_KEY or not index_name: return False, "Pinecone Configuration Missing."
148
 
 
152
  # 1. PRE-EMPTIVE DELETE
153
  pm.delete_file(index_name, source_name, namespace=username)
154
 
155
+ # 2. SAVE BACKUP (Critical: ChunkingManager works best with files)
156
  user_docs_dir = os.path.join(UPLOAD_DIR, username)
157
  os.makedirs(user_docs_dir, exist_ok=True)
158
  backup_path = os.path.join(user_docs_dir, source_name)
 
160
  with open(backup_path, "w", encoding='utf-8') as f:
161
  f.write(text)
162
 
163
+ # 3. CHUNK & PROCESS (Using the Manager on the backup file)
164
+ # This ensures flattened text gets the same robust metadata/cleaning as regular files
165
+ logger.info(f"Processing flattened text via ChunkingManager: {source_name}")
166
+ manager = ChunkingManager(embedding_model_name=embed_model_name)
167
 
168
+ # We use 'token' strategy for summaries usually, or 'paragraph' if preferred
169
+ docs = manager.process_document(backup_path, strategy="paragraph", preprocess=True)
 
 
170
 
171
+ # Override metadata to ensure it's marked as generated
172
+ for doc in docs:
173
+ doc.metadata["file_type"] = "generated"
174
+ doc.metadata["strategy"] = "flattened"
175
+
176
+ # 4. UPLOAD
177
+ emb_fn = get_embedding_func(embed_model_name)
178
  vstore = pm.get_vectorstore(index_name, emb_fn, namespace=username)
 
179
 
180
+ custom_ids = [f"{source_name}_{i}" for i in enumerate(docs)]
181
+ vstore.add_documents(docs, ids=custom_ids)
182
+
183
+ return True, f"Updated: {source_name} ({len(docs)} chunks)"
184
+
185
  except Exception as e:
186
  logger.error(f"Error indexing text: {e}")
187
  return False, str(e)
188
 
189
+ def search_knowledge_base(query: str, username: str, index_name: str, embed_model_name: str, k: int = 5, final_k: int = 5):
190
+ if not PINECONE_KEY or not index_name: return []
191
+
192
  try:
 
 
 
 
 
 
 
 
 
 
193
  pm = PineconeManager(PINECONE_KEY)
 
 
194
  emb_fn = get_embedding_func(embed_model_name)
 
 
 
 
 
 
 
 
 
 
195
  vstore = pm.get_vectorstore(index_name, emb_fn, namespace=username)
 
 
196
 
197
+ # 1. RETRIEVE BROAD
198
+ broad_k = final_k * 3
199
+ initial_docs = vstore.similarity_search(query, k=broad_k)
200
+
201
+ if not initial_docs or not reranker:
202
+ return initial_docs[:final_k]
203
+
204
+ # 2. RERANK
205
+ passages = [
206
+ {"id": str(i), "text": doc.page_content, "meta": doc.metadata}
207
+ for i, doc in enumerate(initial_docs)
208
+ ]
209
+
210
+ rerank_request = RerankRequest(query=query, passages=passages)
211
+ ranked_results = reranker.rerank(rerank_request)
212
+
213
+ # 3. SELECT TOP K
214
+ final_docs = []
215
+ for res in ranked_results[:final_k]:
216
+ meta = res.get("meta", {})
217
+ meta["rerank_score"] = res.get("score")
218
+ final_docs.append(Document(page_content=res["text"], metadata=meta))
219
+
220
+ return final_docs
221
 
222
  except Exception as e:
223
+ logger.error(f"Search failed: {e}")
224
+ return []
225
 
226
  def delete_document(username: str, filename: str, index_name: str):
227
  user_dir = os.path.join(UPLOAD_DIR, username)
 
247
  ids = pm.get_all_ids(index_name, username)
248
  if not ids: return False, "No data found in Pinecone."
249
 
250
+ # Nuke local folder first to handle deletions
251
+ user_dir = os.path.join(UPLOAD_DIR, username)
252
+ if os.path.exists(user_dir): shutil.rmtree(user_dir)
253
+ os.makedirs(user_dir, exist_ok=True)
254
+
255
  batch_size = 100
256
  reconstructed_files = {}
257
  for i in range(0, len(ids), batch_size):
 
269
  if source not in reconstructed_files: reconstructed_files[source] = []
270
  reconstructed_files[source].append((chunk_index, text))
271
 
 
 
272
  count = 0
273
  for filename, chunks in reconstructed_files.items():
274
+ chunks.sort(key=lambda x: x[0])
275
  full_text = "\n\n".join([c[1] for c in chunks])
276
  file_path = os.path.join(user_dir, filename)
277
  with open(file_path, "w", encoding="utf-8") as f: f.write(full_text)