m97j commited on
Commit
cda6eee
·
1 Parent(s): 972e4b1

feat: Change qdrant from local mode to server mode.

Browse files
Dockerfile CHANGED
@@ -7,16 +7,28 @@ ENV PYTHONUNBUFFERED=1
7
 
8
  WORKDIR /app
9
 
 
10
  RUN apt-get update && apt-get install -y --no-install-recommends \
11
  build-essential \
 
12
  && rm -rf /var/lib/apt/lists/*
13
 
 
 
 
 
 
 
14
  COPY requirements.txt .
15
  RUN pip install --upgrade pip && \
16
  pip install --no-cache-dir -r requirements.txt
17
 
18
  COPY . .
19
 
 
 
 
20
  VOLUME ["/app/data"]
21
 
22
- CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "7860"]
 
 
7
 
8
  WORKDIR /app
9
 
10
+ # Install essential system packages and wget for downloading Qdrant binary
11
  RUN apt-get update && apt-get install -y --no-install-recommends \
12
  build-essential \
13
+ wget \
14
  && rm -rf /var/lib/apt/lists/*
15
 
16
+ # Download Qdrant Binaries (Based on v1.16.2, for Linux)
17
+ RUN wget https://github.com/qdrant/qdrant/releases/download/v1.16.2/qdrant-x86_64-unknown-linux-gnu.tar.gz && \
18
+ tar -xzf qdrant-x86_64-unknown-linux-gnu.tar.gz && \
19
+ mv qdrant /usr/local/bin/ && \
20
+ rm qdrant-x86_64-unknown-linux-gnu.tar.gz
21
+
22
  COPY requirements.txt .
23
  RUN pip install --upgrade pip && \
24
  pip install --no-cache-dir -r requirements.txt
25
 
26
  COPY . .
27
 
28
+ # Grant execution permissions to the startup script
29
+ RUN chmod +x start.sh
30
+
31
  VOLUME ["/app/data"]
32
 
33
+ # Control multiple processes via start.sh
34
+ CMD ["./start.sh"]
core/config.py CHANGED
@@ -21,9 +21,10 @@ class Settings(BaseSettings):
21
  REPO_ID: str = Field(default="m97j/ke-store", description="Hugging Face repository ID")
22
 
23
  # 2. Storage Settings (Vector DB & RDBMS)
24
- QDRANT_PATH: str = Field(default="./data/qdrant", description="Qdrant local storage path")
 
25
  QDRANT_COLLECTION: str = Field(default="knowledge_base", description="Qdrant collection name")
26
- SQLITE_PATH: str = Field(default="./data/corpus/corpus.sqlite", description="SQLite DB file path")
27
 
28
  # 3. Model Settings (Embedder & Reranker)
29
  EMBEDDER_NAME: str = Field(default="BAAI/bge-m3", description="FlagEmbedding model name")
 
21
  REPO_ID: str = Field(default="m97j/ke-store", description="Hugging Face repository ID")
22
 
23
  # 2. Storage Settings (Vector DB & RDBMS)
24
+ SQLITE_PATH: str = Field(default="{DATA_DIR}/knowledge_base/corpus.sqlite", description="SQLite DB file path")
25
+ QDRANT_PATH: str = Field(default="{DATA_DIR}/vector_store/qdrant", description="Qdrant local storage path")
26
  QDRANT_COLLECTION: str = Field(default="knowledge_base", description="Qdrant collection name")
27
+ QDRANT_URL: str = Field(default="http://localhost:6333", description="Qdrant server URL (if using client-server mode)")
28
 
29
  # 3. Model Settings (Embedder & Reranker)
30
  EMBEDDER_NAME: str = Field(default="BAAI/bge-m3", description="FlagEmbedding model name")
main.py CHANGED
@@ -12,7 +12,6 @@ from core.exceptions import setup_exception_handlers
12
  from core.logger import setup_logger
13
  from models.embedder import TextEmbedder
14
  from models.reranker import TextReranker
15
- from scripts.setup_db import download_knowledge_base
16
  from services.search_service import HybridSearchService
17
  from storage.qdrant_client import QdrantStorage
18
  from storage.sqlite_client import SQLiteStorage
@@ -34,12 +33,8 @@ async def lifespan(app: FastAPI):
34
  sqlite_client = None
35
 
36
  try:
37
- # 0. Prepare dependency data (DB) (Download if unavailable, skip if available)
38
- logger.info("Checking and preparing Knowledge Base data...")
39
- download_knowledge_base()
40
-
41
  # 1. Infrastructure Connection (Database)
42
- qdrant_client = QdrantStorage(path=settings.QDRANT_PATH, collection_name=settings.QDRANT_COLLECTION)
43
  sqlite_client = SQLiteStorage(db_path=settings.SQLITE_PATH)
44
 
45
  # 2. Load AI Model (Singleton)
 
12
  from core.logger import setup_logger
13
  from models.embedder import TextEmbedder
14
  from models.reranker import TextReranker
 
15
  from services.search_service import HybridSearchService
16
  from storage.qdrant_client import QdrantStorage
17
  from storage.sqlite_client import SQLiteStorage
 
33
  sqlite_client = None
34
 
35
  try:
 
 
 
 
36
  # 1. Infrastructure Connection (Database)
37
+ qdrant_client = QdrantStorage(url=settings.QDRANT_URL, collection_name=settings.QDRANT_COLLECTION)
38
  sqlite_client = SQLiteStorage(db_path=settings.SQLITE_PATH)
39
 
40
  # 2. Load AI Model (Singleton)
scripts/data_pipeline.py CHANGED
@@ -1,49 +1,79 @@
1
  # scripts/data_pipeline.py
2
 
3
- import json
4
  import os
5
  import re
6
  import sqlite3
 
 
 
 
7
 
8
  import numpy as np
 
9
  from datasets import load_dataset
10
  from FlagEmbedding import BGEM3FlagModel
 
11
  from qdrant_client import QdrantClient
12
- from qdrant_client.models import (Distance, OptimizersConfigDiff, PointStruct,
13
- ScalarQuantization, ScalarQuantizationConfig,
14
- ScalarType, SparseIndexParams, SparseVector,
 
 
15
  SparseVectorParams, VectorParams)
16
  from tqdm import tqdm
17
  from transformers import AutoTokenizer
18
 
19
 
20
  class KnowledgeEngineBuilder:
21
- def __init__(self, base_dir="ke_store", dim=1024):
22
  self.base_dir = base_dir
23
  self.dim = dim
24
-
25
- print("Loading BGE-M3 Model and Tokenizer...")
26
- self.model = BGEM3FlagModel('BAAI/bge-m3', use_fp16=True)
27
- self.tokenizer = AutoTokenizer.from_pretrained('BAAI/bge-m3')
28
 
29
- self.max_tokens = 384
30
- self.overlap_count = 2
31
-
 
 
 
 
 
32
  self._init_dirs()
33
- self._init_sqlite()
34
- self._init_meta()
35
- self._init_qdrant()
 
 
 
 
 
 
 
 
 
 
 
 
36
 
37
- # ---------------------------
38
- # INIT & SETUP
39
- # ---------------------------
40
- def _init_dirs(self):
41
- for d in ["corpus", "qdrant", "build_cache/embeddings"]:
42
- os.makedirs(os.path.join(self.base_dir, d), exist_ok=True)
43
 
44
- def _init_qdrant(self):
45
- self.qdrant_path = f"{self.base_dir}/qdrant"
46
- self.qdrant_client = QdrantClient(path=self.qdrant_path)
 
 
 
 
 
 
 
 
 
47
  self.collection_name = "knowledge_base"
48
 
49
  if not self.qdrant_client.collection_exists(self.collection_name):
@@ -57,331 +87,464 @@ class KnowledgeEngineBuilder:
57
  "sparse": SparseVectorParams(index=SparseIndexParams(on_disk=True))
58
  },
59
  quantization_config=ScalarQuantization(
60
- scalar=ScalarQuantizationConfig(type=ScalarType.INT8, always_ram=True)
 
 
 
61
  ),
62
- optimizers_config=OptimizersConfigDiff(indexing_threshold=0)
 
63
  )
64
 
65
- def _optimize_sqlite(self, conn):
66
- conn.execute("PRAGMA journal_mode=WAL;")
67
- conn.execute("PRAGMA synchronous=NORMAL;")
68
- conn.execute("PRAGMA temp_store=MEMORY;")
69
- conn.execute("PRAGMA cache_size=-2000000")
70
 
71
  def _init_sqlite(self):
72
- self.conn = sqlite3.connect(f"{self.base_dir}/corpus/corpus.sqlite")
73
- self._optimize_sqlite(self.conn)
74
- cur = self.conn.cursor()
 
 
75
 
 
 
76
  cur.execute("""
77
  CREATE TABLE IF NOT EXISTS documents (
78
- doc_id INTEGER PRIMARY KEY AUTOINCREMENT,
79
  external_id TEXT, title TEXT, lang TEXT, url TEXT,
80
  wikidata_id TEXT, date_modified TEXT, full_text TEXT)
81
  """)
82
-
83
  cur.execute("""
84
  CREATE TABLE IF NOT EXISTS chunks (
85
- chunk_id INTEGER PRIMARY KEY AUTOINCREMENT,
86
  doc_id INTEGER, chunk_index INTEGER, text TEXT,
87
- token_length INTEGER, section TEXT, lang TEXT)
 
88
  """)
89
-
90
  cur.execute("""
91
  CREATE TABLE IF NOT EXISTS spans (
92
- span_id INTEGER PRIMARY KEY AUTOINCREMENT,
93
- chunk_id INTEGER, span_index INTEGER, text TEXT, char_length INTEGER)
 
94
  """)
95
-
96
  cur.execute("CREATE INDEX IF NOT EXISTS idx_chunks_doc_id ON chunks(doc_id)")
97
  cur.execute("CREATE INDEX IF NOT EXISTS idx_spans_chunk_id ON spans(chunk_id)")
98
  cur.execute("CREATE INDEX IF NOT EXISTS idx_chunks_lang ON chunks(lang)")
99
  self.conn.commit()
100
 
101
- def _init_meta(self):
102
- self.meta_path = f"{self.base_dir}/corpus/meta.json"
103
- cur = self.conn.cursor()
104
- cur.execute("SELECT MAX(doc_id) FROM documents")
105
- db_doc = cur.fetchone()[0] or 0
106
- cur.execute("SELECT MAX(chunk_id) FROM chunks")
107
- db_chunk = cur.fetchone()[0] or 0
108
- cur.execute("SELECT MAX(span_id) FROM spans")
109
- db_span = cur.fetchone()[0] or 0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
110
 
111
- self.meta = {
112
- "last_doc_id": db_doc + 1,
113
- "last_chunk_id": db_chunk + 1,
114
- "last_span_id": db_span + 1
115
- }
116
- self._save_meta()
117
-
118
- def _save_meta(self):
119
- with open(self.meta_path, "w") as f:
120
- json.dump(self.meta, f, indent=4)
121
-
122
- # ---------------------------
123
- # TEXT PROCESSING & INGESTION
124
- # ---------------------------
125
- def split_sentences(self, text):
126
- text = re.sub(r'[ \t]+', ' ', text)
127
- pattern = r'(?<=[.!?。!?])(?<![Ar|Dr|Mr|Ms|St]\.)(?<![A-Z]\.)\s+'
128
- sentences = re.split(pattern, text)
129
- final_sentences = []
130
- for s in sentences:
131
- sub_parts = [p.strip() for p in s.split('\n') if p.strip()]
132
- final_sentences.extend(sub_parts)
133
- return [s for s in final_sentences if len(s) > 1]
134
-
135
- def count_tokens(self, text):
136
- return len(self.tokenizer.encode(text, add_special_tokens=False))
137
-
138
- def get_token_counts_batch(self, texts):
139
- if not texts: return []
140
- encodings = self.tokenizer(texts, add_special_tokens=False, padding=False, truncation=False)
141
- return [len(ids) for ids in encodings['input_ids']]
142
-
143
- def _split_monster_sentence(self, sentence):
144
- words = sentence.split(' ')
145
- sub_spans, current_sub, current_toks = [], [], 0
146
-
147
- for word in words:
148
- word_toks = self.count_tokens(word)
149
- if word_toks > self.max_tokens:
150
- if current_sub:
151
- sub_spans.append(" ".join(current_sub))
152
- current_sub, current_toks = [], 0
153
- half = len(word) // 2
154
- sub_spans.extend([word[:half], word[half:]])
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
155
  continue
156
 
157
- space_tok = 1 if current_sub else 0
158
- if current_toks + word_toks + space_tok > self.max_tokens and current_sub:
159
- sub_spans.append(" ".join(current_sub))
160
- current_sub, current_toks = [word], word_toks
 
 
 
161
  else:
162
- current_sub.append(word)
163
- current_toks += word_toks + space_tok
 
164
 
165
- if current_sub: sub_spans.append(" ".join(current_sub))
166
- return sub_spans
167
-
168
- def chunk_text(self, text):
169
- raw_sentences = self.split_sentences(text)
170
- sentence_lengths = self.get_token_counts_batch(raw_sentences)
171
-
172
- refined_spans = []
173
- for s, length in zip(raw_sentences, sentence_lengths):
174
- if length > self.max_tokens: refined_spans.extend(self._split_monster_sentence(s))
175
- else: refined_spans.append(s)
176
-
177
- span_toks_list = self.get_token_counts_batch(refined_spans)
178
- chunks, current_spans, current_tokens = [], [], 0
179
-
180
- for span, span_toks in zip(refined_spans, span_toks_list):
181
- if current_tokens + span_toks > self.max_tokens and current_spans:
182
- chunk_text = " ".join(current_spans)
183
- chunks.append((chunk_text, self.count_tokens(chunk_text), list(current_spans)))
184
 
185
- actual_overlap = min(self.overlap_count, len(current_spans) - 1)
186
- if actual_overlap > 0:
187
- current_spans = current_spans[-actual_overlap:]
188
- current_tokens = self.count_tokens(" ".join(current_spans)) + 1
189
- else:
190
- current_spans, current_tokens = [], 0
191
 
192
- current_spans.append(span)
193
- current_tokens += span_toks + 1
 
194
 
 
 
 
195
  if current_spans:
196
- chunk_text = " ".join(current_spans)
197
- chunks.append((chunk_text, self.count_tokens(chunk_text), list(current_spans)))
 
 
 
198
  return chunks
199
 
200
- def ingest(self, lang="ko", batch_size=32, limit=None):
201
- """
202
- - The dataset is read in a streaming manner to handle large corpora without memory issues.
203
- - Each document is processed to create chunks based on token limits, with an overlap strategy to ensure comprehensive coverage of the text.
204
- - The processed documents, chunks, and spans are stored in SQLite with appropriate indexing for efficient retrieval during search.
205
- """
206
  ds = load_dataset("HuggingFaceFW/finewiki", lang, split="train", streaming=True)
207
  cur = self.conn.cursor()
 
 
 
 
 
 
 
 
 
 
208
  count = 0
209
- batch_docs, batch_chunks, batch_spans = [], [], []
210
 
211
- for item in tqdm(ds, desc=f"Ingesting {lang}"):
212
  if limit and count >= limit: break
213
- doc_id = self.meta["last_doc_id"]
214
- batch_docs.append((doc_id, item["id"], item["title"], lang, item["url"], item.get("wikidata_id", ""), item.get("date_modified", ""), item["text"]))
215
-
216
- for c_idx, (chunk_text, token_len, span_list) in enumerate(self.chunk_text(item["text"])):
217
- chunk_id = self.meta["last_chunk_id"]
218
- batch_chunks.append((chunk_id, doc_id, c_idx, chunk_text, token_len, item["title"], lang))
219
- for s_idx, span_text in enumerate(span_list):
220
- batch_spans.append((self.meta["last_span_id"], chunk_id, s_idx, span_text, len(span_text)))
221
- self.meta["last_span_id"] += 1
222
- self.meta["last_chunk_id"] += 1
223
- self.meta["last_doc_id"] += 1
 
 
 
 
 
 
 
 
224
  count += 1
225
 
226
- if len(batch_docs) >= batch_size:
227
- self._commit_batch(cur, batch_docs, batch_chunks, batch_spans)
228
- batch_docs, batch_chunks, batch_spans = [], [], []
229
- if count % (batch_size * 10) == 0: self._save_meta()
230
 
231
- self._commit_batch(cur, batch_docs, batch_chunks, batch_spans)
232
  self.conn.commit()
233
- self.conn.execute("PRAGMA wal_checkpoint(FULL);")
234
- self._save_meta()
235
-
236
- def _commit_batch(self, cur, docs, chunks, spans):
237
- if not docs: return
238
- cur.executemany("INSERT INTO documents VALUES (?,?,?,?,?,?,?,?)", docs)
239
- cur.executemany("INSERT INTO chunks VALUES (?,?,?,?,?,?,?)", chunks)
240
- cur.executemany("INSERT INTO spans VALUES (?,?,?,?,?)", spans)
241
-
242
- # ---------------------------
243
- # EMBED TO DISK
244
- # ---------------------------
245
- def embed_corpus(self, lang="ko", batch_size=128, save_interval=100000):
246
- """
247
- Text is read in batches from SQLite, embeddings are generated using BGE-M3, and then saved to disk.
248
- - Embedding generation is performed on the GPU, and data is saved to disk in fixed batches to manage memory.
249
- - Dense vectors are saved in NumPy's .npz format to ensure fast loading and low disk usage.
250
- - Sparse vectors are saved in JSONL format to provide flexibility and readability.
251
- - The saved embeddings are subsequently uploaded to Qdrant for use in searches.
252
- - This method is designed to reliably generate and save embeddings even on large-scale datasets.
253
- """
254
  cur = self.conn.cursor()
255
- cur.execute("SELECT chunk_id, text FROM chunks WHERE lang=?", (lang,))
256
- rows = cur.fetchall()
257
 
258
- part_id = 0
259
- id_buffer = []
260
- dense_buffer = []
261
- sparse_buffer = []
 
 
 
 
 
 
 
262
 
263
- save_dir = f"{self.base_dir}/build_cache/embeddings"
 
 
 
 
 
 
 
264
 
265
- for i in tqdm(range(0, len(rows), batch_size), desc=f"1/2 GPU Embedding ({lang})"):
266
- batch = rows[i:i+batch_size]
267
  ids = [r[0] for r in batch]
268
  texts = [r[1] for r in batch]
269
 
270
- output = self.model.encode(
271
- texts, batch_size=len(texts), max_length=self.max_tokens,
272
- return_dense=True, return_sparse=True, return_colbert_vecs=False
273
- )
274
-
275
- id_buffer.extend(ids)
276
- dense_buffer.append(output['dense_vecs'])
277
-
278
- for sp_dict in output['lexical_weights']:
279
- sparse_buffer.append({str(k): float(v) for k, v in sp_dict.items()})
280
 
281
- # Save to disk when a certain number is reached (prevents memory explosion)
282
- if len(id_buffer) >= save_interval:
283
- self._save_embedding_part(save_dir, lang, part_id, id_buffer, dense_buffer, sparse_buffer)
284
- part_id += 1
285
- id_buffer, dense_buffer, sparse_buffer = [], [], []
286
 
287
- # Save the last remaining scraps
288
- self._save_embedding_part(save_dir, lang, part_id, id_buffer, dense_buffer, sparse_buffer)
289
- print(f"Embedding Generation Complete. Saved to {save_dir}")
 
290
 
291
- def _save_embedding_part(self, save_dir, lang, part_id, ids, dense_chunks, sparse_list):
292
- if not ids: return
293
-
294
- # Dense & IDs: High-speed storage as NumPy binaries
295
- np.savez(f"{save_dir}/ebd_{lang}_{part_id}.npz",
296
- ids=np.array(ids, dtype=np.int64),
297
- dense=np.vstack(dense_chunks))
298
-
299
- # Sparse: Save in JSONL format (one line at a time)
300
- with open(f"{save_dir}/sparse_{lang}_{part_id}.jsonl", 'w', encoding='utf-8') as f:
301
- for sp in sparse_list:
302
- f.write(json.dumps(sp) + '\n')
303
-
304
- # ---------------------------
305
- # BUILD QDRANT INDEX
306
- # ---------------------------
307
- def build_qdrant_index(self, lang="ko", batch_size=2000):
308
- """
309
- The generated embeddings are read from disk and uploaded to Qdrant in batches.
310
- - This method reads the saved dense and sparse embeddings, constructs the appropriate data structures for Qdrant, and uploads them in batches to manage memory and ensure efficient indexing.
311
- - After all data is uploaded, it triggers Qdrant's indexing process to optimize search performance.
312
- - The use of batch uploads and on-disk storage allows this process to scale to large datasets without overwhelming system memory.
313
- """
314
- save_dir = f"{self.base_dir}/build_cache/embeddings"
315
- files = sorted([f for f in os.listdir(save_dir) if f.startswith(f"ebd_{lang}_") and f.endswith(".npz")])
316
 
317
- for file_name in files:
318
- part_id = file_name.split("_")[-1].split(".")[0]
319
-
320
- # 1. Load file and convert to Qdrant point structure
321
- npz_path = os.path.join(save_dir, file_name)
322
- sparse_path = os.path.join(save_dir, f"sparse_{lang}_{part_id}.jsonl")
323
-
324
- data = np.load(npz_path)
325
- ids = data['ids']
326
- dense_vecs = data['dense']
327
-
328
- with open(sparse_path, 'r', encoding='utf-8') as f:
329
- sparse_vecs = [json.loads(line) for line in f]
330
 
331
- points_batch = []
 
 
332
 
333
- # 2. Qdrant Upload Loop
334
- for i in tqdm(range(len(ids)), desc=f"2/2 Qdrant Uploading (Part {part_id})"):
335
- chunk_id = int(ids[i])
336
- sparse_dict = sparse_vecs[i]
337
-
338
- point = PointStruct(
339
- id=chunk_id,
 
 
 
 
 
340
  vector={
341
- "dense": dense_vecs[i].tolist(),
342
- "sparse": SparseVector(
343
- indices=[int(k) for k in sparse_dict.keys()],
344
- values=list(sparse_dict.values())
345
- )
346
  },
347
- payload={"chunk_id": chunk_id, "lang": lang}
348
- )
349
- points_batch.append(point)
350
-
351
- # Upload when stacked to batch size
352
- if len(points_batch) >= batch_size:
353
- self.qdrant_client.upload_points(
354
- collection_name=self.collection_name,
355
- points=points_batch
356
- )
357
- points_batch = []
358
 
359
- # Uploading leftover scraps
360
- if points_batch:
361
- self.qdrant_client.upload_points(
362
- collection_name=self.collection_name,
363
- points=points_batch
364
- )
365
 
366
- print("Data upload complete. Enabling HNSW Indexing...")
367
-
368
- # 3. [Key] After all uploads are complete, re-enable indexing (default 20,000) to optimize the graph
369
  self.qdrant_client.update_collection(
370
  collection_name=self.collection_name,
371
- optimizer_config=OptimizersConfigDiff(indexing_threshold=20000)
372
  )
373
- print("Qdrant Indexing Complete!")
374
 
375
  def close(self):
376
- if hasattr(self, 'conn') and self.conn:
 
377
  self.conn.close()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
378
 
379
 
380
  if __name__ == "__main__":
381
- builder = KnowledgeEngineBuilder()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
382
  try:
383
- builder.ingest(lang="ko", batch_size=32, limit=10000) # Process only 10,000 documents as an example
384
- builder.embed_corpus(lang="ko", batch_size=128, save_interval=5000)
385
- builder.build_qdrant_index(lang="ko", batch_size=2000)
 
 
 
 
 
 
 
 
386
  finally:
387
- builder.close()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  # scripts/data_pipeline.py
2
 
3
+ import argparse
4
  import os
5
  import re
6
  import sqlite3
7
+ import subprocess
8
+ import sys
9
+ import time
10
+ from concurrent.futures import ThreadPoolExecutor
11
 
12
  import numpy as np
13
+ import orjson
14
  from datasets import load_dataset
15
  from FlagEmbedding import BGEM3FlagModel
16
+ from huggingface_hub import HfApi, upload_folder
17
  from qdrant_client import QdrantClient
18
+ from qdrant_client.models import (Distance, HnswConfigDiff,
19
+ OptimizersConfigDiff, PayloadSchemaType,
20
+ PointStruct, ScalarQuantization,
21
+ ScalarQuantizationConfig, ScalarType,
22
+ SparseIndexParams, SparseVector,
23
  SparseVectorParams, VectorParams)
24
  from tqdm import tqdm
25
  from transformers import AutoTokenizer
26
 
27
 
28
  class KnowledgeEngineBuilder:
29
+ def __init__(self, base_dir="ke_store", dim=1024, host="localhost", port=6333, grpc_port=6334):
30
  self.base_dir = base_dir
31
  self.dim = dim
32
+ self.max_tokens = 512
 
 
 
33
 
34
+ # Dynamic Overlap setting constants
35
+ self.overlap_ratio = 0.12 # Use 12% of the chunk length as overlap (Sweet Spot)
36
+ self.min_overlap = 30 # Minimum guaranteed overlap token count
37
+
38
+ self.kb_dir = os.path.join(self.base_dir, "knowledge_base")
39
+ self.artifacts_dir = os.path.join(self.base_dir, "artifacts/bge_m3_cache")
40
+
41
+ print("Loading Initial Setup...")
42
  self._init_dirs()
43
+ self._init_sqlite()
44
+ self._init_qdrant(host, port, grpc_port)
45
+
46
+ self.model = None
47
+ self.tokenizer = None
48
+
49
+ self.prefix_map = {
50
+ "ko": "문서 제목",
51
+ "en": "Document Title",
52
+ "zh": "文档标题",
53
+ "ja": "ドキュメントタイトル",
54
+ "es": "Título del documento",
55
+ "fr": "Titre du document",
56
+ "de": "Dokumenttitel",
57
+ }
58
 
59
+ def _load_models(self):
60
+ if self.model is None:
61
+ print("Loading BGE-M3 Model and Tokenizer to GPU...")
62
+ self.model = BGEM3FlagModel('BAAI/bge-m3', use_fp16=True)
63
+ self.tokenizer = AutoTokenizer.from_pretrained('BAAI/bge-m3')
 
64
 
65
+ def _init_dirs(self):
66
+ os.makedirs(self.kb_dir, exist_ok=True)
67
+ os.makedirs(self.artifacts_dir, exist_ok=True)
68
+
69
+ def _init_qdrant(self, host, port, grpc_port):
70
+ self.qdrant_client = QdrantClient(
71
+ host=host,
72
+ port=port,
73
+ grpc_port=grpc_port,
74
+ prefer_grpc=True,
75
+ timeout=300
76
+ )
77
  self.collection_name = "knowledge_base"
78
 
79
  if not self.qdrant_client.collection_exists(self.collection_name):
 
87
  "sparse": SparseVectorParams(index=SparseIndexParams(on_disk=True))
88
  },
89
  quantization_config=ScalarQuantization(
90
+ scalar=ScalarQuantizationConfig(
91
+ type=ScalarType.INT8,
92
+ always_ram=False
93
+ )
94
  ),
95
+ hnsw_config=HnswConfigDiff(on_disk=True),
96
+ optimizers_config=OptimizersConfigDiff(indexing_threshold=0)
97
  )
98
 
99
+ # Index for metadata-based filtering search (e.g., language)
100
+ self.qdrant_client.create_payload_index(
101
+ collection_name=self.collection_name, field_name="lang", field_schema=PayloadSchemaType.KEYWORD
102
+ )
 
103
 
104
  def _init_sqlite(self):
105
+ self.conn = sqlite3.connect(f"{self.kb_dir}/corpus.sqlite", check_same_thread=False)
106
+ self.conn.execute("PRAGMA journal_mode=WAL;")
107
+ self.conn.execute("PRAGMA synchronous=NORMAL;")
108
+ self.conn.execute("PRAGMA cache_size=-10000000;") # 10GB cache
109
+ self.conn.execute("PRAGMA foreign_keys=ON;")
110
 
111
+ cur = self.conn.cursor()
112
+
113
  cur.execute("""
114
  CREATE TABLE IF NOT EXISTS documents (
115
+ doc_id INTEGER PRIMARY KEY,
116
  external_id TEXT, title TEXT, lang TEXT, url TEXT,
117
  wikidata_id TEXT, date_modified TEXT, full_text TEXT)
118
  """)
119
+
120
  cur.execute("""
121
  CREATE TABLE IF NOT EXISTS chunks (
122
+ chunk_id INTEGER PRIMARY KEY,
123
  doc_id INTEGER, chunk_index INTEGER, text TEXT,
124
+ token_length INTEGER, section TEXT, lang TEXT,
125
+ FOREIGN KEY (doc_id) REFERENCES documents (doc_id) ON DELETE CASCADE)
126
  """)
127
+
128
  cur.execute("""
129
  CREATE TABLE IF NOT EXISTS spans (
130
+ span_id INTEGER PRIMARY KEY,
131
+ chunk_id INTEGER, span_index INTEGER, text TEXT, char_length INTEGER,
132
+ FOREIGN KEY (chunk_id) REFERENCES chunks (chunk_id) ON DELETE CASCADE)
133
  """)
134
+
135
  cur.execute("CREATE INDEX IF NOT EXISTS idx_chunks_doc_id ON chunks(doc_id)")
136
  cur.execute("CREATE INDEX IF NOT EXISTS idx_spans_chunk_id ON spans(chunk_id)")
137
  cur.execute("CREATE INDEX IF NOT EXISTS idx_chunks_lang ON chunks(lang)")
138
  self.conn.commit()
139
 
140
+ # ---------------------------------------------------------------
141
+ # PHASE 1: Sophisticated Semantic Chunking and SQLite Ingestion
142
+ # ---------------------------------------------------------------
143
+ def split_sentences(self, text, lang="ko"):
144
+ """
145
+ Global Multilingual Sentence Splitter
146
+ 1st: Physical separation based on line breaks (compatible with table and list data)
147
+ 2nd: Semantic separation based on punctuation
148
+ """
149
+ # 1st physical line break separation (remove empty strings)
150
+ lines = [line.strip() for line in text.split('\n') if line.strip()]
151
+
152
+ # Setting up 2nd Language-Specific Regular Expressions for Punctuation Separation
153
+ if lang in ["ko", "zh", "ja"]:
154
+ # CJK: Includes full-width characters, immediately separated
155
+ pattern = r'(?<=[.!?。!?])\s*'
156
+ else:
157
+ # Global: Abbreviation Defense and Multilingual Period Support
158
+ pattern = r'(?<=[.!?。!?।॥؟۔])(?<!\bMr\.)(?<!\bDr\.)(?<!\bMs\.)(?<!\bSt\.)(?<!\b[A-Z]\.)\s+'
159
+
160
+ final_spans = []
161
+ for line in lines:
162
+ # Normalization of consecutive spaces and tabs within lines
163
+ line = re.sub(r'[ \t]+', ' ', line)
164
+
165
+ # Punctuation-based separation
166
+ spans = [s.strip() for s in re.split(pattern, line) if len(s.strip()) > 0]
167
+ final_spans.extend(spans)
168
+
169
+ return final_spans
170
 
171
+ def chunk_text(self, text, title="", lang="ko"):
172
+ """
173
+ Context-Aware Dynamic Overlap Chunker
174
+ Injects the document's title at the top of each chunk to maximize BGE-M3 embedding context retention.
175
+ """
176
+ raw_sentences = self.split_sentences(text, lang)
177
+ chunks = []
178
+
179
+ # 1. Context Injection Format Settings Optimized for BGE-M3 (Fixed Prefix)
180
+ prefix_label = self.prefix_map.get(lang, "Document Title")
181
+ prefix = f"{prefix_label}: [{title}]\n" if title else ""
182
+ prefix_toks = self.tokenizer.encode(prefix, add_special_tokens=False) if prefix else []
183
+ prefix_len = len(prefix_toks)
184
+
185
+ # [Safety Mechanism] If the title itself is abnormally long and consumes all tokens, a forced cutoff is set to a maximum of 100 tokens.
186
+ if prefix_len > 100:
187
+ prefix_toks = prefix_toks[:100]
188
+ prefix = self.tokenizer.decode(prefix_toks) + "...\n"
189
+ prefix_len = len(prefix_toks)
190
+
191
+ # 2. Calculation of the actual maximum number of tokens that can be inserted into the body (Span combinations + Overlap)
192
+ eff_max_tokens = self.max_tokens - prefix_len
193
+
194
+ current_spans = []
195
+ current_tokens = 0 # Cumulative number of tokens in the body (excluding prefix)
196
+
197
+ for span in raw_sentences:
198
+ span_toks = len(self.tokenizer.encode(span, add_special_tokens=False))
199
+
200
+ # ---------------------------------------------------------
201
+ # Case 1: Monster Sentence (when a single Span exceeds eff_max_tokens)
202
+ # ---------------------------------------------------------
203
+ if span_toks > eff_max_tokens:
204
+ # 1. If there is accumulated span, release it first.
205
+ if current_spans:
206
+ chunk_body = " ".join(current_spans)
207
+ chunk_text_final = prefix + chunk_body
208
+ final_tokens = prefix_len + len(self.tokenizer.encode(chunk_body, add_special_tokens=False))
209
+
210
+ chunks.append((chunk_text_final, final_tokens, list(current_spans)))
211
+
212
+ # Dynamic Overlap Calculation (Based on Emitted 'Body')
213
+ target_overlap = max(self.min_overlap, int(current_tokens * self.overlap_ratio))
214
+ prev_tokens = self.tokenizer.encode(chunk_body, add_special_tokens=False)
215
+ overlap_tokens = prev_tokens[-target_overlap:]
216
+ overlap_text = self.tokenizer.decode(overlap_tokens)
217
+
218
+ current_spans = [overlap_text]
219
+
220
+ # 2. Merging Overlap and Monster Sentences
221
+ combined_text = " ".join(current_spans + [span]) if current_spans else span
222
+ combined_tokens = self.tokenizer.encode(combined_text, add_special_tokens=False)
223
+
224
+ # 3. Slicing Monster Sentences into eff_max_tokens (Sliding Window)
225
+ i = 0
226
+ while i + eff_max_tokens < len(combined_tokens):
227
+ slice_toks = combined_tokens[i : i + eff_max_tokens]
228
+ slice_text = self.tokenizer.decode(slice_toks)
229
+
230
+ chunk_text_final = prefix + slice_text
231
+ # Configure db_spans to store only the text (slice_text)
232
+ chunks.append((chunk_text_final, prefix_len + len(slice_toks), [slice_text]))
233
+
234
+ # Overlap calculation when moving to the next window (Overlap inside monster sentences)
235
+ dyn_overlap = max(self.min_overlap, int(eff_max_tokens * self.overlap_ratio))
236
+ i += (eff_max_tokens - dyn_overlap)
237
+
238
+ # 4. Save the remaining tail portion after the loop
239
+ remainder_toks = combined_tokens[i:]
240
+ if remainder_toks:
241
+ rem_text = self.tokenizer.decode(remainder_toks)
242
+ current_spans = [rem_text]
243
+ current_tokens = len(self.tokenizer.encode(rem_text, add_special_tokens=False))
244
+ else:
245
+ current_spans = []
246
+ current_tokens = 0
247
  continue
248
 
249
+ # ---------------------------------------------------------
250
+ # Case 2: General Sentence (Accumulation of general sentences)
251
+ # ---------------------------------------------------------
252
+ # +1 is a fake calculation that takes into account spacing between sentences
253
+ if current_tokens + span_toks + 1 <= eff_max_tokens:
254
+ current_spans.append(span)
255
+ current_tokens += span_toks + 1
256
  else:
257
+ # 1. Release accumulated span upon overflow
258
+ chunk_body = " ".join(current_spans)
259
+ body_tokens = self.tokenizer.encode(chunk_body, add_special_tokens=False)
260
 
261
+ chunk_text_final = prefix + chunk_body
262
+ final_tokens = prefix_len + len(body_tokens)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
263
 
264
+ chunks.append((chunk_text_final, final_tokens, list(current_spans)))
265
+
266
+ # 2. Dynamic Overlap Calculation (Based on Emitted 'Body')
267
+ target_overlap = max(self.min_overlap, int(len(body_tokens) * self.overlap_ratio))
268
+ overlap_tokens = body_tokens[-target_overlap:]
269
+ overlap_text = self.tokenizer.decode(overlap_tokens)
270
 
271
+ # 3. Start of new chunk (previous chunk overlap + current span)
272
+ current_spans = [overlap_text, span]
273
+ current_tokens = len(self.tokenizer.encode(" ".join(current_spans), add_special_tokens=False))
274
 
275
+ # ---------------------------------------------------------
276
+ # Handle remaining spans after loop termination
277
+ # ---------------------------------------------------------
278
  if current_spans:
279
+ chunk_body = " ".join(current_spans)
280
+ chunk_text_final = prefix + chunk_body
281
+ final_tokens = prefix_len + len(self.tokenizer.encode(chunk_body, add_special_tokens=False))
282
+ chunks.append((chunk_text_final, final_tokens, list(current_spans)))
283
+
284
  return chunks
285
 
286
+ def ingest_to_db(self, lang="ko", chunk_batch_size=10000, limit=None):
287
+ self._load_models()
 
 
 
 
288
  ds = load_dataset("HuggingFaceFW/finewiki", lang, split="train", streaming=True)
289
  cur = self.conn.cursor()
290
+
291
+ cur.execute("SELECT MAX(doc_id) FROM documents")
292
+ next_doc_id = (cur.fetchone()[0] or 0) + 1
293
+
294
+ cur.execute("SELECT MAX(chunk_id) FROM chunks")
295
+ next_chunk_id = (cur.fetchone()[0] or 0) + 1
296
+
297
+ cur.execute("SELECT MAX(span_id) FROM spans")
298
+ next_span_id = (cur.fetchone()[0] or 0) + 1
299
+
300
  count = 0
301
+ b_docs, b_chunks, b_spans = [], [], []
302
 
303
+ for item in tqdm(ds, desc=f"1/3: Ingesting {lang}wiki to SQLite"):
304
  if limit and count >= limit: break
305
+
306
+ doc_id = next_doc_id
307
+ doc_title = item.get("title", "")
308
+
309
+ b_docs.append((doc_id, item["id"], doc_title, lang, item.get("url", ""),
310
+ item.get("wikidata_id", ""), item.get("date_modified", ""), item["text"]))
311
+
312
+ for c_idx, (c_text, c_len, span_list) in enumerate(self.chunk_text(item["text"], doc_title, lang)):
313
+ chunk_id = next_chunk_id
314
+ b_chunks.append((chunk_id, doc_id, c_idx, c_text, c_len, doc_title, lang))
315
+
316
+ for s_idx, s_text in enumerate(span_list):
317
+ span_id = next_span_id
318
+ b_spans.append((span_id, chunk_id, s_idx, s_text, len(s_text)))
319
+ next_span_id += 1
320
+
321
+ next_chunk_id += 1
322
+
323
+ next_doc_id += 1
324
  count += 1
325
 
326
+ if len(b_chunks) >= chunk_batch_size:
327
+ self._commit(cur, b_docs, b_chunks, b_spans)
328
+ b_docs, b_chunks, b_spans = [], [], []
 
329
 
330
+ self._commit(cur, b_docs, b_chunks, b_spans)
331
  self.conn.commit()
332
+
333
+ def _commit(self, cur, d, c, s):
334
+ if d: cur.executemany("INSERT INTO documents VALUES (?,?,?,?,?,?,?,?)", d)
335
+ if c: cur.executemany("INSERT INTO chunks VALUES (?,?,?,?,?,?,?)", c)
336
+ if s: cur.executemany("INSERT INTO spans VALUES (?,?,?,?,?)", s)
337
+
338
+ # --------------------------------------------------------------
339
+ # PHASE 2: GPU Embedding and Disk Caching (Full Resume Support)
340
+ # --------------------------------------------------------------
341
+ def embed_corpus(self, lang="ko", batch_size=1024):
342
+ self._load_models()
 
 
 
 
 
 
 
 
 
 
343
  cur = self.conn.cursor()
 
 
344
 
345
+ cur.execute("SELECT COUNT(*) FROM chunks WHERE lang=?", (lang,))
346
+ total_chunks = cur.fetchone()[0]
347
+
348
+ cur.execute("SELECT chunk_id, text FROM chunks WHERE lang=? ORDER BY chunk_id ASC", (lang,))
349
+
350
+ batch_idx = 0
351
+ pbar = tqdm(total=total_chunks, desc="2/3 GPU Embedding to Disk")
352
+
353
+ while True:
354
+ batch = cur.fetchmany(batch_size)
355
+ if not batch: break
356
 
357
+ npz_path = f"{self.artifacts_dir}/chunk_{lang}_{batch_idx}.npz"
358
+ jsonl_path = f"{self.artifacts_dir}/chunk_{lang}_{batch_idx}.jsonl"
359
+
360
+ # Resume Defense Logic: Skip embedding if both .npz and .jsonl files for the batch already exist (Assumes that if .npz exists, .jsonl also exists, but double-checking for safety)
361
+ if os.path.exists(npz_path) and os.path.exists(jsonl_path):
362
+ batch_idx += 1
363
+ pbar.update(len(batch))
364
+ continue
365
 
 
 
366
  ids = [r[0] for r in batch]
367
  texts = [r[1] for r in batch]
368
 
369
+ # GPU Batch Embedding with BGE-M3 (Dense + Sparse Extraction)
370
+ output = self.model.encode(texts, batch_size=len(texts), max_length=self.max_tokens, return_dense=True, return_sparse=True)
 
 
 
 
 
 
 
 
371
 
372
+ np.savez(npz_path, ids=np.array(ids), dense=output['dense_vecs'])
 
 
 
 
373
 
374
+ # Ultra-fast serialization using orjson for sparse vectors (List of Dicts) to JSONL format
375
+ with open(jsonl_path, 'wb') as f:
376
+ for sp in output['lexical_weights']:
377
+ f.write(orjson.dumps({str(k): float(v) for k, v in sp.items()}) + b'\n')
378
 
379
+ batch_idx += 1
380
+ pbar.update(len(batch))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
381
 
382
+ pbar.close()
383
+
384
+ # ----------------------------------------------------------------------
385
+ # PHASE 3: Qdrant Server Parallel Upload and Indexing Finalized on Disk
386
+ # ----------------------------------------------------------------------
387
+ def upload_to_qdrant(self, lang="ko", parallel_workers=None):
388
+ save_dir = self.artifacts_dir
389
+ files = [f for f in os.listdir(save_dir) if f.startswith(f"chunk_{lang}_") and f.endswith(".npz")]
 
 
 
 
 
390
 
391
+ if parallel_workers is None:
392
+ num_cores = os.cpu_count() or 1
393
+ parallel_workers = max(1, min(8, int(num_cores * 0.2))) # Use up to 20% of CPU cores, capped at 8 workers
394
 
395
+ def upload_worker(file_name):
396
+ data = np.load(os.path.join(save_dir, file_name))
397
+ ids, dense = data['ids'], data['dense']
398
+
399
+ # Ultra-fast deserialization using orjson for sparse vectors (List of Dicts) from JSONL format
400
+ with open(os.path.join(save_dir, file_name.replace(".npz", ".jsonl")), 'rb') as f:
401
+ sparse = [orjson.loads(line) for line in f]
402
+
403
+ points = []
404
+ for j in range(len(ids)):
405
+ points.append(PointStruct(
406
+ id=int(ids[j]),
407
  vector={
408
+ "dense": dense[j].tolist(),
409
+ "sparse": SparseVector(indices=[int(k) for k in sparse[j].keys()],
410
+ values=list(sparse[j].values()))
 
 
411
  },
412
+ payload={"lang": lang, "chunk_id": int(ids[j])}
413
+ ))
414
+
415
+ self.qdrant_client.upload_points(
416
+ collection_name=self.collection_name,
417
+ points=points,
418
+ wait=False
419
+ )
 
 
 
420
 
421
+ print(f"3/3 Starting Qdrant parallel upload with {parallel_workers} workers...")
422
+ with ThreadPoolExecutor(max_workers=parallel_workers) as executor:
423
+ list(tqdm(executor.map(upload_worker, files), total=len(files), desc="Qdrant Upload"))
 
 
 
424
 
425
+ print("Upload complete. Finalizing HNSW Index on Disk...")
 
 
426
  self.qdrant_client.update_collection(
427
  collection_name=self.collection_name,
428
+ optimizer_config=OptimizersConfigDiff(indexing_threshold=20000)
429
  )
430
+ print("Pipeline Complete!")
431
 
432
  def close(self):
433
+ """DB Connection Close Method for Safe Resource Management"""
434
+ if hasattr(self, 'conn'):
435
  self.conn.close()
436
+ print("SQLite connection closed.")
437
+
438
+ def wait_for_indexing(self):
439
+ """
440
+ Wait until optimizer_status is 'ok' and there are no ongoing tasks
441
+ (indicating that indexing is complete and the collection is fully optimized on disk)
442
+ """
443
+ print("Waiting for Qdrant to finish indexing (HNSW Merging)...")
444
+ while True:
445
+ try:
446
+ info = self.qdrant_client.get_collection(self.collection_name)
447
+
448
+ if info.status == "green":
449
+ print("Indexing confirmed complete.")
450
+ break
451
+ except Exception as e:
452
+ print(f"Checking index status... (Error: {e})")
453
+ print("Retrying in 10 seconds...")
454
+
455
+ time.sleep(10)
456
+
457
+
458
+ # Magic method to support Python's 'with' statement for automatic resource management
459
+ def __enter__(self):
460
+ return self
461
+
462
+ def __exit__(self, exc_type, exc_val, exc_tb):
463
+ self.close()
464
+ # Wait for indexing only when there are no exceptions (exc_type) (normal exit).
465
+ if exc_type is None:
466
+ self.wait_for_indexing()
467
+ else:
468
+ print(f"Pipeline failed with error, skipping index wait: {exc_val}")
469
+
470
+
471
+ def manage_qdrant_server(storage_path, http_port=6333, grpc_port=6334):
472
+ """Helper function that manages the lifecycle of the Qdrant server"""
473
+ abs_storage_path = os.path.abspath(storage_path)
474
+ os.makedirs(abs_storage_path, exist_ok=True)
475
+
476
+ # 1. Terminate existing processes (prevent port conflicts)
477
+ subprocess.run(["pkill", "-9", "qdrant"], capture_output=True)
478
+
479
+ # 2. Check for Binary Existence (Installation Guide)
480
+ if not os.path.exists("./qdrant"):
481
+ print("Error: 'qdrant' binary not found in current directory.")
482
+ print("Please download it first: wget https://github.com/qdrant/qdrant/releases/download/v1.16.2/qdrant-x86_64-unknown-linux-gnu.tar.gz")
483
+ sys.exit(1)
484
+
485
+ print(f"Starting Qdrant server [Storage: {abs_storage_path}]...")
486
+ env = os.environ.copy()
487
+ env["QDRANT__SERVICE__HTTP_PORT"] = str(http_port)
488
+ env["QDRANT__SERVICE__GRPC_PORT"] = str(grpc_port)
489
+ env["QDRANT__STORAGE__STORAGE_PATH"] = abs_storage_path
490
+
491
+ log_file = open("qdrant_log.txt", "w")
492
+ process = subprocess.Popen(
493
+ ["./qdrant"],
494
+ env=env,
495
+ stdout=log_file,
496
+ stderr=log_file,
497
+ preexec_fn=os.setpgrp
498
+ )
499
+ time.sleep(10) # Waiting for server initialization
500
+ return process
501
+
502
 
503
 
504
  if __name__ == "__main__":
505
+ # ---1. CLI Argument Settings---
506
+ parser = argparse.ArgumentParser(description="Knowledge Engine Data Pipeline Runner")
507
+ parser.add_argument("--lang", type=str, default="ko", help="Language code (e.g., ko, en)")
508
+ parser.add_argument("--chunk_batch_size", type=int, default=10000, help="Batch size for SQLite ingestion")
509
+ parser.add_argument("--limit", type=int, default=50000, help="Ingestion document limit")
510
+ parser.add_argument("--batch_size", type=int, default=1024, help="Embedding batch size")
511
+ parser.add_argument("--workers", type=int, default=4, help="Number of parallel workers for Qdrant upload")
512
+ parser.add_argument("--upload", action="store_true", help="Upload to HuggingFace after completion")
513
+ parser.add_argument("--repo_id", type=str, default="user_id/repo", help="Hugging Face repository ID for upload (e.g., user_id/repo)")
514
+ args = parser.parse_args()
515
+
516
+ # --- 2. Environment Setup ---
517
+ STORAGE_PATH = "./ke_store/qdrant_storage"
518
+
519
+ # --- 3. Server Execution ---
520
+ server_process = manage_qdrant_server(STORAGE_PATH)
521
+
522
+ # --- 4. Pipeline Execution (Utilizing Context Manager) ---
523
  try:
524
+ print(f"--- Starting Pipeline for language: {args.lang} ---")
525
+ with KnowledgeEngineBuilder() as builder:
526
+ builder.ingest_to_db(lang=args.lang, chunk_batch_size=args.chunk_batch_size, limit=args.limit)
527
+ builder.embed_corpus(lang=args.lang, batch_size=args.batch_size)
528
+ builder.upload_to_qdrant(lang=args.lang, parallel_workers=args.workers)
529
+
530
+ print("--- Pipeline Execution Successful ---")
531
+
532
+ except Exception as e:
533
+ print(f"Critical Error during pipeline: {e}")
534
+
535
  finally:
536
+ # --- 5. Graceful Shutdown ---
537
+ print("Shutting down Qdrant server safely...")
538
+ subprocess.run(["pkill", "-15", "qdrant"], check=False)
539
+ time.sleep(5) # Waiting for data flush
540
+
541
+ # --- 6. Hugging Face Upload (Optional) ---
542
+ if args.upload:
543
+ print("Uploading to Hugging Face Hub...")
544
+ api = HfApi()
545
+ upload_folder(
546
+ repo_id=args.repo_id,
547
+ folder_path="ke_store",
548
+ repo_type="dataset"
549
+ )
550
+ print("Upload complete!")
scripts/setup_db.py CHANGED
@@ -6,6 +6,8 @@ import sys
6
  from huggingface_hub import snapshot_download
7
  from huggingface_hub.utils import HfHubHTTPError
8
 
 
 
9
  from core.config import settings
10
  from core.logger import setup_logger
11
 
@@ -34,8 +36,8 @@ def download_knowledge_base():
34
  repo_id=repo_id,
35
  repo_type="dataset",
36
  local_dir=local_dir,
37
- allow_patterns=["corpus/*", "qdrant/*"],
38
- ignore_patterns=["build_cache/*", ".gitattributes"],
39
  max_workers=4
40
  )
41
  logger.info(f"✅ Download complete! Data is ready at: {download_path}")
 
6
  from huggingface_hub import snapshot_download
7
  from huggingface_hub.utils import HfHubHTTPError
8
 
9
+ sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
10
+
11
  from core.config import settings
12
  from core.logger import setup_logger
13
 
 
36
  repo_id=repo_id,
37
  repo_type="dataset",
38
  local_dir=local_dir,
39
+ allow_patterns=["knowledge_base/*", "vector_store/qdrant/*"],
40
+ ignore_patterns=["artifacts/*", ".gitattributes"],
41
  max_workers=4
42
  )
43
  logger.info(f"✅ Download complete! Data is ready at: {download_path}")
services/search_service.py CHANGED
@@ -24,13 +24,15 @@ class HybridSearchService:
24
  self.embedder = embedder
25
  self.reranker = reranker
26
 
27
- def search(self, query: str, top_k: int = 5, limit: int = 50) -> Dict[str, Any]:
28
  """
29
  Receives user queries and performs hybrid search and reranking.
30
 
31
  :param query: User search query
32
  :param top_k: Number of documents to return (after reranking)
33
  :param limit: Number of candidate documents to fetch from Qdrant (after RRF fusion, before reranking)
 
 
34
  """
35
  start_time = time.time()
36
  logger.info(f"🔍 Starting search pipeline for query: '{query}'")
@@ -97,12 +99,16 @@ class HybridSearchService:
97
  latency_ms = int((time.time() - start_time) * 1000)
98
  logger.info(f"✅ Search completed in {latency_ms}ms. Found {len(final_results)} final chunks.")
99
 
100
- return {
101
  "query": query,
102
  "results": final_results,
103
  "latency_ms": latency_ms
104
  }
105
 
 
 
 
 
106
  except Exception as e:
107
  # Wrap unexpected errors in custom errors and throw them to the router
108
  logger.error(f"❌ Pipeline failed: {str(e)}", exc_info=True)
@@ -113,6 +119,7 @@ class HybridSearchService:
113
  return {
114
  "query": query,
115
  "results": [],
 
116
  "latency_ms": int((time.time() - start_time) * 1000)
117
  }
118
 
@@ -138,6 +145,7 @@ class HybridSearchService:
138
  f"<doc id=\"{i}\" source=\"{source}\" "
139
  f"url=\"{meta.get('url', 'N/A')}\" "
140
  f"relevance_score=\"{res['score']}\">\n"
 
141
  f"{res['text']}\n"
142
  f"</doc>"
143
  )
 
24
  self.embedder = embedder
25
  self.reranker = reranker
26
 
27
+ def search(self, query: str, top_k: int = 5, limit: int = 50, include_llm_context: bool = True) -> Dict[str, Any]:
28
  """
29
  Receives user queries and performs hybrid search and reranking.
30
 
31
  :param query: User search query
32
  :param top_k: Number of documents to return (after reranking)
33
  :param limit: Number of candidate documents to fetch from Qdrant (after RRF fusion, before reranking)
34
+ :param include_llm_context: Whether to include LLM context in the response (formatted text for LLM consumption)
35
+ :return: A dictionary containing the original query, a list of search results, and latency information. Each search result includes chunk_id, text, relevance score, and metadata.
36
  """
37
  start_time = time.time()
38
  logger.info(f"🔍 Starting search pipeline for query: '{query}'")
 
99
  latency_ms = int((time.time() - start_time) * 1000)
100
  logger.info(f"✅ Search completed in {latency_ms}ms. Found {len(final_results)} final chunks.")
101
 
102
+ response = {
103
  "query": query,
104
  "results": final_results,
105
  "latency_ms": latency_ms
106
  }
107
 
108
+ if include_llm_context:
109
+ # 7. Optional: Format results into LLM-friendly context (Markdown/XML mixed format)
110
+ response["llm_context"] = self.format_for_llm(final_results)
111
+
112
  except Exception as e:
113
  # Wrap unexpected errors in custom errors and throw them to the router
114
  logger.error(f"❌ Pipeline failed: {str(e)}", exc_info=True)
 
119
  return {
120
  "query": query,
121
  "results": [],
122
+ "llm_context": "No relevant knowledge (documents) available.",
123
  "latency_ms": int((time.time() - start_time) * 1000)
124
  }
125
 
 
145
  f"<doc id=\"{i}\" source=\"{source}\" "
146
  f"url=\"{meta.get('url', 'N/A')}\" "
147
  f"relevance_score=\"{res['score']}\">\n"
148
+ f"date_modified=\"{meta.get('date_modified', 'N/A')}\">\n"
149
  f"{res['text']}\n"
150
  f"</doc>"
151
  )
start.sh ADDED
@@ -0,0 +1,23 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/bin/bash
2
+ # start.sh
3
+
4
+ echo "1. Downloading Knowledge Base Data (Syncing with Hugging Face)..."
5
+ # Download data first before executing FastAPI so that Qdrant can recognize it.
6
+ python scripts/setup_db.py
7
+
8
+ echo "2. Starting Qdrant Server in background..."
9
+ # Run in the background by explicitly specifying the repository path of Qdrant
10
+ export QDRANT__STORAGE__STORAGE_PATH="/app/data/vector_store/qdrant"
11
+ /usr/local/bin/qdrant &
12
+
13
+ # Wait until the Qdrant server is fully running before starting FastAPI
14
+ echo "Waiting for Qdrant to initialize..."
15
+ until curl -s http://localhost:6333/readyz > /dev/null; do
16
+ echo "Qdrant is not ready yet. Retrying in 2 seconds..."
17
+ sleep 2
18
+ done
19
+ echo "Qdrant is fully initialized!"
20
+
21
+ echo "3. Starting FastAPI Server..."
22
+ # Run Uvicorn in the foreground
23
+ uvicorn main:app --host 0.0.0.0 --port 7860
storage/qdrant_client.py CHANGED
@@ -13,13 +13,13 @@ class QdrantStorage:
13
  """
14
  Qdrant client performing hybrid search based on dense and sparse vectors
15
  """
16
- def __init__(self, path: str, collection_name: str = "knowledge_base"):
17
- self.path = path
18
  self.collection_name = collection_name
19
  try:
20
  # Local file system-based Qdrant connection (v1.10+)
21
- self.client = QdrantClient(path=self.path)
22
- logger.info(f"✅ Connected to local Qdrant at {self.path} (Collection: {self.collection_name})")
23
  except Exception as e:
24
  logger.critical(f"❌ Qdrant connection failed: {e}")
25
  raise e
 
13
  """
14
  Qdrant client performing hybrid search based on dense and sparse vectors
15
  """
16
+ def __init__(self, url: str, collection_name: str = "knowledge_base"):
17
+ self.url = url
18
  self.collection_name = collection_name
19
  try:
20
  # Local file system-based Qdrant connection (v1.10+)
21
+ self.client = QdrantClient(url=self.url, timeout=60.0)
22
+ logger.info(f"✅ Connected to local Qdrant at {self.url} (Collection: {self.collection_name})")
23
  except Exception as e:
24
  logger.critical(f"❌ Qdrant connection failed: {e}")
25
  raise e
storage/sqlite_client.py CHANGED
@@ -31,35 +31,40 @@ class SQLiteStorage:
31
  if not chunk_ids:
32
  return {}
33
 
34
- placeholders = ",".join("?" * len(chunk_ids))
35
-
36
- query = f"""
37
- SELECT
38
- c.chunk_id, c.text AS chunk_text,
39
- d.doc_id, d.title, d.lang, d.url, d.date_modified
40
- FROM chunks c
41
- JOIN documents d ON c.doc_id = d.doc_id
42
- WHERE c.chunk_id IN ({placeholders})
43
- """
44
 
45
  try:
46
  cur = self.conn.cursor()
47
- cur.execute(query, chunk_ids)
48
- rows = cur.fetchall()
49
-
50
- # Transform the result into a dictionary for O(1) access: { chunk_id: { "text": "...", "metadata": {...} } }
51
- result_dict = {}
52
- for row in rows:
53
- result_dict[row["chunk_id"]] = {
54
- "text": row["chunk_text"],
55
- "metadata": {
56
- "doc_id": row["doc_id"],
57
- "title": row["title"],
58
- "lang": row["lang"],
59
- "url": row["url"],
60
- "date_modified": row["date_modified"]
 
 
 
 
 
 
 
 
 
 
 
 
 
61
  }
62
- }
63
  return result_dict
64
 
65
  except sqlite3.Error as e:
 
31
  if not chunk_ids:
32
  return {}
33
 
34
+ CHUNK_SIZE_LIMIT = 900 # SQLite has a default limit of 999 variables per query, so we use 900 to be safe
35
+ result_dict = {}
 
 
 
 
 
 
 
 
36
 
37
  try:
38
  cur = self.conn.cursor()
39
+
40
+ for i in range(0, len(chunk_ids), CHUNK_SIZE_LIMIT):
41
+ batch_ids = chunk_ids[i:i + CHUNK_SIZE_LIMIT]
42
+ placeholders = ",".join("?" * len(batch_ids))
43
+ query = f"""
44
+ SELECT
45
+ c.chunk_id, c.text AS chunk_text,
46
+ d.doc_id, d.title, d.lang, d.url, d.date_modified
47
+ FROM chunks c
48
+ JOIN documents d ON c.doc_id = d.doc_id
49
+ WHERE c.chunk_id IN ({placeholders})
50
+ """
51
+
52
+ cur.execute(query, batch_ids)
53
+ rows = cur.fetchall()
54
+
55
+ # Transform the result into a dictionary for O(1) access: { chunk_id: { "text": "...", "metadata": {...} } }
56
+ for row in rows:
57
+ result_dict[row["chunk_id"]] = {
58
+ "text": row["chunk_text"],
59
+ "metadata": {
60
+ "doc_id": row["doc_id"],
61
+ "title": row["title"],
62
+ "lang": row["lang"],
63
+ "url": row["url"],
64
+ "date_modified": row["date_modified"]
65
+ }
66
  }
67
+
68
  return result_dict
69
 
70
  except sqlite3.Error as e: