Bellok commited on
Commit
a0dbf73
·
1 Parent(s): bfcb0d4

`Refactored background ingestion tracking and function`

Browse files
Files changed (1) hide show
  1. app.py +79 -77
app.py CHANGED
@@ -13,6 +13,85 @@ import spaces
13
  from pathlib import Path
14
  from typing import List, Tuple, Optional, Dict
15
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
16
  SAMPLE_DOCS = [
17
  {
18
  "id": "wisdom_1",
@@ -123,83 +202,6 @@ class PackManager:
123
 
124
  pack_manager = PackManager()
125
 
126
- # Global variables for background ingestion tracking
127
- ingestion_status = {
128
- "running": False,
129
- "total_docs": 0,
130
- "processed": 0,
131
- "failed": 0,
132
- "start_time": None,
133
- "eta": 0,
134
- "rate": 0,
135
- }
136
-
137
-
138
- def background_ingest_packs(api, pack_docs, pack_manager):
139
- """Background function to ingest packs without blocking app startup"""
140
- global ingestion_status
141
-
142
- ingestion_status["running"] = True
143
- ingestion_status["total_docs"] = len(pack_docs)
144
- ingestion_status["processed"] = 0
145
- ingestion_status["failed"] = 0
146
- ingestion_status["start_time"] = time.time()
147
-
148
- print(f"[INFO] Ingesting {len(pack_docs)} documents from Warbler packs...")
149
- total_docs = len(pack_docs)
150
- processed = 0
151
- failed = 0
152
- start_time = time.time()
153
- batch_size = 1000
154
-
155
- # Process in batches to avoid memory issues and provide progress
156
- for batch_start in range(0, total_docs, batch_size):
157
- batch_end = min(batch_start + batch_size, total_docs)
158
- batch = pack_docs[batch_start:batch_end]
159
-
160
- batch_processed = 0
161
- batch_failed = 0
162
-
163
- for doc in batch:
164
- success = api.add_document(doc["id"], doc["content"], doc["metadata"])
165
- if not success:
166
- batch_failed += 1
167
- failed += 1
168
- if failed <= 5: # Log first few failures
169
- print(f"[WARN] Failed to add document {doc['id']}")
170
-
171
- batch_processed += 1
172
- processed += 1
173
-
174
- # Update global status
175
- ingestion_status["processed"] = processed
176
- ingestion_status["failed"] = failed
177
-
178
- # Progress update after each batch
179
- elapsed = time.time() - start_time
180
- rate = processed / elapsed if elapsed > 0 else 0
181
- eta = (total_docs - processed) / rate if rate > 0 else 0
182
- ingestion_status["rate"] = rate
183
- ingestion_status["eta"] = eta
184
-
185
- print(f"[PROGRESS] {processed}/{total_docs} documents ingested "
186
- f"({processed/total_docs*100:.1f}%) - "
187
- f"{rate:.1f} docs/sec - ETA: {eta/60:.1f} min")
188
-
189
- # Force garbage collection after large batches to free memory
190
- if processed % 10000 == 0:
191
- import gc
192
- gc.collect()
193
-
194
- packs_loaded = processed
195
- pack_manager.mark_packs_ingested(1, packs_loaded)
196
- total_time = time.time() - start_time
197
- print(f"[OK] Loaded {packs_loaded} documents from Warbler packs "
198
- f"({failed} failed) in {total_time:.1f} seconds")
199
-
200
- # Mark ingestion complete
201
- ingestion_status["running"] = False
202
-
203
 
204
  try:
205
  from warbler_cda import (
 
13
  from pathlib import Path
14
  from typing import List, Tuple, Optional, Dict
15
 
16
+
17
+ # Global variables for background ingestion tracking
18
+ ingestion_status = {
19
+ "running": False,
20
+ "total_docs": 0,
21
+ "processed": 0,
22
+ "failed": 0,
23
+ "start_time": None,
24
+ "eta": 0,
25
+ "rate": 0,
26
+ }
27
+
28
+
29
+ def background_ingest_packs(api, pack_docs, pack_manager):
30
+ """Background function to ingest packs without blocking app startup"""
31
+ global ingestion_status
32
+
33
+ ingestion_status["running"] = True
34
+ ingestion_status["total_docs"] = len(pack_docs)
35
+ ingestion_status["processed"] = 0
36
+ ingestion_status["failed"] = 0
37
+ ingestion_status["start_time"] = time.time()
38
+
39
+ print(f"[INFO] Ingesting {len(pack_docs)} documents from Warbler packs...")
40
+ total_docs = len(pack_docs)
41
+ processed = 0
42
+ failed = 0
43
+ start_time = time.time()
44
+ batch_size = 1000
45
+
46
+ # Process in batches to avoid memory issues and provide progress
47
+ for batch_start in range(0, total_docs, batch_size):
48
+ batch_end = min(batch_start + batch_size, total_docs)
49
+ batch = pack_docs[batch_start:batch_end]
50
+
51
+ batch_processed = 0
52
+ batch_failed = 0
53
+
54
+ for doc in batch:
55
+ success = api.add_document(doc["id"], doc["content"], doc["metadata"])
56
+ if not success:
57
+ batch_failed += 1
58
+ failed += 1
59
+ if failed <= 5: # Log first few failures
60
+ print(f"[WARN] Failed to add document {doc['id']}")
61
+
62
+ batch_processed += 1
63
+ processed += 1
64
+
65
+ # Update global status
66
+ ingestion_status["processed"] = processed
67
+ ingestion_status["failed"] = failed
68
+
69
+ # Progress update after each batch
70
+ elapsed = time.time() - start_time
71
+ rate = processed / elapsed if elapsed > 0 else 0
72
+ eta = (total_docs - processed) / rate if rate > 0 else 0
73
+ ingestion_status["rate"] = rate
74
+ ingestion_status["eta"] = eta
75
+
76
+ print(f"[PROGRESS] {processed}/{total_docs} documents ingested "
77
+ f"({processed/total_docs*100:.1f}%) - "
78
+ f"{rate:.1f} docs/sec - ETA: {eta/60:.1f} min")
79
+
80
+ # Force garbage collection after large batches to free memory
81
+ if processed % 10000 == 0:
82
+ import gc
83
+ gc.collect()
84
+
85
+ packs_loaded = processed
86
+ pack_manager.mark_packs_ingested(1, packs_loaded)
87
+ total_time = time.time() - start_time
88
+ print(f"[OK] Loaded {packs_loaded} documents from Warbler packs "
89
+ f"({failed} failed) in {total_time:.1f} seconds")
90
+
91
+ # Mark ingestion complete
92
+ ingestion_status["running"] = False
93
+
94
+
95
  SAMPLE_DOCS = [
96
  {
97
  "id": "wisdom_1",
 
202
 
203
  pack_manager = PackManager()
204
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
205
 
206
  try:
207
  from warbler_cda import (