Dheeraj-13 commited on
Commit
d3a38ee
·
1 Parent(s): 8367f0a

Feature: Additive Ingestion - Allow adding new docs without wiping old ones. Added Clear button.

Browse files
Files changed (2) hide show
  1. apps/web/app.py +45 -15
  2. services/rag/index.py +15 -16
apps/web/app.py CHANGED
@@ -91,45 +91,69 @@ def chat_fn(message, history, backend):
91
  return final_response
92
 
93
 
94
- def admin_ingest(files, use_sample):
95
- # 1. Clean Data & Temp Dirs (Fresh Start)
96
- temp_in = "temp_ingest"
97
- dirs_to_clean = [temp_in, PROCESSED_DIR, INDEX_DIR]
98
-
99
- for d in dirs_to_clean:
100
  if os.path.exists(d):
101
  shutil.rmtree(d)
102
  os.makedirs(d)
103
 
104
- status = "Starting ingestion...\n"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
105
 
106
  # Handle Source Selection
 
 
107
  if use_sample:
108
  # Copy from samples dir
109
  sample_file = os.path.join(SAMPLES_DIR, "sports_legends.txt")
110
  if os.path.exists(sample_file):
111
  shutil.copy(sample_file, temp_in)
112
- status += f"Loaded sample data: {sample_file}\n"
 
113
  else:
114
  return "Error: Sample data not found on server."
115
- elif files:
 
116
  # Copy uploaded files
117
  for file in files:
118
  shutil.copy(file.name, temp_in)
119
- status += f"Loaded {len(files)} uploaded files.\n"
120
- else:
121
- return "No files selected and 'Use Sample' not checked."
 
 
122
 
123
  yield status
124
 
125
  # Run Ingest
126
  try:
 
127
  ingest(temp_in, PROCESSED_DIR)
128
- status += "Ingestion complete.\nBuilding Index...\n"
129
  yield status
130
 
 
131
  build_index(PROCESSED_DIR, INDEX_DIR)
132
- status += "Index built successfully.\nReloading services...\n"
133
  yield status
134
 
135
  # FORCE RELOAD: Clear singletons
@@ -137,7 +161,7 @@ def admin_ingest(files, use_sample):
137
  services.rag.retrieve._shared_retriever = None
138
 
139
  init_services()
140
- status += "Services reloaded. Index updated successfully."
141
  except Exception as e:
142
  print(f"Ingestion Failed: {e}") # Print to server logs
143
  import traceback
@@ -172,6 +196,7 @@ with gr.Blocks(title="RAG Knowledge Assistant", theme=gr.themes.Soft()) as demo:
172
  )
173
 
174
  ingest_btn = gr.Button("Process Documents", variant="primary", size="sm")
 
175
 
176
  # Status Log - Visible by default
177
  with gr.Accordion("System Logs", open=True):
@@ -190,6 +215,11 @@ with gr.Blocks(title="RAG Knowledge Assistant", theme=gr.themes.Soft()) as demo:
190
  outputs=[status_box]
191
  )
192
 
 
 
 
 
 
193
  with gr.Group():
