Spaces:
Sleeping
Sleeping
| """ | |
| ๋ฌธ์ ์ฒ๋ฆฌ ์ ํธ๋ฆฌํฐ ๋ชจ๋ | |
| """ | |
| import os | |
| import re | |
| import csv | |
| import io | |
| import logging | |
| from typing import List, Dict, Any, Optional, Tuple, Union | |
| import numpy as np | |
| logger = logging.getLogger("DocProcessor") | |
| if not logger.hasHandlers(): | |
| handler = logging.StreamHandler() | |
| formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
| handler.setFormatter(formatter) | |
| logger.addHandler(handler) | |
| logger.setLevel(logging.INFO) | |
| class DocumentProcessor: | |
| """๋ฌธ์ ์ฒ๋ฆฌ ์ ํธ๋ฆฌํฐ ํด๋์ค""" | |
| def split_text( | |
| text: str, | |
| chunk_size: int = 512, | |
| chunk_overlap: int = 50, | |
| separator: str = "\n" | |
| ) -> List[str]: | |
| """ | |
| ํ ์คํธ๋ฅผ ๋ ์์ ์ฒญํฌ๋ก ๋ถํ | |
| Args: | |
| text: ๋ถํ ํ ํ ์คํธ | |
| chunk_size: ๊ฐ ์ฒญํฌ์ ์ต๋ ๋ฌธ์ ์ | |
| chunk_overlap: ์ฒญํฌ ๊ฐ ์ค์ฒฉ๋๋ ๋ฌธ์ ์ | |
| separator: ๋ถํ ์ ์ฌ์ฉํ ๊ตฌ๋ถ์ | |
| Returns: | |
| ๋ถํ ๋ ํ ์คํธ ์ฒญํฌ ๋ชฉ๋ก | |
| """ | |
| if not text or chunk_size <= 0: | |
| return [] | |
| # ๊ตฌ๋ถ์๋ก ๋ถํ | |
| parts = text.split(separator) | |
| chunks = [] | |
| current_chunk = [] | |
| current_size = 0 | |
| for part in parts: | |
| part_size = len(part) | |
| if current_size + part_size + len(current_chunk) > chunk_size and current_chunk: | |
| # ํ์ฌ ์ฒญํฌ๊ฐ ์ต๋ ํฌ๊ธฐ๋ฅผ ์ด๊ณผํ๋ฉด ์ ์ฅ | |
| chunks.append(separator.join(current_chunk)) | |
| # ์ค์ฒฉ์ ์ํด ์ผ๋ถ ์ฒญํฌ ์ ์ง | |
| overlap_tokens = [] | |
| overlap_size = 0 | |
| for token in reversed(current_chunk): | |
| if overlap_size + len(token) <= chunk_overlap: | |
| overlap_tokens.insert(0, token) | |
| overlap_size += len(token) + 1 # separator ๊ธธ์ด ํฌํจ | |
| else: | |
| break | |
| current_chunk = overlap_tokens | |
| current_size = overlap_size - len(current_chunk) # separator ๊ธธ์ด ์ ์ธ | |
| current_chunk.append(part) | |
| current_size += part_size | |
| # ๋ง์ง๋ง ์ฒญํฌ ์ถ๊ฐ | |
| if current_chunk: | |
| chunks.append(separator.join(current_chunk)) | |
| return chunks | |
| def clean_text(text: str, remove_urls: bool = True, remove_extra_whitespace: bool = True) -> str: | |
| """ | |
| ํ ์คํธ ์ ์ | |
| Args: | |
| text: ์ ์ ํ ํ ์คํธ | |
| remove_urls: URL ์ ๊ฑฐ ์ฌ๋ถ | |
| remove_extra_whitespace: ์ฌ๋ถ์ ๊ณต๋ฐฑ ์ ๊ฑฐ ์ฌ๋ถ | |
| Returns: | |
| ์ ์ ๋ ํ ์คํธ | |
| """ | |
| if not text: | |
| return "" | |
| # URL ์ ๊ฑฐ | |
| if remove_urls: | |
| text = re.sub(r'https?://\S+|www\.\S+', '', text) | |
| # ํน์ ๋ฌธ์ ๋ฐ HTML ํ๊ทธ ์ ์ | |
| text = re.sub(r'<.*?>', '', text) # HTML ํ๊ทธ ์ ๊ฑฐ | |
| # ์ฌ๋ถ์ ๊ณต๋ฐฑ ์ ๊ฑฐ | |
| if remove_extra_whitespace: | |
| text = re.sub(r'\s+', ' ', text).strip() | |
| return text | |
| def text_to_documents( | |
| text: str, | |
| metadata: Optional[Dict[str, Any]] = None, | |
| chunk_size: int = 512, | |
| chunk_overlap: int = 50 | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| ํ ์คํธ๋ฅผ ๋ฌธ์ ๊ฐ์ฒด ๋ชฉ๋ก์ผ๋ก ๋ณํ | |
| Args: | |
| text: ๋ณํํ ํ ์คํธ | |
| metadata: ๋ฌธ์์ ์ถ๊ฐํ ๋ฉํ๋ฐ์ดํฐ | |
| chunk_size: ๊ฐ ์ฒญํฌ์ ์ต๋ ๋ฌธ์ ์ | |
| chunk_overlap: ์ฒญํฌ ๊ฐ ์ค์ฒฉ๋๋ ๋ฌธ์ ์ | |
| Returns: | |
| ๋ฌธ์ ๊ฐ์ฒด ๋ชฉ๋ก | |
| """ | |
| if not text: | |
| return [] | |
| # ํ ์คํธ ์ ์ | |
| clean = DocumentProcessor.clean_text(text) | |
| # ํ ์คํธ ๋ถํ | |
| chunks = DocumentProcessor.split_text( | |
| clean, | |
| chunk_size=chunk_size, | |
| chunk_overlap=chunk_overlap | |
| ) | |
| # ๋ฌธ์ ๊ฐ์ฒด ์์ฑ | |
| documents = [] | |
| for i, chunk in enumerate(chunks): | |
| doc = { | |
| "text": chunk, | |
| "index": i, | |
| "chunk_count": len(chunks) | |
| } | |
| # ๋ฉํ๋ฐ์ดํฐ ์ถ๊ฐ | |
| if metadata: | |
| doc.update(metadata) | |
| documents.append(doc) | |
| return documents | |
| def load_documents_from_directory( | |
| directory: str, | |
| extensions: List[str] = [".txt", ".md", ".csv"], | |
| recursive: bool = True, | |
| chunk_size: int = 512, | |
| chunk_overlap: int = 50 | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| ๋๋ ํ ๋ฆฌ์์ ๋ฌธ์ ๋ก๋ ๋ฐ ์ฒ๋ฆฌ | |
| Args: | |
| directory: ๋ก๋ํ ๋๋ ํ ๋ฆฌ ๊ฒฝ๋ก | |
| extensions: ์ฒ๋ฆฌํ ํ์ผ ํ์ฅ์ ๋ชฉ๋ก | |
| recursive: ํ์ ๋๋ ํ ๋ฆฌ ๊ฒ์ ์ฌ๋ถ | |
| chunk_size: ๊ฐ ์ฒญํฌ์ ์ต๋ ๋ฌธ์ ์ | |
| chunk_overlap: ์ฒญํฌ ๊ฐ ์ค์ฒฉ๋๋ ๋ฌธ์ ์ | |
| Returns: | |
| ๋ฌธ์ ๊ฐ์ฒด ๋ชฉ๋ก | |
| """ | |
| if not os.path.isdir(directory): | |
| logger.error(f"๋๋ ํ ๋ฆฌ๋ฅผ ์ฐพ์ ์ ์์ต๋๋ค: {directory}") | |
| return [] | |
| documents = [] | |
| for root, dirs, files in os.walk(directory): | |
| if not recursive and root != directory: | |
| continue | |
| for file in files: | |
| _, ext = os.path.splitext(file) | |
| if ext.lower() not in extensions: | |
| continue | |
| file_path = os.path.join(root, file) | |
| rel_path = os.path.relpath(file_path, directory) | |
| try: | |
| logger.info(f"ํ์ผ ๋ก๋ ์ค: {rel_path}") | |
| # ๋จผ์ UTF-8๋ก ์๋ | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| except UnicodeDecodeError: | |
| # UTF-8๋ก ์คํจํ๋ฉด CP949(ํ๊ตญ์ด Windows ๊ธฐ๋ณธ ์ธ์ฝ๋ฉ)๋ก ์๋ | |
| logger.info(f"UTF-8 ๋์ฝ๋ฉ ์คํจ, CP949๋ก ์๋: {rel_path}") | |
| with open(file_path, 'r', encoding='cp949') as f: | |
| content = f.read() | |
| # ๋ฉํ๋ฐ์ดํฐ ์์ฑ | |
| metadata = { | |
| "source": rel_path, | |
| "filename": file, | |
| "filetype": ext.lower()[1:], | |
| "filepath": file_path | |
| } | |
| # CSV ํ์ผ์ ํน๋ณ ์ฒ๋ฆฌ | |
| if ext.lower() == '.csv': | |
| logger.info(f"CSV ํ์ผ ๊ฐ์ง, ํ ๋จ์๋ก ๋ถํ ์ฒ๋ฆฌ: {rel_path}") | |
| file_docs = DocumentProcessor.csv_to_documents(content, metadata) | |
| else: | |
| # ์ผ๋ฐ ํ ์คํธ ๋ฌธ์ ์ฒ๋ฆฌ | |
| file_docs = DocumentProcessor.text_to_documents( | |
| content, | |
| metadata=metadata, | |
| chunk_size=chunk_size, | |
| chunk_overlap=chunk_overlap | |
| ) | |
| documents.extend(file_docs) | |
| logger.info(f"{len(file_docs)}๊ฐ ์ฒญํฌ ์ถ์ถ: {rel_path}") | |
| except Exception as e: | |
| logger.error(f"ํ์ผ '{rel_path}' ์ฒ๋ฆฌ ์ค ์ค๋ฅ ๋ฐ์: {e}") | |
| continue | |
| logger.info(f"์ด {len(documents)}๊ฐ ๋ฌธ์ ์ฒญํฌ๋ฅผ ๋ก๋ํ์ต๋๋ค.") | |
| return documents | |
| def prepare_rag_context(results: List[Dict[str, Any]], field: str = "text") -> List[str]: | |
| """ | |
| ๊ฒ์ ๊ฒฐ๊ณผ์์ RAG์ ์ฌ์ฉํ ์ปจํ ์คํธ ์ถ์ถ | |
| Args: | |
| results: ๊ฒ์ ๊ฒฐ๊ณผ ๋ชฉ๋ก | |
| field: ํ ์คํธ ๋ด์ฉ์ด ์๋ ํ๋ ์ด๋ฆ | |
| Returns: | |
| ์ปจํ ์คํธ ํ ์คํธ ๋ชฉ๋ก | |
| """ | |
| context = [] | |
| for result in results: | |
| if field in result: | |
| context.append(result[field]) | |
| return context | |
| def csv_to_documents(content: str, metadata: Dict[str, Any]) -> List[Dict[str, Any]]: | |
| """ | |
| CSV ํ์ผ ๋ด์ฉ์ ํ ๋จ์๋ก ๋ถ๋ฆฌํ์ฌ ๊ฐ ํ์ ๋ณ๋์ ๋ฌธ์๋ก ์ฒ๋ฆฌ | |
| Args: | |
| content: CSV ํ์ผ์ ๋ด์ฉ | |
| metadata: ๊ธฐ๋ณธ ๋ฉํ๋ฐ์ดํฐ | |
| Returns: | |
| ๋ฌธ์ ๊ฐ์ฒด ๋ชฉ๋ก (๊ฐ ํ์ด ๋ณ๋์ ๋ฌธ์) | |
| """ | |
| documents = [] | |
| try: | |
| # ์ผ๋ฐ CSV ํ์ฑ ์๋ (์ฝ๋ง ๊ตฌ๋ถ์ ๊ธฐ๋ณธ) | |
| try: | |
| csv_reader = csv.reader(io.StringIO(content)) | |
| rows = list(csv_reader) | |
| if len(rows) > 0 and len(rows[0]) > 1: | |
| # ์ฝ๋ง๋ก ์ ๋๋ก ๊ตฌ๋ถ๋์๋ค๊ณ ํ๋จ | |
| logger.info(f"CSV ํ์ผ ์ฝ๋ง ๊ตฌ๋ถ์๋ก ์ฒ๋ฆฌ: {metadata.get('source', 'unknown')}") | |
| has_valid_format = True | |
| else: | |
| # ์ฝ๋ง๋ก ์ ๋๋ก ๊ตฌ๋ถ๋์ง ์์ | |
| has_valid_format = False | |
| except Exception: | |
| has_valid_format = False | |
| # ์ฝ๋ง ํ์์ด ์๋ ๊ฒฝ์ฐ, ๊ณต๋ฐฑ ๊ตฌ๋ถ์ ์ฒ๋ฆฌ ์๋ | |
| if not has_valid_format: | |
| logger.warning(f"CSV ํ์ผ์ด ํ์ค ์ฝ๋ง ํ์์ด ์๋๋๋ค. ๊ณต๋ฐฑ ๊ตฌ๋ถ์๋ก ์ฒ๋ฆฌํ๊ฒ ์ต๋๋ค: {metadata.get('source', 'unknown')}") | |
| lines = content.strip().split('\n') | |
| for i, line in enumerate(lines): | |
| # IT๋ก ์์ํ๋ ์ค๋ง ์ฒ๋ฆฌ (๋ฐ์ดํฐ ํ์ผ๋ก ๊ฐ์ฃผ) | |
| if not line.strip().startswith('IT'): | |
| continue | |
| # ๊ณต๋ฐฑ์ผ๋ก ๋ถ๋ฆฌํ๋, ์ต์ 5๊ฐ ์ด๋ก ๋ณด์ฅ | |
| parts = line.split(maxsplit=4) | |
| # ์ ํจํ ํ์ ์ต์ ๊ธธ์ด ํ์ธ | |
| if len(parts) < 5: | |
| logger.warning(f"ํ {i+1} ๋ถ์กฑํ ๋ฐ์ดํฐ: {line[:50]}...") | |
| continue | |
| # ๊ฐ ํ๋ ์ถ์ถ | |
| doc_id = parts[0].strip() # IT ๋ฒํธ | |
| query_type = parts[1].strip() # ์ฟผ๋ฆฌ ์ ํ | |
| question = parts[2].strip() # ์ง๋ฌธ | |
| answer = parts[3].strip() # ๋ต๋ณ | |
| reference = parts[4].strip() if len(parts) > 4 else "" # ์ฐธ์กฐ | |
| # ๋ฌธ์ ํ ์คํธ ์์ฑ - ๊ฐ ํ๋๋ฅผ ๊ตฌ๋ถํ์ฌ ํฌํจ | |
| text = f"ID: {doc_id}\n" | |
| text += f"์ฟผ๋ฆฌ ์ ํ: {query_type}\n" | |
| text += f"์ง์ (Question): {question}\n" | |
| text += f"์๋ต (Answer): {answer}\n" | |
| if reference: | |
| text += f"์ฐธ์กฐ ๋ฌธ์/๋งฅ๋ฝ (Reference/Context): {reference}" | |
| # ๋ฌธ์ ๊ฐ์ฒด ์์ฑ | |
| doc_metadata = metadata.copy() | |
| doc_metadata.update({ | |
| "row": i, | |
| "query_type": query_type, | |
| "question": question, | |
| "answer": answer, | |
| "reference": reference | |
| }) | |
| document = { | |
| "text": text, | |
| "id": doc_id, # IT ๋ฒํธ๋ฅผ ID๋ก ์ฌ์ฉ | |
| **doc_metadata | |
| } | |
| documents.append(document) | |
| logger.debug(f"IT ๋ฌธ์ ์ฒ๋ฆฌ: {doc_id} - {question[:30]}...") | |
| logger.info(f"๊ณต๋ฐฑ ๊ตฌ๋ถ์ CSV ํ์ผ '{metadata.get('source', 'unknown')}'์์ {len(documents)}๊ฐ ํ์ ๋ฌธ์๋ก ๋ณํํ์ต๋๋ค.") | |
| return documents | |
| # ํ์ค CSV ํ์ ์ฒ๋ฆฌ (์ฝ๋ง ๊ตฌ๋ถ์ ์ฌ์ฉ) | |
| if not rows: | |
| logger.warning(f"CSV ํ์ผ์ ๋ฐ์ดํฐ๊ฐ ์์ต๋๋ค: {metadata.get('source', 'unknown')}") | |
| return [] | |
| # ์ฒซ ๋ฒ์งธ ํ์ ํค๋๋ก ์ฌ์ฉ | |
| headers = rows[0] | |
| logger.debug(f"CSV ํค๋: {headers}") | |
| # ๊ฐ ํ์ ๋ณ๋์ ๋ฌธ์๋ก ๋ณํ | |
| for i, row in enumerate(rows[1:], 1): # ํค๋ ์ ์ธ, 1๋ถํฐ ์์ | |
| # ํ์ด ํค๋๋ณด๋ค ์งง์ผ๋ฉด ๋น ๊ฐ์ผ๋ก ์ฑ์ | |
| while len(row) < len(headers): | |
| row.append("") | |
| # ํ ๋ฐ์ดํฐ๋ฅผ ์ฌ์ ํ์ผ๋ก ๋ณํ | |
| row_data = {headers[j]: value for j, value in enumerate(row) if j < len(headers)} | |
| # ์ฒซ ๋ฒ์งธ ์ด์ ID๋ก ์ฌ์ฉ (์๋ ๊ฒฝ์ฐ) | |
| row_id = row[0] if row and len(row) > 0 else f"row_{i}" | |
| # ๋ฌธ์ ํ ์คํธ ์์ฑ - ๋ชจ๋ ํ๋๋ฅผ ํฌํจํ ํํ | |
| text_parts = [] | |
| for j, header in enumerate(headers): | |
| if j < len(row) and row[j]: | |
| text_parts.append(f"{header}: {row[j]}") | |
| text = "\n".join(text_parts) | |
| # ๋ฌธ์ ๊ฐ์ฒด ์์ฑ | |
| doc_metadata = metadata.copy() | |
| doc_metadata.update({ | |
| "row": i, | |
| "row_id": row_id, | |
| "total_rows": len(rows) - 1, # ํค๋ ์ ์ธ | |
| "csv_data": row_data # ์๋ณธ ํ ๋ฐ์ดํฐ๋ ์ ์ฅ | |
| }) | |
| document = { | |
| "text": text, | |
| "id": row_id, | |
| **doc_metadata | |
| } | |
| documents.append(document) | |
| logger.info(f"CSV ํ์ผ '{metadata.get('source', 'unknown')}'์์ {len(documents)}๊ฐ ํ์ ๋ฌธ์๋ก ๋ณํํ์ต๋๋ค.") | |
| except Exception as e: | |
| logger.error(f"CSV ํ์ผ ์ฒ๋ฆฌ ์ค ์ค๋ฅ ๋ฐ์: {e}") | |
| return documents | |