|
|
import pandas as pd |
|
|
from langchain_chroma import Chroma |
|
|
from langchain_openai.embeddings import OpenAIEmbeddings |
|
|
import os |
|
|
from tqdm import tqdm |
|
|
import time |
|
|
|
|
|
from src.utils.config import RAGConfig |
|
|
|
|
|
|
|
|
class DataValidator: |
|
|
"""๋ฐ์ดํฐ ๊ฒ์ฆ ๋ฐ ์ ์ """ |
|
|
|
|
|
def __init__(self, config: RAGConfig): |
|
|
self.config = config |
|
|
|
|
|
def validate_and_clean(self, df: pd.DataFrame) -> pd.DataFrame: |
|
|
"""์ ์ฒด ๊ฒ์ฆ ๋ฐ ์ ์ ํ์ดํ๋ผ์ธ""" |
|
|
df = self._check_required_columns(df) |
|
|
df = self._remove_duplicates(df) |
|
|
df = self._remove_nan(df) |
|
|
df = self._filter_by_length(df) |
|
|
df = self._clean_metadata(df) |
|
|
|
|
|
return df |
|
|
|
|
|
def _check_required_columns(self, df: pd.DataFrame) -> pd.DataFrame: |
|
|
"""ํ์ ์ปฌ๋ผ ํ์ธ""" |
|
|
required = ['chunk_content', 'chunk_id'] |
|
|
missing = [col for col in required if col not in df.columns] |
|
|
|
|
|
if missing: |
|
|
raise ValueError(f"ํ์ ์ปฌ๋ผ ๋๋ฝ: {missing}") |
|
|
|
|
|
return df |
|
|
|
|
|
def _remove_duplicates(self, df: pd.DataFrame) -> pd.DataFrame: |
|
|
"""์ค๋ณต ID ์ ๊ฑฐ""" |
|
|
return df.drop_duplicates(subset=['chunk_id'], keep='first') |
|
|
|
|
|
def _remove_nan(self, df: pd.DataFrame) -> pd.DataFrame: |
|
|
"""NaN ๊ฐ ์ ๊ฑฐ""" |
|
|
return df.dropna(subset=['chunk_content', 'chunk_id']) |
|
|
|
|
|
def _filter_by_length(self, df: pd.DataFrame) -> pd.DataFrame: |
|
|
"""๊ธธ์ด ๊ธฐ์ค ํํฐ๋ง""" |
|
|
df['_temp_length'] = df['chunk_content'].str.len() |
|
|
|
|
|
df = df[ |
|
|
(df['_temp_length'] >= self.config.MIN_CHUNK_LENGTH) & |
|
|
(df['_temp_length'] <= self.config.MAX_CHUNK_LENGTH) |
|
|
] |
|
|
|
|
|
return df.drop(columns=['_temp_length']) |
|
|
|
|
|
def _clean_metadata(self, df: pd.DataFrame) -> pd.DataFrame: |
|
|
"""๋ฉํ๋ฐ์ดํฐ ์ ์ """ |
|
|
|
|
|
df = df.fillna('') |
|
|
|
|
|
|
|
|
metadata_cols = [col for col in df.columns |
|
|
if col not in ['chunk_content', 'chunk_id']] |
|
|
|
|
|
for col in metadata_cols: |
|
|
df[col] = df[col].astype(str) |
|
|
|
|
|
return df |
|
|
|
|
|
|
|
|
class ChromaDBBuilder: |
|
|
"""ChromaDB ๋ฒกํฐ ๋ฐ์ดํฐ๋ฒ ์ด์ค ๊ตฌ์ถ""" |
|
|
|
|
|
def __init__(self, config: RAGConfig): |
|
|
self.config = config |
|
|
self.vectorstore = None |
|
|
self.embeddings = None |
|
|
|
|
|
self._initialize_embeddings() |
|
|
|
|
|
def _initialize_embeddings(self): |
|
|
"""์๋ฒ ๋ฉ ๋ชจ๋ธ ์ด๊ธฐํ""" |
|
|
os.environ["OPENAI_API_KEY"] = self.config.OPENAI_API_KEY |
|
|
|
|
|
self.embeddings = OpenAIEmbeddings( |
|
|
model=self.config.EMBEDDING_MODEL_NAME |
|
|
) |
|
|
|
|
|
def build_from_dataframe(self, df: pd.DataFrame): |
|
|
"""DataFrame์ผ๋ก๋ถํฐ ๋ฒกํฐ DB ๊ตฌ์ถ""" |
|
|
documents, ids, metadatas = self._prepare_data(df) |
|
|
self._validate_data_consistency(documents, ids, metadatas) |
|
|
self._create_vectorstore() |
|
|
self._add_documents_in_batches(documents, ids, metadatas) |
|
|
|
|
|
return self.vectorstore |
|
|
|
|
|
def _prepare_data(self, df: pd.DataFrame): |
|
|
"""ChromaDB์ฉ ๋ฐ์ดํฐ ์ค๋น""" |
|
|
documents = df['chunk_content'].tolist() |
|
|
ids = df['chunk_id'].tolist() |
|
|
|
|
|
|
|
|
metadata_cols = [col for col in df.columns |
|
|
if col not in ['chunk_content', 'chunk_id']] |
|
|
|
|
|
metadatas = [] |
|
|
for _, row in df.iterrows(): |
|
|
metadata = { |
|
|
col: row[col] |
|
|
for col in metadata_cols |
|
|
if row[col] and row[col] != 'nan' and row[col] != '' |
|
|
} |
|
|
metadatas.append(metadata) |
|
|
|
|
|
return documents, ids, metadatas |
|
|
|
|
|
def _validate_data_consistency(self, documents, ids, metadatas): |
|
|
"""๋ฐ์ดํฐ ์ผ๊ด์ฑ ๊ฒ์ฆ""" |
|
|
if not (len(documents) == len(ids) == len(metadatas)): |
|
|
raise ValueError("๋ฐ์ดํฐ ๊ธธ์ด ๋ถ์ผ์น") |
|
|
|
|
|
def _create_vectorstore(self): |
|
|
"""๋น ๋ฒกํฐ์คํ ์ด ์์ฑ""" |
|
|
self.vectorstore = Chroma( |
|
|
embedding_function=self.embeddings, |
|
|
persist_directory=self.config.DB_DIRECTORY, |
|
|
collection_name=self.config.COLLECTION_NAME |
|
|
) |
|
|
|
|
|
def _add_documents_in_batches(self, documents, ids, metadatas): |
|
|
"""๋ฐฐ์น ์ฒ๋ฆฌ๋ก ๋ฌธ์ ์ถ๊ฐ""" |
|
|
batch_size = self.config.BATCH_SIZE |
|
|
total_batches = (len(documents) + batch_size - 1) // batch_size |
|
|
|
|
|
for i in tqdm(range(0, len(documents), batch_size), |
|
|
desc="์๋ฒ ๋ฉ ๋ฐ ์ ์ฅ", |
|
|
total=total_batches): |
|
|
|
|
|
batch_docs = documents[i:i + batch_size] |
|
|
batch_ids = ids[i:i + batch_size] |
|
|
batch_metas = metadatas[i:i + batch_size] |
|
|
|
|
|
self._add_batch_with_retry(batch_docs, batch_ids, batch_metas) |
|
|
time.sleep(1) |
|
|
|
|
|
def _add_batch_with_retry(self, docs, ids, metas): |
|
|
"""๋ฐฐ์น ์ถ๊ฐ (์คํจ ์ ์ฌ์๋)""" |
|
|
batch_tokens = sum(len(doc) for doc in docs) / 4 |
|
|
|
|
|
if batch_tokens > self.config.MAX_TOKENS_PER_BATCH: |
|
|
smaller_size = len(docs) // 2 |
|
|
for j in range(0, len(docs), smaller_size): |
|
|
self.vectorstore.add_texts( |
|
|
texts=docs[j:j + smaller_size], |
|
|
metadatas=metas[j:j + smaller_size], |
|
|
ids=ids[j:j + smaller_size] |
|
|
) |
|
|
time.sleep(0.5) |
|
|
else: |
|
|
try: |
|
|
self.vectorstore.add_texts( |
|
|
texts=docs, |
|
|
metadatas=metas, |
|
|
ids=ids |
|
|
) |
|
|
except Exception as e: |
|
|
for j in range(0, len(docs), 10): |
|
|
self.vectorstore.add_texts( |
|
|
texts=docs[j:j + 10], |
|
|
metadatas=metas[j:j + 10], |
|
|
ids=ids[j:j + 10] |
|
|
) |
|
|
time.sleep(0.5) |
|
|
|
|
|
def get_collection_count(self): |
|
|
"""์ ์ฅ๋ ๋ฌธ์ ์ ๋ฐํ""" |
|
|
if self.vectorstore: |
|
|
return self.vectorstore._collection.count() |
|
|
return 0 |
|
|
|
|
|
def search(self, query: str, k: int = 5): |
|
|
"""๊ฒ์ ์ํ""" |
|
|
if not self.vectorstore: |
|
|
raise ValueError("๋ฒกํฐ์คํ ์ด๊ฐ ์ด๊ธฐํ๋์ง ์์์ต๋๋ค") |
|
|
|
|
|
return self.vectorstore.similarity_search_with_score(query, k=k) |
|
|
|
|
|
|
|
|
class RAGVectorDBPipeline: |
|
|
"""์ ์ฒด RAG Vector DB ๊ตฌ์ถ ํ์ดํ๋ผ์ธ""" |
|
|
|
|
|
def __init__(self, config: RAGConfig = None): |
|
|
self.config = config or RAGConfig() |
|
|
self.validator = DataValidator(self.config) |
|
|
self.builder = ChromaDBBuilder(self.config) |
|
|
|
|
|
def build(self): |
|
|
"""์ ์ฒด ํ์ดํ๋ผ์ธ ์คํ""" |
|
|
|
|
|
df = pd.read_csv(self.config.RAG_INPUT_PATH) |
|
|
print(f"์๋ณธ ๋ฐ์ดํฐ: {len(df)}๊ฐ ์ฒญํฌ") |
|
|
|
|
|
|
|
|
df_cleaned = self.validator.validate_and_clean(df) |
|
|
print(f"์ ์ ํ ๋ฐ์ดํฐ: {len(df_cleaned)}๊ฐ ์ฒญํฌ") |
|
|
|
|
|
|
|
|
vectorstore = self.builder.build_from_dataframe(df_cleaned) |
|
|
|
|
|
|
|
|
count = self.builder.get_collection_count() |
|
|
print(f"โ
ChromaDB ์ ์ฅ ์๋ฃ: {count}๊ฐ ๋ฌธ์") |
|
|
print(f"์ ์ฅ ์์น: {self.config.DB_DIRECTORY}") |
|
|
|
|
|
return vectorstore |
|
|
|
|
|
def test_search(self, query: str = "ํ์ฌ ์ ๋ณด ์์คํ
", k: int = 3): |
|
|
"""๊ฒ์ ํ
์คํธ""" |
|
|
results = self.builder.search(query, k=k) |
|
|
|
|
|
print(f"\nํ
์คํธ ์ฟผ๋ฆฌ: '{query}'") |
|
|
print(f"๊ฒ์ ๊ฒฐ๊ณผ: {len(results)}๊ฐ\n") |
|
|
|
|
|
for i, (doc, score) in enumerate(results, 1): |
|
|
print(f"[{i}] ๊ฑฐ๋ฆฌ: {score:.4f}") |
|
|
print(f"๋ด์ฉ: {doc.page_content[:100]}...") |
|
|
print(f"๋ฉํ๋ฐ์ดํฐ: {doc.metadata}\n") |
|
|
|
|
|
return results |