194
  backend_radio = gr.Radio(
195
  choices=["openai", "gemini", "local"],
 
91
  return final_response
92
 
93
 
94
+
95
+ def clear_knowledge_base():
96
+ # Helper to wipe data
97
+ for d in [PROCESSED_DIR, INDEX_DIR]:
 
 
98
  if os.path.exists(d):
99
  shutil.rmtree(d)
100
  os.makedirs(d)
101
 
102
+ # Reset helper
103
+ import services.rag.retrieve
104
+ services.rag.retrieve._shared_retriever = None
105
+ init_services()
106
+
107
+ return "Knowledge Base Cleared. System is empty."
108
+
109
+ def admin_ingest(files, use_sample):
110
+ # 1. Clean Temp Input ONLY (Keep Processed/Index for additive)
111
+ temp_in = "temp_ingest"
112
+ if os.path.exists(temp_in):
113
+ shutil.rmtree(temp_in)
114
+ os.makedirs(temp_in)
115
+
116
+ # Ensure processed/index dirs exist
117
+ os.makedirs(PROCESSED_DIR, exist_ok=True)
118
+ os.makedirs(INDEX_DIR, exist_ok=True)
119
+
120
+ status = "Starting processing...\n"
121
 
122
  # Handle Source Selection
123
+ files_found = False
124
+
125
  if use_sample:
126
  # Copy from samples dir
127
  sample_file = os.path.join(SAMPLES_DIR, "sports_legends.txt")
128
  if os.path.exists(sample_file):
129
  shutil.copy(sample_file, temp_in)
130
+ status += f"Loaded: Sports Legends Dataset\n"
131
+ files_found = True
132
  else:
133
  return "Error: Sample data not found on server."
134
+
135
+ if files:
136
  # Copy uploaded files
137
  for file in files:
138
  shutil.copy(file.name, temp_in)
139
+ status += f"Loaded: {len(files)} new files.\n"
140
+ files_found = True
141
+
142
+ if not files_found:
143
+ return "No new files selected. Select files or sample data."
144
 
145
  yield status
146
 
147
  # Run Ingest
148
  try:
149
+ # Ingest new files to PROCESSED_DIR (Additive)
150
  ingest(temp_in, PROCESSED_DIR)
151
+ status += "Processing new files complete.\nRebuilding Index...\n"
152
  yield status
153
 
154
+ # Build Index (scans ALL files in PROCESSED_DIR)
155
  build_index(PROCESSED_DIR, INDEX_DIR)
156
+ status += "Index rebuilt with all documents.\nReloading services...\n"
157
  yield status
158
 
159
  # FORCE RELOAD: Clear singletons
 
161
  services.rag.retrieve._shared_retriever = None
162
 
163
  init_services()
164
+ status += "Services reloaded. Knowledge Base Updated successfully!"
165
  except Exception as e:
166
  print(f"Ingestion Failed: {e}") # Print to server logs
167
  import traceback
 
196
  )
197
 
198
  ingest_btn = gr.Button("Process Documents", variant="primary", size="sm")
199
+ clear_btn = gr.Button("Clear Knowledge Base", variant="stop", size="sm")
200
 
201
  # Status Log - Visible by default
202
  with gr.Accordion("System Logs", open=True):
 
215
  outputs=[status_box]
216
  )
217
 
218
+ clear_btn.click(
219
+ clear_knowledge_base,
220
+ outputs=[status_box]
221
+ )
222
+
223
  with gr.Group():
224
  backend_radio = gr.Radio(
225
  choices=["openai", "gemini", "local"],
services/rag/index.py CHANGED
@@ -9,23 +9,22 @@ from .embed import get_embedder
9
 
10
  def load_processed_data(processed_dir: str) -> List[Dict]:
11
  chunks = []
12
- # Read manifest if exists, or just iterate JSONs
13
- manifest_path = os.path.join(processed_dir, "manifest.json")
14
- if os.path.exists(manifest_path):
15
- with open(manifest_path, 'r') as f:
16
- manifest = json.load(f)
17
- for entry in manifest:
18
- with open(entry['path'], 'r') as f:
19
- doc_data = json.load(f)
20
- chunks.extend(doc_data['chunks'])
21
- else:
22
- # Fallback to glob
23
- import glob
24
- for f_path in glob.glob(os.path.join(processed_dir, "*.json")):
25
- if f_path.endswith("manifest.json"): continue
26
- with open(f_path, 'r') as f:
27
  doc_data = json.load(f)
28
- chunks.extend(doc_data['chunks'])
 
 
 
 
29
  return chunks
30
 
31
  def build_index(processed_dir: str, output_dir: str):
 
9
 
10
  def load_processed_data(processed_dir: str) -> List[Dict]:
11
  chunks = []
12
+ # Always glob for all JSONs to support additive ingestion
13
+ import glob
14
+ json_files = glob.glob(os.path.join(processed_dir, "*.json"))
15
+
16
+ print(f"Found {len(json_files)} existing documents to index.")
17
+
18
+ for f_path in json_files:
19
+ if f_path.endswith("manifest.json"): continue
20
+ try:
21
+ with open(f_path, 'r') as f:
 
 
 
 
 
22
  doc_data = json.load(f)
23
+ if 'chunks' in doc_data:
24
+ chunks.extend(doc_data['chunks'])
25
+ except Exception as e:
26
+ print(f"Error loading {f_path}: {e}")
27
+
28
  return chunks
29
 
30
  def build_index(processed_dir: str, output_dir: str):