Spaces:
Paused
Paused
| """ | |
| CPU에 최적화된 문서 처리 모듈 - 병렬 처리 적용 | |
| """ | |
| import os | |
| import time | |
| from typing import List, Dict, Any, Optional | |
| from langchain.schema import Document | |
| from concurrent.futures import ThreadPoolExecutor | |
| # 멀티프로세싱 가져오기 | |
| import multiprocessing | |
| try: | |
| CPU_COUNT = multiprocessing.cpu_count() | |
| except: | |
| CPU_COUNT = 4 | |
| print(f"CPU 코어 수: {CPU_COUNT}") | |
| # docling 라이브러리 존재 여부 확인 | |
| try: | |
| from docling.datamodel.base_models import InputFormat | |
| from docling.document_converter import DocumentConverter, PdfFormatOption | |
| from docling.datamodel.pipeline_options import PdfPipelineOptions, TableFormerMode | |
| from docling.chunking import HybridChunker | |
| DOCLING_AVAILABLE = True | |
| print("docling 라이브러리 사용 가능") | |
| except ImportError: | |
| print("docling 라이브러리를 찾을 수 없습니다. PyPDFLoader만 사용합니다.") | |
| DOCLING_AVAILABLE = False | |
| # LangChain 문서 로더 | |
| from langchain_community.document_loaders import PyPDFLoader | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| class OptimizedDocumentProcessor: | |
| """ | |
| CPU에 최적화된 병렬 처리 문서 처리 클래스 | |
| """ | |
| def __init__(self, | |
| chunk_size: int = 1000, | |
| chunk_overlap: int = 200, | |
| tokenizer: str = "Alibaba-NLP/gte-multilingual-base", # 올바른 모델 경로로 수정 | |
| max_workers: int = CPU_COUNT): | |
| """ | |
| 문서 처리기 초기화 | |
| Args: | |
| chunk_size: 텍스트 청크 크기 | |
| chunk_overlap: 청크 간 겹침 크기 | |
| tokenizer: HybridChunker에서 사용할 토크나이저 | |
| max_workers: 병렬 처리시 최대 작업자 수 | |
| """ | |
| self.chunk_size = chunk_size | |
| self.chunk_overlap = chunk_overlap | |
| self.tokenizer = tokenizer | |
| self.max_workers = max(1, min(max_workers, CPU_COUNT)) # CPU 코어 수 초과하지 않도록 | |
| print(f"병렬 처리 작업자 수: {self.max_workers}") | |
| # LangChain 텍스트 스플리터 | |
| self.text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=chunk_size, | |
| chunk_overlap=chunk_overlap, | |
| separators=["\n\n", "\n", ". ", " ", ""], | |
| ) | |
| # docling 관련 컴포넌트 초기화 | |
| if DOCLING_AVAILABLE: | |
| # 파이프라인 옵션 설정 | |
| self.pipeline_options = PdfPipelineOptions(do_table_structure=True) | |
| self.pipeline_options.table_structure_options.mode = TableFormerMode.ACCURATE | |
| # 문서 변환기 초기화 | |
| self.doc_converter = DocumentConverter( | |
| format_options={ | |
| InputFormat.PDF: PdfFormatOption(pipeline_options=self.pipeline_options) | |
| } | |
| ) | |
| # HybridChunker 초기화 (trust_remote_code=True 추가) | |
| self.hybrid_chunker = HybridChunker( | |
| tokenizer=tokenizer, | |
| chunk_size=chunk_size, | |
| overlap=chunk_overlap, | |
| tokenizer_kwargs={"trust_remote_code": True} # 원격 코드 실행 허용 | |
| ) | |
| print(f"docling 초기화 완료: HybridChunker(청크 크기={chunk_size}, 오버랩={chunk_overlap})") | |
| def process_with_docling(self, pdf_path: str) -> Dict[str, Any]: | |
| """ | |
| docling을 사용하여 PDF 문서 처리 | |
| Args: | |
| pdf_path: PDF 파일 경로 | |
| Returns: | |
| 처리된 문서 데이터 | |
| """ | |
| if not DOCLING_AVAILABLE: | |
| raise ImportError("docling 라이브러리가 설치되지 않았습니다.") | |
| try: | |
| start_time = time.time() | |
| # 문서 변환 | |
| conv_res = self.doc_converter.convert(pdf_path) | |
| doc = conv_res.document | |
| # 성능 측정 | |
| conversion_time = time.time() - start_time | |
| print(f"PDF 변환 시간: {conversion_time:.2f}초") | |
| # 메타데이터 추출 | |
| metadata = { | |
| "source": pdf_path, | |
| "title": os.path.basename(pdf_path), | |
| "processing_time": conversion_time | |
| } | |
| return { | |
| "content": doc.export_to_markdown(), | |
| "metadata": metadata, | |
| "raw_document": doc, | |
| } | |
| except Exception as e: | |
| print(f"docling으로 문서 처리 중 오류 발생: {e}") | |
| raise | |
| def chunk_with_hybrid_chunker(self, doc: Any) -> List[Dict[str, Any]]: | |
| """ | |
| HybridChunker를 사용하여 문서를 청크로 분할 | |
| Args: | |
| doc: docling 문서 객체 | |
| Returns: | |
| 청크 리스트 | |
| """ | |
| start_time = time.time() | |
| # 청킹 수행 | |
| chunk_iter = self.hybrid_chunker.chunk(doc) | |
| chunks = list(chunk_iter) | |
| chunking_time = time.time() - start_time | |
| print(f"청킹 시간: {chunking_time:.2f}초 (청크 수: {len(chunks)})") | |
| return chunks | |
| def create_langchain_documents_from_chunks(self, | |
| chunks: List[Dict[str, Any]], | |
| metadata: Dict[str, Any]) -> List[Document]: | |
| """ | |
| docling 청크를 LangChain Document 객체로 변환 | |
| Args: | |
| chunks: docling HybridChunker로 생성한 청크 리스트 | |
| metadata: 문서 메타데이터 | |
| Returns: | |
| LangChain Document 객체 리스트 | |
| """ | |
| documents = [] | |
| for i, chunk in enumerate(chunks): | |
| # 각 청크에 대한 메타데이터 | |
| chunk_metadata = metadata.copy() | |
| chunk_metadata["chunk_id"] = i | |
| # 청크 내용 추출 | |
| if hasattr(chunk, "text"): | |
| content = chunk.text | |
| elif hasattr(chunk, "content"): | |
| content = chunk.content | |
| else: | |
| content = str(chunk) | |
| document = Document( | |
| page_content=content, | |
| metadata=chunk_metadata | |
| ) | |
| documents.append(document) | |
| return documents | |
| def process_with_langchain(self, pdf_path: str) -> List[Document]: | |
| """ | |
| LangChain의 PyPDFLoader를 사용하여 PDF 문서 로드 | |
| Args: | |
| pdf_path: PDF 파일 경로 | |
| Returns: | |
| LangChain Document 객체 리스트 | |
| """ | |
| start_time = time.time() | |
| try: | |
| loader = PyPDFLoader(pdf_path) | |
| documents = loader.load() | |
| processing_time = time.time() - start_time | |
| print(f"PyPDFLoader 처리 시간: {processing_time:.2f}초") | |
| return documents | |
| except Exception as e: | |
| print(f"PyPDFLoader로 문서 처리 중 오류 발생: {e}") | |
| raise | |
| def process_pdf(self, pdf_path: str, use_docling: bool = True) -> List[Document]: | |
| """ | |
| PDF 파일 처리 | |
| Args: | |
| pdf_path: PDF 파일 경로 | |
| use_docling: docling 사용 여부 | |
| Returns: | |
| 처리된 문서의 청크 리스트 | |
| """ | |
| total_start_time = time.time() | |
| # docling 사용 가능 여부 확인 | |
| can_use_docling = use_docling and DOCLING_AVAILABLE | |
| if can_use_docling: | |
| try: | |
| # 1. docling으로 PDF 처리 | |
| docling_result = self.process_with_docling(pdf_path) | |
| doc = docling_result["raw_document"] | |
| metadata = docling_result["metadata"] | |
| # 2. HybridChunker로 청크 생성 | |
| chunks = self.chunk_with_hybrid_chunker(doc) | |
| # 3. 청크를 LangChain Document로 변환 | |
| documents = self.create_langchain_documents_from_chunks(chunks, metadata) | |
| total_time = time.time() - total_start_time | |
| print(f"docling 처리 완료: '{pdf_path}', {len(documents)} 청크, 총 {total_time:.2f}초") | |
| return documents | |
| except Exception as e: | |
| print(f"docling 처리 실패, PyPDFLoader로 대체: {e}") | |
| can_use_docling = False | |
| if not can_use_docling: | |
| # PyPDFLoader로 처리 (대체 방안) | |
| documents = self.process_with_langchain(pdf_path) | |
| chunks = self.text_splitter.split_documents(documents) | |
| total_time = time.time() - total_start_time | |
| print(f"PyPDFLoader 처리 완료: '{pdf_path}', {len(chunks)} 청크, 총 {total_time:.2f}초") | |
| return chunks | |
| def process_directory_parallel(self, directory: str, use_docling: bool = True) -> List[Document]: | |
| """ | |
| 디렉토리 내 모든 PDF 파일 병렬 처리 (멀티스레딩) | |
| Args: | |
| directory: PDF 파일 디렉토리 경로 | |
| use_docling: docling 사용 여부 | |
| Returns: | |
| 처리된 모든 문서의 청크 리스트 | |
| """ | |
| all_documents = [] | |
| pdf_files = [] | |
| # PDF 파일 목록 수집 | |
| for file in os.listdir(directory): | |
| if file.endswith(".pdf"): | |
| pdf_path = os.path.join(directory, file) | |
| pdf_files.append(pdf_path) | |
| if not pdf_files: | |
| print(f"'{directory}' 디렉토리에 PDF 파일이 없습니다.") | |
| return [] | |
| print(f"총 {len(pdf_files)}개 PDF 파일 병렬 처리 시작 (최대 {self.max_workers} 작업자)") | |
| start_time = time.time() | |
| # 병렬 처리 실행 | |
| with ThreadPoolExecutor(max_workers=self.max_workers) as executor: | |
| # 각 PDF 파일에 대해 process_pdf 함수 병렬 실행 | |
| future_to_pdf = {executor.submit(self.process_pdf, pdf_path, use_docling): pdf_path | |
| for pdf_path in pdf_files} | |
| # 결과 수집 | |
| for future in future_to_pdf: | |
| pdf_path = future_to_pdf[future] | |
| try: | |
| # 결과 가져오기 | |
| chunks = future.result() | |
| all_documents.extend(chunks) | |
| print(f"'{os.path.basename(pdf_path)}' 처리 완료: {len(chunks)} 청크") | |
| except Exception as e: | |
| print(f"'{pdf_path}' 처리 중 오류 발생: {e}") | |
| total_time = time.time() - start_time | |
| print(f"병렬 처리 완료: 총 {len(all_documents)} 청크, 처리 시간: {total_time:.2f}초") | |
| return all_documents | |
| def process_directory(self, directory: str, use_docling: bool = True, parallel: bool = True) -> List[Document]: | |
| """ | |
| 디렉토리 내 모든 PDF 파일 처리 | |
| Args: | |
| directory: PDF 파일 디렉토리 경로 | |
| use_docling: docling 사용 여부 | |
| parallel: 병렬 처리 사용 여부 | |
| Returns: | |
| 처리된 모든 문서의 청크 리스트 | |
| """ | |
| # 병렬 처리 사용 | |
| if parallel: | |
| return self.process_directory_parallel(directory, use_docling) | |
| # 순차 처리 | |
| all_documents = [] | |
| start_time = time.time() | |
| for file in os.listdir(directory): | |
| if file.endswith(".pdf"): | |
| pdf_path = os.path.join(directory, file) | |
| print(f"처리 중: {pdf_path}") | |
| try: | |
| chunks = self.process_pdf(pdf_path, use_docling=use_docling) | |
| all_documents.extend(chunks) | |
| except Exception as e: | |
| print(f"'{pdf_path}' 처리 중 오류 발생: {e}") | |
| total_time = time.time() - start_time | |
| print(f"순차 처리 완료: 총 {len(all_documents)} 청크, 처리 시간: {total_time:.2f}초") | |
| return all_documents |