scipious commited on
Commit
6358489
·
verified ·
1 Parent(s): 4f73c92

Update reg_embedding_system.py

Browse files
Files changed (1) hide show
  1. reg_embedding_system.py +469 -527
reg_embedding_system.py CHANGED
@@ -1,527 +1,469 @@
1
- import gc
2
- import json
3
- import sqlite3 # SQLite 모듈 추가
4
- from pathlib import Path
5
- from typing import Optional, Tuple, Any, Dict, List, Set
6
- from collections import Counter # 빈도 계산을 위한 추가
7
- import numpy as np
8
-
9
- import faiss
10
- from langchain.retrievers import BM25Retriever, EnsembleRetriever
11
- from langchain_core.documents import Document
12
- from langchain_community.vectorstores import FAISS
13
- from sentence_transformers import SentenceTransformer
14
-
15
- # 런타임에 Embeddings 클래스를 찾기 위한 로직 (원본 코드 유지)
16
- try:
17
- from langchain_core.embeddings import Embeddings
18
- except ImportError:
19
- try:
20
- from langchain.embeddings.base import Embeddings
21
- except ImportError:
22
- Embeddings = object
23
-
24
- # --- SQLite 헬퍼 함수 ---
25
- SQLITE_DB_NAME = "metadata_mapping.db"
26
-
27
- # === IDSelector 클래스 정의 (파일 상단 또는 함수 외부에 위치) ===
28
- # IDSelector 대신 IDSelectorBatch 사용
29
- class MetadataIDSelector(faiss.IDSelectorBatch):
30
- def __init__(self, allowed_ids: Set[int]):
31
- # IDSelectorBatch는 allowed_ids 리스트를 직접 받음
32
- super().__init__(list(allowed_ids)) # 리스트로 변환해서 전달
33
-
34
- def get_db_connection(persist_directory: str) -> sqlite3.Connection:
35
- """FAISS 저장 경로를 기반으로 SQLite 연결을 설정하고 반환합니다."""
36
- db_path = Path(persist_directory) / SQLITE_DB_NAME
37
- conn = sqlite3.connect(db_path)
38
- return conn
39
-
40
- def _create_and_populate_sqlite_db(chunks: List[Document], persist_directory: str):
41
- """문서 청크를 기반으로 SQLite DB를 생성하고 채웁니다. (save_embedding_system 내부용)"""
42
- conn = get_db_connection(persist_directory)
43
- cursor = conn.cursor()
44
-
45
- # 1. 테이블 생성
46
- cursor.execute("""
47
- CREATE TABLE IF NOT EXISTS documents (
48
- faiss_id INTEGER PRIMARY KEY,
49
- regulation_part TEXT,
50
- regulation_section TEXT,
51
- chapter_section TEXT,
52
- jo TEXT,
53
- json_metadata TEXT
54
- )
55
- """)
56
- conn.commit()
57
-
58
- # 2. 데이터 채우기: FAISS ID는 청크 인덱스 i와 동일하게 매핑
59
- for i, doc in enumerate(chunks):
60
- faiss_id = i
61
- metadata_json = json.dumps(doc.metadata, ensure_ascii=False)
62
- reg_part = doc.metadata.get('regulation_part')
63
- reg_section = doc.metadata.get('regulation_section')
64
- reg_chapter = doc.metadata.get('chapter_section')
65
- reg_jo = doc.metadata.get('jo')
66
-
67
- # 변수가 리스트인 경우, 쉼표로 구분된 문자열로 변환
68
- if isinstance(reg_section, list):
69
- reg_section = ', '.join(map(str, reg_section))
70
- if isinstance(reg_part, list):
71
- reg_part = ', '.join(map(str, reg_part))
72
- if isinstance(reg_chapter, list):
73
- reg_chapter = ', '.join(map(str, reg_chapter))
74
- if isinstance(reg_jo, list):
75
- reg_jo = ', '.join(map(str, reg_jo))
76
-
77
- # 문서 메타데이터에 FAISS ID 추가 (검색 후 필터링 함수에서 활용)
78
- doc.metadata['faiss_id'] = faiss_id
79
-
80
- cursor.execute(
81
- "INSERT OR REPLACE INTO documents (faiss_id, regulation_part, regulation_section, chapter_section, jo, json_metadata) VALUES (?, ?, ?, ?, ?, ?)",
82
- (faiss_id, reg_part, reg_section, reg_chapter, reg_jo, metadata_json)
83
- )
84
-
85
- conn.commit()
86
- conn.close()
87
-
88
- # --- LocalSentenceTransformerEmbeddings (변경 없음) ---
89
-
90
- class LocalSentenceTransformerEmbeddings(Embeddings):
91
- """SentenceTransformer를 LangChain Embeddings 인터페이스로 래핑"""
92
-
93
- def __init__(self, st_model, normalize_embeddings: bool = True, encode_batch_size: int = 32):
94
- self.model = st_model
95
- self.normalize = normalize_embeddings
96
- self.encode_batch_size = encode_batch_size
97
-
98
- def embed_documents(self, texts):
99
- vecs = self.model.encode(
100
- texts,
101
- batch_size=self.encode_batch_size,
102
- show_progress_bar=False,
103
- normalize_embeddings=self.normalize,
104
- convert_to_numpy=True,
105
- )
106
- return vecs.tolist()
107
-
108
- def embed_query(self, text: str):
109
- vec = self.model.encode(
110
- [text],
111
- batch_size=self.encode_batch_size,
112
- show_progress_bar=False,
113
- normalize_embeddings=self.normalize,
114
- convert_to_numpy=True,
115
- )[0]
116
- return vec.tolist()
117
-
118
- # --- save_embedding_system (SQLite 저장 로직 추가) ---
119
-
120
- def save_embedding_system(
121
- chunks,
122
- persist_directory: str = r"D:/Project AI/RAG",
123
- batch_size: int = 32,
124
- device: str = 'cuda'
125
- ):
126
- """
127
- 청크를 임베딩하여 FAISS 벡터스토어와 앙상블 리트리버를 생성하고,
128
- SQLite DB에 메타데이터를 저장합니다.
129
- """
130
- Path(persist_directory).mkdir(parents=True, exist_ok=True)
131
-
132
- # 1) SQLite DB에 메타데이터 저장 및 청크에 faiss_id 추가
133
- # 함수 내에서 chunks 리스트의 metadata에 'faiss_id'가 추가됩니다.
134
- _create_and_populate_sqlite_db(chunks, persist_directory)
135
-
136
- # 2) SentenceTransformer 로드 (원래 코드와 동일)
137
- model = SentenceTransformer(
138
- 'nomic-ai/nomic-embed-text-v2-moe',
139
- trust_remote_code=True,
140
- device=device
141
- )
142
-
143
- embeddings = LocalSentenceTransformerEmbeddings(
144
- st_model=model,
145
- normalize_embeddings=True,
146
- encode_batch_size=batch_size
147
- )
148
-
149
- # 3) FAISS 벡터스토어 생성 (원래 코드와 동일)
150
- vectorstore = None
151
- for i in range(0, len(chunks), batch_size):
152
- batch = chunks[i:i + batch_size]
153
- if vectorstore is None:
154
- vectorstore = FAISS.from_documents(documents=batch, embedding=embeddings)
155
- else:
156
- vectorstore.add_documents(documents=batch)
157
- gc.collect()
158
-
159
- # 4) BM25 + 벡터 앙상블 리트리버 생성 (원래 코드와 동일)
160
- bm25_retriever = BM25Retriever.from_documents(chunks)
161
- bm25_retriever.k = 5
162
-
163
- vector_retriever = vectorstore.as_retriever(search_kwargs={"k": 5})
164
-
165
- ensemble_retriever = EnsembleRetriever(
166
- retrievers=[vector_retriever, bm25_retriever],
167
- weights=[0.6, 0.4]
168
- )
169
-
170
- # 5) FAISS 인덱스 저장 (원래 코드와 동일)
171
- vectorstore.save_local(persist_directory)
172
-
173
- # 6) SQLite 연결
174
- sqlite_conn = get_db_connection(persist_directory)
175
- gc.collect()
176
-
177
- return ensemble_retriever, vectorstore, sqlite_conn
178
-
179
- # --- load_embedding_from_faiss (SQLite 연결 반환 추가) ---
180
-
181
- def load_embedding_from_faiss(
182
- persist_directory: str = r"D:/Project AI/RAG",
183
- top_k: int = 10,
184
- bm25_k: int = 10,
185
- weights: Tuple[float, float] = (0.6, 0.4),
186
- embeddings: Optional[Any] = None,
187
- device: str = 'cpu'
188
- ) -> Tuple[Any, FAISS, sqlite3.Connection]: # 반환 값에 SQLite 연결 추가
189
- """
190
- 저장된 FAISS 인덱스와 SQLite 연결을 로드하여 앙상블 리트리버를 생성합니다.
191
- """
192
- # 1) Embeddings 준비 (원래 코드와 동일)
193
- if embeddings is None:
194
- st_model = SentenceTransformer(
195
- 'nomic-ai/nomic-embed-text-v2-moe',
196
- trust_remote_code=True,
197
- device=device
198
- )
199
- embeddings = LocalSentenceTransformerEmbeddings(
200
- st_model=st_model,
201
- normalize_embeddings=True,
202
- encode_batch_size=32
203
- )
204
-
205
- # 2) FAISS 벡터스토어 로드 (원래 코드와 동일)
206
- persist_dir = Path(persist_directory)
207
- if not persist_dir.exists():
208
- raise FileNotFoundError(f"FAISS 경로가 없습니다: {persist_dir}")
209
-
210
- vectorstore = FAISS.load_local(
211
- folder_path=str(persist_dir),
212
- embeddings=embeddings,
213
- allow_dangerous_deserialization=True
214
- )
215
-
216
- # 3) BM25를 위한 문서 추출 (원래 코드와 동일)
217
- docs = []
218
- try:
219
- # FAISS docstore에서 문서 추출
220
- if hasattr(vectorstore, "docstore") and hasattr(vectorstore.docstore, "_dict"):
221
- docs = list(vectorstore.docstore._dict.values())
222
- except Exception as e:
223
- print(f"[경고] 저장된 문서를 읽는 중 문제가 발생했습니다: {e}")
224
-
225
- # 4) 앙상블 리트리버 구성 (원래 코드와 동일)
226
- vector_retriever = vectorstore.as_retriever(search_kwargs={"k": top_k})
227
-
228
- if docs:
229
- bm25_retriever = BM25Retriever.from_documents(docs)
230
- bm25_retriever.k = bm25_k
231
- ensemble_retriever = EnsembleRetriever(
232
- retrievers=[vector_retriever, bm25_retriever],
233
- weights=list(weights)
234
- )
235
- else:
236
- print("[안내] 문서를 찾지 못해 BM25 없이 벡터 리트리버만 반환합니다.")
237
- ensemble_retriever = vector_retriever
238
-
239
- # 5) SQLite 연결
240
- sqlite_conn = get_db_connection(persist_directory)
241
-
242
- return ensemble_retriever, vectorstore, sqlite_conn # SQLite 연결 반환
243
-
244
- # --- search_vectorstore (변경 없음) ---
245
-
246
- def search_vectorstore(retriever, query, k=5):
247
- """리트리버를 사용해 쿼리와 관련된 문서를 검색합니다."""
248
- results = retriever.invoke(query)
249
- return results[:k]
250
-
251
- # === search_with_metadata_filter (사전 필터링 버전) ===
252
- def search_with_metadata_filter(
253
- ensemble_retriever: EnsembleRetriever,
254
- vectorstore: FAISS,
255
- query: str,
256
- k: int = 5,
257
- metadata_filter: Optional[Dict[str, Any]] = None,
258
- sqlite_conn: Optional[sqlite3.Connection] = None,
259
- exact_match: bool = True
260
- ) -> List[Document]:
261
- """
262
- SQLite로 사전 필터링 → FAISS ID 추출 → IDSelector로 FAISS 검색 제한
263
- BM25는 post-filtering (BM25는 IDSelector 미지원)
264
- """
265
- vector_ret, bm25_ret = ensemble_retriever.retrievers
266
-
267
- # === 1. SQLite에서 필터링된 FAISS ID 추출 ===
268
- filtered_ids = None
269
- if metadata_filter and sqlite_conn:
270
- cursor = sqlite_conn.cursor()
271
- where_clauses = []
272
- params = []
273
-
274
- for key, value in metadata_filter.items():
275
- print(f"[key] {key}")
276
- print(f"[value] {value}")
277
- if isinstance(value, list):
278
- # IN 쿼리: 리스트 값 지원
279
- if not value:
280
- continue # 리스트면 무시
281
- placeholders = ', '.join(['?'] * len(value))
282
- where_clauses.append(f"{key} IN ({placeholders})")
283
- params.extend(value)
284
- else:
285
- # 단일 값
286
- where_clauses.append(f"{key} = ?")
287
- params.append(value)
288
-
289
-
290
- if where_clauses:
291
- where_sql = " OR ".join(where_clauses)
292
- sql_query = f"SELECT faiss_id FROM documents WHERE {where_sql}"
293
-
294
- try:
295
- cursor.execute(sql_query, params)
296
- filtered_ids = {row[0] for row in cursor.fetchall()}
297
- print(f"[사전 필터링] {len(filtered_ids)}개 ID 획득 → FAISS 검색 제한")
298
- except Exception as e:
299
- print(f"[경고] SQLite 필터링 실패: {e}")
300
- filtered_ids = None
301
- else:
302
- print("[안내] 필터 조건 없음 전체 검색")
303
- else:
304
- print("[안내] 필터 또는 DB 없음 → 전체 검색")
305
-
306
- # === 2. FAISS 벡터 검색 (IDSelector 기반 사전 필터링) ===
307
- if filtered_ids and len(filtered_ids) > 0:
308
- # IDSelector 생성
309
- selector = MetadataIDSelector(filtered_ids)
310
-
311
- # FAISS 인덱스 추출
312
- index: faiss.Index = vectorstore.index
313
- if not hasattr(index, "search"):
314
- raise ValueError("FAISS 인덱스가 검색을 지원하지 않습니다.")
315
-
316
- # 쿼리 임베딩
317
- query_embedding = np.array(vectorstore.embeddings.embed_query(query)).astype('float32')
318
- query_embedding = query_embedding.reshape(1, -1)
319
-
320
- # 검색 파라미터 설정
321
- search_params = faiss.SearchParametersIVF(
322
- sel=selector,
323
- nprobe=50 # 필요시 조정 (성능 vs 재현율)
324
- )
325
-
326
- # 여유 있게 k * 10개 후보 요청 (필터 후 부족 방지)
327
- _k = max(k * 10, 100)
328
- D, I = index.search(query_embedding, _k, params=search_params)
329
-
330
- # 유효한 결과만 추출
331
- valid_indices = [i for i in I[0] if i != -1]
332
- vector_docs = []
333
- for idx in valid_indices[:k]:
334
- doc_id = vectorstore.index_to_docstore_id[idx]
335
- doc = vectorstore.docstore.search(doc_id)
336
- if isinstance(doc, Document):
337
- vector_docs.append(doc)
338
-
339
- print(f"[벡터 검색] {len(valid_indices)}개 후보 {len(vector_docs)}개 유효")
340
- else:
341
- # 필터 없거나 실패 일반 검색 (기존 방식)
342
- search_k = k * 5
343
- vector_docs = vector_ret.invoke(query, config={"search_kwargs": {"k": search_k}})
344
- print(f"[벡터 검색] 전체 검색 → {len(vector_docs)}개 후보")
345
-
346
- # === 3. BM25 검색 (post-filtering, BM25는 IDSelector 미지원) ===
347
- bm25_docs = []
348
- if hasattr(bm25_ret, "invoke"):
349
- search_k = k * 5
350
- candidates = bm25_ret.invoke(query, config={"search_kwargs": {"k": search_k}})
351
- if filtered_ids:
352
- bm25_docs = [d for d in candidates if d.metadata.get('faiss_id') in filtered_ids]
353
- else:
354
- bm25_docs = candidates[:k]
355
- print(f"[BM25 검색] {len(candidates)}개 후보 → {len(bm25_docs)}개 필터링 후")
356
-
357
- # === 4. 병합 및 최종 k개 반환 ===
358
- combined = {id(d): d for d in (vector_docs + bm25_docs)}.values()
359
- final_results = list(combined)[:k]
360
-
361
- print(f"[최종 결과] {len(final_results)}개 문서 반환")
362
- return final_results
363
-
364
-
365
- def get_unique_metadata_values(
366
- sqlite_conn: sqlite3.Connection,
367
- key_name: str,
368
- partial_match: Optional[str] = None
369
- ) -> List[str]:
370
- """
371
- SQLite 'documents' 테이블에서 특정 컬럼(key_name)의 중복되지 않은
372
- 모든 고유 값 리스트를 반환합니다.
373
-
374
- Args:
375
- sqlite_conn: SQLite 데이터베이스 연결 객체.
376
- key_name: 고유한 값을 가져올 컬럼 이름 (예: 'regulation_name', 'part_name').
377
- partial_match: (선택 사항) 해당 문자열을 포함하는 값만 검색할 때 사용.
378
-
379
- Returns:
380
- 중복이 제거된 고유한 값들의 리스트.
381
- """
382
- if not sqlite_conn:
383
- print("[경고] SQLite 연결이 없어 고유 값 검색을 수행할 수 없습니다.")
384
- return []
385
-
386
- cursor = sqlite_conn.cursor()
387
-
388
- # SQL 쿼리 구성
389
- # 1. 컬럼 이름에 백틱(`)을 사���하여 안전성 확보
390
- # 2. DISTINCT를 사용하여 중복 제거
391
-
392
- sql_query = f"SELECT DISTINCT `{key_name}` FROM documents"
393
- params = []
394
-
395
- # 부분 문자열 검색 (LIKE) 조건 추가
396
- if partial_match:
397
- sql_query += f" WHERE `{key_name}` LIKE ?"
398
- params.append(f"%{partial_match}%")
399
-
400
- try:
401
- cursor.execute(sql_query, params)
402
-
403
- # 쿼리 결과에서 첫 번째 항목 ()만 추출
404
- unique_values = [row[0] for row in cursor.fetchall() if row[0] is not None]
405
-
406
- return unique_values
407
-
408
- except sqlite3.OperationalError as e:
409
- # 컬럼 이름이 DB에 없을 때 발생하는 에러 처리
410
- print(f"[에러] SQLite 쿼리 실행 실패 (컬럼 '{key_name}' 이름 오류 가능): {e}")
411
- return []
412
- except Exception as e:
413
- print(f"[에러] 고유 값 검색 중 알 수 없는 오류 발생: {e}")
414
- return []
415
-
416
- def smart_search_vectorstore(
417
- retriever,
418
- query,
419
- k=5,
420
- vectorstore=None,
421
- sqlite_conn=None,
422
- enable_detailed_search=True
423
- ):
424
- """
425
- 리트리버를 사용해 쿼리와 관련된 문서를 검색하고,
426
- 선택적으로 가장 빈번한 regulation_part 카테고리에 대해 상세 검색을 수행합니다.
427
-
428
- Args:
429
- retriever: 기본 검색에 사용할 리트리버
430
- query: 검색 쿼리
431
- k: 반환할 문서 수
432
- vectorstore: FAISS 벡터스토어 (상세 검색용)
433
- sqlite_conn: SQLite 연결 (상세 검색용)
434
- enable_detailed_search: 상세 검색 활성화 여부
435
-
436
- Returns:
437
- 검색된 문서 리스트 (기본 검색 + 상세 검색 결과)
438
- """
439
- # 1. 기본 검색 수행
440
- basic_results = retriever.invoke(query)
441
- basic_results = basic_results[:k]
442
-
443
- print(f"[기본 검색] {len(basic_results)}개 문서 검색 완료")
444
-
445
- # 상세 검색이 비활성화되었거나 필요한 컴포넌트가 없으면 기본 결과만 반환
446
- if not enable_detailed_search or not vectorstore or not sqlite_conn:
447
- print("[안내] 상세 검색 비활성화 또는 컴포넌트 부족 → 기본 검색 결과만 반환")
448
- return basic_results
449
-
450
- # 2. regulation_part 메타데이터 빈도 분석
451
- regulation_parts = []
452
- for doc in basic_results:
453
- reg_part = doc.metadata.get('regulation_part')
454
- if reg_part:
455
- # regulation_part가 리스트인 경우 모든 항목 추가
456
- if isinstance(reg_part, list):
457
- regulation_parts.extend(reg_part)
458
- elif isinstance(reg_part, str):
459
- # 쉼표로 구분된 문자열인 경우 분리
460
- if ',' in reg_part:
461
- regulation_parts.extend([part.strip() for part in reg_part.split(',')])
462
- else:
463
- regulation_parts.append(reg_part)
464
-
465
- # 빈도 계산 및 상위 카테고리 추출
466
- if not regulation_parts:
467
- print("[안내] regulation_part 메타데이터 없음 → 기본 검색 결과만 반환")
468
- return basic_results
469
-
470
- counter = Counter(regulation_parts)
471
- most_extracted_category = counter.most_common(2) # 상위 2개 카테고리
472
-
473
- print(f"[빈도 분석] regulation_part 빈도: {dict(counter)}")
474
- print(f"[상위 카테고리] {most_extracted_category}")
475
-
476
- # 3. 상위 카테고리에 대한 상세 검색 수행
477
- detailed_results = []
478
-
479
- for rank, (category, count) in enumerate(most_extracted_category, 1):
480
- print(f"[상세 검색 {rank}순위] '{category}' 카테고리 검색 시작 (빈도: {count})")
481
-
482
- # metadata_filter 구성
483
- metadata_filter = {'regulation_part': category}
484
-
485
- try:
486
- # search_with_metadata_filter 호출
487
- category_results = search_with_metadata_filter(
488
- ensemble_retriever=retriever,
489
- vectorstore=vectorstore,
490
- query=query,
491
- k=k,
492
- metadata_filter=metadata_filter,
493
- sqlite_conn=sqlite_conn
494
- )
495
-
496
- detailed_results.extend(category_results)
497
- print(f"[상세 검색 {rank}순위] {len(category_results)}개 추가 문서 검색 완료")
498
-
499
- except Exception as e:
500
- print(f"[경고] 상세 검색 {rank}순위 실패 ({category}): {e}")
501
- continue
502
-
503
- # 4. 결과 병합 (중복 제거)
504
- # Document 객체의 고유성을 위해 page_content와 metadata의 조합으로 중복 판단
505
- seen = set()
506
- final_results = []
507
-
508
- # 기본 검색 결과 우선 추가
509
- for doc in basic_results:
510
- doc_signature = (doc.page_content, str(sorted(doc.metadata.items())))
511
- if doc_signature not in seen:
512
- seen.add(doc_signature)
513
- final_results.append(doc)
514
-
515
- # 상세 검색 결과 추가 (중복 제거)
516
- for doc in detailed_results:
517
- doc_signature = (doc.page_content, str(sorted(doc.metadata.items())))
518
- if doc_signature not in seen:
519
- seen.add(doc_signature)
520
- final_results.append(doc)
521
-
522
- # 최종 k개로 제한
523
- final_results = final_results[:k]
524
-
525
- print(f"[최종 결과] 기본 {len(basic_results)}개 + 상세 {len(detailed_results)}개 → 중복 제거 후 {len(final_results)}개 반환")
526
-
527
- return final_results
 
