scipious commited on
Commit
bad7edf
·
verified ·
1 Parent(s): 78f9356

Update reg_embedding_system_v02.py

Browse files
Files changed (1) hide show
  1. reg_embedding_system_v02.py +544 -453
reg_embedding_system_v02.py CHANGED
@@ -1,453 +1,544 @@
1
- import gc
2
- import json
3
- import sqlite3
4
- from pathlib import Path
5
- from typing import Optional, Tuple, Any, Dict, List, Set, Union
6
- import numpy as np
7
-
8
- import faiss
9
-
10
- # [수정됨] 패키지 구조 변경 반영 및 EnsembleRetriever 제거
11
- from langchain_community.retrievers import BM25Retriever
12
- from langchain_core.documents import Document
13
- from langchain_community.vectorstores import FAISS
14
- from sentence_transformers import SentenceTransformer
15
-
16
- # 런타임에 Embeddings 클래스를 찾기 위한 로직
17
- try:
18
- from langchain_core.embeddings import Embeddings
19
- except ImportError:
20
- try:
21
- from langchain.embeddings.base import Embeddings
22
- except ImportError:
23
- Embeddings = object
24
-
25
- # --- SQLite 헬퍼 함수 ---
26
- SQLITE_DB_NAME = "metadata_mapping.db"
27
-
28
- # === IDSelector 클래스 정의 ===
29
- class MetadataIDSelector(faiss.IDSelectorBatch):
30
- def __init__(self, allowed_ids: Set[int]):
31
- super().__init__(list(allowed_ids))
32
-
33
- def get_db_connection(persist_directory: str) -> sqlite3.Connection:
34
- """FAISS 저장 경로를 기반으로 SQLite 연결을 설정하고 반환합니다."""
35
- db_path = Path(persist_directory) / SQLITE_DB_NAME
36
- conn = sqlite3.connect(db_path)
37
- return conn
38
-
39
- def _create_and_populate_sqlite_db(chunks: List[Document], persist_directory: str):
40
- """
41
- 문서 청크를 기반으로 SQLite DB를 생성하고 채웁니다.
42
- [업데이트 반영] 메타데이터 구조: regulation, chapter, section, standard
43
- """
44
- # 1. 입력 데이터 확인 (가장 중요한 체크 포인트)
45
- if not chunks:
46
- print("🚨 [오류] _create_and_populate_sqlite_db 함수에 전달된 chunks 리스트가 비어 있습니다!")
47
- print(" -> load_chunks_from_jsonl 함수가 정상적으로 파일을 읽었는지 확인해주세요.")
48
- return
49
-
50
- # 2. 저장 경로 확인 및 생성
51
- save_dir = Path(persist_directory)
52
- save_dir.mkdir(parents=True, exist_ok=True)
53
-
54
- conn = get_db_connection(persist_directory)
55
-
56
- try:
57
- cursor = conn.cursor()
58
-
59
- # 3. 테이블 생성 (기존 테이블 삭제 후 재생성 옵션 고려)
60
- # 스키마가 변경되었으므로 기존 테이블이 있다면 충돌날 있습니다.
61
- # 안전하게 지우고 다시 만드는 방법을 추천합니다. (개발 단계)
62
- cursor.execute("DROP TABLE IF EXISTS documents")
63
-
64
- cursor.execute("""
65
- CREATE TABLE documents (
66
- faiss_id INTEGER PRIMARY KEY,
67
- source TEXT,
68
- regulation TEXT,
69
- chapter TEXT,
70
- section TEXT,
71
- standard TEXT,
72
- json_metadata TEXT
73
- )
74
- """)
75
- # 테이블 생성 직후 커밋 (파일에 스키마 기록)
76
- conn.commit()
77
- print(f"📂 DB 테이블 생성 완료 (경로: {save_dir}/{SQLITE_DB_NAME})")
78
-
79
- # 4. 데이터 채우기
80
- inserted_count = 0
81
- for i, doc in enumerate(chunks):
82
- faiss_id = i
83
- metadata_json = json.dumps(doc.metadata, ensure_ascii=False)
84
-
85
- source_val = doc.metadata.get('source', '')
86
- regulation_val = doc.metadata.get('regulation', '')
87
- chapter_val = doc.metadata.get('chapter', '')
88
- section_val = doc.metadata.get('section', '')
89
- standard_val = doc.metadata.get('standard', '')
90
-
91
- if isinstance(regulation_val, list): regulation_val = ', '.join(map(str, regulation_val))
92
- if isinstance(chapter_val, list): chapter_val = ', '.join(map(str, chapter_val))
93
- if isinstance(section_val, list): section_val = ', '.join(map(str, section_val))
94
- if isinstance(standard_val, list): standard_val = ', '.join(map(str, standard_val))
95
-
96
- doc.metadata['faiss_id'] = faiss_id
97
-
98
- cursor.execute(
99
- """
100
- INSERT OR REPLACE INTO documents
101
- (faiss_id, source, regulation, chapter, section, standard, json_metadata)
102
- VALUES (?, ?, ?, ?, ?, ?, ?)
103
- """,
104
- (faiss_id, source_val, regulation_val, chapter_val, section_val, standard_val, metadata_json)
105
- )
106
- inserted_count += 1
107
-
108
- # 5. 최종 커밋
109
- conn.commit()
110
- print(f"✅ SQLite 데이터 저장 완료: 총 {inserted_count}행이 삽입되었습니다.")
111
-
112
- except Exception as e:
113
- print(f"🚨 [DB 저장 중 에러 발생] {e}")
114
- # 에러가 나도 traceback을 있게 함
115
- import traceback
116
- traceback.print_exc()
117
-
118
- finally:
119
- # 6. 연결 확실히 종료
120
- conn.close()
121
-
122
- # --- LocalSentenceTransformerEmbeddings ---
123
- class LocalSentenceTransformerEmbeddings(Embeddings):
124
- def __init__(self, st_model, normalize_embeddings: bool = True, encode_batch_size: int = 32):
125
- self.model = st_model
126
- self.normalize = normalize_embeddings
127
- self.encode_batch_size = encode_batch_size
128
-
129
- def embed_documents(self, texts):
130
- vecs = self.model.encode(
131
- texts,
132
- batch_size=self.encode_batch_size,
133
- show_progress_bar=False,
134
- normalize_embeddings=self.normalize,
135
- convert_to_numpy=True,
136
- )
137
- return vecs.tolist()
138
-
139
- def embed_query(self, text: str):
140
- vec = self.model.encode(
141
- [text],
142
- batch_size=self.encode_batch_size,
143
- show_progress_bar=False,
144
- normalize_embeddings=self.normalize,
145
- convert_to_numpy=True,
146
- )[0]
147
- return vec.tolist()
148
-
149
- def load_chunks_from_jsonl(file_paths: Union[str, List[str]]):
150
- """
151
- JSONL 파일 로드 함수
152
- """
153
- if isinstance(file_paths, str):
154
- file_paths = [file_paths]
155
-
156
- restored_documents = []
157
- print(f" {len(file_paths)}개의 파일 병합 로드를 시작합니다...")
158
-
159
- for file_path in file_paths:
160
- try:
161
- file_doc_count = 0
162
- with open(file_path, 'r', encoding='utf-8') as f:
163
- for line_number, line in enumerate(f):
164
- line = line.strip()
165
- if not line: continue
166
- data = json.loads(line)
167
- doc = Document(
168
- page_content=data.get('page_content', ""),
169
- metadata=data.get('metadata', {})
170
- )
171
- restored_documents.append(doc)
172
- file_doc_count += 1
173
- print(f" - [성공] {file_path}: {file_doc_count}개 Chunk")
174
-
175
- except Exception as e:
176
- print(f" [실패] 오류 ({file_path}): {e}")
177
- continue
178
-
179
- print(f"✅ 전체 로드 완료: 총 {len(restored_documents)}개의 Chunk가 복원되었습니다.")
180
- return restored_documents
181
-
182
- # --- save_embedding_system (수정됨: Ensemble 제거 및 개별 반환) ---
183
- def save_embedding_system(
184
- chunks,
185
- persist_directory: str = r"D:/Project AI/RAG",
186
- batch_size: int = 32,
187
- device: str = 'cuda'
188
- ):
189
- Path(persist_directory).mkdir(parents=True, exist_ok=True)
190
-
191
- # 1) SQLite DB 저장
192
- _create_and_populate_sqlite_db(chunks, persist_directory)
193
-
194
- # 2) 모델 로드
195
- model = SentenceTransformer(
196
- 'nomic-ai/nomic-embed-text-v2-moe',
197
- trust_remote_code=True,
198
- device=device
199
- )
200
-
201
- embeddings = LocalSentenceTransformerEmbeddings(
202
- st_model=model,
203
- normalize_embeddings=True,
204
- encode_batch_size=batch_size
205
- )
206
-
207
- # 3) FAISS 생성
208
- vectorstore = None
209
- for i in range(0, len(chunks), batch_size):
210
- batch = chunks[i:i + batch_size]
211
- if vectorstore is None:
212
- vectorstore = FAISS.from_documents(documents=batch, embedding=embeddings)
213
- else:
214
- vectorstore.add_documents(documents=batch)
215
- gc.collect()
216
-
217
- # 4) BM25 생성 (Ensemble 없이 독립 생성)
218
- bm25_retriever = BM25Retriever.from_documents(chunks)
219
- bm25_retriever.k = 5
220
-
221
- # 5) 저장
222
- vectorstore.save_local(persist_directory)
223
-
224
- # 6) 연결 반환 (개별 요소 반환)
225
- sqlite_conn = get_db_connection(persist_directory)
226
- gc.collect()
227
-
228
- return bm25_retriever, vectorstore, sqlite_conn
229
-
230
- # --- load_embedding_from_faiss (수정됨: Ensemble 제거 및 개별 반환) ---
231
- def load_embedding_from_faiss(
232
- persist_directory: str = r"D:/Project AI/RAG",
233
- top_k: int = 10,
234
- bm25_k: int = 10,
235
- embeddings: Optional[Any] = None,
236
- device: str = 'cpu'
237
- ) -> Tuple[Any, FAISS, sqlite3.Connection]:
238
-
239
- if embeddings is None:
240
- st_model = SentenceTransformer(
241
- 'nomic-ai/nomic-embed-text-v2-moe',
242
- trust_remote_code=True,
243
- device=device
244
- )
245
- embeddings = LocalSentenceTransformerEmbeddings(
246
- st_model=st_model,
247
- normalize_embeddings=True,
248
- encode_batch_size=32
249
- )
250
-
251
- persist_dir = Path(persist_directory)
252
- if not persist_dir.exists():
253
- raise FileNotFoundError(f"FAISS 경로가 없습니다: {persist_dir}")
254
-
255
- # FAISS 로드
256
- vectorstore = FAISS.load_local(
257
- folder_path=str(persist_dir),
258
- embeddings=embeddings,
259
- allow_dangerous_deserialization=True
260
- )
261
-
262
- # BM25 복원 (저장된 문서로부터 재생성)
263
- bm25_retriever = None
264
- docs = []
265
- try:
266
- if hasattr(vectorstore, "docstore") and hasattr(vectorstore.docstore, "_dict"):
267
- docs = list(vectorstore.docstore._dict.values())
268
- if docs:
269
- bm25_retriever = BM25Retriever.from_documents(docs)
270
- bm25_retriever.k = bm25_k
271
- else:
272
- print("[경고] 저장된 문서를 찾을 수 없어 BM25를 생성하지 못했습니다.")
273
- except Exception as e:
274
- print(f"[경고] 저장된 문서를 읽는 중 문제가 발생했습니다: {e}")
275
-
276
- sqlite_conn = get_db_connection(persist_directory)
277
-
278
- return bm25_retriever, vectorstore, sqlite_conn
279
-
280
- # --- search_vectorstore (단순 벡터 검색 헬퍼) ---
281
- def search_vectorstore(bm25_retriever, vectorstore, query, k=5):
282
- """
283
- vectorstore와 bm25_retriever를 받아 앙상블(Hybrid) 검색을 수행하는 함수.
284
- EnsembleRetriever(weights=[0.6, 0.4]) 유사한 결과를 반환합니다.
285
- """
286
- weights=[0.6, 0.4]
287
- # 1. 벡터 검색 수행 (Vector Search)
288
- # FAISS를 리트리버로 변환하여 검색
289
- vec_retriever = vectorstore.as_retriever(search_kwargs={"k": k})
290
- vec_docs = vec_retriever.invoke(query)
291
-
292
- # 2. 키워드 검색 수행 (BM25 Search)
293
- # 검색 개수를 k개로 맞춰서 실행
294
- bm25_docs = bm25_retriever.invoke(query, config={"search_kwargs": {"k": k}})
295
-
296
- # 3. 랭킹 퓨전 (Weighted Reciprocal Rank Fusion)
297
- # 리스트의 순위를 기반으로 가중치를 적용해 점수를 매깁니다.
298
-
299
- doc_scores = {} # 문서 내용(또는 ID) -> 점수
300
- doc_map = {} # 문서 내용 -> 문서 객체 저장 (나중에 반환하기 위해)
301
-
302
- # 내부 함수: 순위에 따른 점수 계산 (Weight / (Rank + 1))
303
- def apply_rank_score(docs, weight):
304
- for rank, doc in enumerate(docs):
305
- # 고유 생성 (page_content가 고유하다고 가정하거나, doc_id가 있다면 사용)
306
- doc_key = doc.page_content
307
- doc_map[doc_key] = doc
308
-
309
- if doc_key not in doc_scores:
310
- doc_scores[doc_key] = 0.0
311
-
312
- # 순위가 높을수록(rank가 작을수록) 점수가 높음
313
- score = weight / (rank + 1)
314
- doc_scores[doc_key] += score
315
-
316
- # 벡터 검색 결과 점수 반영 (가중치 0.6)
317
- apply_rank_score(vec_docs, weights[0])
318
-
319
- # BM25 검색 결과 점수 반영 (가중치 0.4)
320
- apply_rank_score(bm25_docs, weights[1])
321
-
322
- # 4. 점수순 정렬 (높은 점수가 상위)
323
- # 점수(item[1])를 기준으로 내림차순 정렬
324
- sorted_docs = sorted(doc_scores.items(), key=lambda item: item[1], reverse=True)
325
-
326
- # 5. Top-K 추출 및 문서 객체 반환
327
- final_results = [doc_map[key] for key, score in sorted_docs[:k]]
328
-
329
- return final_results
330
-
331
- # --- search_with_metadata_filter (수정됨: 수동 병합 로직 구현) ---
332
- def search_with_metadata_filter(
333
- bm25_retriever: Any, # [변경] Ensemble 대신 BM25를 직접 받음
334
- vectorstore: FAISS,
335
- query: str,
336
- k: int = 5,
337
- metadata_filter: Optional[Dict[str, Any]] = None,
338
- sqlite_conn: Optional[sqlite3.Connection] = None
339
- ) -> List[Document]:
340
- """
341
- SQLite 사전 필터링 -> FAISS 벡터 검색 + BM25 검색 -> 결과 병합
342
- """
343
-
344
- # === 1. SQLite에서 필터링된 FAISS ID 추출 ===
345
- filtered_ids = None
346
- if metadata_filter and sqlite_conn:
347
- cursor = sqlite_conn.cursor()
348
- where_clauses = []
349
- params = []
350
-
351
- for key, value in metadata_filter.items():
352
- if isinstance(value, list):
353
- if not value: continue
354
- placeholders = ', '.join(['?'] * len(value))
355
- where_clauses.append(f"{key} IN ({placeholders})")
356
- params.extend(value)
357
- else:
358
- where_clauses.append(f"{key} = ?")
359
- params.append(value)
360
-
361
- if where_clauses:
362
- where_sql = " OR ".join(where_clauses)
363
- sql_query = f"SELECT faiss_id FROM documents WHERE {where_sql}"
364
-
365
- try:
366
- cursor.execute(sql_query, params)
367
- filtered_ids = {row[0] for row in cursor.fetchall()}
368
- print(f"[사전 필터링] {len(filtered_ids)}개 ID 획득 FAISS 검색 제한")
369
- except Exception as e:
370
- print(f"[경고] SQLite 필터링 실패: {e}")
371
- filtered_ids = None
372
- else:
373
- print("[안내] 필터 조건 없음 → 전체 검색")
374
- else:
375
- print("[안내] 필터 또는 DB 없음 → 전체 검색")
376
-
377
- # === 2. FAISS 벡터 검색 ===
378
- vector_docs = []
379
- if filtered_ids and len(filtered_ids) > 0:
380
- selector = MetadataIDSelector(filtered_ids)
381
- index: faiss.Index = vectorstore.index
382
-
383
- query_embedding = np.array(vectorstore.embeddings.embed_query(query)).astype('float32')
384
- query_embedding = query_embedding.reshape(1, -1)
385
-
386
- search_params = faiss.SearchParametersIVF(sel=selector, nprobe=20)
387
- _k = max(k * 10, 100)
388
- D, I = index.search(query_embedding, _k, params=search_params)
389
-
390
- valid_indices = [i for i in I[0] if i != -1]
391
- for idx in valid_indices[:k]:
392
- doc_id = vectorstore.index_to_docstore_id[idx]
393
- doc = vectorstore.docstore.search(doc_id)
394
- if isinstance(doc, Document):
395
- vector_docs.append(doc)
396
- print(f"[벡터 검색] {len(valid_indices)}개 후보 → {len(vector_docs)}개 유효")
397
- else:
398
- # 전체 검색
399
- vector_retriever = vectorstore.as_retriever(search_kwargs={"k": k})
400
- vector_docs = vector_retriever.invoke(query)
401
- print(f"[벡터 검색] 전체 검색 → {len(vector_docs)}개 후보")
402
-
403
- # === 3. BM25 검색 ===
404
- bm25_docs = []
405
- if bm25_retriever:
406
- search_k = k * 5
407
- candidates = bm25_retriever.invoke(query, config={"search_kwargs": {"k": search_k}})
408
- if filtered_ids:
409
- bm25_docs = [d for d in candidates if d.metadata.get('faiss_id') in filtered_ids]
410
- else:
411
- bm25_docs = candidates
412
-
413
- # Top K 자르기
414
- bm25_docs = bm25_docs[:k]
415
- print(f"[BM25 검색] {len(candidates)}개 후보 → {len(bm25_docs)}개 필터링 후")
416
-
417
- # === 4. 병합 (Vector 우선 + 중복 제거) ===
418
- combined = {id(d): d for d in (vector_docs + bm25_docs)}.values()
419
- final_results = list(combined)[:k]
420
-
421
- print(f"[최종 결과] {len(final_results)}개 문서 반환")
422
- return final_results
423
-
424
- # --- get_unique_metadata_values (빠진 함수 추가) ---
425
- def get_unique_metadata_values(
426
- sqlite_conn: sqlite3.Connection,
427
- key_name: str,
428
- partial_match: Optional[str] = None
429
- ) -> List[str]:
430
- """
431
- 고유 값 검색 함수.
432
- key_name 인자로 'part', 'subpart', 'section', 'source' 등을 사용할 수 있습니다.
433
- """
434
- if not sqlite_conn:
435
- return []
436
-
437
- cursor = sqlite_conn.cursor()
438
- # 안전을 위해 key_name은 컬럼명으로 직접 사용 (SQL Injection 주의: 내부 사용 전제)
439
- # 실제 프로덕션에서는 key_name 화이트리스트로 검증하는 것이 좋습니다.
440
- sql_query = f"SELECT DISTINCT `{key_name}` FROM documents"
441
- params = []
442
-
443
- if partial_match:
444
- sql_query += f" WHERE `{key_name}` LIKE ?"
445
- params.append(f"%{partial_match}%")
446
-
447
- try:
448
- cursor.execute(sql_query, params)
449
- unique_values = [row[0] for row in cursor.fetchall() if row[0] is not None]
450
- return unique_values
451
- except Exception as e:
452
- print(f"[에러] 고유 검색 실패 ({key_name}): {e}")
453
- return []
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import gc
2
+ import json
3
+ import sqlite3
4
+ from pathlib import Path
5
+ from typing import Optional, Tuple, Any, Dict, List, Set, Union
6
+ from collections import Counter
7
+ import numpy as np
8
+
9
+ import faiss
10
+
11
+ # [수정됨] 패키지 구조 변경 반영 및 EnsembleRetriever 제거
12
+ from langchain_community.retrievers import BM25Retriever
13
+ from langchain_core.documents import Document
14
+ from langchain_community.vectorstores import FAISS
15
+ from sentence_transformers import SentenceTransformer
16
+
17
+ # 런타임에 Embeddings 클래스를 찾기 위한 로직
18
+ try:
19
+ from langchain_core.embeddings import Embeddings
20
+ except ImportError:
21
+ try:
22
+ from langchain.embeddings.base import Embeddings
23
+ except ImportError:
24
+ Embeddings = object
25
+
26
+ # --- SQLite 헬퍼 함수 ---
27
+ SQLITE_DB_NAME = "metadata_mapping.db"
28
+
29
+ # === IDSelector 클래스 정의 ===
30
+ class MetadataIDSelector(faiss.IDSelectorBatch):
31
+ def __init__(self, allowed_ids: Set[int]):
32
+ super().__init__(list(allowed_ids))
33
+
34
+ def get_db_connection(persist_directory: str) -> sqlite3.Connection:
35
+ """FAISS 저장 경로를 기반으로 SQLite 연결을 설정하고 반환합니다."""
36
+ db_path = Path(persist_directory) / SQLITE_DB_NAME
37
+ conn = sqlite3.connect(db_path)
38
+ return conn
39
+
40
+ def _create_and_populate_sqlite_db(chunks: List[Document], persist_directory: str):
41
+ """
42
+ 문서 청크를 기반으로 SQLite DB를 생성하고 채웁니다.
43
+ [업데이트 반영] 메타데이터 구조: regulation, chapter, section, standard
44
+ """
45
+ # 1. 입력 데이터 확인 (가장 중요한 체크 포인트)
46
+ if not chunks:
47
+ print("🚨 [오류] _create_and_populate_sqlite_db 함수에 전달된 chunks 리스트가 비어 있습니다!")
48
+ print(" -> load_chunks_from_jsonl 함수가 정상적으로 파일을 읽었는지 확인해주세요.")
49
+ return
50
+
51
+ # 2. 저장 경로 확인 및 생성
52
+ save_dir = Path(persist_directory)
53
+ save_dir.mkdir(parents=True, exist_ok=True)
54
+
55
+ conn = get_db_connection(persist_directory)
56
+
57
+ try:
58
+ cursor = conn.cursor()
59
+
60
+ # 3. 테이블 생성 (기존 테이블 삭제 재생성 옵션 고려)
61
+ # 스키마가 변경되었으므로 기존 테이블이 있다면 충돌날 있습니다.
62
+ # 안전하게 지우고 다시 만드는 방법을 추천합니다. (개발 단계)
63
+ cursor.execute("DROP TABLE IF EXISTS documents")
64
+
65
+ cursor.execute("""
66
+ CREATE TABLE documents (
67
+ faiss_id INTEGER PRIMARY KEY,
68
+ source TEXT,
69
+ regulation TEXT,
70
+ chapter TEXT,
71
+ section TEXT,
72
+ standard TEXT,
73
+ json_metadata TEXT
74
+ )
75
+ """)
76
+ # 테이블 생성 직후 커밋 (파일에 스키마 기록)
77
+ conn.commit()
78
+ print(f"📂 DB 테이블 생성 완료 (경로: {save_dir}/{SQLITE_DB_NAME})")
79
+
80
+ # 4. 데이터 채우기
81
+ inserted_count = 0
82
+ for i, doc in enumerate(chunks):
83
+ faiss_id = i
84
+ metadata_json = json.dumps(doc.metadata, ensure_ascii=False)
85
+
86
+ source_val = doc.metadata.get('source', '')
87
+ regulation_val = doc.metadata.get('regulation', '')
88
+ chapter_val = doc.metadata.get('chapter', '')
89
+ section_val = doc.metadata.get('section', '')
90
+ standard_val = doc.metadata.get('standard', '')
91
+
92
+ if isinstance(regulation_val, list): regulation_val = ', '.join(map(str, regulation_val))
93
+ if isinstance(chapter_val, list): chapter_val = ', '.join(map(str, chapter_val))
94
+ if isinstance(section_val, list): section_val = ', '.join(map(str, section_val))
95
+ if isinstance(standard_val, list): standard_val = ', '.join(map(str, standard_val))
96
+
97
+ doc.metadata['faiss_id'] = faiss_id
98
+
99
+ cursor.execute(
100
+ """
101
+ INSERT OR REPLACE INTO documents
102
+ (faiss_id, source, regulation, chapter, section, standard, json_metadata)
103
+ VALUES (?, ?, ?, ?, ?, ?, ?)
104
+ """,
105
+ (faiss_id, source_val, regulation_val, chapter_val, section_val, standard_val, metadata_json)
106
+ )
107
+ inserted_count += 1
108
+
109
+ # 5. 최종 커밋
110
+ conn.commit()
111
+ print(f"✅ SQLite 데이터 저장 완료: 총 {inserted_count}행이 삽입되었습니다.")
112
+
113
+ except Exception as e:
114
+ print(f"🚨 [DB 저장 에러 발생] {e}")
115
+ # 에러가 나도 traceback을 볼 수 있게 함
116
+ import traceback
117
+ traceback.print_exc()
118
+
119
+ finally:
120
+ # 6. 연결 확실히 종료
121
+ conn.close()
122
+
123
+ # --- LocalSentenceTransformerEmbeddings ---
124
+ class LocalSentenceTransformerEmbeddings(Embeddings):
125
+ def __init__(self, st_model, normalize_embeddings: bool = True, encode_batch_size: int = 32):
126
+ self.model = st_model
127
+ self.normalize = normalize_embeddings
128
+ self.encode_batch_size = encode_batch_size
129
+
130
+ def embed_documents(self, texts):
131
+ vecs = self.model.encode(
132
+ texts,
133
+ batch_size=self.encode_batch_size,
134
+ show_progress_bar=False,
135
+ normalize_embeddings=self.normalize,
136
+ convert_to_numpy=True,
137
+ )
138
+ return vecs.tolist()
139
+
140
+ def embed_query(self, text: str):
141
+ vec = self.model.encode(
142
+ [text],
143
+ batch_size=self.encode_batch_size,
144
+ show_progress_bar=False,
145
+ normalize_embeddings=self.normalize,
146
+ convert_to_numpy=True,
147
+ )[0]
148
+ return vec.tolist()
149
+
150
+ def load_chunks_from_jsonl(file_paths: Union[str, List[str]]):
151
+ """
152
+ JSONL 파일 로드 함수
153
+ """
154
+ if isinstance(file_paths, str):
155
+ file_paths = [file_paths]
156
+
157
+ restored_documents = []
158
+ print(f" 총 {len(file_paths)}개의 파일 병합 로드를 시작합니다...")
159
+
160
+ for file_path in file_paths:
161
+ try:
162
+ file_doc_count = 0
163
+ with open(file_path, 'r', encoding='utf-8') as f:
164
+ for line_number, line in enumerate(f):
165
+ line = line.strip()
166
+ if not line: continue
167
+ data = json.loads(line)
168
+ doc = Document(
169
+ page_content=data.get('page_content', ""),
170
+ metadata=data.get('metadata', {})
171
+ )
172
+ restored_documents.append(doc)
173
+ file_doc_count += 1
174
+ print(f" - [성공] {file_path}: {file_doc_count}개 Chunk")
175
+
176
+ except Exception as e:
177
+ print(f" [실패] 오류 ({file_path}): {e}")
178
+ continue
179
+
180
+ print(f"✅ 전체 로드 완료: 총 {len(restored_documents)}개의 Chunk가 복원되었습니다.")
181
+ return restored_documents
182
+
183
+ # --- save_embedding_system (수정됨: Ensemble 제거 및 개별 반환) ---
184
+ def save_embedding_system(
185
+ chunks,
186
+ persist_directory: str = r"D:/Project AI/RAG",
187
+ batch_size: int = 32,
188
+ device: str = 'cuda'
189
+ ):
190
+ Path(persist_directory).mkdir(parents=True, exist_ok=True)
191
+
192
+ # 1) SQLite DB 저장
193
+ _create_and_populate_sqlite_db(chunks, persist_directory)
194
+
195
+ # 2) 모델 로드
196
+ model = SentenceTransformer(
197
+ 'nomic-ai/nomic-embed-text-v2-moe',
198
+ trust_remote_code=True,
199
+ device=device
200
+ )
201
+
202
+ embeddings = LocalSentenceTransformerEmbeddings(
203
+ st_model=model,
204
+ normalize_embeddings=True,
205
+ encode_batch_size=batch_size
206
+ )
207
+
208
+ # 3) FAISS 생성
209
+ vectorstore = None
210
+ for i in range(0, len(chunks), batch_size):
211
+ batch = chunks[i:i + batch_size]
212
+ if vectorstore is None:
213
+ vectorstore = FAISS.from_documents(documents=batch, embedding=embeddings)
214
+ else:
215
+ vectorstore.add_documents(documents=batch)
216
+ gc.collect()
217
+
218
+ # 4) BM25 생성 (Ensemble 없이 독립 생성)
219
+ bm25_retriever = BM25Retriever.from_documents(chunks)
220
+ bm25_retriever.k = 5
221
+
222
+ # 5) 저장
223
+ vectorstore.save_local(persist_directory)
224
+
225
+ # 6) 연결 반환 (개별 요소 반환)
226
+ sqlite_conn = get_db_connection(persist_directory)
227
+ gc.collect()
228
+
229
+ return bm25_retriever, vectorstore, sqlite_conn
230
+
231
+ # --- load_embedding_from_faiss (수정됨: Ensemble 제거 및 개별 반환) ---
232
+ def load_embedding_from_faiss(
233
+ persist_directory: str = r"D:/Project AI/RAG",
234
+ top_k: int = 10,
235
+ bm25_k: int = 10,
236
+ embeddings: Optional[Any] = None,
237
+ device: str = 'cpu'
238
+ ) -> Tuple[Any, FAISS, sqlite3.Connection]:
239
+
240
+ if embeddings is None:
241
+ st_model = SentenceTransformer(
242
+ 'nomic-ai/nomic-embed-text-v2-moe',
243
+ trust_remote_code=True,
244
+ device=device
245
+ )
246
+ embeddings = LocalSentenceTransformerEmbeddings(
247
+ st_model=st_model,
248
+ normalize_embeddings=True,
249
+ encode_batch_size=32
250
+ )
251
+
252
+ persist_dir = Path(persist_directory)
253
+ if not persist_dir.exists():
254
+ raise FileNotFoundError(f"FAISS 경로가 없습니다: {persist_dir}")
255
+
256
+ # FAISS 로드
257
+ vectorstore = FAISS.load_local(
258
+ folder_path=str(persist_dir),
259
+ embeddings=embeddings,
260
+ allow_dangerous_deserialization=True
261
+ )
262
+
263
+ # BM25 복원 (저장된 문서로부터 재생성)
264
+ bm25_retriever = None
265
+ docs = []
266
+ try:
267
+ if hasattr(vectorstore, "docstore") and hasattr(vectorstore.docstore, "_dict"):
268
+ docs = list(vectorstore.docstore._dict.values())
269
+ if docs:
270
+ bm25_retriever = BM25Retriever.from_documents(docs)
271
+ bm25_retriever.k = bm25_k
272
+ else:
273
+ print("[경고] 저장된 문서를 찾을 수 없어 BM25를 생성하지 못했습니다.")
274
+ except Exception as e:
275
+ print(f"[경고] 저장된 문서를 읽는 중 문제가 발생했습니다: {e}")
276
+
277
+ sqlite_conn = get_db_connection(persist_directory)
278
+
279
+ return bm25_retriever, vectorstore, sqlite_conn
280
+
281
+ # --- search_vectorstore (단순 벡터 검색 헬퍼) ---
282
+ def search_vectorstore(bm25_retriever, vectorstore, query, k=5):
283
+ """
284
+ vectorstore와 bm25_retriever를 받아 앙상블(Hybrid) 검색을 수행하는 함수.
285
+ EnsembleRetriever(weights=[0.6, 0.4])와 유사한 결과를 반환합니다.
286
+ """
287
+ weights=[0.6, 0.4]
288
+ # 1. 벡터 검색 수행 (Vector Search)
289
+ # FAISS를 리트리버로 변환하여 검색
290
+ vec_retriever = vectorstore.as_retriever(search_kwargs={"k": k})
291
+ vec_docs = vec_retriever.invoke(query)
292
+
293
+ # 2. 키워드 검색 수행 (BM25 Search)
294
+ # 검색 개수를 k개로 맞춰서 실행
295
+ bm25_docs = bm25_retriever.invoke(query, config={"search_kwargs": {"k": k}})
296
+
297
+ # 3. 랭킹 퓨전 (Weighted Reciprocal Rank Fusion)
298
+ # 두 리스트의 순위를 기반으로 가중치를 적용해 점수를 매깁니다.
299
+
300
+ doc_scores = {} # 문서 내용(또는 ID) -> 점수
301
+ doc_map = {} # 문서 내용 -> 문서 객체 저장 (나중에 반환하기 위해)
302
+
303
+ # 내부 함수: 순위에 따른 점수 계산 (Weight / (Rank + 1))
304
+ def apply_rank_score(docs, weight):
305
+ for rank, doc in enumerate(docs):
306
+ # 고유 키 생성 (page_content가 고유하다고 가정하거나, doc_id가 있다면 사용)
307
+ doc_key = doc.page_content
308
+ doc_map[doc_key] = doc
309
+
310
+ if doc_key not in doc_scores:
311
+ doc_scores[doc_key] = 0.0
312
+
313
+ # 순위가 높을수록(rank 작을수록) 점수가 높음
314
+ score = weight / (rank + 1)
315
+ doc_scores[doc_key] += score
316
+
317
+ # 벡터 검색 결과 점수 반영 (가중치 0.6)
318
+ apply_rank_score(vec_docs, weights[0])
319
+
320
+ # BM25 검색 결과 점수 반영 (가중치 0.4)
321
+ apply_rank_score(bm25_docs, weights[1])
322
+
323
+ # 4. 점수순 정렬 (높은 점수가 상위)
324
+ # 점수(item[1])를 기준으로 내림차순 정렬
325
+ sorted_docs = sorted(doc_scores.items(), key=lambda item: item[1], reverse=True)
326
+
327
+ # 5. Top-K 추출 문서 객체 반환
328
+ final_results = [doc_map[key] for key, score in sorted_docs[:k]]
329
+
330
+ return final_results
331
+
332
+ # --- search_with_metadata_filter (수정됨: 수동 병합 로직 구현) ---
333
+ def search_with_metadata_filter(
334
+ bm25_retriever: Any, # [변경] Ensemble 대신 BM25를 직접 받음
335
+ vectorstore: FAISS,
336
+ query: str,
337
+ k: int = 5,
338
+ metadata_filter: Optional[Dict[str, Any]] = None,
339
+ sqlite_conn: Optional[sqlite3.Connection] = None
340
+ ) -> List[Document]:
341
+ """
342
+ SQLite 사전 필터링 -> FAISS 벡터 검색 + BM25 검색 -> 결과 병합
343
+ """
344
+
345
+ # === 1. SQLite에서 필터링된 FAISS ID 추출 ===
346
+ filtered_ids = None
347
+ if metadata_filter and sqlite_conn:
348
+ cursor = sqlite_conn.cursor()
349
+ where_clauses = []
350
+ params = []
351
+
352
+ for key, value in metadata_filter.items():
353
+ if isinstance(value, list):
354
+ if not value: continue
355
+ placeholders = ', '.join(['?'] * len(value))
356
+ where_clauses.append(f"{key} IN ({placeholders})")
357
+ params.extend(value)
358
+ else:
359
+ where_clauses.append(f"{key} = ?")
360
+ params.append(value)
361
+
362
+ if where_clauses:
363
+ where_sql = " OR ".join(where_clauses)
364
+ sql_query = f"SELECT faiss_id FROM documents WHERE {where_sql}"
365
+
366
+ try:
367
+ cursor.execute(sql_query, params)
368
+ filtered_ids = {row[0] for row in cursor.fetchall()}
369
+ print(f"[사전 필터링] {len(filtered_ids)}개 ID 획득 → FAISS 검색 제한")
370
+ except Exception as e:
371
+ print(f"[경고] SQLite 필터링 실패: {e}")
372
+ filtered_ids = None
373
+ else:
374
+ print("[안내] 필터 조건 없음 → 전체 검색")
375
+ else:
376
+ print("[안내] 필터 또는 DB 없음 → 전체 검색")
377
+
378
+ # === 2. FAISS 벡터 검색 ===
379
+ vector_docs = []
380
+ if filtered_ids and len(filtered_ids) > 0:
381
+ selector = MetadataIDSelector(filtered_ids)
382
+ index: faiss.Index = vectorstore.index
383
+
384
+ query_embedding = np.array(vectorstore.embeddings.embed_query(query)).astype('float32')
385
+ query_embedding = query_embedding.reshape(1, -1)
386
+
387
+ search_params = faiss.SearchParametersIVF(sel=selector, nprobe=20)
388
+ _k = max(k * 10, 100)
389
+ D, I = index.search(query_embedding, _k, params=search_params)
390
+
391
+ valid_indices = [i for i in I[0] if i != -1]
392
+ for idx in valid_indices[:k]:
393
+ doc_id = vectorstore.index_to_docstore_id[idx]
394
+ doc = vectorstore.docstore.search(doc_id)
395
+ if isinstance(doc, Document):
396
+ vector_docs.append(doc)
397
+ print(f"[벡터 검색] {len(valid_indices)}개 후보 → {len(vector_docs)}개 유효")
398
+ else:
399
+ # 전체 검색
400
+ vector_retriever = vectorstore.as_retriever(search_kwargs={"k": k})
401
+ vector_docs = vector_retriever.invoke(query)
402
+ print(f"[벡터 검색] 전체 검색 → {len(vector_docs)}개 후보")
403
+
404
+ # === 3. BM25 검색 ===
405
+ bm25_docs = []
406
+ if bm25_retriever:
407
+ search_k = k * 5
408
+ candidates = bm25_retriever.invoke(query, config={"search_kwargs": {"k": search_k}})
409
+ if filtered_ids:
410
+ bm25_docs = [d for d in candidates if d.metadata.get('faiss_id') in filtered_ids]
411
+ else:
412
+ bm25_docs = candidates
413
+
414
+ # Top K 자르기
415
+ bm25_docs = bm25_docs[:k]
416
+ print(f"[BM25 검색] {len(candidates)}개 후보 → {len(bm25_docs)}개 필터링 후")
417
+
418
+ # === 4. 병합 (Vector 우선 + 중복 제거) ===
419
+ combined = {id(d): d for d in (vector_docs + bm25_docs)}.values()
420
+ final_results = list(combined)[:k]
421
+
422
+ print(f"[최종 결과] {len(final_results)}개 문서 반환")
423
+ return final_results
424
+
425
+ # --- get_unique_metadata_values (빠진 함수 추가) ---
426
+ def get_unique_metadata_values(
427
+ sqlite_conn: sqlite3.Connection,
428
+ key_name: str,
429
+ partial_match: Optional[str] = None
430
+ ) -> List[str]:
431
+ """
432
+ 고유 검색 함수.
433
+ key_name 인자로 'part', 'subpart', 'section', 'source' 등을 사용할 수 있습니다.
434
+ """
435
+ if not sqlite_conn:
436
+ return []
437
+
438
+ cursor = sqlite_conn.cursor()
439
+ # 안전을 위해 key_name 컬럼명으로 직접 사용 (SQL Injection 주의: 내부 사용 전제)
440
+ # 실제 프로덕션에서는 key_name 화이트리스트로 검증하는 것이 좋습니다.
441
+ sql_query = f"SELECT DISTINCT `{key_name}` FROM documents"
442
+ params = []
443
+
444
+ if partial_match:
445
+ sql_query += f" WHERE `{key_name}` LIKE ?"
446
+ params.append(f"%{partial_match}%")
447
+
448
+ try:
449
+ cursor.execute(sql_query, params)
450
+ unique_values = [row[0] for row in cursor.fetchall() if row[0] is not None]
451
+ return unique_values
452
+ except Exception as e:
453
+ print(f"[에러] 고유 값 검색 실패 ({key_name}): {e}")
454
+ return []
455
+
456
+ def smart_search_vectorstore(
457
+ retriever,
458
+ query,
459
+ k=5,
460
+ vectorstore=None,
461
+ sqlite_conn=None,
462
+ enable_detailed_search=True
463
+ ):
464
+ """기본 검색 + 상세 검색 수행"""
465
+ # 1. 기본 검색
466
+ basic_results = retriever.invoke(query)
467
+ basic_results = basic_results[:k]
468
+ #logger.info(f"[기본 검색] {len(basic_results)}개 문서 검색 완료")
469
+
470
+ if not enable_detailed_search or not vectorstore or not sqlite_conn:
471
+ #logger.info("[안내] 상세 검색 비활성화 또는 컴포넌트 부족 → 기본 검색 결과만 반환")
472
+ return basic_results
473
+
474
+ # 2. regulation_part 빈도 분석
475
+ regulation_parts = []
476
+ for doc in basic_results:
477
+ reg_part = doc.metadata.get('regulation_part')
478
+ if reg_part:
479
+ if isinstance(reg_part, list):
480
+ regulation_parts.extend(reg_part)
481
+ elif isinstance(reg_part, str):
482
+ if ',' in reg_part:
483
+ regulation_parts.extend([part.strip() for part in reg_part.split(',')])
484
+ else:
485
+ regulation_parts.append(reg_part)
486
+
487
+ if not regulation_parts:
488
+ #logger.info("[안내] regulation_part 메타데이터 없음 → 기본 검색 결과만 반환")
489
+ return basic_results
490
+
491
+ counter = Counter(regulation_parts)
492
+ most_extracted_category = counter.most_common(2)
493
+ #logger.info(f"[빈도 분석] regulation_part 빈도: {dict(counter)}")
494
+ #logger.info(f"[상위 카테고리] {most_extracted_category}")
495
+
496
+ # 3. 상세 검색
497
+ detailed_results = []
498
+ for rank, (category, count) in enumerate(most_extracted_category, 1):
499
+ #logger.info(f"[상세 검색 {rank}순위] '{category}' 카테고리 검색 시작 (빈도: {count})")
500
+ metadata_filter = {'regulation_part': category}
501
+
502
+ try:
503
+ category_results = search_with_metadata_filter(
504
+ ensemble_retriever=retriever,
505
+ vectorstore=vectorstore,
506
+ query=query,
507
+ k=k,
508
+ metadata_filter=metadata_filter,
509
+ sqlite_conn=sqlite_conn
510
+ )
511
+ detailed_results.extend(category_results)
512
+ #logger.info(f"[상세 검색 {rank}순위] {len(category_results)}개 추가 문서 검색 완료")
513
+ except Exception as e:
514
+ #logger.info(f"[경고] 상세 검색 {rank}순위 실패 ({category}): {e}")
515
+ continue
516
+
517
+ # 4. 결과 병합
518
+ seen = set()
519
+ final_results = []
520
+
521
+ #Detailed 검색 결과를 먼저 추가
522
+ for doc in detailed_results:
523
+ doc_signature = (doc.page_content, str(sorted(doc.metadata.items())))
524
+ if doc_signature not in seen:
525
+ seen.add(doc_signature)
526
+ final_results.append(doc)
527
+
528
+ for doc in basic_results:
529
+ doc_signature = (doc.page_content, str(sorted(doc.metadata.items())))
530
+ if doc_signature not in seen:
531
+ seen.add(doc_signature)
532
+ final_results.append(doc)
533
+
534
+ final_results = final_results[:k]
535
+ #logger.info(f"[최종 결과] 기본 {len(basic_results)}개 + 상세 {len(detailed_results)}개 → 중복 제거 후 {len(final_results)}개 반환")
536
+
537
+ return final_results
538
+
539
+ # natural_sort_key 함수 추가 (app.py에서 사용됨)
540
+ import re
541
+
542
+ def natural_sort_key(s):
543
+ """자연스러운 정렬을 위한 키 함수"""
544
+ return [int(text) if text.isdigit() else text.lower() for text in re.split('([0-9]+)', str(s))]