Spaces:
Sleeping
Sleeping
| # -*- coding: utf-8 -*- | |
| """ | |
| SmartEyeSsen PDF ์ฒ๋ฆฌ ์๋น์ค | |
| ============================ | |
| PDF ํ์ผ์ ํ์ด์ง๋ณ ์ด๋ฏธ์ง๋ก ๋ณํํ๋ ๊ธฐ๋ฅ์ ์ ๊ณตํฉ๋๋ค. | |
| PyMuPDF (fitz)๋ฅผ ์ฌ์ฉํ์ฌ ๊ณ ํ์ง ์ด๋ฏธ์ง ๋ณํ์ ์ํํฉ๋๋ค. | |
| """ | |
| from typing import List, Dict, Optional, Tuple | |
| from loguru import logger | |
| import os | |
| import fitz # PyMuPDF | |
| from PIL import Image | |
| import io | |
| from pathlib import Path | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| DEFAULT_PDF_DPI = 300 | |
| class PDFProcessor: | |
| """PDF ํ์ผ ์ฒ๋ฆฌ ํด๋์ค""" | |
| def __init__(self, upload_directory: str = "uploads", dpi: Optional[int] = None): | |
| """ | |
| PDF ์ฒ๋ฆฌ๊ธฐ ์ด๊ธฐํ | |
| Args: | |
| upload_directory: ํ์ผ ์ ์ฅ ๊ธฐ๋ณธ ๋๋ ํ ๋ฆฌ | |
| dpi: ์ด๋ฏธ์ง ๋ณํ ํด์๋ (๊ธฐ๋ณธ๊ฐ: 300) | |
| """ | |
| self.upload_directory = Path(upload_directory).resolve() | |
| self.dpi = self._resolve_dpi(dpi) | |
| self.jpeg_quality = 95 | |
| os.makedirs(self.upload_directory, exist_ok=True) | |
| logger.info( | |
| f"PDFProcessor ์ด๊ธฐํ ์๋ฃ - DPI: {self.dpi}, ์ ์ฅ ๊ฒฝ๋ก: {self.upload_directory}" | |
| ) | |
| def _resolve_dpi(provided_dpi: Optional[int]) -> int: | |
| """ํ๊ฒฝ ๋ณ์์ ์ธ์ ๊ฐ์ ๊ณ ๋ คํด DPI๋ฅผ ๊ฒฐ์ """ | |
| if provided_dpi and provided_dpi > 0: | |
| return int(provided_dpi) | |
| env_value = os.getenv("PDF_PROCESSOR_DPI") | |
| if env_value: | |
| try: | |
| parsed = int(env_value) | |
| if parsed > 0: | |
| logger.debug( | |
| f"ํ๊ฒฝ ๋ณ์ PDF_PROCESSOR_DPI ์ ์ฉ: {parsed} (์ธ์ ๋ฏธ์ง์ )" | |
| ) | |
| return parsed | |
| except ValueError: | |
| logger.warning( | |
| f"ํ๊ฒฝ ๋ณ์ PDF_PROCESSOR_DPI ๊ฐ '{env_value}'์(๋ฅผ) ์ ์๋ก ๋ณํํ ์ ์์ด ๊ธฐ๋ณธ๊ฐ {DEFAULT_PDF_DPI}์ ์ฌ์ฉํฉ๋๋ค." | |
| ) | |
| return DEFAULT_PDF_DPI | |
| def convert_pdf_to_images( | |
| self, | |
| pdf_bytes: bytes, | |
| project_id: int, | |
| start_page_number: int | |
| ) -> List[Dict[str, any]]: | |
| """ | |
| PDF ๋ฐ์ดํธ ๋ฐ์ดํฐ๋ฅผ ํ์ด์ง๋ณ ์ด๋ฏธ์ง๋ก ๋ณํํ๊ณ ์ ์ฅ | |
| Args: | |
| pdf_bytes: PDF ํ์ผ์ ๋ฐ์ดํธ ๋ฐ์ดํฐ | |
| project_id: ํ๋ก์ ํธ ID (ํด๋ ๊ฒฝ๋ก์ฉ) | |
| start_page_number: ์์ ํ์ด์ง ๋ฒํธ | |
| Returns: | |
| ๋ณํ๋ ์ด๋ฏธ์ง ์ ๋ณด ๋ฆฌ์คํธ | |
| [ | |
| { | |
| 'page_number': 1, | |
| 'image_path': '123/page_1.jpg', # DB ์ ์ฅ์ฉ ์๋ ๊ฒฝ๋ก | |
| 'full_path': 'uploads/123/page_1.jpg', # ์ค์ ํ์ผ ๊ฒฝ๋ก | |
| 'width': 2480, | |
| 'height': 3508 | |
| }, | |
| ... | |
| ] | |
| Raises: | |
| ValueError: PDF ํ์ผ์ด ์์๋์๊ฑฐ๋ ์ฝ์ ์ ์๋ ๊ฒฝ์ฐ | |
| OSError: ํ์ผ ์ ์ฅ ์ค ๋์คํฌ ์ค๋ฅ ๋ฐ์ ์ | |
| """ | |
| logger.info(f"PDF ๋ณํ ์์ - ProjectID: {project_id}, ์์ ํ์ด์ง: {start_page_number}") | |
| # ํ๋ก์ ํธ๋ณ ์ ์ฅ ๋๋ ํ ๋ฆฌ ์์ฑ | |
| project_dir = self.upload_directory / str(project_id) | |
| project_dir.mkdir(parents=True, exist_ok=True) | |
| converted_pages = [] | |
| pdf_document = None | |
| try: | |
| # PDF ๋ฌธ์ ์ด๊ธฐ | |
| pdf_document = fitz.open(stream=pdf_bytes, filetype="pdf") | |
| total_pages = len(pdf_document) | |
| logger.info(f"PDF ํ์ด์ง ์: {total_pages}") | |
| if total_pages == 0: | |
| raise ValueError("PDF ํ์ผ์ ํ์ด์ง๊ฐ ์์ต๋๋ค.") | |
| # PDF ์๋ณธ ํ์ผ ์ ์ฅ | |
| original_pdf_path = project_dir / "original.pdf" | |
| with open(original_pdf_path, "wb") as f: | |
| f.write(pdf_bytes) | |
| logger.info(f"PDF ์๋ณธ ์ ์ฅ ์๋ฃ: {original_pdf_path}") | |
| # ๊ฐ ํ์ด์ง๋ฅผ ์ด๋ฏธ์ง๋ก ๋ณํ | |
| for page_index in range(total_pages): | |
| page_number = start_page_number + page_index | |
| try: | |
| # PDF ํ์ด์ง๋ฅผ Pixmap์ผ๋ก ๋ ๋๋ง | |
| page = pdf_document[page_index] | |
| # DPI ๊ธฐ๋ฐ ํ๋ ๋น์จ ๊ณ์ฐ (72 DPI๊ฐ ๊ธฐ๋ณธ) | |
| zoom = self.dpi / 72 | |
| mat = fitz.Matrix(zoom, zoom) | |
| pix = page.get_pixmap(matrix=mat, alpha=False) | |
| # PIL Image๋ก ๋ณํ | |
| img_data = pix.tobytes("jpeg") | |
| img = Image.open(io.BytesIO(img_data)) | |
| # ์ด๋ฏธ์ง ํฌ๊ธฐ | |
| width, height = img.size | |
| # ํ์ผ๋ช ๋ฐ ๊ฒฝ๋ก ์์ฑ | |
| filename = f"page_{page_number}.jpg" | |
| full_path = project_dir / filename | |
| public_path = Path("uploads") / str(project_id) / filename | |
| # ์ด๋ฏธ์ง ์ ์ฅ (JPEG ํ์ง ์ ์ฉ) | |
| img.save(str(full_path), "JPEG", quality=self.jpeg_quality, optimize=True) | |
| # ๋ณํ ์ ๋ณด ์ ์ฅ | |
| page_info = { | |
| 'page_number': page_number, | |
| 'image_path': str(public_path).replace("\\", "/"), | |
| 'full_path': str(full_path), | |
| 'width': width, | |
| 'height': height, | |
| 'dpi': self.dpi, | |
| } | |
| converted_pages.append(page_info) | |
| logger.debug( | |
| f"ํ์ด์ง {page_index + 1}/{total_pages} ๋ณํ ์๋ฃ - " | |
| f"ํ์ด์ง ๋ฒํธ: {page_number}, ํฌ๊ธฐ: {width}x{height}" | |
| ) | |
| except Exception as e: | |
| logger.error(f"ํ์ด์ง {page_index + 1} ๋ณํ ์คํจ: {str(e)}") | |
| # ๋ถ๋ถ ๋ณํ ์คํจ ์ ๋กค๋ฐฑ | |
| self._rollback_conversion(converted_pages) | |
| raise ValueError(f"PDF ํ์ด์ง {page_index + 1} ๋ณํ ์คํจ: {str(e)}") | |
| logger.info( | |
| f"PDF ๋ณํ ์๋ฃ - ProjectID: {project_id}, " | |
| f"์ด {len(converted_pages)}๊ฐ ํ์ด์ง ๋ณํ" | |
| ) | |
| return converted_pages | |
| except fitz.fitz.FileDataError as e: | |
| logger.error(f"PDF ํ์ผ ์ค๋ฅ: {str(e)}") | |
| raise ValueError(f"PDF ํ์ผ์ด ์์๋์๊ฑฐ๋ ์ฝ์ ์ ์์ต๋๋ค: {str(e)}") | |
| except Exception as e: | |
| logger.error(f"PDF ๋ณํ ์ค ์์์น ๋ชปํ ์ค๋ฅ: {str(e)}") | |
| if converted_pages: | |
| self._rollback_conversion(converted_pages) | |
| raise | |
| finally: | |
| # PDF ๋ฌธ์ ๋ซ๊ธฐ | |
| if pdf_document: | |
| pdf_document.close() | |
| def convert_pdf_to_images_parallel( | |
| self, | |
| pdf_bytes: bytes, | |
| project_id: int, | |
| start_page_number: int, | |
| max_workers: Optional[int] = None | |
| ) -> List[Dict[str, any]]: | |
| """ | |
| PDF ๋ฐ์ดํธ ๋ฐ์ดํฐ๋ฅผ ํ์ด์ง๋ณ ์ด๋ฏธ์ง๋ก ๋ณ๋ ฌ ๋ณํํ๊ณ ์ ์ฅ | |
| Args: | |
| pdf_bytes: PDF ํ์ผ์ ๋ฐ์ดํธ ๋ฐ์ดํฐ | |
| project_id: ํ๋ก์ ํธ ID (ํด๋ ๊ฒฝ๋ก์ฉ) | |
| start_page_number: ์์ ํ์ด์ง ๋ฒํธ | |
| max_workers: ์ต๋ ์์ปค ์ค๋ ๋ ์ (None์ด๋ฉด CPU ์ฝ์ด ์, ์ต๋ 4๊ฐ) | |
| Returns: | |
| ๋ณํ๋ ์ด๋ฏธ์ง ์ ๋ณด ๋ฆฌ์คํธ | |
| Note: | |
| ThreadPoolExecutor๋ฅผ ์ฌ์ฉํ์ฌ ์ฌ๋ฌ ํ์ด์ง๋ฅผ ๋์์ ๋ณํํฉ๋๋ค. | |
| ๋์ฉ๋ PDF์ ๊ฒฝ์ฐ ๋ณํ ์๋๊ฐ 2-3๋ฐฐ ํฅ์๋ฉ๋๋ค. | |
| max_workers๋ฅผ ๋๋ฌด ํฌ๊ฒ ์ค์ ํ๋ฉด ๋ฉ๋ชจ๋ฆฌ ์ฌ์ฉ๋์ด ์ฆ๊ฐํ ์ ์์ผ๋ฏ๋ก ์ฃผ์ํ์ธ์. | |
| """ | |
| logger.info( | |
| f"PDF ๋ณ๋ ฌ ๋ณํ ์์ - ProjectID: {project_id}, ์์ ํ์ด์ง: {start_page_number}" | |
| ) | |
| # ํ๋ก์ ํธ๋ณ ์ ์ฅ ๋๋ ํ ๋ฆฌ ์์ฑ | |
| project_dir = self.upload_directory / str(project_id) | |
| project_dir.mkdir(parents=True, exist_ok=True) | |
| pdf_document = None | |
| converted_pages = [] | |
| try: | |
| # PDF ๋ฌธ์ ์ด๊ธฐ | |
| pdf_document = fitz.open(stream=pdf_bytes, filetype="pdf") | |
| total_pages = len(pdf_document) | |
| logger.info(f"PDF ํ์ด์ง ์: {total_pages}") | |
| if total_pages == 0: | |
| raise ValueError("PDF ํ์ผ์ ํ์ด์ง๊ฐ ์์ต๋๋ค.") | |
| # PDF ์๋ณธ ํ์ผ ์ ์ฅ | |
| original_pdf_path = project_dir / "original.pdf" | |
| with open(original_pdf_path, "wb") as f: | |
| f.write(pdf_bytes) | |
| logger.info(f"PDF ์๋ณธ ์ ์ฅ ์๋ฃ: {original_pdf_path}") | |
| # ์์ปค ์ ๊ฒฐ์ (๊ธฐ๋ณธ: CPU ์ฝ์ด ์, ์ต๋ 4๊ฐ) | |
| if max_workers is None: | |
| max_workers = min(os.cpu_count() or 4, 4) | |
| logger.info(f"๋ณ๋ ฌ ๋ณํ ์์: {max_workers}๊ฐ ์์ปค ์ฌ์ฉ") | |
| def convert_single_page(page_index: int) -> Dict[str, any]: | |
| """ | |
| ๋จ์ผ ํ์ด์ง ๋ณํ (์์ ๋ ๋ฆฝ ์คํ) | |
| ๊ฐ ์ค๋ ๋๊ฐ ๋ ๋ฆฝ์ ์ธ PDF ๋ฌธ์ ์ธ์คํด์ค๋ฅผ ์์ฑํ์ฌ | |
| ์ง์ ํ ๋ณ๋ ฌ ์ฒ๋ฆฌ๋ฅผ ์ํํฉ๋๋ค. | |
| """ | |
| page_number = start_page_number + page_index | |
| try: | |
| # ๊ฐ ์ค๋ ๋๊ฐ ๋ ๋ฆฝ์ ์ธ PDF ๋ฌธ์ ์ธ์คํด์ค ์์ฑ | |
| # PyMuPDF๋ ๊ฐ Document ๊ฐ์ฒด๊ฐ ๋ ๋ฆฝ์ ์ด๋ฉด ์ค๋ ๋ ์์ ํจ | |
| temp_doc = fitz.open(stream=pdf_bytes, filetype="pdf") | |
| page = temp_doc[page_index] | |
| # DPI ๊ธฐ๋ฐ ํ๋ ๋น์จ ๊ณ์ฐ | |
| zoom = self.dpi / 72 | |
| mat = fitz.Matrix(zoom, zoom) | |
| pix = page.get_pixmap(matrix=mat, alpha=False) | |
| # PIL Image๋ก ๋ณํ | |
| img_data = pix.tobytes("jpeg") | |
| temp_doc.close() | |
| img = Image.open(io.BytesIO(img_data)) | |
| width, height = img.size | |
| # ํ์ผ๋ช ๋ฐ ๊ฒฝ๋ก ์์ฑ | |
| filename = f"page_{page_number}.jpg" | |
| full_path = project_dir / filename | |
| public_path = Path("uploads") / str(project_id) / filename | |
| # ์ด๋ฏธ์ง ์ ์ฅ | |
| img.save(str(full_path), "JPEG", quality=self.jpeg_quality, optimize=True) | |
| logger.debug( | |
| f"ํ์ด์ง {page_index + 1}/{total_pages} ๋ณํ ์๋ฃ - " | |
| f"ํ์ด์ง ๋ฒํธ: {page_number}, ํฌ๊ธฐ: {width}x{height}" | |
| ) | |
| return { | |
| 'page_number': page_number, | |
| 'image_path': str(public_path).replace("\\", "/"), | |
| 'full_path': str(full_path), | |
| 'width': width, | |
| 'height': height, | |
| 'dpi': self.dpi, | |
| } | |
| except Exception as e: | |
| logger.error(f"ํ์ด์ง {page_index + 1} ๋ณ๋ ฌ ๋ณํ ์คํจ: {str(e)}") | |
| raise ValueError(f"PDF ํ์ด์ง {page_index + 1} ๋ณํ ์คํจ: {str(e)}") | |
| # ThreadPoolExecutor๋ก ๋ณ๋ ฌ ์ฒ๋ฆฌ | |
| with ThreadPoolExecutor(max_workers=max_workers) as executor: | |
| # ๋ชจ๋ ํ์ด์ง์ ๋ํ Future ์์ฑ | |
| future_to_page = { | |
| executor.submit(convert_single_page, i): i | |
| for i in range(total_pages) | |
| } | |
| # ์๋ฃ๋ ์์๋๋ก ๊ฒฐ๊ณผ ์์ง | |
| for future in as_completed(future_to_page): | |
| page_index = future_to_page[future] | |
| try: | |
| page_info = future.result() | |
| converted_pages.append(page_info) | |
| except Exception as e: | |
| logger.error(f"ํ์ด์ง {page_index + 1} ์ฒ๋ฆฌ ์คํจ: {str(e)}") | |
| # ์คํจ ์ ๋กค๋ฐฑ | |
| self._rollback_conversion(converted_pages) | |
| raise | |
| # ํ์ด์ง ๋ฒํธ ์์ผ๋ก ์ ๋ ฌ | |
| converted_pages.sort(key=lambda x: x['page_number']) | |
| logger.info( | |
| f"PDF ๋ณ๋ ฌ ๋ณํ ์๋ฃ - ProjectID: {project_id}, " | |
| f"์ด {len(converted_pages)}๊ฐ ํ์ด์ง ๋ณํ" | |
| ) | |
| return converted_pages | |
| except fitz.fitz.FileDataError as e: | |
| logger.error(f"PDF ํ์ผ ์ค๋ฅ: {str(e)}") | |
| raise ValueError(f"PDF ํ์ผ์ด ์์๋์๊ฑฐ๋ ์ฝ์ ์ ์์ต๋๋ค: {str(e)}") | |
| except Exception as e: | |
| logger.error(f"PDF ๋ณ๋ ฌ ๋ณํ ์ค ์์์น ๋ชปํ ์ค๋ฅ: {str(e)}") | |
| if converted_pages: | |
| self._rollback_conversion(converted_pages) | |
| raise | |
| finally: | |
| # PDF ๋ฌธ์ ๋ซ๊ธฐ | |
| if pdf_document: | |
| pdf_document.close() | |
| def _rollback_conversion(self, converted_pages: List[Dict[str, any]]) -> None: | |
| """ | |
| ๋ณํ ์คํจ ์ ์์ฑ๋ ์ด๋ฏธ์ง ํ์ผ ๋กค๋ฐฑ | |
| Args: | |
| converted_pages: ๋กค๋ฐฑํ ํ์ด์ง ์ ๋ณด ๋ฆฌ์คํธ | |
| """ | |
| logger.warning(f"๋ณํ ๋กค๋ฐฑ ์์ - {len(converted_pages)}๊ฐ ํ์ผ ์ญ์ ") | |
| for page_info in converted_pages: | |
| try: | |
| full_path = page_info.get('full_path') | |
| if full_path and os.path.exists(full_path): | |
| os.remove(full_path) | |
| logger.debug(f"ํ์ผ ์ญ์ : {full_path}") | |
| except Exception as e: | |
| logger.error(f"๋กค๋ฐฑ ์ค ํ์ผ ์ญ์ ์คํจ: {full_path}, ์ค๋ฅ: {str(e)}") | |
| logger.info("๋ณํ ๋กค๋ฐฑ ์๋ฃ") | |
| def get_pdf_info(self, pdf_bytes: bytes) -> Dict[str, any]: | |
| """ | |
| PDF ํ์ผ์ ๋ฉํ๋ฐ์ดํฐ ์ถ์ถ | |
| Args: | |
| pdf_bytes: PDF ํ์ผ์ ๋ฐ์ดํธ ๋ฐ์ดํฐ | |
| Returns: | |
| PDF ์ ๋ณด ๋์ ๋๋ฆฌ | |
| { | |
| 'total_pages': 10, | |
| 'title': '๋ฌธ์ ์ ๋ชฉ', | |
| 'author': '์์ฑ์', | |
| 'subject': '์ฃผ์ ', | |
| 'creator': '์์ฑ ํ๋ก๊ทธ๋จ', | |
| 'producer': 'PDF ์์ฑ๊ธฐ', | |
| 'creation_date': '์์ฑ ๋ ์ง' | |
| } | |
| """ | |
| try: | |
| pdf_document = fitz.open(stream=pdf_bytes, filetype="pdf") | |
| metadata = pdf_document.metadata | |
| info = { | |
| 'total_pages': len(pdf_document), | |
| 'title': metadata.get('title', ''), | |
| 'author': metadata.get('author', ''), | |
| 'subject': metadata.get('subject', ''), | |
| 'creator': metadata.get('creator', ''), | |
| 'producer': metadata.get('producer', ''), | |
| 'creation_date': metadata.get('creationDate', '') | |
| } | |
| pdf_document.close() | |
| logger.debug(f"PDF ๋ฉํ๋ฐ์ดํฐ ์ถ์ถ ์๋ฃ: {info}") | |
| return info | |
| except Exception as e: | |
| logger.error(f"PDF ๋ฉํ๋ฐ์ดํฐ ์ถ์ถ ์คํจ: {str(e)}") | |
| raise ValueError(f"PDF ํ์ผ ์ ๋ณด๋ฅผ ์ฝ์ ์ ์์ต๋๋ค: {str(e)}") | |
| # ์ ์ญ ์ธ์คํด์ค ์์ฑ (์ฑ๊ธํค ํจํด) | |
| UPLOAD_ROOT = os.getenv("UPLOAD_DIR", "uploads") | |
| pdf_processor = PDFProcessor(upload_directory=UPLOAD_ROOT) | |