RFP_summary_chatbot / src /loader /preprocess_pipeline.py
Dongjin1203's picture
Initial commit for HF Spaces deployment
4739096
"""
RAG ๋ฐ์ดํ„ฐ ์ „์ฒ˜๋ฆฌ ์ „์ฒด ํŒŒ์ดํ”„๋ผ์ธ
ํ…์ŠคํŠธ ์ถ”์ถœ โ†’ ์ •์ œ โ†’ ์ฒญํ‚น โ†’ ์ €์žฅ
๋ชจ๋“  ์ „์ฒ˜๋ฆฌ ํด๋ž˜์Šค๋ฅผ ํ•˜๋‚˜์˜ ํŒŒ์ผ๋กœ ํ†ตํ•ฉ
"""
import os
import re
import zlib
import struct
import pandas as pd
from tqdm import tqdm
from pypdf import PdfReader
import olefile
from langchain_text_splitters import RecursiveCharacterTextSplitter
from src.utils.config import PreprocessConfig
# ============================================================
# ํ…์ŠคํŠธ ์ถ”์ถœ ํด๋ž˜์Šค
# ============================================================
class TextExtractor:
"""PDF ๋ฐ HWP ํŒŒ์ผ์—์„œ ํ…์ŠคํŠธ ์ถ”์ถœ"""
@staticmethod
def extract_pdf(filepath: str) -> str:
"""
PDF ํŒŒ์ผ์—์„œ ํ…์ŠคํŠธ ์ถ”์ถœ
Args:
filepath: PDF ํŒŒ์ผ ๊ฒฝ๋กœ
Returns:
์ถ”์ถœ๋œ ํ…์ŠคํŠธ
"""
try:
reader = PdfReader(filepath)
page_texts = [
page.extract_text()
for page in reader.pages
if page.extract_text()
]
return "\n\n".join(page_texts)
except Exception as e:
return f"[PDF ์ถ”์ถœ ์‹คํŒจ: {e}]"
@staticmethod
def extract_hwp(filepath: str) -> str:
"""
HWP ํŒŒ์ผ์—์„œ ํ…์ŠคํŠธ ์ถ”์ถœ
Args:
filepath: HWP ํŒŒ์ผ ๊ฒฝ๋กœ
Returns:
์ถ”์ถœ๋œ ํ…์ŠคํŠธ
"""
try:
f = olefile.OleFileIO(filepath)
dirs = f.listdir()
# HWP 5.0 ํŒŒ์ผ ๊ฒ€์ฆ
if ["FileHeader"] not in dirs or ["\x05HwpSummaryInformation"] not in dirs:
return "[HWP ์ถ”์ถœ ์‹คํŒจ: ์œ ํšจํ•œ HWP 5.0 ํŒŒ์ผ์ด ์•„๋‹˜]"
# ์••์ถ• ์—ฌ๋ถ€ ํ™•์ธ
header = f.openstream("FileHeader")
header_data = header.read()
is_compressed = (header_data[36] & 1) == 1
# ์„น์…˜ ๋ฒˆํ˜ธ ์ •๋ ฌ
nums = [
int(d[1][len("Section"):])
for d in dirs
if d[0] == "BodyText"
]
sections = [f"BodyText/Section{x}" for x in sorted(nums)]
# ํ…์ŠคํŠธ ์ถ”์ถœ
text = ""
for section in sections:
bodytext = f.openstream(section)
data = bodytext.read()
# ์••์ถ• ํ•ด์ œ
if is_compressed:
unpacked_data = zlib.decompress(data, -15)
else:
unpacked_data = data
# ๋ ˆ์ฝ”๋“œ ํŒŒ์‹ฑ
i = 0
size = len(unpacked_data)
while i < size:
header = struct.unpack_from("<I", unpacked_data, i)[0]
rec_type = header & 0x3ff
rec_len = (header >> 20) & 0xfff
# ํ…์ŠคํŠธ ๋ ˆ์ฝ”๋“œ (ํƒ€์ž… 67)
if rec_type == 67:
rec_data = unpacked_data[i + 4 : i + 4 + rec_len]
text += rec_data.decode('utf-16', errors='ignore')
i += 4 + rec_len
f.close()
return text
except Exception as e:
return f"[HWP ์ถ”์ถœ ์‹คํŒจ: {e}]"
@staticmethod
def extract(filepath: str, file_format: str) -> str:
"""
ํŒŒ์ผ ํ˜•์‹์— ๋”ฐ๋ผ ํ…์ŠคํŠธ ์ถ”์ถœ
Args:
filepath: ํŒŒ์ผ ๊ฒฝ๋กœ
file_format: ํŒŒ์ผ ํ˜•์‹ ('pdf' ๋˜๋Š” 'hwp')
Returns:
์ถ”์ถœ๋œ ํ…์ŠคํŠธ
"""
if not os.path.exists(filepath):
return "[์ถ”์ถœ ์‹คํŒจ: ํŒŒ์ผ ์—†์Œ]"
file_format = file_format.lower()
if file_format == 'pdf':
return TextExtractor.extract_pdf(filepath)
elif file_format == 'hwp':
return TextExtractor.extract_hwp(filepath)
else:
return f"[์ถ”์ถœ ์‹คํŒจ: ์•Œ ์ˆ˜ ์—†๋Š” ํŒŒ์ผ ํ˜•์‹ ({file_format})]"
# ============================================================
# ํ…์ŠคํŠธ ์ •์ œ ํด๋ž˜์Šค
# ============================================================
class TextCleaner:
"""ํ…์ŠคํŠธ ์ •์ œ ๋ฐ ๊ฒ€์ฆ"""
@staticmethod
def clean(text: str) -> str:
"""
ํ…์ŠคํŠธ ์ •์ œ
- ํŠน์ˆ˜๋ฌธ์ž ์ œ๊ฑฐ (ํ•œ๊ธ€, ์˜๋ฌธ, ์ˆซ์ž, ๊ธฐ๋ณธ ๊ณต๋ฐฑ๋ฌธ์ž๋งŒ ์œ ์ง€)
- NULL ๋ฌธ์ž ์ œ๊ฑฐ
Args:
text: ์›๋ณธ ํ…์ŠคํŠธ
Returns:
์ •์ œ๋œ ํ…์ŠคํŠธ
"""
# ํ—ˆ์šฉ: ์˜๋ฌธ, ์ˆซ์ž, ๊ณต๋ฐฑ, ํƒญ, ์ค„๋ฐ”๊ฟˆ, ํ•œ๊ธ€
cleaned = re.sub(
r'[^\x20-\x7E\n\r\t\uAC00-\uD7AF]',
'',
str(text)
)
# NULL ๋ฌธ์ž ์ œ๊ฑฐ
cleaned = cleaned.replace('\x00', '')
return cleaned
@staticmethod
def validate(text: str, min_length: int = 100) -> bool:
"""
ํ…์ŠคํŠธ ์œ ํšจ์„ฑ ๊ฒ€์‚ฌ
Args:
text: ๊ฒ€์ฆํ•  ํ…์ŠคํŠธ
min_length: ์ตœ์†Œ ๊ธธ์ด
Returns:
์œ ํšจ ์—ฌ๋ถ€
"""
if not text or text.strip() == "":
return False
if "[์ถ”์ถœ ์‹คํŒจ" in text:
return False
if len(text) < min_length:
return False
return True
@staticmethod
def get_stats(text: str) -> dict:
"""
ํ…์ŠคํŠธ ํ†ต๊ณ„ ์ •๋ณด
Args:
text: ๋ถ„์„ํ•  ํ…์ŠคํŠธ
Returns:
ํ†ต๊ณ„ ๋”•์…”๋„ˆ๋ฆฌ
"""
return {
'length': len(text),
'lines': text.count('\n') + 1,
'words': len(text.split()),
'is_valid': TextCleaner.validate(text)
}
# ============================================================
# ๋ฌธ์„œ ์ฒญํ‚น ํด๋ž˜์Šค
# ============================================================
class DocumentChunker:
"""๋ฌธ์„œ๋ฅผ ์ฒญํฌ๋กœ ๋ถ„ํ• """
def __init__(self, config: PreprocessConfig):
"""
์ดˆ๊ธฐํ™”
Args:
config: ์ „์ฒ˜๋ฆฌ ์„ค์ • ๊ฐ์ฒด
"""
self.config = config
# LangChain ํ…์ŠคํŠธ ๋ถ„ํ• ๊ธฐ ์ดˆ๊ธฐํ™”
self.splitter = RecursiveCharacterTextSplitter(
chunk_size=config.CHUNK_SIZE,
chunk_overlap=config.CHUNK_OVERLAP,
separators=config.SEPARATORS,
length_function=len,
)
def chunk_document(self, text: str, metadata: dict) -> list:
"""
๋‹จ์ผ ๋ฌธ์„œ ์ฒญํ‚น
Args:
text: ๋ฌธ์„œ ํ…์ŠคํŠธ
metadata: ๋ฌธ์„œ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ
Returns:
์ฒญํฌ ๋ฆฌ์ŠคํŠธ
"""
try:
chunks = self.splitter.split_text(text)
except Exception as e:
print(f"WARNING: ๋ฌธ์„œ ๋ถ„ํ•  ์‹คํŒจ - {e}")
return []
chunk_records = []
filename = metadata.get('ํŒŒ์ผ๋ช…', 'unknown')
for i, chunk_content in enumerate(chunks, 1):
chunk_record = metadata.copy()
chunk_record['chunk_id'] = f"{filename}_chunk_{i:04d}"
chunk_record['chunk_content'] = chunk_content
chunk_records.append(chunk_record)
return chunk_records
def chunk_dataframe(
self,
df: pd.DataFrame,
text_column: str = 'text_content'
) -> pd.DataFrame:
"""
DataFrame ์ „์ฒด ์ฒญํ‚น
Args:
df: ์›๋ณธ DataFrame
text_column: ํ…์ŠคํŠธ๊ฐ€ ๋“ค์–ด์žˆ๋Š” ์ปฌ๋Ÿผ๋ช…
Returns:
์ฒญํฌ DataFrame
"""
print(f"์ฒญํ‚น ์‹œ์ž‘ (ํฌ๊ธฐ: {self.config.CHUNK_SIZE}, "
f"์˜ค๋ฒ„๋žฉ: {self.config.CHUNK_OVERLAP})...")
all_chunks = []
for index, row in tqdm(df.iterrows(), total=len(df), desc="์ฒญํ‚น"):
text = row[text_column]
# ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ ์ค€๋น„ (ํ…์ŠคํŠธ ์ปฌ๋Ÿผ ์ œ์™ธ)
metadata = row.to_dict()
metadata.pop(text_column, None)
metadata.pop('text_length', None)
# ์ฒญํ‚น
chunks = self.chunk_document(text, metadata)
all_chunks.extend(chunks)
df_chunks = pd.DataFrame(all_chunks)
print(f"์ฒญํ‚น ์™„๋ฃŒ: ์›๋ณธ {len(df)}๊ฐœ โ†’ ์ฒญํฌ {len(df_chunks)}๊ฐœ")
return df_chunks
# ============================================================
# RAG ์ „์ฒ˜๋ฆฌ ํŒŒ์ดํ”„๋ผ์ธ
# ============================================================
class RAGPreprocessPipeline:
"""RAG ๋ฐ์ดํ„ฐ ์ „์ฒ˜๋ฆฌ ์ „์ฒด ํŒŒ์ดํ”„๋ผ์ธ"""
def __init__(self, config: PreprocessConfig = None):
"""
์ดˆ๊ธฐํ™”
Args:
config: ์ „์ฒ˜๋ฆฌ ์„ค์ • (None์ด๋ฉด ๊ธฐ๋ณธ๊ฐ’)
"""
self.config = config or PreprocessConfig()
self.extractor = TextExtractor()
self.cleaner = TextCleaner()
self.chunker = DocumentChunker(self.config)
# ํ†ต๊ณ„ ์ •๋ณด
self.stats = {
'total_files': 0,
'success_files': 0,
'failed_files': 0,
'total_chunks': 0
}
def extract_from_files(self) -> pd.DataFrame:
"""
1๋‹จ๊ณ„: ํŒŒ์ผ์—์„œ ํ…์ŠคํŠธ ์ถ”์ถœ
Returns:
ํ…์ŠคํŠธ๊ฐ€ ์ถ”์ถœ๋œ DataFrame
"""
print("\n" + "="*60)
print("1๋‹จ๊ณ„: ํ…์ŠคํŠธ ์ถ”์ถœ")
print("="*60)
# ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ ๋กœ๋“œ
df = pd.read_csv(self.config.META_CSV_PATH)
self.stats['total_files'] = len(df)
print(f"ํŒŒ์ผ ๋กœ๋“œ ์™„๋ฃŒ: {len(df)}๊ฐœ")
extracted_data = []
for index, row in tqdm(df.iterrows(), total=len(df), desc="ํ…์ŠคํŠธ ์ถ”์ถœ"):
filepath = os.path.join(self.config.BASE_FOLDER_PATH, row['ํŒŒ์ผ๋ช…'])
file_format = row['ํŒŒ์ผํ˜•์‹']
# ํ…์ŠคํŠธ ์ถ”์ถœ
raw_text = self.extractor.extract(filepath, file_format)
# ์ •์ œ
cleaned_text = self.cleaner.clean(raw_text)
# HWP ํŠน์ˆ˜ ์ฒ˜๋ฆฌ (ํ…์ŠคํŠธ๊ฐ€ ๋„ˆ๋ฌด ์งง์œผ๋ฉด ์‹คํŒจ๋กœ ๊ฐ„์ฃผ)
if file_format == 'hwp' and len(cleaned_text) < self.config.MIN_TEXT_LENGTH:
if "[์ถ”์ถœ ์‹คํŒจ" not in cleaned_text:
cleaned_text = "[์ถ”์ถœ ์‹คํŒจ: HWP ํ…์ŠคํŠธ ๋„ˆ๋ฌด ์งง์Œ]"
# ํ†ต๊ณ„ ์—…๋ฐ์ดํŠธ
if self.cleaner.validate(cleaned_text):
self.stats['success_files'] += 1
else:
self.stats['failed_files'] += 1
# ๊ฒฐ๊ณผ ์ €์žฅ
new_row = row.to_dict()
new_row['full_text'] = cleaned_text
# ๋ถˆํ•„์š”ํ•œ ์ปฌ๋Ÿผ ์ œ๊ฑฐ
if 'ํ…์ŠคํŠธ' in new_row:
del new_row['ํ…์ŠคํŠธ']
extracted_data.append(new_row)
result_df = pd.DataFrame(extracted_data)
print(f"\nํ…์ŠคํŠธ ์ถ”์ถœ ์™„๋ฃŒ:")
print(f" - ์„ฑ๊ณต: {self.stats['success_files']}๊ฐœ")
print(f" - ์‹คํŒจ: {self.stats['failed_files']}๊ฐœ")
return result_df
def clean_dataframe(self, df: pd.DataFrame) -> pd.DataFrame:
"""
2๋‹จ๊ณ„: DataFrame ์ •์ œ
Args:
df: ์›๋ณธ DataFrame
Returns:
์ •์ œ๋œ DataFrame
"""
print("\n" + "="*60)
print("2๋‹จ๊ณ„: ํ…์ŠคํŠธ ์ •์ œ")
print("="*60)
# ์ปฌ๋Ÿผ๋ช… ๋ณ€๊ฒฝ
df['text_content'] = df['full_text']
df = df.drop(columns=['full_text'])
# ๊ฒฐ์ธก์น˜ ์ฒ˜๋ฆฌ
df['text_content'] = df['text_content'].fillna('')
# ํ†ต๊ณ„ ์ •๋ณด ์ถ”๊ฐ€
df['text_length'] = df['text_content'].apply(len)
print(f"ํ…์ŠคํŠธ ์ •์ œ ์™„๋ฃŒ")
print(f" - ํ‰๊ท  ๊ธธ์ด: {df['text_length'].mean():.0f} ๋ฌธ์ž")
print(f" - ์ตœ์†Œ ๊ธธ์ด: {df['text_length'].min()} ๋ฌธ์ž")
print(f" - ์ตœ๋Œ€ ๊ธธ์ด: {df['text_length'].max()} ๋ฌธ์ž")
return df
def create_chunks(self, df: pd.DataFrame) -> pd.DataFrame:
"""
3๋‹จ๊ณ„: ์ฒญํ‚น
Args:
df: ์ •์ œ๋œ DataFrame
Returns:
์ฒญํฌ DataFrame
"""
print("\n" + "="*60)
print("3๋‹จ๊ณ„: ์ฒญํ‚น")
print("="*60)
# [์ถ”๊ฐ€] ํ•„ํ„ฐ๋ง ์ „ ์ƒํƒœ ํ™•์ธ
original_count = len(df)
print(f"๐Ÿ” ํ•„ํ„ฐ๋ง ์ „ ๋ฌธ์„œ ์ˆ˜: {original_count}")
# ์ƒ˜ํ”Œ ํ…์ŠคํŠธ ๋ฏธ๋ฆฌ๋ณด๊ธฐ
if len(df) > 0:
sample = df['text_content'].iloc[0]
print(f"๐Ÿ” ์ฒซ ๋ฒˆ์งธ ๋ฌธ์„œ ๋ฏธ๋ฆฌ๋ณด๊ธฐ:")
print(f" ์‹œ์ž‘ ๋ถ€๋ถ„: {sample[:100]}...")
print(f" ์ „์ฒด ๊ธธ์ด: {len(sample)}์ž")
# ์ถ”์ถœ ์‹คํŒจ ํŒจํ„ด์ด ์žˆ๋Š”์ง€ ํ™•์ธ
has_failure = any([
'[์ถ”์ถœ ์‹คํŒจ' in sample,
'[PDF ์ถ”์ถœ ์‹คํŒจ' in sample,
'[HWP ์ถ”์ถœ ์‹คํŒจ' in sample
])
print(f" ์ถ”์ถœ ์‹คํŒจ ํฌํ•จ?: {has_failure}")
# ์ถ”์ถœ ์‹คํŒจ ๋ฌธ์„œ ํ•„ํ„ฐ๋ง (raw string ์‚ฌ์šฉ)
df = df[~df['text_content'].str.contains(r'\[์ถ”์ถœ ์‹คํŒจ', na=False)]
df = df[~df['text_content'].str.contains(r'\[PDF ์ถ”์ถœ ์‹คํŒจ', na=False)]
df = df[~df['text_content'].str.contains(r'\[HWP ์ถ”์ถœ ์‹คํŒจ', na=False)]
filtered_count = original_count - len(df)
print(f"\n๐Ÿ“Š ํ•„ํ„ฐ๋ง ๊ฒฐ๊ณผ:")
print(f" ์ œ์™ธ๋œ ๋ฌธ์„œ: {filtered_count}๊ฐœ")
print(f" ๋‚จ์€ ๋ฌธ์„œ: {len(df)}๊ฐœ")
if len(df) == 0:
print("\nโŒ ๊ฒฝ๊ณ : ๋ชจ๋“  ๋ฌธ์„œ๊ฐ€ ํ•„ํ„ฐ๋ง๋˜์—ˆ์Šต๋‹ˆ๋‹ค!")
print(" โ†’ ์ถ”์ถœ์ด ๋ชจ๋‘ ์‹คํŒจํ–ˆ๊ฑฐ๋‚˜ ํ•„ํ„ฐ๋ง ์กฐ๊ฑด์ด ๋„ˆ๋ฌด ์—„๊ฒฉํ•ฉ๋‹ˆ๋‹ค.")
return pd.DataFrame()
if filtered_count > 0:
print(f"โš ๏ธ ์ถ”์ถœ ์‹คํŒจ ๋ฌธ์„œ ์ œ์™ธ: {filtered_count}๊ฐœ")
print(f"โœ… ์œ ํšจํ•œ ๋ฌธ์„œ: {len(df)}๊ฐœ")
# ์ฒญํ‚น ์‹œ์ž‘
df_chunks = self.chunker.chunk_dataframe(df)
self.stats['total_chunks'] = len(df_chunks)
return df_chunks
def save_chunks(self, df_chunks: pd.DataFrame):
"""
4๋‹จ๊ณ„: ๊ฒฐ๊ณผ ์ €์žฅ
Args:
df_chunks: ์ฒญํฌ DataFrame
"""
print("\n" + "="*60)
print("4๋‹จ๊ณ„: ๊ฒฐ๊ณผ ์ €์žฅ")
print("="*60)
df_chunks.to_csv(
self.config.OUTPUT_CHUNKS_PATH,
index=False,
encoding='utf-8-sig'
)
print(f"์ตœ์ข… ์ฒญํฌ ์ €์žฅ ์™„๋ฃŒ: {self.config.OUTPUT_CHUNKS_PATH}")
print(f"์ด ์ฒญํฌ ์ˆ˜: {len(df_chunks)}")
def run(self) -> pd.DataFrame:
"""
์ „์ฒด ํŒŒ์ดํ”„๋ผ์ธ ์‹คํ–‰
Returns:
์ตœ์ข… ์ฒญํฌ DataFrame
"""
print("="*60)
print("RAG ์ „์ฒ˜๋ฆฌ ํŒŒ์ดํ”„๋ผ์ธ ์‹œ์ž‘")
print("="*60)
# ์„ค์ • ๊ฒ€์ฆ
self.config.validate()
print(self.config)
# 1. ํ…์ŠคํŠธ ์ถ”์ถœ
df_extracted = self.extract_from_files()
# 2. ํ…์ŠคํŠธ ์ •์ œ
df_cleaned = self.clean_dataframe(df_extracted)
# 3. ์ฒญํ‚น
df_chunks = self.create_chunks(df_cleaned)
# 4. ์ €์žฅ
self.save_chunks(df_chunks)
# ์ตœ์ข… ํ†ต๊ณ„
self._print_final_stats()
print("\n" + "="*60)
print("โœ… RAG ์ „์ฒ˜๋ฆฌ ํŒŒ์ดํ”„๋ผ์ธ ์™„๋ฃŒ")
print("="*60)
return df_chunks
def _print_final_stats(self):
"""์ตœ์ข… ํ†ต๊ณ„ ์ถœ๋ ฅ"""
print("\n" + "="*60)
print("๐Ÿ“Š ์ตœ์ข… ํ†ต๊ณ„")
print("="*60)
print(f"์ด ํŒŒ์ผ ์ˆ˜: {self.stats['total_files']}")
if self.stats['total_files'] > 0:
success_rate = self.stats['success_files'] / self.stats['total_files'] * 100
fail_rate = self.stats['failed_files'] / self.stats['total_files'] * 100
print(f" - ์ถ”์ถœ ์„ฑ๊ณต: {self.stats['success_files']} ({success_rate:.1f}%)")
print(f" - ์ถ”์ถœ ์‹คํŒจ: {self.stats['failed_files']} ({fail_rate:.1f}%)")
print(f"์ด ์ฒญํฌ ์ˆ˜: {self.stats['total_chunks']}")
if self.stats['success_files'] > 0:
avg_chunks = self.stats['total_chunks'] / self.stats['success_files']
print(f"ํŒŒ์ผ๋‹น ํ‰๊ท  ์ฒญํฌ: {avg_chunks:.1f}๊ฐœ")