1
+ # ===== Pydantic v1 호환 설정 (파일 최상단에 배치) =====
2
+ import os
3
+ os.environ["PYDANTIC_V1_STYLE"] = "1"
4
+ os.environ["PYDANTIC_SKIP_VALIDATING_CORE_SCHEMAS"] = "1"
5
+
6
+ import gc
7
+ import json
8
+ import sqlite3
9
+ from pathlib import Path
10
+ from typing import Optional, Tuple, Any, Dict, List, Set
11
+ from collections import Counter
12
+ import numpy as np
13
+
14
+ import faiss
15
+ from langchain.retrievers import BM25Retriever, EnsembleRetriever
16
+ from langchain_core.documents import Document
17
+ from langchain_community.vectorstores import FAISS
18
+ from sentence_transformers import SentenceTransformer
19
+
20
+ # 런타임에 Embeddings 클래스를 찾기 위한 로직
21
+ try:
22
+ from langchain_core.embeddings import Embeddings
23
+ except ImportError:
24
+ try:
25
+ from langchain.embeddings.base import Embeddings
26
+ except ImportError:
27
+ Embeddings = object
28
+
29
+ # --- SQLite 헬퍼 함수 ---
30
+ SQLITE_DB_NAME = "metadata_mapping.db"
31
+
32
+ # === IDSelector 클래스 정의 ===
33
+ class MetadataIDSelector(faiss.IDSelectorBatch):
34
+ def __init__(self, allowed_ids: Set[int]):
35
+ super().__init__(list(allowed_ids))
36
+
37
+ def get_db_connection(persist_directory: str) -> sqlite3.Connection:
38
+ """FAISS 저장 경로를 기반으로 SQLite 연결을 설정하고 반환합니다."""
39
+ db_path = Path(persist_directory) / SQLITE_DB_NAME
40
+ conn = sqlite3.connect(db_path)
41
+ return conn
42
+
43
+ def _create_and_populate_sqlite_db(chunks: List[Document], persist_directory: str):
44
+ """문서 청크를 기반으로 SQLite DB를 생성하고 채웁니다."""
45
+ conn = get_db_connection(persist_directory)
46
+ cursor = conn.cursor()
47
+
48
+ # 1. 테이블 생성
49
+ cursor.execute("""
50
+ CREATE TABLE IF NOT EXISTS documents (
51
+ faiss_id INTEGER PRIMARY KEY,
52
+ regulation_part TEXT,
53
+ regulation_section TEXT,
54
+ chapter_section TEXT,
55
+ jo TEXT,
56
+ json_metadata TEXT
57
+ )
58
+ """)
59
+ conn.commit()
60
+
61
+ # 2. 데이터 채우기
62
+ for i, doc in enumerate(chunks):
63
+ faiss_id = i
64
+ metadata_json = json.dumps(doc.metadata, ensure_ascii=False)
65
+ reg_part = doc.metadata.get('regulation_part')
66
+ reg_section = doc.metadata.get('regulation_section')
67
+ reg_chapter = doc.metadata.get('chapter_section')
68
+ reg_jo = doc.metadata.get('jo')
69
+
70
+ # 변수가 리스트인 경우, 쉼표로 구분된 문자열로 변환
71
+ if isinstance(reg_section, list):
72
+ reg_section = ', '.join(map(str, reg_section))
73
+ if isinstance(reg_part, list):
74
+ reg_part = ', '.join(map(str, reg_part))
75
+ if isinstance(reg_chapter, list):
76
+ reg_chapter = ', '.join(map(str, reg_chapter))
77
+ if isinstance(reg_jo, list):
78
+ reg_jo = ', '.join(map(str, reg_jo))
79
+
80
+ # 문서 메타데이터에 FAISS ID 추가
81
+ doc.metadata['faiss_id'] = faiss_id
82
+
83
+ cursor.execute(
84
+ "INSERT OR REPLACE INTO documents (faiss_id, regulation_part, regulation_section, chapter_section, jo, json_metadata) VALUES (?, ?, ?, ?, ?, ?)",
85
+ (faiss_id, reg_part, reg_section, reg_chapter, reg_jo, metadata_json)
86
+ )
87
+
88
+ conn.commit()
89
+ conn.close()
90
+
91
+ # --- LocalSentenceTransformerEmbeddings ---
92
+ class LocalSentenceTransformerEmbeddings(Embeddings):
93
+ """SentenceTransformer를 LangChain Embeddings 인터페이스로 래핑"""
94
+
95
+ def __init__(self, st_model, normalize_embeddings: bool = True, encode_batch_size: int = 32):
96
+ self.model = st_model
97
+ self.normalize = normalize_embeddings
98
+ self.encode_batch_size = encode_batch_size
99
+
100
+ def embed_documents(self, texts):
101
+ vecs = self.model.encode(
102
+ texts,
103
+ batch_size=self.encode_batch_size,
104
+ show_progress_bar=False,
105
+ normalize_embeddings=self.normalize,
106
+ convert_to_numpy=True,
107
+ )
108
+ return vecs.tolist()
109
+
110
+ def embed_query(self, text: str):
111
+ vec = self.model.encode(
112
+ [text],
113
+ batch_size=self.encode_batch_size,
114
+ show_progress_bar=False,
115
+ normalize_embeddings=self.normalize,
116
+ convert_to_numpy=True,
117
+ )[0]
118
+ return vec.tolist()
119
+
120
+ # --- save_embedding_system ---
121
+ def save_embedding_system(
122
+ chunks,
123
+ persist_directory: str = r"D:/Project AI/RAG",
124
+ batch_size: int = 32,
125
+ device: str = 'cuda'
126
+ ):
127
+ """
128
+ 청크를 임베딩���여 FAISS 벡터스토어와 앙상블 리트리버를 생성하고,
129
+ SQLite DB에 메타데이터를 저장합니다.
130
+ """
131
+ Path(persist_directory).mkdir(parents=True, exist_ok=True)
132
+
133
+ # 1) SQLite DB에 메타데이터 저장 청크에 faiss_id 추가
134
+ _create_and_populate_sqlite_db(chunks, persist_directory)
135
+
136
+ # 2) SentenceTransformer 로드
137
+ model = SentenceTransformer(
138
+ 'nomic-ai/nomic-embed-text-v2-moe',
139
+ trust_remote_code=True,
140
+ device=device
141
+ )
142
+
143
+ embeddings = LocalSentenceTransformerEmbeddings(
144
+ st_model=model,
145
+ normalize_embeddings=True,
146
+ encode_batch_size=batch_size
147
+ )
148
+
149
+ # 3) FAISS 벡터스토어 생성
150
+ vectorstore = None
151
+ for i in range(0, len(chunks), batch_size):
152
+ batch = chunks[i:i + batch_size]
153
+ if vectorstore is None:
154
+ vectorstore = FAISS.from_documents(documents=batch, embedding=embeddings)
155
+ else:
156
+ vectorstore.add_documents(documents=batch)
157
+ gc.collect()
158
+
159
+ # 4) BM25 + 벡터 앙상블 리트리버 생성
160
+ bm25_retriever = BM25Retriever.from_documents(chunks)
161
+ bm25_retriever.k = 5
162
+
163
+ vector_retriever = vectorstore.as_retriever(search_kwargs={"k": 5})
164
+
165
+ ensemble_retriever = EnsembleRetriever(
166
+ retrievers=[vector_retriever, bm25_retriever],
167
+ weights=[0.6, 0.4]
168
+ )
169
+
170
+ # 5) FAISS 인덱스 저장
171
+ vectorstore.save_local(persist_directory)
172
+
173
+ # 6) SQLite 연결
174
+ sqlite_conn = get_db_connection(persist_directory)
175
+ gc.collect()
176
+
177
+ return ensemble_retriever, vectorstore, sqlite_conn
178
+
179
+ # --- load_embedding_from_faiss ---
180
+ def load_embedding_from_faiss(
181
+ persist_directory: str = r"D:/Project AI/RAG",
182
+ top_k: int = 10,
183
+ bm25_k: int = 10,
184
+ weights: Tuple[float, float] = (0.6, 0.4),
185
+ embeddings: Optional[Any] = None,
186
+ device: str = 'cpu'
187
+ ) -> Tuple[Any, FAISS, sqlite3.Connection]:
188
+ """
189
+ 저장된 FAISS 인덱스와 SQLite 연결을 로드하여 앙상블 리트리버를 생성합니다.
190
+ """
191
+ # 1) Embeddings 준비
192
+ if embeddings is None:
193
+ st_model = SentenceTransformer(
194
+ 'nomic-ai/nomic-embed-text-v2-moe',
195
+ trust_remote_code=True,
196
+ device=device
197
+ )
198
+ embeddings = LocalSentenceTransformerEmbeddings(
199
+ st_model=st_model,
200
+ normalize_embeddings=True,
201
+ encode_batch_size=32
202
+ )
203
+
204
+ # 2) FAISS 벡터스토어 로드 (Pydantic v1 호환 옵션 추가)
205
+ persist_dir = Path(persist_directory)
206
+ if not persist_dir.exists():
207
+ raise FileNotFoundError(f"FAISS 경로가 없습니다: {persist_dir}")
208
+
209
+ try:
210
+ vectorstore = FAISS.load_local(
211
+ folder_path=str(persist_dir),
212
+ embeddings=embeddings,
213
+ allow_dangerous_deserialization=True
214
+ )
215
+ print(f"[로드 성공] FAISS 인덱스 로드 완료: {persist_dir}")
216
+ except Exception as e:
217
+ print(f"[로드 오류] FAISS 로드 실패: {e}")
218
+ raise
219
+
220
+ # 3) BM25를 위한 문서 추출
221
+ docs = []
222
+ try:
223
+ if hasattr(vectorstore, "docstore") and hasattr(vectorstore.docstore, "_dict"):
224
+ docs = list(vectorstore.docstore._dict.values())
225
+ except Exception as e:
226
+ print(f"[경고] 저장된 문서를 읽는 중 문제가 발생했습니다: {e}")
227
+
228
+ # 4) 앙상블 리트리버 구성
229
+ vector_retriever = vectorstore.as_retriever(search_kwargs={"k": top_k})
230
+
231
+ if docs:
232
+ bm25_retriever = BM25Retriever.from_documents(docs)
233
+ bm25_retriever.k = bm25_k
234
+ ensemble_retriever = EnsembleRetriever(
235
+ retrievers=[vector_retriever, bm25_retriever],
236
+ weights=list(weights)
237
+ )
238
+ else:
239
+ print("[안내] 문서를 찾지 못해 BM25 없이 벡터 리트리버만 반환합니다.")
240
+ ensemble_retriever = vector_retriever
241
+
242
+ # 5) SQLite 연결
243
+ sqlite_conn = get_db_connection(persist_directory)
244
+
245
+ return ensemble_retriever, vectorstore, sqlite_conn
246
+
247
+ # --- search_vectorstore ---
248
+ def search_vectorstore(retriever, query, k=5):
249
+ """리트리버를 사용해 쿼리와 관련된 문서를 검색합니다."""
250
+ results = retriever.invoke(query)
251
+ return results[:k]
252
+
253
+ # === search_with_metadata_filter ===
254
+ def search_with_metadata_filter(
255
+ ensemble_retriever: EnsembleRetriever,
256
+ vectorstore: FAISS,
257
+ query: str,
258
+ k: int = 5,
259
+ metadata_filter: Optional[Dict[str, Any]] = None,
260
+ sqlite_conn: Optional[sqlite3.Connection] = None,
261
+ exact_match: bool = True
262
+ ) -> List[Document]:
263
+ """SQLite로 사전 필터링 FAISS 검색"""
264
+ vector_ret, bm25_ret = ensemble_retriever.retrievers
265
+
266
+ # === 1. SQLite에서 필터링된 FAISS ID 추출 ===
267
+ filtered_ids = None
268
+ if metadata_filter and sqlite_conn:
269
+ cursor = sqlite_conn.cursor()
270
+ where_clauses = []
271
+ params = []
272
+
273
+ for key, value in metadata_filter.items():
274
+ print(f"[key] {key}")
275
+ print(f"[value] {value}")
276
+ if isinstance(value, list):
277
+ if not value:
278
+ continue
279
+ placeholders = ', '.join(['?'] * len(value))
280
+ where_clauses.append(f"{key} IN ({placeholders})")
281
+ params.extend(value)
282
+ else:
283
+ where_clauses.append(f"{key} = ?")
284
+ params.append(value)
285
+
286
+ if where_clauses:
287
+ where_sql = " OR ".join(where_clauses)
288
+ sql_query = f"SELECT faiss_id FROM documents WHERE {where_sql}"
289
+
290
+ try:
291
+ cursor.execute(sql_query, params)
292
+ filtered_ids = {row[0] for row in cursor.fetchall()}
293
+ print(f"[사전 필터링] {len(filtered_ids)}개 ID 획득 → FAISS 검색 제한")
294
+ except Exception as e:
295
+ print(f"[경고] SQLite 필터링 실패: {e}")
296
+ filtered_ids = None
297
+ else:
298
+ print("[안내] 필터 조건 없음 → 전체 검색")
299
+ else:
300
+ print("[안내] 필터 또는 DB 없음 → 전체 검색")
301
+
302
+ # === 2. FAISS 벡터 검색 ===
303
+ if filtered_ids and len(filtered_ids) > 0:
304
+ selector = MetadataIDSelector(filtered_ids)
305
+ index: faiss.Index = vectorstore.index
306
+
307
+ if not hasattr(index, "search"):
308
+ raise ValueError("FAISS 인덱스가 검색을 지원하지 않습니다.")
309
+
310
+ query_embedding = np.array(vectorstore.embeddings.embed_query(query)).astype('float32')
311
+ query_embedding = query_embedding.reshape(1, -1)
312
+
313
+ search_params = faiss.SearchParametersIVF(
314
+ sel=selector,
315
+ nprobe=50
316
+ )
317
+
318
+ _k = max(k * 10, 100)
319
+ D, I = index.search(query_embedding, _k, params=search_params)
320
+
321
+ valid_indices = [i for i in I[0] if i != -1]
322
+ vector_docs = []
323
+ for idx in valid_indices[:k]:
324
+ doc_id = vectorstore.index_to_docstore_id[idx]
325
+ doc = vectorstore.docstore.search(doc_id)
326
+ if isinstance(doc, Document):
327
+ vector_docs.append(doc)
328
+
329
+ print(f"[벡터 검색] {len(valid_indices)}개 후보 → {len(vector_docs)}개 유효")
330
+ else:
331
+ search_k = k * 5
332
+ vector_docs = vector_ret.invoke(query, config={"search_kwargs": {"k": search_k}})
333
+ print(f"[벡터 검색] 전체 검색 → {len(vector_docs)}개 후보")
334
+
335
+ # === 3. BM25 검색 ===
336
+ bm25_docs = []
337
+ if hasattr(bm25_ret, "invoke"):
338
+ search_k = k * 5
339
+ candidates = bm25_ret.invoke(query, config={"search_kwargs": {"k": search_k}})
340
+ if filtered_ids:
341
+ bm25_docs = [d for d in candidates if d.metadata.get('faiss_id') in filtered_ids]
342
+ else:
343
+ bm25_docs = candidates[:k]
344
+ print(f"[BM25 검색] {len(candidates)}개 후보 → {len(bm25_docs)}개 필터링 후")
345
+
346
+ # === 4. 병합 최종 k개 반환 ===
347
+ combined = {id(d): d for d in (vector_docs + bm25_docs)}.values()
348
+ final_results = list(combined)[:k]
349
+
350
+ print(f"[최종 결과] {len(final_results)}개 문서 반환")
351
+ return final_results
352
+
353
+ def get_unique_metadata_values(
354
+ sqlite_conn: sqlite3.Connection,
355
+ key_name: str,
356
+ partial_match: Optional[str] = None
357
+ ) -> List[str]:
358
+ """SQLite에서 특정 컬럼의 고유한 리스트를 반환합니다."""
359
+ if not sqlite_conn:
360
+ print("[경고] SQLite 연결이 없어 고유 값 검색을 수행할 수 없습니다.")
361
+ return []
362
+
363
+ cursor = sqlite_conn.cursor()
364
+ sql_query = f"SELECT DISTINCT `{key_name}` FROM documents"
365
+ params = []
366
+
367
+ if partial_match:
368
+ sql_query += f" WHERE `{key_name}` LIKE ?"
369
+ params.append(f"%{partial_match}%")
370
+
371
+ try:
372
+ cursor.execute(sql_query, params)
373
+ unique_values = [row[0] for row in cursor.fetchall() if row[0] is not None]
374
+ return unique_values
375
+ except sqlite3.OperationalError as e:
376
+ print(f"[에러] SQLite 쿼리 실행 실패 (컬럼 '{key_name}' 이름 오류 가능): {e}")
377
+ return []
378
+ except Exception as e:
379
+ print(f"[에러] 고유 값 검색 중 알 수 없는 오류 발생: {e}")
380
+ return []
381
+
382
+ def smart_search_vectorstore(
383
+ retriever,
384
+ query,
385
+ k=5,
386
+ vectorstore=None,
387
+ sqlite_conn=None,
388
+ enable_detailed_search=True
389
+ ):
390
+ """기본 검색 + 상세 검색 수행"""
391
+ # 1. 기본 검색
392
+ basic_results = retriever.invoke(query)
393
+ basic_results = basic_results[:k]
394
+ print(f"[기본 검색] {len(basic_results)}개 문서 검색 완료")
395
+
396
+ if not enable_detailed_search or not vectorstore or not sqlite_conn:
397
+ print("[안내] 상세 검색 비활성화 또는 컴포넌트 부족 → 기본 검색 결과만 반환")
398
+ return basic_results
399
+
400
+ # 2. regulation_part 빈도 분석
401
+ regulation_parts = []
402
+ for doc in basic_results:
403
+ reg_part = doc.metadata.get('regulation_part')
404
+ if reg_part:
405
+ if isinstance(reg_part, list):
406
+ regulation_parts.extend(reg_part)
407
+ elif isinstance(reg_part, str):
408
+ if ',' in reg_part:
409
+ regulation_parts.extend([part.strip() for part in reg_part.split(',')])
410
+ else:
411
+ regulation_parts.append(reg_part)
412
+
413
+ if not regulation_parts:
414
+ print("[안내] regulation_part 메타데이터 없음 → 기본 검색 결과만 반환")
415
+ return basic_results
416
+
417
+ counter = Counter(regulation_parts)
418
+ most_extracted_category = counter.most_common(2)
419
+ print(f"[빈도 분석] regulation_part 빈도: {dict(counter)}")
420
+ print(f"[상위 카테고리] {most_extracted_category}")
421
+
422
+ # 3. 상세 검색
423
+ detailed_results = []
424
+ for rank, (category, count) in enumerate(most_extracted_category, 1):
425
+ print(f"[상세 검색 {rank}순위] '{category}' 카테고리 검색 시작 (빈도: {count})")
426
+ metadata_filter = {'regulation_part': category}
427
+
428
+ try:
429
+ category_results = search_with_metadata_filter(
430
+ ensemble_retriever=retriever,
431
+ vectorstore=vectorstore,
432
+ query=query,
433
+ k=k,
434
+ metadata_filter=metadata_filter,
435
+ sqlite_conn=sqlite_conn
436
+ )
437
+ detailed_results.extend(category_results)
438
+ print(f"[상세 검색 {rank}순위] {len(category_results)}개 추가 문서 검색 완료")
439
+ except Exception as e:
440
+ print(f"[경고] 상세 검색 {rank}순위 실패 ({category}): {e}")
441
+ continue
442
+
443
+ # 4. 결과 병합
444
+ seen = set()
445
+ final_results = []
446
+
447
+ for doc in basic_results:
448
+ doc_signature = (doc.page_content, str(sorted(doc.metadata.items())))
449
+ if doc_signature not in seen:
450
+ seen.add(doc_signature)
451
+ final_results.append(doc)
452
+
453
+ for doc in detailed_results:
454
+ doc_signature = (doc.page_content, str(sorted(doc.metadata.items())))
455
+ if doc_signature not in seen:
456
+ seen.add(doc_signature)
457
+ final_results.append(doc)
458
+
459
+ final_results = final_results[:k]
460
+ print(f"[최종 결과] 기본 {len(basic_results)}개 + 상세 {len(detailed_results)}개 → 중복 제거 후 {len(final_results)}개 반환")
461
+
462
+ return final_results
463
+
464
+ # natural_sort_key 함수 추가 (app.py에서 사용됨)
465
+ import re
466
+
467
+ def natural_sort_key(s):
468
+ """자연스러운 정렬을 위한 키 함수"""
469
+ return [int(text) if text.isdigit() else text.lower() for text in re.split('([0-9]+)', str(s))]