|
|
""" |
|
|
RAG ๋ฐ์ดํฐ ์ ์ฒ๋ฆฌ ์ ์ฒด ํ์ดํ๋ผ์ธ |
|
|
ํ
์คํธ ์ถ์ถ โ ์ ์ โ ์ฒญํน โ ์ ์ฅ |
|
|
|
|
|
๋ชจ๋ ์ ์ฒ๋ฆฌ ํด๋์ค๋ฅผ ํ๋์ ํ์ผ๋ก ํตํฉ |
|
|
""" |
|
|
|
|
|
import os |
|
|
import re |
|
|
import zlib |
|
|
import struct |
|
|
import pandas as pd |
|
|
from tqdm import tqdm |
|
|
from pypdf import PdfReader |
|
|
import olefile |
|
|
from langchain_text_splitters import RecursiveCharacterTextSplitter |
|
|
|
|
|
from src.utils.config import PreprocessConfig |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TextExtractor: |
|
|
"""PDF ๋ฐ HWP ํ์ผ์์ ํ
์คํธ ์ถ์ถ""" |
|
|
|
|
|
@staticmethod |
|
|
def extract_pdf(filepath: str) -> str: |
|
|
""" |
|
|
PDF ํ์ผ์์ ํ
์คํธ ์ถ์ถ |
|
|
|
|
|
Args: |
|
|
filepath: PDF ํ์ผ ๊ฒฝ๋ก |
|
|
|
|
|
Returns: |
|
|
์ถ์ถ๋ ํ
์คํธ |
|
|
""" |
|
|
try: |
|
|
reader = PdfReader(filepath) |
|
|
page_texts = [ |
|
|
page.extract_text() |
|
|
for page in reader.pages |
|
|
if page.extract_text() |
|
|
] |
|
|
return "\n\n".join(page_texts) |
|
|
except Exception as e: |
|
|
return f"[PDF ์ถ์ถ ์คํจ: {e}]" |
|
|
|
|
|
@staticmethod |
|
|
def extract_hwp(filepath: str) -> str: |
|
|
""" |
|
|
HWP ํ์ผ์์ ํ
์คํธ ์ถ์ถ |
|
|
|
|
|
Args: |
|
|
filepath: HWP ํ์ผ ๊ฒฝ๋ก |
|
|
|
|
|
Returns: |
|
|
์ถ์ถ๋ ํ
์คํธ |
|
|
""" |
|
|
try: |
|
|
f = olefile.OleFileIO(filepath) |
|
|
dirs = f.listdir() |
|
|
|
|
|
|
|
|
if ["FileHeader"] not in dirs or ["\x05HwpSummaryInformation"] not in dirs: |
|
|
return "[HWP ์ถ์ถ ์คํจ: ์ ํจํ HWP 5.0 ํ์ผ์ด ์๋]" |
|
|
|
|
|
|
|
|
header = f.openstream("FileHeader") |
|
|
header_data = header.read() |
|
|
is_compressed = (header_data[36] & 1) == 1 |
|
|
|
|
|
|
|
|
nums = [ |
|
|
int(d[1][len("Section"):]) |
|
|
for d in dirs |
|
|
if d[0] == "BodyText" |
|
|
] |
|
|
sections = [f"BodyText/Section{x}" for x in sorted(nums)] |
|
|
|
|
|
|
|
|
text = "" |
|
|
for section in sections: |
|
|
bodytext = f.openstream(section) |
|
|
data = bodytext.read() |
|
|
|
|
|
|
|
|
if is_compressed: |
|
|
unpacked_data = zlib.decompress(data, -15) |
|
|
else: |
|
|
unpacked_data = data |
|
|
|
|
|
|
|
|
i = 0 |
|
|
size = len(unpacked_data) |
|
|
while i < size: |
|
|
header = struct.unpack_from("<I", unpacked_data, i)[0] |
|
|
rec_type = header & 0x3ff |
|
|
rec_len = (header >> 20) & 0xfff |
|
|
|
|
|
|
|
|
if rec_type == 67: |
|
|
rec_data = unpacked_data[i + 4 : i + 4 + rec_len] |
|
|
text += rec_data.decode('utf-16', errors='ignore') |
|
|
|
|
|
i += 4 + rec_len |
|
|
|
|
|
f.close() |
|
|
return text |
|
|
|
|
|
except Exception as e: |
|
|
return f"[HWP ์ถ์ถ ์คํจ: {e}]" |
|
|
|
|
|
@staticmethod |
|
|
def extract(filepath: str, file_format: str) -> str: |
|
|
""" |
|
|
ํ์ผ ํ์์ ๋ฐ๋ผ ํ
์คํธ ์ถ์ถ |
|
|
|
|
|
Args: |
|
|
filepath: ํ์ผ ๊ฒฝ๋ก |
|
|
file_format: ํ์ผ ํ์ ('pdf' ๋๋ 'hwp') |
|
|
|
|
|
Returns: |
|
|
์ถ์ถ๋ ํ
์คํธ |
|
|
""" |
|
|
if not os.path.exists(filepath): |
|
|
return "[์ถ์ถ ์คํจ: ํ์ผ ์์]" |
|
|
|
|
|
file_format = file_format.lower() |
|
|
|
|
|
if file_format == 'pdf': |
|
|
return TextExtractor.extract_pdf(filepath) |
|
|
elif file_format == 'hwp': |
|
|
return TextExtractor.extract_hwp(filepath) |
|
|
else: |
|
|
return f"[์ถ์ถ ์คํจ: ์ ์ ์๋ ํ์ผ ํ์ ({file_format})]" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TextCleaner: |
|
|
"""ํ
์คํธ ์ ์ ๋ฐ ๊ฒ์ฆ""" |
|
|
|
|
|
@staticmethod |
|
|
def clean(text: str) -> str: |
|
|
""" |
|
|
ํ
์คํธ ์ ์ |
|
|
- ํน์๋ฌธ์ ์ ๊ฑฐ (ํ๊ธ, ์๋ฌธ, ์ซ์, ๊ธฐ๋ณธ ๊ณต๋ฐฑ๋ฌธ์๋ง ์ ์ง) |
|
|
- NULL ๋ฌธ์ ์ ๊ฑฐ |
|
|
|
|
|
Args: |
|
|
text: ์๋ณธ ํ
์คํธ |
|
|
|
|
|
Returns: |
|
|
์ ์ ๋ ํ
์คํธ |
|
|
""" |
|
|
|
|
|
cleaned = re.sub( |
|
|
r'[^\x20-\x7E\n\r\t\uAC00-\uD7AF]', |
|
|
'', |
|
|
str(text) |
|
|
) |
|
|
|
|
|
|
|
|
cleaned = cleaned.replace('\x00', '') |
|
|
|
|
|
return cleaned |
|
|
|
|
|
@staticmethod |
|
|
def validate(text: str, min_length: int = 100) -> bool: |
|
|
""" |
|
|
ํ
์คํธ ์ ํจ์ฑ ๊ฒ์ฌ |
|
|
|
|
|
Args: |
|
|
text: ๊ฒ์ฆํ ํ
์คํธ |
|
|
min_length: ์ต์ ๊ธธ์ด |
|
|
|
|
|
Returns: |
|
|
์ ํจ ์ฌ๋ถ |
|
|
""" |
|
|
if not text or text.strip() == "": |
|
|
return False |
|
|
|
|
|
if "[์ถ์ถ ์คํจ" in text: |
|
|
return False |
|
|
|
|
|
if len(text) < min_length: |
|
|
return False |
|
|
|
|
|
return True |
|
|
|
|
|
@staticmethod |
|
|
def get_stats(text: str) -> dict: |
|
|
""" |
|
|
ํ
์คํธ ํต๊ณ ์ ๋ณด |
|
|
|
|
|
Args: |
|
|
text: ๋ถ์ํ ํ
์คํธ |
|
|
|
|
|
Returns: |
|
|
ํต๊ณ ๋์
๋๋ฆฌ |
|
|
""" |
|
|
return { |
|
|
'length': len(text), |
|
|
'lines': text.count('\n') + 1, |
|
|
'words': len(text.split()), |
|
|
'is_valid': TextCleaner.validate(text) |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class DocumentChunker: |
|
|
"""๋ฌธ์๋ฅผ ์ฒญํฌ๋ก ๋ถํ """ |
|
|
|
|
|
def __init__(self, config: PreprocessConfig): |
|
|
""" |
|
|
์ด๊ธฐํ |
|
|
|
|
|
Args: |
|
|
config: ์ ์ฒ๋ฆฌ ์ค์ ๊ฐ์ฒด |
|
|
""" |
|
|
self.config = config |
|
|
|
|
|
|
|
|
self.splitter = RecursiveCharacterTextSplitter( |
|
|
chunk_size=config.CHUNK_SIZE, |
|
|
chunk_overlap=config.CHUNK_OVERLAP, |
|
|
separators=config.SEPARATORS, |
|
|
length_function=len, |
|
|
) |
|
|
|
|
|
def chunk_document(self, text: str, metadata: dict) -> list: |
|
|
""" |
|
|
๋จ์ผ ๋ฌธ์ ์ฒญํน |
|
|
|
|
|
Args: |
|
|
text: ๋ฌธ์ ํ
์คํธ |
|
|
metadata: ๋ฌธ์ ๋ฉํ๋ฐ์ดํฐ |
|
|
|
|
|
Returns: |
|
|
์ฒญํฌ ๋ฆฌ์คํธ |
|
|
""" |
|
|
try: |
|
|
chunks = self.splitter.split_text(text) |
|
|
except Exception as e: |
|
|
print(f"WARNING: ๋ฌธ์ ๋ถํ ์คํจ - {e}") |
|
|
return [] |
|
|
|
|
|
chunk_records = [] |
|
|
filename = metadata.get('ํ์ผ๋ช
', 'unknown') |
|
|
|
|
|
for i, chunk_content in enumerate(chunks, 1): |
|
|
chunk_record = metadata.copy() |
|
|
chunk_record['chunk_id'] = f"{filename}_chunk_{i:04d}" |
|
|
chunk_record['chunk_content'] = chunk_content |
|
|
chunk_records.append(chunk_record) |
|
|
|
|
|
return chunk_records |
|
|
|
|
|
def chunk_dataframe( |
|
|
self, |
|
|
df: pd.DataFrame, |
|
|
text_column: str = 'text_content' |
|
|
) -> pd.DataFrame: |
|
|
""" |
|
|
DataFrame ์ ์ฒด ์ฒญํน |
|
|
|
|
|
Args: |
|
|
df: ์๋ณธ DataFrame |
|
|
text_column: ํ
์คํธ๊ฐ ๋ค์ด์๋ ์ปฌ๋ผ๋ช
|
|
|
|
|
|
Returns: |
|
|
์ฒญํฌ DataFrame |
|
|
""" |
|
|
print(f"์ฒญํน ์์ (ํฌ๊ธฐ: {self.config.CHUNK_SIZE}, " |
|
|
f"์ค๋ฒ๋ฉ: {self.config.CHUNK_OVERLAP})...") |
|
|
|
|
|
all_chunks = [] |
|
|
|
|
|
for index, row in tqdm(df.iterrows(), total=len(df), desc="์ฒญํน"): |
|
|
text = row[text_column] |
|
|
|
|
|
|
|
|
metadata = row.to_dict() |
|
|
metadata.pop(text_column, None) |
|
|
metadata.pop('text_length', None) |
|
|
|
|
|
|
|
|
chunks = self.chunk_document(text, metadata) |
|
|
all_chunks.extend(chunks) |
|
|
|
|
|
df_chunks = pd.DataFrame(all_chunks) |
|
|
|
|
|
print(f"์ฒญํน ์๋ฃ: ์๋ณธ {len(df)}๊ฐ โ ์ฒญํฌ {len(df_chunks)}๊ฐ") |
|
|
|
|
|
return df_chunks |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class RAGPreprocessPipeline: |
|
|
"""RAG ๋ฐ์ดํฐ ์ ์ฒ๋ฆฌ ์ ์ฒด ํ์ดํ๋ผ์ธ""" |
|
|
|
|
|
def __init__(self, config: PreprocessConfig = None): |
|
|
""" |
|
|
์ด๊ธฐํ |
|
|
|
|
|
Args: |
|
|
config: ์ ์ฒ๋ฆฌ ์ค์ (None์ด๋ฉด ๊ธฐ๋ณธ๊ฐ) |
|
|
""" |
|
|
self.config = config or PreprocessConfig() |
|
|
self.extractor = TextExtractor() |
|
|
self.cleaner = TextCleaner() |
|
|
self.chunker = DocumentChunker(self.config) |
|
|
|
|
|
|
|
|
self.stats = { |
|
|
'total_files': 0, |
|
|
'success_files': 0, |
|
|
'failed_files': 0, |
|
|
'total_chunks': 0 |
|
|
} |
|
|
|
|
|
def extract_from_files(self) -> pd.DataFrame: |
|
|
""" |
|
|
1๋จ๊ณ: ํ์ผ์์ ํ
์คํธ ์ถ์ถ |
|
|
|
|
|
Returns: |
|
|
ํ
์คํธ๊ฐ ์ถ์ถ๋ DataFrame |
|
|
""" |
|
|
print("\n" + "="*60) |
|
|
print("1๋จ๊ณ: ํ
์คํธ ์ถ์ถ") |
|
|
print("="*60) |
|
|
|
|
|
|
|
|
df = pd.read_csv(self.config.META_CSV_PATH) |
|
|
self.stats['total_files'] = len(df) |
|
|
print(f"ํ์ผ ๋ก๋ ์๋ฃ: {len(df)}๊ฐ") |
|
|
|
|
|
extracted_data = [] |
|
|
|
|
|
for index, row in tqdm(df.iterrows(), total=len(df), desc="ํ
์คํธ ์ถ์ถ"): |
|
|
filepath = os.path.join(self.config.BASE_FOLDER_PATH, row['ํ์ผ๋ช
']) |
|
|
file_format = row['ํ์ผํ์'] |
|
|
|
|
|
|
|
|
raw_text = self.extractor.extract(filepath, file_format) |
|
|
|
|
|
|
|
|
cleaned_text = self.cleaner.clean(raw_text) |
|
|
|
|
|
|
|
|
if file_format == 'hwp' and len(cleaned_text) < self.config.MIN_TEXT_LENGTH: |
|
|
if "[์ถ์ถ ์คํจ" not in cleaned_text: |
|
|
cleaned_text = "[์ถ์ถ ์คํจ: HWP ํ
์คํธ ๋๋ฌด ์งง์]" |
|
|
|
|
|
|
|
|
if self.cleaner.validate(cleaned_text): |
|
|
self.stats['success_files'] += 1 |
|
|
else: |
|
|
self.stats['failed_files'] += 1 |
|
|
|
|
|
|
|
|
new_row = row.to_dict() |
|
|
new_row['full_text'] = cleaned_text |
|
|
|
|
|
|
|
|
if 'ํ
์คํธ' in new_row: |
|
|
del new_row['ํ
์คํธ'] |
|
|
|
|
|
extracted_data.append(new_row) |
|
|
|
|
|
result_df = pd.DataFrame(extracted_data) |
|
|
|
|
|
print(f"\nํ
์คํธ ์ถ์ถ ์๋ฃ:") |
|
|
print(f" - ์ฑ๊ณต: {self.stats['success_files']}๊ฐ") |
|
|
print(f" - ์คํจ: {self.stats['failed_files']}๊ฐ") |
|
|
|
|
|
return result_df |
|
|
|
|
|
def clean_dataframe(self, df: pd.DataFrame) -> pd.DataFrame: |
|
|
""" |
|
|
2๋จ๊ณ: DataFrame ์ ์ |
|
|
|
|
|
Args: |
|
|
df: ์๋ณธ DataFrame |
|
|
|
|
|
Returns: |
|
|
์ ์ ๋ DataFrame |
|
|
""" |
|
|
print("\n" + "="*60) |
|
|
print("2๋จ๊ณ: ํ
์คํธ ์ ์ ") |
|
|
print("="*60) |
|
|
|
|
|
|
|
|
df['text_content'] = df['full_text'] |
|
|
df = df.drop(columns=['full_text']) |
|
|
|
|
|
|
|
|
df['text_content'] = df['text_content'].fillna('') |
|
|
|
|
|
|
|
|
df['text_length'] = df['text_content'].apply(len) |
|
|
|
|
|
print(f"ํ
์คํธ ์ ์ ์๋ฃ") |
|
|
print(f" - ํ๊ท ๊ธธ์ด: {df['text_length'].mean():.0f} ๋ฌธ์") |
|
|
print(f" - ์ต์ ๊ธธ์ด: {df['text_length'].min()} ๋ฌธ์") |
|
|
print(f" - ์ต๋ ๊ธธ์ด: {df['text_length'].max()} ๋ฌธ์") |
|
|
|
|
|
return df |
|
|
|
|
|
def create_chunks(self, df: pd.DataFrame) -> pd.DataFrame: |
|
|
""" |
|
|
3๋จ๊ณ: ์ฒญํน |
|
|
|
|
|
Args: |
|
|
df: ์ ์ ๋ DataFrame |
|
|
|
|
|
Returns: |
|
|
์ฒญํฌ DataFrame |
|
|
""" |
|
|
print("\n" + "="*60) |
|
|
print("3๋จ๊ณ: ์ฒญํน") |
|
|
print("="*60) |
|
|
|
|
|
|
|
|
original_count = len(df) |
|
|
print(f"๐ ํํฐ๋ง ์ ๋ฌธ์ ์: {original_count}") |
|
|
|
|
|
|
|
|
if len(df) > 0: |
|
|
sample = df['text_content'].iloc[0] |
|
|
print(f"๐ ์ฒซ ๋ฒ์งธ ๋ฌธ์ ๋ฏธ๋ฆฌ๋ณด๊ธฐ:") |
|
|
print(f" ์์ ๋ถ๋ถ: {sample[:100]}...") |
|
|
print(f" ์ ์ฒด ๊ธธ์ด: {len(sample)}์") |
|
|
|
|
|
|
|
|
has_failure = any([ |
|
|
'[์ถ์ถ ์คํจ' in sample, |
|
|
'[PDF ์ถ์ถ ์คํจ' in sample, |
|
|
'[HWP ์ถ์ถ ์คํจ' in sample |
|
|
]) |
|
|
print(f" ์ถ์ถ ์คํจ ํฌํจ?: {has_failure}") |
|
|
|
|
|
|
|
|
df = df[~df['text_content'].str.contains(r'\[์ถ์ถ ์คํจ', na=False)] |
|
|
df = df[~df['text_content'].str.contains(r'\[PDF ์ถ์ถ ์คํจ', na=False)] |
|
|
df = df[~df['text_content'].str.contains(r'\[HWP ์ถ์ถ ์คํจ', na=False)] |
|
|
|
|
|
filtered_count = original_count - len(df) |
|
|
|
|
|
print(f"\n๐ ํํฐ๋ง ๊ฒฐ๊ณผ:") |
|
|
print(f" ์ ์ธ๋ ๋ฌธ์: {filtered_count}๊ฐ") |
|
|
print(f" ๋จ์ ๋ฌธ์: {len(df)}๊ฐ") |
|
|
|
|
|
if len(df) == 0: |
|
|
print("\nโ ๊ฒฝ๊ณ : ๋ชจ๋ ๋ฌธ์๊ฐ ํํฐ๋ง๋์์ต๋๋ค!") |
|
|
print(" โ ์ถ์ถ์ด ๋ชจ๋ ์คํจํ๊ฑฐ๋ ํํฐ๋ง ์กฐ๊ฑด์ด ๋๋ฌด ์๊ฒฉํฉ๋๋ค.") |
|
|
return pd.DataFrame() |
|
|
|
|
|
if filtered_count > 0: |
|
|
print(f"โ ๏ธ ์ถ์ถ ์คํจ ๋ฌธ์ ์ ์ธ: {filtered_count}๊ฐ") |
|
|
print(f"โ
์ ํจํ ๋ฌธ์: {len(df)}๊ฐ") |
|
|
|
|
|
|
|
|
df_chunks = self.chunker.chunk_dataframe(df) |
|
|
self.stats['total_chunks'] = len(df_chunks) |
|
|
|
|
|
return df_chunks |
|
|
|
|
|
def save_chunks(self, df_chunks: pd.DataFrame): |
|
|
""" |
|
|
4๋จ๊ณ: ๊ฒฐ๊ณผ ์ ์ฅ |
|
|
|
|
|
Args: |
|
|
df_chunks: ์ฒญํฌ DataFrame |
|
|
""" |
|
|
print("\n" + "="*60) |
|
|
print("4๋จ๊ณ: ๊ฒฐ๊ณผ ์ ์ฅ") |
|
|
print("="*60) |
|
|
|
|
|
df_chunks.to_csv( |
|
|
self.config.OUTPUT_CHUNKS_PATH, |
|
|
index=False, |
|
|
encoding='utf-8-sig' |
|
|
) |
|
|
|
|
|
print(f"์ต์ข
์ฒญํฌ ์ ์ฅ ์๋ฃ: {self.config.OUTPUT_CHUNKS_PATH}") |
|
|
print(f"์ด ์ฒญํฌ ์: {len(df_chunks)}") |
|
|
|
|
|
def run(self) -> pd.DataFrame: |
|
|
""" |
|
|
์ ์ฒด ํ์ดํ๋ผ์ธ ์คํ |
|
|
|
|
|
Returns: |
|
|
์ต์ข
์ฒญํฌ DataFrame |
|
|
""" |
|
|
print("="*60) |
|
|
print("RAG ์ ์ฒ๋ฆฌ ํ์ดํ๋ผ์ธ ์์") |
|
|
print("="*60) |
|
|
|
|
|
|
|
|
self.config.validate() |
|
|
print(self.config) |
|
|
|
|
|
|
|
|
df_extracted = self.extract_from_files() |
|
|
|
|
|
|
|
|
df_cleaned = self.clean_dataframe(df_extracted) |
|
|
|
|
|
|
|
|
df_chunks = self.create_chunks(df_cleaned) |
|
|
|
|
|
|
|
|
self.save_chunks(df_chunks) |
|
|
|
|
|
|
|
|
self._print_final_stats() |
|
|
|
|
|
print("\n" + "="*60) |
|
|
print("โ
RAG ์ ์ฒ๋ฆฌ ํ์ดํ๋ผ์ธ ์๋ฃ") |
|
|
print("="*60) |
|
|
|
|
|
return df_chunks |
|
|
|
|
|
def _print_final_stats(self): |
|
|
"""์ต์ข
ํต๊ณ ์ถ๋ ฅ""" |
|
|
print("\n" + "="*60) |
|
|
print("๐ ์ต์ข
ํต๊ณ") |
|
|
print("="*60) |
|
|
print(f"์ด ํ์ผ ์: {self.stats['total_files']}") |
|
|
|
|
|
if self.stats['total_files'] > 0: |
|
|
success_rate = self.stats['success_files'] / self.stats['total_files'] * 100 |
|
|
fail_rate = self.stats['failed_files'] / self.stats['total_files'] * 100 |
|
|
|
|
|
print(f" - ์ถ์ถ ์ฑ๊ณต: {self.stats['success_files']} ({success_rate:.1f}%)") |
|
|
print(f" - ์ถ์ถ ์คํจ: {self.stats['failed_files']} ({fail_rate:.1f}%)") |
|
|
|
|
|
print(f"์ด ์ฒญํฌ ์: {self.stats['total_chunks']}") |
|
|
|
|
|
if self.stats['success_files'] > 0: |
|
|
avg_chunks = self.stats['total_chunks'] / self.stats['success_files'] |
|
|
print(f"ํ์ผ๋น ํ๊ท ์ฒญํฌ: {avg_chunks:.1f}๊ฐ") |