Spaces:
Sleeping
Sleeping
| """ | |
| Project Batch Analysis Service | |
| ============================= | |
| ์ค์ ๋ฐ์ดํฐ๋ฒ ์ด์ค(DB) ๊ธฐ๋ฐ์ผ๋ก ํ๋ก์ ํธ ๋ด ํ์ด์ง๋ค์ ์์ฐจ์ ์ผ๋ก ๋ถ์ํ๊ณ | |
| ์ ๋ ฌ(Question Grouping) ๋ฐ ํฌ๋งทํ (Text Version ์์ฑ)๊น์ง ์ํํฉ๋๋ค. | |
| ํ์ดํ๋ผ์ธ (ํ์ด์ง ๋จ์) | |
| 1. ์ด๋ฏธ์ง ๋ก๋ | |
| 2. AnalysisService๋ก ๋ ์ด์์ โ OCR โ (์ ํ) AI ์ค๋ช ์์ฑ | |
| 3. sorter.py๋ฅผ ์ด์ฉํ ์ ๋ ฌ ํ question_groups / question_elements ์ ์ฅ | |
| 4. TextFormatter๋ก ์๋ ํฌ๋งทํ โ text_versions์ ์ต์ ๋ฒ์ ๊ธฐ๋ก | |
| ๊ฒฐ๊ณผ๋ ํ์ด์ง๋ณ ์์ฝ ์ ๋ณด์ ํจ๊ป ํ๋ก์ ํธ ์ํ๋ฅผ ๊ฐฑ์ ํฉ๋๋ค. | |
| ๋ณ๋ ฌ ์ฒ๋ฆฌ ์ ๋ต | |
| ------------- | |
| - **ํ์ด์ง ๋ ๋ฒจ ๋ณ๋ ฌ**: ์ฌ๋ฌ ํ์ด์ง๋ฅผ ๋์์ ์ฒ๋ฆฌ (asyncio.gather + Semaphore) | |
| - **ํ์ด์ง ๋ด๋ถ ์์ฐจ**: ๋ ์ด์์/OCR/์ ๋ ฌ์ ๋๊ธฐ ์คํ | |
| * Tesseract/EasyOCR ์์ง์ด ์ค๋ ๋ ์์ ํ์ง ์์ asyncio.to_thread() ์ฌ์ฉ ๋ถ๊ฐ | |
| * YOLO ๋ชจ๋ธ๋ ์ค๋ ๋ ์์ ์ฑ ๋ฌธ์ ๋ก ๋๊ธฐ ์คํ | |
| - **I/O๋ง ๋น๋๊ธฐ**: ์ด๋ฏธ์ง ๋ก๋ฉ, DB ์์ ๋ง asyncio.to_thread() ์ฌ์ฉ | |
| ์ฑ๋ฅ ํน์ฑ | |
| -------- | |
| - 10ํ์ด์ง ์ฒ๋ฆฌ ์๊ฐ: ~60-90์ด (ํ์ด์ง๋น 6-9์ด) | |
| - max_concurrent_pages=8๋ก 8๊ฐ ํ์ด์ง ๋์ ์ฒ๋ฆฌ | |
| - CPU ๋ฐ์ด๋ ์์ ์ด ๋๋ถ๋ถ์ด๋ฏ๋ก CPU ์ฝ์ด ์์ ๋น๋กํ ์ฑ๋ฅ | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import io | |
| import os | |
| import threading | |
| import time | |
| from contextlib import asynccontextmanager | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional, Set | |
| import aiofiles | |
| import cv2 | |
| import numpy as np | |
| from loguru import logger | |
| from PIL import Image | |
| from sqlalchemy.orm import Session, selectinload | |
| from ..models import LayoutElement, Page, Project | |
| from .analysis_service import AnalysisService | |
| from .model_registry import model_registry | |
| from .formatter import TextFormatter | |
| from .mock_models import MockElement | |
| from .sorter import save_sorting_results_to_db, sort_layout_elements | |
| from .text_version_service import create_text_version | |
| # ----------------------------------------------------------------------------- | |
| # ๋ด๋ถ ์์ & ํฌํผ | |
| # ----------------------------------------------------------------------------- | |
| UPLOADS_ROOT = (Path(__file__).resolve().parents[2] / "uploads").resolve() | |
| DEFAULT_AI_CONCURRENCY = int(os.getenv("OPENAI_MAX_CONCURRENCY", "30")) # 15 โ 30 (OpenAI Rate Limit 500 RPM ๊ณ ๋ ค) | |
| DEFAULT_MAX_CONCURRENT_PAGES = int(os.getenv("MAX_CONCURRENT_PAGES", "4")) # CPU ํ๊ฒฝ ๊ธฐ๋ณธ๊ฐ (GPU ํ๊ฒฝ์์๋ 16-32) | |
| # ๋ชจ๋ธ ์ธ์คํด์ค ์บ์ (์ค๋ ๋ ์์ ํ ์ฑ๊ธํค ํจํด) | |
| _model_instances: Dict[str, AnalysisService] = {} | |
| _model_lock = threading.Lock() | |
| # ๋ฌธ์ ํ์ ๋ณ ๊ธฐ๋ณธ ๋ชจ๋ธ ๋งคํ | |
| DOC_TYPE_MODEL_MAP = { | |
| 1: "SmartEyeSsen", | |
| 2: "docstructbench", | |
| } | |
| DEFAULT_MODEL_CHOICE = "SmartEyeSsen" | |
| def _available_model_names() -> Set[str]: | |
| return set(model_registry.list_registered().keys()) | |
| def is_supported_model(model_name: str) -> bool: | |
| return model_name in _available_model_names() | |
| def resolve_model_choice( | |
| doc_type_id: Optional[int], | |
| requested_model: Optional[str] = None, | |
| ) -> str: | |
| """ | |
| doc_type ๋๋ ์ฌ์ฉ์ ์์ฒญ์ ๋ง๋ ๋ชจ๋ธ๋ช ์ ๋ฐํํฉ๋๋ค. | |
| Args: | |
| doc_type_id: document_types.doc_type_id | |
| requested_model: ์ฌ์ฉ์๊ฐ ๋ช ์์ ์ผ๋ก ์ง์ ํ ๋ชจ๋ธ ์ด๋ฆ | |
| Raises: | |
| ValueError: ์ง์๋์ง ์๋ ๋ชจ๋ธ๋ช ์ด ์์ฒญ๋ ๊ฒฝ์ฐ | |
| """ | |
| if requested_model: | |
| if not is_supported_model(requested_model): | |
| raise ValueError(f"์ง์ํ์ง ์๋ AI ๋ชจ๋ธ์ ๋๋ค: {requested_model}") | |
| return requested_model | |
| if doc_type_id in DOC_TYPE_MODEL_MAP: | |
| return DOC_TYPE_MODEL_MAP[doc_type_id] | |
| logger.warning( | |
| "์ ์ ์๋ doc_type_id ({})์ ๋ํด ๊ธฐ๋ณธ ๋ชจ๋ธ({})์ ์ฌ์ฉํฉ๋๋ค.", | |
| doc_type_id, | |
| DEFAULT_MODEL_CHOICE, | |
| ) | |
| return DEFAULT_MODEL_CHOICE | |
| def _get_analysis_service(model_choice: str = "SmartEyeSsen") -> AnalysisService: | |
| """ | |
| ๋ชจ๋ธ๋ณ๋ก ์ฑ๊ธํค ์ธ์คํด์ค๋ฅผ ๋ฐํํฉ๋๋ค. | |
| ์ค๋ ๋ ์์ ํ Double-checked locking ํจํด์ ์ฌ์ฉํ์ฌ | |
| ๋ณ๋ ฌ ์ฒ๋ฆฌ ์์๋ ๊ฐ ๋ชจ๋ธ๋น ํ๋์ ์ธ์คํด์ค๋ง ์์ฑ๋ฉ๋๋ค. | |
| ์ด๋ฅผ ํตํด ๋ค์์ ๋ณด์ฅํฉ๋๋ค: | |
| - ๋์ผ ๋ชจ๋ธ์ ๋ํด ๋ฉ๋ชจ๋ฆฌ์ ํ๋์ ์ธ์คํด์ค๋ง ์ ์ง | |
| - ํ๋ก ํธ์๋์์ ๋์ ์ผ๋ก ๋ค๋ฅธ ๋ชจ๋ธ ์ ํ ๊ฐ๋ฅ | |
| - ๋ณ๋ ฌ ์ฒ๋ฆฌ ์ ๋ชจ๋ธ ์ค๋ณต ๋ก๋ ๋ฐฉ์ง | |
| - ์ค๋ ๋ ์์ ์ฑ ํ๋ณด | |
| Args: | |
| model_choice: ๋ชจ๋ธ ์ ํ (๊ธฐ๋ณธ๊ฐ: "SmartEyeSsen") | |
| Returns: | |
| AnalysisService: ๋ชจ๋ธ ์ธ์คํด์ค (๋ชจ๋ธ๋ณ ์ฑ๊ธํค) | |
| Example: | |
| >>> # 4๊ฐ ํ์ด์ง ๋ณ๋ ฌ ์ฒ๋ฆฌ ์ | |
| >>> service1 = _get_analysis_service("SmartEyeSsen") # ์ ์ธ์คํด์ค ์์ฑ | |
| >>> service2 = _get_analysis_service("SmartEyeSsen") # ์บ์๋ ์ธ์คํด์ค ๋ฐํ | |
| >>> service3 = _get_analysis_service("YOLOv8") # ๋ค๋ฅธ ๋ชจ๋ธ ์ธ์คํด์ค ์์ฑ | |
| >>> assert service1 is service2 # True | |
| >>> assert service1 is not service3 # True | |
| """ | |
| # ๋น ๋ฅธ ๊ฒฝ๋ก: ์ด๋ฏธ ๋ก๋๋ ๊ฒฝ์ฐ ๋ฝ ์์ด ๋ฐํ (์ฑ๋ฅ ์ต์ ํ) | |
| if model_choice in _model_instances: | |
| logger.debug(f"โ ์บ์๋ AnalysisService ๋ฐํ: {model_choice}") | |
| return _model_instances[model_choice] | |
| # Double-checked locking ํจํด | |
| with _model_lock: | |
| # ๋ฝ ํ๋ ํ ๋ค์ ํ์ธ (๋ค๋ฅธ ์ค๋ ๋๊ฐ ์ด๋ฏธ ์์ฑํ์ ์ ์์) | |
| if model_choice in _model_instances: | |
| logger.debug(f"โ ์บ์๋ AnalysisService ๋ฐํ (๋ฝ ๋ด๋ถ): {model_choice}") | |
| return _model_instances[model_choice] | |
| # ๋ชจ๋ธ ์ธ์คํด์ค ์์ฑ (ํ ๋ฒ๋ง) | |
| logger.info(f"๐ง ์ AnalysisService ์ธ์คํด์ค ์์ฑ ์ค: model_choice={model_choice}") | |
| service = AnalysisService(model_choice=model_choice, auto_load=False) | |
| # ๋ชจ๋ธ ๋ก๋ (์ด๊ธฐํ) | |
| logger.info(f"๐ฆ ๋ชจ๋ธ ๋ก๋ ์์: {model_choice}") | |
| service._ensure_model_loaded() | |
| logger.info(f"โ ๋ชจ๋ธ ๋ก๋ ์๋ฃ: {model_choice}") | |
| # ์บ์์ ์ ์ฅ | |
| _model_instances[model_choice] = service | |
| logger.info( | |
| f"๐พ AnalysisService ์บ์ ์๋ฃ: {model_choice} " | |
| f"(์ด ์บ์๋ ๋ชจ๋ธ ์: {len(_model_instances)})" | |
| ) | |
| return service | |
| async def get_async_db_session(): | |
| """ | |
| ๋น๋๊ธฐ ์ปจํ ์คํธ์์ ์ฌ์ฉํ DB ์ธ์ ๊ด๋ฆฌ์. | |
| ์ปค๋ฅ์ ํ์์ ์ธ์ ์ ๊ฐ์ ธ์ ์ฌ์ฌ์ฉํ๊ณ , | |
| ์ค๋ฅ ๋ฐ์ ์ ์๋ ๋กค๋ฐฑ ์ฒ๋ฆฌํฉ๋๋ค. | |
| Yields: | |
| Session: SQLAlchemy ์ธ์ ๊ฐ์ฒด | |
| Example: | |
| >>> async with get_async_db_session() as session: | |
| ... page = session.query(Page).first() | |
| Note: | |
| ๋ณ๋ ฌ ์ฒ๋ฆฌ ์ ๊ฐ ์์ ๋ง๋ค ๋ ๋ฆฝ์ ์ธ ์ธ์ ์ ์ฌ์ฉํ์ฌ | |
| ์ธ์ ์ถฉ๋์ ๋ฐฉ์งํฉ๋๋ค. | |
| """ | |
| from ..database import SessionLocal | |
| session = SessionLocal() | |
| try: | |
| yield session | |
| await asyncio.to_thread(session.commit) | |
| except Exception: | |
| await asyncio.to_thread(session.rollback) | |
| raise | |
| finally: | |
| await asyncio.to_thread(session.close) | |
| def _resolve_image_path(image_path: str) -> Path: | |
| """ | |
| Page.image_path ๊ฐ์ ์ ๋ ๊ฒฝ๋ก๋ก ๋ณํํฉ๋๋ค. | |
| """ | |
| raw_path = Path(image_path) | |
| candidates = [] | |
| if raw_path.is_absolute(): | |
| candidates.append(raw_path) | |
| else: | |
| candidates.append((UPLOADS_ROOT / raw_path).resolve()) | |
| candidates.append((Path.cwd() / "uploads" / raw_path).resolve()) | |
| candidates.append((Path.cwd() / raw_path).resolve()) | |
| for candidate in candidates: | |
| if candidate.exists(): | |
| return candidate | |
| raise FileNotFoundError( | |
| "์ด๋ฏธ์ง ํ์ผ์ ์ฐพ์ ์ ์์ต๋๋ค. " | |
| f"ํ์ธ๋ ๊ฒฝ๋ก: {[str(path) for path in candidates]}" | |
| ) | |
| def _load_page_image(page: Page) -> np.ndarray: | |
| """ | |
| ํ์ด์ง ๊ฐ์ฒด์์ ์ด๋ฏธ์ง๋ฅผ ๋ก๋ํ๊ณ , ํด์๋ ์ ๋ณด๋ฅผ ๊ฐฑ์ ํฉ๋๋ค. | |
| Note: | |
| ๋๊ธฐ ๋ฐฉ์์ผ๋ก ์ด๋ฏธ์ง๋ฅผ ๋ก๋ํฉ๋๋ค. | |
| ๋น๋๊ธฐ ์ปจํ ์คํธ์์๋ _load_page_image_async() ์ฌ์ฉ ๊ถ์ฅ. | |
| """ | |
| resolved_path = _resolve_image_path(page.image_path) | |
| image = cv2.imread(str(resolved_path)) | |
| if image is None: | |
| raise ValueError(f"์ด๋ฏธ์ง ํ์ผ์ ์ฝ์ ์ ์์ต๋๋ค: {resolved_path}") | |
| height, width = image.shape[:2] | |
| if page.image_width != width or page.image_height != height: | |
| page.image_width = width | |
| page.image_height = height | |
| return image | |
| async def _load_page_image_async(page: Page) -> np.ndarray: | |
| """ | |
| ๋น๋๊ธฐ ๋ฐฉ์์ผ๋ก ์ด๋ฏธ์ง๋ฅผ ๋ก๋ํ๊ณ ํด์๋ ์ ๋ณด๋ฅผ ๊ฐฑ์ ํฉ๋๋ค. | |
| ๋์คํฌ I/O๋ฅผ ๋ ผ๋ธ๋กํน์ผ๋ก ์ฒ๋ฆฌํ์ฌ CPU ๋๊ธฐ ์๊ฐ์ ์ต์ํํฉ๋๋ค. | |
| CPU ์ง์ฝ์ ์ธ ๋์ฝ๋ฉ ์์ ์ ์ค๋ ๋ ํ๋ก ์์ํฉ๋๋ค. | |
| Args: | |
| page: ํ์ด์ง ๊ฐ์ฒด | |
| Returns: | |
| np.ndarray: OpenCV ํฌ๋งท ์ด๋ฏธ์ง (BGR) | |
| Raises: | |
| FileNotFoundError: ์ด๋ฏธ์ง ํ์ผ์ ์ฐพ์ ์ ์๋ ๊ฒฝ์ฐ | |
| ValueError: ์ด๋ฏธ์ง ๋์ฝ๋ฉ ์คํจ ์ | |
| Example: | |
| >>> image = await _load_page_image_async(page) | |
| >>> height, width = image.shape[:2] | |
| """ | |
| resolved_path = _resolve_image_path(page.image_path) | |
| # ๋น๋๊ธฐ ํ์ผ ์ฝ๊ธฐ (I/O ๋๊ธฐ ์๊ฐ ์ต์ํ) | |
| async with aiofiles.open(resolved_path, 'rb') as f: | |
| image_data = await f.read() | |
| # ์ด๋ฏธ์ง ๋์ฝ๋ฉ (CPU ๋ฐ์ด๋ ์์ ์ ์ค๋ ๋ ํ๋ก) | |
| def decode_image(data: bytes) -> np.ndarray: | |
| """PIL๋ก ๋์ฝ๋ฉ ํ OpenCV ํฌ๋งท์ผ๋ก ๋ณํ""" | |
| pil_image = Image.open(io.BytesIO(data)) | |
| # RGB โ BGR ๋ณํ (OpenCV ํฌ๋งท) | |
| return cv2.cvtColor(np.array(pil_image), cv2.COLOR_RGB2BGR) | |
| image = await asyncio.to_thread(decode_image, image_data) | |
| if image is None: | |
| raise ValueError(f"์ด๋ฏธ์ง ๋์ฝ๋ฉ ์คํจ: {resolved_path}") | |
| # ํด์๋ ์ ๋ณด ๊ฐฑ์ | |
| height, width = image.shape[:2] | |
| if page.image_width != width or page.image_height != height: | |
| page.image_width = width | |
| page.image_height = height | |
| return image | |
| def _layout_to_mock(elements: List[LayoutElement]) -> List[MockElement]: | |
| """ | |
| SQLAlchemy LayoutElement ๊ฐ์ฒด๋ฅผ sorter์์ ์ฌ์ฉํ๋ MockElement๋ก ๋ณํํฉ๋๋ค. | |
| """ | |
| mock_elements: List[MockElement] = [] | |
| for element in elements: | |
| mock = MockElement( | |
| element_id=element.element_id, | |
| class_name=element.class_name, | |
| confidence=float(element.confidence or 0.0), | |
| bbox_x=int(element.bbox_x), | |
| bbox_y=int(element.bbox_y), | |
| bbox_width=int(element.bbox_width), | |
| bbox_height=int(element.bbox_height), | |
| page_id=element.page_id, | |
| ) | |
| mock_elements.append(mock) | |
| return mock_elements | |
| def _sync_layout_runtime_fields( | |
| layout_elements: List[LayoutElement], | |
| mock_elements: List[MockElement], | |
| ) -> List[LayoutElement]: | |
| """ | |
| sorter๊ฐ ๊ณ์ฐํ order_in_question, group_id ๋ฑ์ ์ค์ LayoutElement์ ๋ฐ์ํฉ๋๋ค. | |
| """ | |
| element_map: Dict[int, LayoutElement] = { | |
| elem.element_id: elem for elem in layout_elements | |
| } | |
| synced_elements: List[LayoutElement] = [] | |
| for mock in mock_elements: | |
| target = element_map.get(mock.element_id) | |
| if not target: | |
| logger.warning( | |
| "์ ๋ ฌ ๊ฒฐ๊ณผ์ ์กด์ฌํ์ง๋ง DB์ ์๋ element_id={}", mock.element_id | |
| ) | |
| continue | |
| setattr(target, "order_in_question", getattr(mock, "order_in_question", None)) | |
| setattr(target, "group_id", getattr(mock, "group_id", None)) | |
| setattr(target, "order_in_group", getattr(mock, "order_in_group", None)) | |
| setattr(target, "y_position", getattr(mock, "y_position", target.bbox_y)) | |
| setattr(target, "x_position", getattr(mock, "x_position", target.bbox_x)) | |
| setattr( | |
| target, | |
| "area", | |
| getattr(mock, "area", target.bbox_width * target.bbox_height), | |
| ) | |
| synced_elements.append(target) | |
| return synced_elements | |
| def _update_page_status( | |
| page: Page, | |
| *, | |
| status: str, | |
| processing_time: float, | |
| ) -> None: | |
| """ | |
| ํ์ด์ง์ ์ํ/์ฒ๋ฆฌ์๊ฐ/๋ถ์ ์๋ฃ ์๊ฐ์ ๊ฐฑ์ ํฉ๋๋ค. | |
| """ | |
| page.analysis_status = status | |
| page.processing_time = processing_time | |
| page.analyzed_at = datetime.utcnow() | |
| def _update_project_status(project: Project, status: str) -> None: | |
| """ | |
| ํ๋ก์ ํธ ์ํ๋ฅผ ๊ฐฑ์ ํฉ๋๋ค. | |
| """ | |
| project.status = status | |
| project.updated_at = datetime.utcnow() | |
| async def _process_single_page_async( | |
| *, | |
| db: Session, | |
| project: Project, | |
| page: Page, | |
| formatter: TextFormatter, | |
| analysis_service: AnalysisService, | |
| use_ai_descriptions: bool, | |
| api_key: Optional[str], | |
| ai_max_concurrency: int = DEFAULT_AI_CONCURRENCY, | |
| ) -> Dict[str, Any]: | |
| """ | |
| ๊ฐ๋ณ ํ์ด์ง์ ๋ํ ์ ์ฒด ํ์ดํ๋ผ์ธ์ ์คํํ๊ณ ๊ฒฐ๊ณผ ์์ฝ์ ๋ฐํํฉ๋๋ค. | |
| """ | |
| logger.info( | |
| "ํ์ด์ง ๋ถ์ ์์: project_id={} / page_id={}", | |
| project.project_id, | |
| page.page_id, | |
| ) | |
| page_start = time.time() | |
| summary: Dict[str, Any] = { | |
| "page_id": page.page_id, | |
| "page_number": page.page_number, | |
| "status": "error", | |
| "message": "", | |
| "layout_count": 0, | |
| "ocr_count": 0, | |
| "ai_description_count": 0, | |
| "processing_time": 0.0, | |
| } | |
| try: | |
| # ๋น๋๊ธฐ ์ด๋ฏธ์ง ๋ก๋ฉ (I/O ๋๊ธฐ ์๊ฐ ์ต์ํ) | |
| image = await _load_page_image_async(page) | |
| # ๋ ์ด์์ ๋ถ์ (CPU ๋ฐ์ด๋ โ ๋๊ธฐ ์คํ) | |
| # โ ๏ธ OCR/๋ชจ๋ธ ์์ง์ ์ค๋ ๋ ์์ ํ์ง ์์ asyncio.to_thread() ์ฌ์ฉ ๋ถ๊ฐ | |
| layout_elements = analysis_service.analyze_layout( | |
| image=image, | |
| page_id=page.page_id, | |
| db=db, | |
| model_choice=analysis_service.model_choice, | |
| ) | |
| if not layout_elements: | |
| raise ValueError("๋ ์ด์์ ๋ถ์ ๊ฒฐ๊ณผ๊ฐ ๋น์ด ์์ต๋๋ค.") | |
| summary["layout_count"] = len(layout_elements) | |
| # OCR ์ํ (CPU ๋ฐ์ด๋ โ ๋๊ธฐ ์คํ) | |
| # โ ๏ธ Tesseract/EasyOCR์ ์ค๋ ๋ ์์ ํ์ง ์์ asyncio.to_thread() ์ฌ์ฉ ๋ถ๊ฐ | |
| text_contents = await analysis_service.perform_ocr_async( | |
| image=image, | |
| layout_elements=layout_elements, | |
| db=db, | |
| ) | |
| summary["ocr_count"] = len(text_contents) | |
| ai_descriptions: Dict[int, str] = {} | |
| if use_ai_descriptions: | |
| # API ํค: ์์ฒญ ํ๋ผ๋ฏธํฐ ์ฐ์ , ์์ผ๋ฉด ํ๊ฒฝ๋ณ์์์ ๋ก๋ | |
| effective_api_key = api_key or os.getenv("OPENAI_API_KEY") | |
| if effective_api_key: | |
| logger.info(f"AI ์ค๋ช ์์ฑ ์์: page_id={page.page_id}") | |
| try: | |
| ai_descriptions = await analysis_service.call_openai_api_async( | |
| image=image, | |
| layout_elements=layout_elements, | |
| api_key=effective_api_key, | |
| db=db, | |
| max_concurrent_requests=ai_max_concurrency, | |
| ) | |
| summary["ai_description_count"] = len(ai_descriptions) | |
| logger.info( | |
| f"AI ์ค๋ช ์์ฑ ์๋ฃ: {len(ai_descriptions)}๊ฐ ์์ ์ฒ๋ฆฌ" | |
| ) | |
| except Exception as ai_error: | |
| logger.error( | |
| "AI ์ค๋ช ์์ฑ ๋น๋๊ธฐ ์ฒ๋ฆฌ ์คํจ: page_id={} / error={}", | |
| page.page_id, | |
| ai_error, | |
| ) | |
| else: | |
| logger.warning( | |
| f"AI ์ค๋ช ์์ฑ ์์ฒญ๋์์ผ๋ API ํค๊ฐ ์์ต๋๋ค (page_id={page.page_id})" | |
| ) | |
| # ์ ๋ ฌ ์ค๋น (๋๊ธฐ ๋ณํ ์์ ) | |
| mock_elements = _layout_to_mock(layout_elements) | |
| # ์ ๋ ฌ (CPU ๋ฐ์ด๋ โ ๋๊ธฐ ์คํ) | |
| # ๋น ๋ฅธ ๊ณ์ฐ ์์ ์ด๋ฏ๋ก ์ค๋ ๋ ์ค๋ฒํค๋ ๋ถํ์ | |
| sorted_mock = sort_layout_elements( | |
| mock_elements, | |
| document_type=formatter.document_type, | |
| page_width=page.image_width or 0, | |
| page_height=page.image_height or 0, | |
| ) | |
| synced_layouts = _sync_layout_runtime_fields(layout_elements, sorted_mock) | |
| # DB ์ ์ฅ (๋๊ธฐ ์คํ - deadlock ๋ฐฉ์ง) | |
| save_sorting_results_to_db( | |
| db, | |
| page.page_id, | |
| synced_layouts, | |
| ) | |
| # ํฌ๋งทํ (CPU ๋ฐ์ด๋ โ ๋๊ธฐ ์คํ) | |
| # ๋น ๋ฅธ ํ ์คํธ ์ฒ๋ฆฌ์ด๋ฏ๋ก ์ค๋ ๋ ์ค๋ฒํค๋ ๋ถํ์ | |
| formatted_text = formatter.format_page( | |
| synced_layouts, | |
| text_contents, | |
| ai_descriptions=ai_descriptions, | |
| ) | |
| # ํ ์คํธ ๋ฒ์ ์์ฑ (DB I/O) | |
| create_text_version( | |
| db, | |
| page, | |
| formatted_text or "", | |
| ) | |
| # ์ต์ข ์ํ ์ ๋ฐ์ดํธ | |
| processing_time = time.time() - page_start | |
| _update_page_status(page, status="completed", processing_time=processing_time) | |
| summary["status"] = "completed" | |
| summary["processing_time"] = processing_time | |
| summary["message"] = "success" | |
| # DB ์ปค๋ฐ (๋๊ธฐ ์คํ - deadlock ๋ฐฉ์ง) | |
| db.commit() | |
| return summary | |
| except Exception as error: # pylint: disable=broad-except | |
| logger.error(f"ํ์ด์ง ๋ถ์ ์คํจ: page_id={page.page_id} / error={str(error)}") | |
| logger.exception("์์ธ ์คํ ํธ๋ ์ด์ค:") # ์ ์ฒด ์คํ ์ถ๋ ฅ | |
| # DB ๋กค๋ฐฑ (๋๊ธฐ ์คํ - deadlock ๋ฐฉ์ง) | |
| db.rollback() | |
| processing_time = time.time() - page_start | |
| _update_page_status(page, status="error", processing_time=processing_time) | |
| summary["processing_time"] = processing_time | |
| summary["message"] = str(error) | |
| # DB ์ปค๋ฐ (๋๊ธฐ ์คํ - deadlock ๋ฐฉ์ง) | |
| db.commit() | |
| return summary | |
| def _process_single_page( | |
| *, | |
| db: Session, | |
| project: Project, | |
| page: Page, | |
| formatter: TextFormatter, | |
| analysis_service: AnalysisService, | |
| use_ai_descriptions: bool, | |
| api_key: Optional[str], | |
| ai_max_concurrency: int = DEFAULT_AI_CONCURRENCY, | |
| ) -> Dict[str, Any]: | |
| """ | |
| ๋๊ธฐ ์ปจํ ์คํธ ํธํ์ฉ ๋ํผ. | |
| """ | |
| return asyncio.run( | |
| _process_single_page_async( | |
| db=db, | |
| project=project, | |
| page=page, | |
| formatter=formatter, | |
| analysis_service=analysis_service, | |
| use_ai_descriptions=use_ai_descriptions, | |
| api_key=api_key, | |
| ai_max_concurrency=ai_max_concurrency, | |
| ) | |
| ) | |
| # ----------------------------------------------------------------------------- | |
| # ๊ณต๊ฐ API | |
| # ----------------------------------------------------------------------------- | |
| async def analyze_project_batch_async( | |
| db: Session, | |
| project_id: int, | |
| *, | |
| use_ai_descriptions: bool = True, | |
| api_key: Optional[str] = None, | |
| ai_max_concurrency: int = DEFAULT_AI_CONCURRENCY, | |
| analysis_model: Optional[str] = None, | |
| ) -> Dict[str, Any]: | |
| """ | |
| ํ๋ก์ ํธ ๋ด 'pending' ์ํ ํ์ด์ง๋ฅผ ์์ฐจ์ ์ผ๋ก ๋ถ์ํ๊ณ ๊ฒฐ๊ณผ ์์ฝ์ ๋ฐํํฉ๋๋ค. | |
| """ | |
| logger.info("ํ๋ก์ ํธ ๋ฐฐ์น ๋ถ์ ์์: project_id={}", project_id) | |
| started_at = time.time() | |
| project = ( | |
| db.query(Project) | |
| .options(selectinload(Project.pages)) | |
| .filter(Project.project_id == project_id) | |
| .one_or_none() | |
| ) | |
| if not project: | |
| raise ValueError(f"ํ๋ก์ ํธ ID {project_id}๋ฅผ ์ฐพ์ ์ ์์ต๋๋ค.") | |
| pending_pages = [ | |
| page for page in project.pages if page.analysis_status in {"pending", "error"} | |
| ] | |
| pending_pages.sort(key=lambda p: p.page_number) | |
| result_summary: Dict[str, Any] = { | |
| "project_id": project.project_id, | |
| "project_status_before": project.status, | |
| "processed_pages": 0, | |
| "successful_pages": 0, | |
| "failed_pages": 0, | |
| "total_pages": len(pending_pages), | |
| "status": "completed" if pending_pages else "no_pending_pages", | |
| "page_results": [], | |
| "total_time": 0.0, | |
| } | |
| if not pending_pages: | |
| logger.warning("๋ถ์ํ ํ์ด์ง๊ฐ ์์ต๋๋ค. project_id={}", project.project_id) | |
| return result_summary | |
| _update_project_status(project, "in_progress") | |
| db.commit() | |
| model_choice = resolve_model_choice(project.doc_type_id, analysis_model) | |
| logger.info( | |
| "ํ๋ก์ ํธ ๋ถ์ ๋ชจ๋ธ ์ ํ: project_id={}, doc_type_id={}, model={}", | |
| project.project_id, | |
| project.doc_type_id, | |
| model_choice, | |
| ) | |
| analysis_service = _get_analysis_service(model_choice) | |
| formatter = TextFormatter( | |
| doc_type_id=project.doc_type_id, | |
| db=db, | |
| use_db_rules=True, | |
| ) | |
| for page in pending_pages: | |
| page_summary = await _process_single_page_async( | |
| db=db, | |
| project=project, | |
| page=page, | |
| formatter=formatter, | |
| analysis_service=analysis_service, | |
| use_ai_descriptions=use_ai_descriptions, | |
| api_key=api_key, | |
| ai_max_concurrency=ai_max_concurrency, | |
| ) | |
| result_summary["page_results"].append(page_summary) | |
| result_summary["processed_pages"] += 1 | |
| if page_summary["status"] == "completed": | |
| result_summary["successful_pages"] += 1 | |
| else: | |
| result_summary["failed_pages"] += 1 | |
| if result_summary["failed_pages"] == 0: | |
| final_status = "completed" | |
| elif result_summary["successful_pages"] == 0: | |
| final_status = "error" | |
| else: | |
| # ์ผ๋ถ ์ฑ๊ณต, ์ผ๋ถ ์คํจ โ in_progress๋ก ํ์ | |
| final_status = "in_progress" | |
| _update_project_status(project, final_status) | |
| db.commit() | |
| result_summary["status"] = final_status | |
| result_summary["project_status_after"] = project.status | |
| result_summary["total_time"] = time.time() - started_at | |
| logger.info( | |
| "ํ๋ก์ ํธ ๋ฐฐ์น ๋ถ์ ์ข ๋ฃ: project_id={} / status={} / success={} / fail={} / {:.2f}s", | |
| project.project_id, | |
| final_status, | |
| result_summary["successful_pages"], | |
| result_summary["failed_pages"], | |
| result_summary["total_time"], | |
| ) | |
| return result_summary | |
| def analyze_project_batch( | |
| db: Session, | |
| project_id: int, | |
| *, | |
| use_ai_descriptions: bool = True, | |
| api_key: Optional[str] = None, | |
| ai_max_concurrency: int = DEFAULT_AI_CONCURRENCY, | |
| analysis_model: Optional[str] = None, | |
| ) -> Dict[str, Any]: | |
| """ | |
| ๋๊ธฐ ์ปจํ ์คํธ ํธํ์ฉ ๋ํผ. | |
| """ | |
| return asyncio.run( | |
| analyze_project_batch_async( | |
| db=db, | |
| project_id=project_id, | |
| use_ai_descriptions=use_ai_descriptions, | |
| api_key=api_key, | |
| ai_max_concurrency=ai_max_concurrency, | |
| analysis_model=analysis_model, | |
| ) | |
| ) | |
| async def analyze_project_batch_async_parallel( | |
| db: Session, | |
| project_id: int, | |
| *, | |
| use_ai_descriptions: bool = True, | |
| api_key: Optional[str] = None, | |
| ai_max_concurrency: int = DEFAULT_AI_CONCURRENCY, | |
| max_concurrent_pages: int = 8, | |
| analysis_model: Optional[str] = None, | |
| ) -> Dict[str, Any]: | |
| """ | |
| ํ๋ก์ ํธ ๋ด 'pending' ์ํ ํ์ด์ง๋ฅผ ๋ณ๋ ฌ๋ก ๋ถ์ํ๊ณ ๊ฒฐ๊ณผ ์์ฝ์ ๋ฐํํฉ๋๋ค. | |
| Args: | |
| db: ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ธ์ | |
| project_id: ํ๋ก์ ํธ ID | |
| use_ai_descriptions: AI ์ค๋ช ์์ฑ ์ฌ๋ถ | |
| api_key: OpenAI API ํค | |
| ai_max_concurrency: AI API ์ต๋ ๋์ ์์ฒญ ์ | |
| max_concurrent_pages: ์ต๋ ๋์ ์ฒ๋ฆฌ ํ์ด์ง ์ (๊ธฐ๋ณธ๊ฐ: 8) | |
| Returns: | |
| ๋ถ์ ๊ฒฐ๊ณผ ์์ฝ | |
| Note: | |
| ๊ธฐ์กด analyze_project_batch_async์ ๋์ผํ ๊ธฐ๋ฅ์ด์ง๋ง, | |
| ์ฌ๋ฌ ํ์ด์ง๋ฅผ ๋์์ ๋ณ๋ ฌ๋ก ์ฒ๋ฆฌํ์ฌ ์๋๋ฅผ ํฅ์์ํต๋๋ค. | |
| max_concurrent_pages ๊ฐ์ ์กฐ์ ํ์ฌ ์์คํ ๋ฆฌ์์ค์ ๋ง๊ฒ ์ต์ ํํ ์ ์์ต๋๋ค. | |
| """ | |
| logger.info( | |
| "ํ๋ก์ ํธ ๋ณ๋ ฌ ๋ฐฐ์น ๋ถ์ ์์: project_id={}, max_concurrent={}", | |
| project_id, | |
| max_concurrent_pages, | |
| ) | |
| started_at = time.time() | |
| project = ( | |
| db.query(Project) | |
| .options(selectinload(Project.pages)) | |
| .filter(Project.project_id == project_id) | |
| .one_or_none() | |
| ) | |
| if not project: | |
| raise ValueError(f"ํ๋ก์ ํธ ID {project_id}๋ฅผ ์ฐพ์ ์ ์์ต๋๋ค.") | |
| pending_pages = [ | |
| page for page in project.pages if page.analysis_status in {"pending", "error"} | |
| ] | |
| pending_pages.sort(key=lambda p: p.page_number) | |
| result_summary: Dict[str, Any] = { | |
| "project_id": project.project_id, | |
| "project_status_before": project.status, | |
| "processed_pages": 0, | |
| "successful_pages": 0, | |
| "failed_pages": 0, | |
| "total_pages": len(pending_pages), | |
| "status": "completed" if pending_pages else "no_pending_pages", | |
| "page_results": [], | |
| "total_time": 0.0, | |
| "processing_mode": "parallel", | |
| } | |
| if not pending_pages: | |
| logger.warning("๋ถ์ํ ํ์ด์ง๊ฐ ์์ต๋๋ค. project_id={}", project.project_id) | |
| return result_summary | |
| _update_project_status(project, "in_progress") | |
| db.commit() | |
| model_choice = resolve_model_choice(project.doc_type_id, analysis_model) | |
| logger.info( | |
| "๋ณ๋ ฌ ํ๋ก์ ํธ ๋ถ์ ๋ชจ๋ธ ์ ํ: project_id={}, doc_type_id={}, model={}", | |
| project.project_id, | |
| project.doc_type_id, | |
| model_choice, | |
| ) | |
| analysis_service = _get_analysis_service(model_choice) | |
| formatter = TextFormatter( | |
| doc_type_id=project.doc_type_id, | |
| db=db, | |
| use_db_rules=True, | |
| ) | |
| # Semaphore๋ก ๋์ ์คํ ์ ์ด | |
| semaphore = asyncio.Semaphore(max_concurrent_pages) | |
| async def process_with_semaphore(page: Page) -> Dict[str, Any]: | |
| """ | |
| Semaphore๋ฅผ ์ฌ์ฉํ์ฌ ๋์ ์คํ ์๋ฅผ ์ ํํ๋ฉด์ ํ์ด์ง ์ฒ๋ฆฌ | |
| ๊ฐ ํ์ด์ง ๋ถ์ ์์ ๋ง๋ค ๋ ๋ฆฝ์ ์ธ DB ์ธ์ ์ ์์ฑํ์ฌ | |
| ๋ณ๋ ฌ ์ฒ๋ฆฌ ์ ์ธ์ ์ถฉ๋์ ๋ฐฉ์งํฉ๋๋ค. | |
| get_async_db_session() ์ปจํ ์คํธ ๋งค๋์ ๋ฅผ ์ฌ์ฉํ์ฌ | |
| ์๋ commit/rollback ์ฒ๋ฆฌ ๋ฐ ์ธ์ ์ค๋ฒํค๋๋ฅผ ๊ฐ์์ํต๋๋ค. | |
| """ | |
| async with semaphore: | |
| # ๋น๋๊ธฐ DB ์ธ์ ์ปจํ ์คํธ ๋งค๋์ ์ฌ์ฉ | |
| async with get_async_db_session() as task_db: | |
| # ์ธ์ ์์ ํ์ด์ง ์ฌ๋ก๋ (๋ค๋ฅธ ์ธ์ ์์ ๊ฐ์ ธ์จ ๊ฐ์ฒด์ด๋ฏ๋ก) | |
| task_page = await asyncio.to_thread( | |
| task_db.query(Page).filter(Page.page_id == page.page_id).first | |
| ) | |
| task_project = await asyncio.to_thread( | |
| task_db.query(Project).filter(Project.project_id == project.project_id).first | |
| ) | |
| if not task_page or not task_project: | |
| raise ValueError(f"ํ์ด์ง ๋๋ ํ๋ก์ ํธ๋ฅผ ์ฐพ์ ์ ์์ต๋๋ค: page_id={page.page_id}") | |
| return await _process_single_page_async( | |
| db=task_db, | |
| project=task_project, | |
| page=task_page, | |
| formatter=formatter, | |
| analysis_service=analysis_service, | |
| use_ai_descriptions=use_ai_descriptions, | |
| api_key=api_key, | |
| ai_max_concurrency=ai_max_concurrency, | |
| ) | |
| # ๋ชจ๋ ํ์ด์ง๋ฅผ ๋ณ๋ ฌ๋ก ์ฒ๋ฆฌ | |
| logger.info(f"์ด {len(pending_pages)}๊ฐ ํ์ด์ง๋ฅผ ์ต๋ {max_concurrent_pages}๊ฐ์ฉ ๋ณ๋ ฌ ์ฒ๋ฆฌ ์์") | |
| tasks = [process_with_semaphore(page) for page in pending_pages] | |
| page_results = await asyncio.gather(*tasks, return_exceptions=True) | |
| # ๊ฒฐ๊ณผ ์ง๊ณ | |
| for page_result in page_results: | |
| if isinstance(page_result, Exception): | |
| logger.error(f"ํ์ด์ง ์ฒ๋ฆฌ ์ค ์์ธ ๋ฐ์: {page_result}") | |
| result_summary["page_results"].append({ | |
| "status": "error", | |
| "message": str(page_result), | |
| }) | |
| result_summary["failed_pages"] += 1 | |
| else: | |
| result_summary["page_results"].append(page_result) | |
| if page_result["status"] == "completed": | |
| result_summary["successful_pages"] += 1 | |
| else: | |
| result_summary["failed_pages"] += 1 | |
| result_summary["processed_pages"] += 1 | |
| # ์ต์ข ์ํ ๊ฒฐ์ | |
| if result_summary["failed_pages"] == 0: | |
| final_status = "completed" | |
| elif result_summary["successful_pages"] == 0: | |
| final_status = "error" | |
| else: | |
| # ์ผ๋ถ ์ฑ๊ณต, ์ผ๋ถ ์คํจ โ in_progress๋ก ํ์ | |
| final_status = "in_progress" | |
| _update_project_status(project, final_status) | |
| db.commit() | |
| result_summary["status"] = final_status | |
| result_summary["project_status_after"] = project.status | |
| result_summary["total_time"] = time.time() - started_at | |
| logger.info( | |
| "ํ๋ก์ ํธ ๋ณ๋ ฌ ๋ฐฐ์น ๋ถ์ ์ข ๋ฃ: project_id={} / status={} / success={} / fail={} / {:.2f}s", | |
| project.project_id, | |
| final_status, | |
| result_summary["successful_pages"], | |
| result_summary["failed_pages"], | |
| result_summary["total_time"], | |
| ) | |
| return result_summary | |
| def analyze_project_batch_parallel( | |
| db: Session, | |
| project_id: int, | |
| *, | |
| use_ai_descriptions: bool = True, | |
| api_key: Optional[str] = None, | |
| ai_max_concurrency: int = DEFAULT_AI_CONCURRENCY, | |
| max_concurrent_pages: int = DEFAULT_MAX_CONCURRENT_PAGES, | |
| analysis_model: Optional[str] = None, | |
| ) -> Dict[str, Any]: | |
| """ | |
| ๋๊ธฐ ์ปจํ ์คํธ ํธํ์ฉ ๋ํผ (๋ณ๋ ฌ ์ฒ๋ฆฌ ๋ฒ์ ). | |
| """ | |
| return asyncio.run( | |
| analyze_project_batch_async_parallel( | |
| db=db, | |
| project_id=project_id, | |
| use_ai_descriptions=use_ai_descriptions, | |
| api_key=api_key, | |
| ai_max_concurrency=ai_max_concurrency, | |
| max_concurrent_pages=max_concurrent_pages, | |
| analysis_model=analysis_model, | |
| ) | |
| ) | |
| __all__ = [ | |
| "analyze_project_batch", | |
| "analyze_project_batch_async", | |
| "analyze_project_batch_parallel", | |
| "analyze_project_batch_async_parallel", | |
| "_get_analysis_service", | |
| "_process_single_page", | |
| "_process_single_page_async", | |
| "DEFAULT_AI_CONCURRENCY", | |
| "is_supported_model", | |
| "resolve_model_choice", | |
| ] | |