smarteye-backend / app /services /batch_analysis.py
AkJeond's picture
feat: Gemini OCR ๋น„๋™๊ธฐ ์ฒ˜๋ฆฌ ์ถ”๊ฐ€ ๋ฐ ํ™˜๊ฒฝ ๋ณ€์ˆ˜ ์„ค์ • ๊ฐœ์„ 
273b109
"""
Project Batch Analysis Service
=============================
์‹ค์ œ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค(DB) ๊ธฐ๋ฐ˜์œผ๋กœ ํ”„๋กœ์ ํŠธ ๋‚ด ํŽ˜์ด์ง€๋“ค์„ ์ˆœ์ฐจ์ ์œผ๋กœ ๋ถ„์„ํ•˜๊ณ 
์ •๋ ฌ(Question Grouping) ๋ฐ ํฌ๋งทํŒ…(Text Version ์ƒ์„ฑ)๊นŒ์ง€ ์ˆ˜ํ–‰ํ•ฉ๋‹ˆ๋‹ค.
ํŒŒ์ดํ”„๋ผ์ธ (ํŽ˜์ด์ง€ ๋‹จ์œ„)
1. ์ด๋ฏธ์ง€ ๋กœ๋“œ
2. AnalysisService๋กœ ๋ ˆ์ด์•„์›ƒ โ†’ OCR โ†’ (์„ ํƒ) AI ์„ค๋ช… ์ƒ์„ฑ
3. sorter.py๋ฅผ ์ด์šฉํ•œ ์ •๋ ฌ ํ›„ question_groups / question_elements ์ €์žฅ
4. TextFormatter๋กœ ์ž๋™ ํฌ๋งทํŒ… โ†’ text_versions์— ์ตœ์‹  ๋ฒ„์ „ ๊ธฐ๋ก
๊ฒฐ๊ณผ๋Š” ํŽ˜์ด์ง€๋ณ„ ์š”์•ฝ ์ •๋ณด์™€ ํ•จ๊ป˜ ํ”„๋กœ์ ํŠธ ์ƒํƒœ๋ฅผ ๊ฐฑ์‹ ํ•ฉ๋‹ˆ๋‹ค.
๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ ์ „๋žต
-------------
- **ํŽ˜์ด์ง€ ๋ ˆ๋ฒจ ๋ณ‘๋ ฌ**: ์—ฌ๋Ÿฌ ํŽ˜์ด์ง€๋ฅผ ๋™์‹œ์— ์ฒ˜๋ฆฌ (asyncio.gather + Semaphore)
- **ํŽ˜์ด์ง€ ๋‚ด๋ถ€ ์ˆœ์ฐจ**: ๋ ˆ์ด์•„์›ƒ/OCR/์ •๋ ฌ์€ ๋™๊ธฐ ์‹คํ–‰
* Tesseract/EasyOCR ์—”์ง„์ด ์Šค๋ ˆ๋“œ ์•ˆ์ „ํ•˜์ง€ ์•Š์•„ asyncio.to_thread() ์‚ฌ์šฉ ๋ถˆ๊ฐ€
* YOLO ๋ชจ๋ธ๋„ ์Šค๋ ˆ๋“œ ์•ˆ์ „์„ฑ ๋ฌธ์ œ๋กœ ๋™๊ธฐ ์‹คํ–‰
- **I/O๋งŒ ๋น„๋™๊ธฐ**: ์ด๋ฏธ์ง€ ๋กœ๋”ฉ, DB ์ž‘์—…๋งŒ asyncio.to_thread() ์‚ฌ์šฉ
์„ฑ๋Šฅ ํŠน์„ฑ
--------
- 10ํŽ˜์ด์ง€ ์ฒ˜๋ฆฌ ์‹œ๊ฐ„: ~60-90์ดˆ (ํŽ˜์ด์ง€๋‹น 6-9์ดˆ)
- max_concurrent_pages=8๋กœ 8๊ฐœ ํŽ˜์ด์ง€ ๋™์‹œ ์ฒ˜๋ฆฌ
- CPU ๋ฐ”์šด๋“œ ์ž‘์—…์ด ๋Œ€๋ถ€๋ถ„์ด๋ฏ€๋กœ CPU ์ฝ”์–ด ์ˆ˜์— ๋น„๋ก€ํ•œ ์„ฑ๋Šฅ
"""
from __future__ import annotations
import asyncio
import io
import os
import threading
import time
from contextlib import asynccontextmanager
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Set
import aiofiles
import cv2
import numpy as np
from loguru import logger
from PIL import Image
from sqlalchemy.orm import Session, selectinload
from ..models import LayoutElement, Page, Project
from .analysis_service import AnalysisService
from .model_registry import model_registry
from .formatter import TextFormatter
from .mock_models import MockElement
from .sorter import save_sorting_results_to_db, sort_layout_elements
from .text_version_service import create_text_version
# -----------------------------------------------------------------------------
# ๋‚ด๋ถ€ ์ƒ์ˆ˜ & ํ—ฌํผ
# -----------------------------------------------------------------------------
UPLOADS_ROOT = (Path(__file__).resolve().parents[2] / "uploads").resolve()
DEFAULT_AI_CONCURRENCY = int(os.getenv("OPENAI_MAX_CONCURRENCY", "30")) # 15 โ†’ 30 (OpenAI Rate Limit 500 RPM ๊ณ ๋ ค)
DEFAULT_MAX_CONCURRENT_PAGES = int(os.getenv("MAX_CONCURRENT_PAGES", "4")) # CPU ํ™˜๊ฒฝ ๊ธฐ๋ณธ๊ฐ’ (GPU ํ™˜๊ฒฝ์—์„œ๋Š” 16-32)
# ๋ชจ๋ธ ์ธ์Šคํ„ด์Šค ์บ์‹œ (์Šค๋ ˆ๋“œ ์•ˆ์ „ํ•œ ์‹ฑ๊ธ€ํ†ค ํŒจํ„ด)
_model_instances: Dict[str, AnalysisService] = {}
_model_lock = threading.Lock()
# ๋ฌธ์„œ ํƒ€์ž…๋ณ„ ๊ธฐ๋ณธ ๋ชจ๋ธ ๋งคํ•‘
DOC_TYPE_MODEL_MAP = {
1: "SmartEyeSsen",
2: "docstructbench",
}
DEFAULT_MODEL_CHOICE = "SmartEyeSsen"
def _available_model_names() -> Set[str]:
return set(model_registry.list_registered().keys())
def is_supported_model(model_name: str) -> bool:
return model_name in _available_model_names()
def resolve_model_choice(
doc_type_id: Optional[int],
requested_model: Optional[str] = None,
) -> str:
"""
doc_type ๋˜๋Š” ์‚ฌ์šฉ์ž ์š”์ฒญ์— ๋งž๋Š” ๋ชจ๋ธ๋ช…์„ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค.
Args:
doc_type_id: document_types.doc_type_id
requested_model: ์‚ฌ์šฉ์ž๊ฐ€ ๋ช…์‹œ์ ์œผ๋กœ ์ง€์ •ํ•œ ๋ชจ๋ธ ์ด๋ฆ„
Raises:
ValueError: ์ง€์›๋˜์ง€ ์•Š๋Š” ๋ชจ๋ธ๋ช…์ด ์š”์ฒญ๋œ ๊ฒฝ์šฐ
"""
if requested_model:
if not is_supported_model(requested_model):
raise ValueError(f"์ง€์›ํ•˜์ง€ ์•Š๋Š” AI ๋ชจ๋ธ์ž…๋‹ˆ๋‹ค: {requested_model}")
return requested_model
if doc_type_id in DOC_TYPE_MODEL_MAP:
return DOC_TYPE_MODEL_MAP[doc_type_id]
logger.warning(
"์•Œ ์ˆ˜ ์—†๋Š” doc_type_id ({})์— ๋Œ€ํ•ด ๊ธฐ๋ณธ ๋ชจ๋ธ({})์„ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค.",
doc_type_id,
DEFAULT_MODEL_CHOICE,
)
return DEFAULT_MODEL_CHOICE
def _get_analysis_service(model_choice: str = "SmartEyeSsen") -> AnalysisService:
"""
๋ชจ๋ธ๋ณ„๋กœ ์‹ฑ๊ธ€ํ†ค ์ธ์Šคํ„ด์Šค๋ฅผ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค.
์Šค๋ ˆ๋“œ ์•ˆ์ „ํ•œ Double-checked locking ํŒจํ„ด์„ ์‚ฌ์šฉํ•˜์—ฌ
๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ ์‹œ์—๋„ ๊ฐ ๋ชจ๋ธ๋‹น ํ•˜๋‚˜์˜ ์ธ์Šคํ„ด์Šค๋งŒ ์ƒ์„ฑ๋ฉ๋‹ˆ๋‹ค.
์ด๋ฅผ ํ†ตํ•ด ๋‹ค์Œ์„ ๋ณด์žฅํ•ฉ๋‹ˆ๋‹ค:
- ๋™์ผ ๋ชจ๋ธ์— ๋Œ€ํ•ด ๋ฉ”๋ชจ๋ฆฌ์— ํ•˜๋‚˜์˜ ์ธ์Šคํ„ด์Šค๋งŒ ์œ ์ง€
- ํ”„๋ก ํŠธ์—”๋“œ์—์„œ ๋™์ ์œผ๋กœ ๋‹ค๋ฅธ ๋ชจ๋ธ ์„ ํƒ ๊ฐ€๋Šฅ
- ๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ ์‹œ ๋ชจ๋ธ ์ค‘๋ณต ๋กœ๋“œ ๋ฐฉ์ง€
- ์Šค๋ ˆ๋“œ ์•ˆ์ „์„ฑ ํ™•๋ณด
Args:
model_choice: ๋ชจ๋ธ ์„ ํƒ (๊ธฐ๋ณธ๊ฐ’: "SmartEyeSsen")
Returns:
AnalysisService: ๋ชจ๋ธ ์ธ์Šคํ„ด์Šค (๋ชจ๋ธ๋ณ„ ์‹ฑ๊ธ€ํ†ค)
Example:
>>> # 4๊ฐœ ํŽ˜์ด์ง€ ๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ ์‹œ
>>> service1 = _get_analysis_service("SmartEyeSsen") # ์ƒˆ ์ธ์Šคํ„ด์Šค ์ƒ์„ฑ
>>> service2 = _get_analysis_service("SmartEyeSsen") # ์บ์‹œ๋œ ์ธ์Šคํ„ด์Šค ๋ฐ˜ํ™˜
>>> service3 = _get_analysis_service("YOLOv8") # ๋‹ค๋ฅธ ๋ชจ๋ธ ์ธ์Šคํ„ด์Šค ์ƒ์„ฑ
>>> assert service1 is service2 # True
>>> assert service1 is not service3 # True
"""
# ๋น ๋ฅธ ๊ฒฝ๋กœ: ์ด๋ฏธ ๋กœ๋“œ๋œ ๊ฒฝ์šฐ ๋ฝ ์—†์ด ๋ฐ˜ํ™˜ (์„ฑ๋Šฅ ์ตœ์ ํ™”)
if model_choice in _model_instances:
logger.debug(f"โœ… ์บ์‹œ๋œ AnalysisService ๋ฐ˜ํ™˜: {model_choice}")
return _model_instances[model_choice]
# Double-checked locking ํŒจํ„ด
with _model_lock:
# ๋ฝ ํš๋“ ํ›„ ๋‹ค์‹œ ํ™•์ธ (๋‹ค๋ฅธ ์Šค๋ ˆ๋“œ๊ฐ€ ์ด๋ฏธ ์ƒ์„ฑํ–ˆ์„ ์ˆ˜ ์žˆ์Œ)
if model_choice in _model_instances:
logger.debug(f"โœ… ์บ์‹œ๋œ AnalysisService ๋ฐ˜ํ™˜ (๋ฝ ๋‚ด๋ถ€): {model_choice}")
return _model_instances[model_choice]
# ๋ชจ๋ธ ์ธ์Šคํ„ด์Šค ์ƒ์„ฑ (ํ•œ ๋ฒˆ๋งŒ)
logger.info(f"๐Ÿ”ง ์ƒˆ AnalysisService ์ธ์Šคํ„ด์Šค ์ƒ์„ฑ ์ค‘: model_choice={model_choice}")
service = AnalysisService(model_choice=model_choice, auto_load=False)
# ๋ชจ๋ธ ๋กœ๋“œ (์ดˆ๊ธฐํ™”)
logger.info(f"๐Ÿ“ฆ ๋ชจ๋ธ ๋กœ๋“œ ์‹œ์ž‘: {model_choice}")
service._ensure_model_loaded()
logger.info(f"โœ… ๋ชจ๋ธ ๋กœ๋“œ ์™„๋ฃŒ: {model_choice}")
# ์บ์‹œ์— ์ €์žฅ
_model_instances[model_choice] = service
logger.info(
f"๐Ÿ’พ AnalysisService ์บ์‹œ ์™„๋ฃŒ: {model_choice} "
f"(์ด ์บ์‹œ๋œ ๋ชจ๋ธ ์ˆ˜: {len(_model_instances)})"
)
return service
@asynccontextmanager
async def get_async_db_session():
"""
๋น„๋™๊ธฐ ์ปจํ…์ŠคํŠธ์—์„œ ์‚ฌ์šฉํ•  DB ์„ธ์…˜ ๊ด€๋ฆฌ์ž.
์ปค๋„ฅ์…˜ ํ’€์—์„œ ์„ธ์…˜์„ ๊ฐ€์ ธ์™€ ์žฌ์‚ฌ์šฉํ•˜๊ณ ,
์˜ค๋ฅ˜ ๋ฐœ์ƒ ์‹œ ์ž๋™ ๋กค๋ฐฑ ์ฒ˜๋ฆฌํ•ฉ๋‹ˆ๋‹ค.
Yields:
Session: SQLAlchemy ์„ธ์…˜ ๊ฐ์ฒด
Example:
>>> async with get_async_db_session() as session:
... page = session.query(Page).first()
Note:
๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ ์‹œ ๊ฐ ์ž‘์—…๋งˆ๋‹ค ๋…๋ฆฝ์ ์ธ ์„ธ์…˜์„ ์‚ฌ์šฉํ•˜์—ฌ
์„ธ์…˜ ์ถฉ๋Œ์„ ๋ฐฉ์ง€ํ•ฉ๋‹ˆ๋‹ค.
"""
from ..database import SessionLocal
session = SessionLocal()
try:
yield session
await asyncio.to_thread(session.commit)
except Exception:
await asyncio.to_thread(session.rollback)
raise
finally:
await asyncio.to_thread(session.close)
def _resolve_image_path(image_path: str) -> Path:
"""
Page.image_path ๊ฐ’์„ ์ ˆ๋Œ€ ๊ฒฝ๋กœ๋กœ ๋ณ€ํ™˜ํ•ฉ๋‹ˆ๋‹ค.
"""
raw_path = Path(image_path)
candidates = []
if raw_path.is_absolute():
candidates.append(raw_path)
else:
candidates.append((UPLOADS_ROOT / raw_path).resolve())
candidates.append((Path.cwd() / "uploads" / raw_path).resolve())
candidates.append((Path.cwd() / raw_path).resolve())
for candidate in candidates:
if candidate.exists():
return candidate
raise FileNotFoundError(
"์ด๋ฏธ์ง€ ํŒŒ์ผ์„ ์ฐพ์„ ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค. "
f"ํ™•์ธ๋œ ๊ฒฝ๋กœ: {[str(path) for path in candidates]}"
)
def _load_page_image(page: Page) -> np.ndarray:
"""
ํŽ˜์ด์ง€ ๊ฐ์ฒด์—์„œ ์ด๋ฏธ์ง€๋ฅผ ๋กœ๋“œํ•˜๊ณ , ํ•ด์ƒ๋„ ์ •๋ณด๋ฅผ ๊ฐฑ์‹ ํ•ฉ๋‹ˆ๋‹ค.
Note:
๋™๊ธฐ ๋ฐฉ์‹์œผ๋กœ ์ด๋ฏธ์ง€๋ฅผ ๋กœ๋“œํ•ฉ๋‹ˆ๋‹ค.
๋น„๋™๊ธฐ ์ปจํ…์ŠคํŠธ์—์„œ๋Š” _load_page_image_async() ์‚ฌ์šฉ ๊ถŒ์žฅ.
"""
resolved_path = _resolve_image_path(page.image_path)
image = cv2.imread(str(resolved_path))
if image is None:
raise ValueError(f"์ด๋ฏธ์ง€ ํŒŒ์ผ์„ ์ฝ์„ ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค: {resolved_path}")
height, width = image.shape[:2]
if page.image_width != width or page.image_height != height:
page.image_width = width
page.image_height = height
return image
async def _load_page_image_async(page: Page) -> np.ndarray:
"""
๋น„๋™๊ธฐ ๋ฐฉ์‹์œผ๋กœ ์ด๋ฏธ์ง€๋ฅผ ๋กœ๋“œํ•˜๊ณ  ํ•ด์ƒ๋„ ์ •๋ณด๋ฅผ ๊ฐฑ์‹ ํ•ฉ๋‹ˆ๋‹ค.
๋””์Šคํฌ I/O๋ฅผ ๋…ผ๋ธ”๋กœํ‚น์œผ๋กœ ์ฒ˜๋ฆฌํ•˜์—ฌ CPU ๋Œ€๊ธฐ ์‹œ๊ฐ„์„ ์ตœ์†Œํ™”ํ•ฉ๋‹ˆ๋‹ค.
CPU ์ง‘์•ฝ์ ์ธ ๋””์ฝ”๋”ฉ ์ž‘์—…์€ ์Šค๋ ˆ๋“œ ํ’€๋กœ ์œ„์ž„ํ•ฉ๋‹ˆ๋‹ค.
Args:
page: ํŽ˜์ด์ง€ ๊ฐ์ฒด
Returns:
np.ndarray: OpenCV ํฌ๋งท ์ด๋ฏธ์ง€ (BGR)
Raises:
FileNotFoundError: ์ด๋ฏธ์ง€ ํŒŒ์ผ์„ ์ฐพ์„ ์ˆ˜ ์—†๋Š” ๊ฒฝ์šฐ
ValueError: ์ด๋ฏธ์ง€ ๋””์ฝ”๋”ฉ ์‹คํŒจ ์‹œ
Example:
>>> image = await _load_page_image_async(page)
>>> height, width = image.shape[:2]
"""
resolved_path = _resolve_image_path(page.image_path)
# ๋น„๋™๊ธฐ ํŒŒ์ผ ์ฝ๊ธฐ (I/O ๋Œ€๊ธฐ ์‹œ๊ฐ„ ์ตœ์†Œํ™”)
async with aiofiles.open(resolved_path, 'rb') as f:
image_data = await f.read()
# ์ด๋ฏธ์ง€ ๋””์ฝ”๋”ฉ (CPU ๋ฐ”์šด๋“œ ์ž‘์—…์€ ์Šค๋ ˆ๋“œ ํ’€๋กœ)
def decode_image(data: bytes) -> np.ndarray:
"""PIL๋กœ ๋””์ฝ”๋”ฉ ํ›„ OpenCV ํฌ๋งท์œผ๋กœ ๋ณ€ํ™˜"""
pil_image = Image.open(io.BytesIO(data))
# RGB โ†’ BGR ๋ณ€ํ™˜ (OpenCV ํฌ๋งท)
return cv2.cvtColor(np.array(pil_image), cv2.COLOR_RGB2BGR)
image = await asyncio.to_thread(decode_image, image_data)
if image is None:
raise ValueError(f"์ด๋ฏธ์ง€ ๋””์ฝ”๋”ฉ ์‹คํŒจ: {resolved_path}")
# ํ•ด์ƒ๋„ ์ •๋ณด ๊ฐฑ์‹ 
height, width = image.shape[:2]
if page.image_width != width or page.image_height != height:
page.image_width = width
page.image_height = height
return image
def _layout_to_mock(elements: List[LayoutElement]) -> List[MockElement]:
"""
SQLAlchemy LayoutElement ๊ฐ์ฒด๋ฅผ sorter์—์„œ ์‚ฌ์šฉํ•˜๋Š” MockElement๋กœ ๋ณ€ํ™˜ํ•ฉ๋‹ˆ๋‹ค.
"""
mock_elements: List[MockElement] = []
for element in elements:
mock = MockElement(
element_id=element.element_id,
class_name=element.class_name,
confidence=float(element.confidence or 0.0),
bbox_x=int(element.bbox_x),
bbox_y=int(element.bbox_y),
bbox_width=int(element.bbox_width),
bbox_height=int(element.bbox_height),
page_id=element.page_id,
)
mock_elements.append(mock)
return mock_elements
def _sync_layout_runtime_fields(
layout_elements: List[LayoutElement],
mock_elements: List[MockElement],
) -> List[LayoutElement]:
"""
sorter๊ฐ€ ๊ณ„์‚ฐํ•œ order_in_question, group_id ๋“ฑ์„ ์‹ค์ œ LayoutElement์— ๋ฐ˜์˜ํ•ฉ๋‹ˆ๋‹ค.
"""
element_map: Dict[int, LayoutElement] = {
elem.element_id: elem for elem in layout_elements
}
synced_elements: List[LayoutElement] = []
for mock in mock_elements:
target = element_map.get(mock.element_id)
if not target:
logger.warning(
"์ •๋ ฌ ๊ฒฐ๊ณผ์— ์กด์žฌํ•˜์ง€๋งŒ DB์— ์—†๋Š” element_id={}", mock.element_id
)
continue
setattr(target, "order_in_question", getattr(mock, "order_in_question", None))
setattr(target, "group_id", getattr(mock, "group_id", None))
setattr(target, "order_in_group", getattr(mock, "order_in_group", None))
setattr(target, "y_position", getattr(mock, "y_position", target.bbox_y))
setattr(target, "x_position", getattr(mock, "x_position", target.bbox_x))
setattr(
target,
"area",
getattr(mock, "area", target.bbox_width * target.bbox_height),
)
synced_elements.append(target)
return synced_elements
def _update_page_status(
page: Page,
*,
status: str,
processing_time: float,
) -> None:
"""
ํŽ˜์ด์ง€์˜ ์ƒํƒœ/์ฒ˜๋ฆฌ์‹œ๊ฐ„/๋ถ„์„ ์™„๋ฃŒ ์‹œ๊ฐ„์„ ๊ฐฑ์‹ ํ•ฉ๋‹ˆ๋‹ค.
"""
page.analysis_status = status
page.processing_time = processing_time
page.analyzed_at = datetime.utcnow()
def _update_project_status(project: Project, status: str) -> None:
"""
ํ”„๋กœ์ ํŠธ ์ƒํƒœ๋ฅผ ๊ฐฑ์‹ ํ•ฉ๋‹ˆ๋‹ค.
"""
project.status = status
project.updated_at = datetime.utcnow()
async def _process_single_page_async(
*,
db: Session,
project: Project,
page: Page,
formatter: TextFormatter,
analysis_service: AnalysisService,
use_ai_descriptions: bool,
api_key: Optional[str],
ai_max_concurrency: int = DEFAULT_AI_CONCURRENCY,
) -> Dict[str, Any]:
"""
๊ฐœ๋ณ„ ํŽ˜์ด์ง€์— ๋Œ€ํ•œ ์ „์ฒด ํŒŒ์ดํ”„๋ผ์ธ์„ ์‹คํ–‰ํ•˜๊ณ  ๊ฒฐ๊ณผ ์š”์•ฝ์„ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค.
"""
logger.info(
"ํŽ˜์ด์ง€ ๋ถ„์„ ์‹œ์ž‘: project_id={} / page_id={}",
project.project_id,
page.page_id,
)
page_start = time.time()
summary: Dict[str, Any] = {
"page_id": page.page_id,
"page_number": page.page_number,
"status": "error",
"message": "",
"layout_count": 0,
"ocr_count": 0,
"ai_description_count": 0,
"processing_time": 0.0,
}
try:
# ๋น„๋™๊ธฐ ์ด๋ฏธ์ง€ ๋กœ๋”ฉ (I/O ๋Œ€๊ธฐ ์‹œ๊ฐ„ ์ตœ์†Œํ™”)
image = await _load_page_image_async(page)
# ๋ ˆ์ด์•„์›ƒ ๋ถ„์„ (CPU ๋ฐ”์šด๋“œ โ†’ ๋™๊ธฐ ์‹คํ–‰)
# โš ๏ธ OCR/๋ชจ๋ธ ์—”์ง„์€ ์Šค๋ ˆ๋“œ ์•ˆ์ „ํ•˜์ง€ ์•Š์•„ asyncio.to_thread() ์‚ฌ์šฉ ๋ถˆ๊ฐ€
layout_elements = analysis_service.analyze_layout(
image=image,
page_id=page.page_id,
db=db,
model_choice=analysis_service.model_choice,
)
if not layout_elements:
raise ValueError("๋ ˆ์ด์•„์›ƒ ๋ถ„์„ ๊ฒฐ๊ณผ๊ฐ€ ๋น„์–ด ์žˆ์Šต๋‹ˆ๋‹ค.")
summary["layout_count"] = len(layout_elements)
# OCR ์ˆ˜ํ–‰ (CPU ๋ฐ”์šด๋“œ โ†’ ๋™๊ธฐ ์‹คํ–‰)
# โš ๏ธ Tesseract/EasyOCR์€ ์Šค๋ ˆ๋“œ ์•ˆ์ „ํ•˜์ง€ ์•Š์•„ asyncio.to_thread() ์‚ฌ์šฉ ๋ถˆ๊ฐ€
text_contents = await analysis_service.perform_ocr_async(
image=image,
layout_elements=layout_elements,
db=db,
)
summary["ocr_count"] = len(text_contents)
ai_descriptions: Dict[int, str] = {}
if use_ai_descriptions:
# API ํ‚ค: ์š”์ฒญ ํŒŒ๋ผ๋ฏธํ„ฐ ์šฐ์„ , ์—†์œผ๋ฉด ํ™˜๊ฒฝ๋ณ€์ˆ˜์—์„œ ๋กœ๋“œ
effective_api_key = api_key or os.getenv("OPENAI_API_KEY")
if effective_api_key:
logger.info(f"AI ์„ค๋ช… ์ƒ์„ฑ ์‹œ์ž‘: page_id={page.page_id}")
try:
ai_descriptions = await analysis_service.call_openai_api_async(
image=image,
layout_elements=layout_elements,
api_key=effective_api_key,
db=db,
max_concurrent_requests=ai_max_concurrency,
)
summary["ai_description_count"] = len(ai_descriptions)
logger.info(
f"AI ์„ค๋ช… ์ƒ์„ฑ ์™„๋ฃŒ: {len(ai_descriptions)}๊ฐœ ์š”์†Œ ์ฒ˜๋ฆฌ"
)
except Exception as ai_error:
logger.error(
"AI ์„ค๋ช… ์ƒ์„ฑ ๋น„๋™๊ธฐ ์ฒ˜๋ฆฌ ์‹คํŒจ: page_id={} / error={}",
page.page_id,
ai_error,
)
else:
logger.warning(
f"AI ์„ค๋ช… ์ƒ์„ฑ ์š”์ฒญ๋˜์—ˆ์œผ๋‚˜ API ํ‚ค๊ฐ€ ์—†์Šต๋‹ˆ๋‹ค (page_id={page.page_id})"
)
# ์ •๋ ฌ ์ค€๋น„ (๋™๊ธฐ ๋ณ€ํ™˜ ์ž‘์—…)
mock_elements = _layout_to_mock(layout_elements)
# ์ •๋ ฌ (CPU ๋ฐ”์šด๋“œ โ†’ ๋™๊ธฐ ์‹คํ–‰)
# ๋น ๋ฅธ ๊ณ„์‚ฐ ์ž‘์—…์ด๋ฏ€๋กœ ์Šค๋ ˆ๋“œ ์˜ค๋ฒ„ํ—ค๋“œ ๋ถˆํ•„์š”
sorted_mock = sort_layout_elements(
mock_elements,
document_type=formatter.document_type,
page_width=page.image_width or 0,
page_height=page.image_height or 0,
)
synced_layouts = _sync_layout_runtime_fields(layout_elements, sorted_mock)
# DB ์ €์žฅ (๋™๊ธฐ ์‹คํ–‰ - deadlock ๋ฐฉ์ง€)
save_sorting_results_to_db(
db,
page.page_id,
synced_layouts,
)
# ํฌ๋งทํŒ… (CPU ๋ฐ”์šด๋“œ โ†’ ๋™๊ธฐ ์‹คํ–‰)
# ๋น ๋ฅธ ํ…์ŠคํŠธ ์ฒ˜๋ฆฌ์ด๋ฏ€๋กœ ์Šค๋ ˆ๋“œ ์˜ค๋ฒ„ํ—ค๋“œ ๋ถˆํ•„์š”
formatted_text = formatter.format_page(
synced_layouts,
text_contents,
ai_descriptions=ai_descriptions,
)
# ํ…์ŠคํŠธ ๋ฒ„์ „ ์ƒ์„ฑ (DB I/O)
create_text_version(
db,
page,
formatted_text or "",
)
# ์ตœ์ข… ์ƒํƒœ ์—…๋ฐ์ดํŠธ
processing_time = time.time() - page_start
_update_page_status(page, status="completed", processing_time=processing_time)
summary["status"] = "completed"
summary["processing_time"] = processing_time
summary["message"] = "success"
# DB ์ปค๋ฐ‹ (๋™๊ธฐ ์‹คํ–‰ - deadlock ๋ฐฉ์ง€)
db.commit()
return summary
except Exception as error: # pylint: disable=broad-except
logger.error(f"ํŽ˜์ด์ง€ ๋ถ„์„ ์‹คํŒจ: page_id={page.page_id} / error={str(error)}")
logger.exception("์ƒ์„ธ ์Šคํƒ ํŠธ๋ ˆ์ด์Šค:") # ์ „์ฒด ์Šคํƒ ์ถœ๋ ฅ
# DB ๋กค๋ฐฑ (๋™๊ธฐ ์‹คํ–‰ - deadlock ๋ฐฉ์ง€)
db.rollback()
processing_time = time.time() - page_start
_update_page_status(page, status="error", processing_time=processing_time)
summary["processing_time"] = processing_time
summary["message"] = str(error)
# DB ์ปค๋ฐ‹ (๋™๊ธฐ ์‹คํ–‰ - deadlock ๋ฐฉ์ง€)
db.commit()
return summary
def _process_single_page(
*,
db: Session,
project: Project,
page: Page,
formatter: TextFormatter,
analysis_service: AnalysisService,
use_ai_descriptions: bool,
api_key: Optional[str],
ai_max_concurrency: int = DEFAULT_AI_CONCURRENCY,
) -> Dict[str, Any]:
"""
๋™๊ธฐ ์ปจํ…์ŠคํŠธ ํ˜ธํ™˜์šฉ ๋ž˜ํผ.
"""
return asyncio.run(
_process_single_page_async(
db=db,
project=project,
page=page,
formatter=formatter,
analysis_service=analysis_service,
use_ai_descriptions=use_ai_descriptions,
api_key=api_key,
ai_max_concurrency=ai_max_concurrency,
)
)
# -----------------------------------------------------------------------------
# ๊ณต๊ฐœ API
# -----------------------------------------------------------------------------
async def analyze_project_batch_async(
db: Session,
project_id: int,
*,
use_ai_descriptions: bool = True,
api_key: Optional[str] = None,
ai_max_concurrency: int = DEFAULT_AI_CONCURRENCY,
analysis_model: Optional[str] = None,
) -> Dict[str, Any]:
"""
ํ”„๋กœ์ ํŠธ ๋‚ด 'pending' ์ƒํƒœ ํŽ˜์ด์ง€๋ฅผ ์ˆœ์ฐจ์ ์œผ๋กœ ๋ถ„์„ํ•˜๊ณ  ๊ฒฐ๊ณผ ์š”์•ฝ์„ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค.
"""
logger.info("ํ”„๋กœ์ ํŠธ ๋ฐฐ์น˜ ๋ถ„์„ ์‹œ์ž‘: project_id={}", project_id)
started_at = time.time()
project = (
db.query(Project)
.options(selectinload(Project.pages))
.filter(Project.project_id == project_id)
.one_or_none()
)
if not project:
raise ValueError(f"ํ”„๋กœ์ ํŠธ ID {project_id}๋ฅผ ์ฐพ์„ ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค.")
pending_pages = [
page for page in project.pages if page.analysis_status in {"pending", "error"}
]
pending_pages.sort(key=lambda p: p.page_number)
result_summary: Dict[str, Any] = {
"project_id": project.project_id,
"project_status_before": project.status,
"processed_pages": 0,
"successful_pages": 0,
"failed_pages": 0,
"total_pages": len(pending_pages),
"status": "completed" if pending_pages else "no_pending_pages",
"page_results": [],
"total_time": 0.0,
}
if not pending_pages:
logger.warning("๋ถ„์„ํ•  ํŽ˜์ด์ง€๊ฐ€ ์—†์Šต๋‹ˆ๋‹ค. project_id={}", project.project_id)
return result_summary
_update_project_status(project, "in_progress")
db.commit()
model_choice = resolve_model_choice(project.doc_type_id, analysis_model)
logger.info(
"ํ”„๋กœ์ ํŠธ ๋ถ„์„ ๋ชจ๋ธ ์„ ํƒ: project_id={}, doc_type_id={}, model={}",
project.project_id,
project.doc_type_id,
model_choice,
)
analysis_service = _get_analysis_service(model_choice)
formatter = TextFormatter(
doc_type_id=project.doc_type_id,
db=db,
use_db_rules=True,
)
for page in pending_pages:
page_summary = await _process_single_page_async(
db=db,
project=project,
page=page,
formatter=formatter,
analysis_service=analysis_service,
use_ai_descriptions=use_ai_descriptions,
api_key=api_key,
ai_max_concurrency=ai_max_concurrency,
)
result_summary["page_results"].append(page_summary)
result_summary["processed_pages"] += 1
if page_summary["status"] == "completed":
result_summary["successful_pages"] += 1
else:
result_summary["failed_pages"] += 1
if result_summary["failed_pages"] == 0:
final_status = "completed"
elif result_summary["successful_pages"] == 0:
final_status = "error"
else:
# ์ผ๋ถ€ ์„ฑ๊ณต, ์ผ๋ถ€ ์‹คํŒจ โ†’ in_progress๋กœ ํ‘œ์‹œ
final_status = "in_progress"
_update_project_status(project, final_status)
db.commit()
result_summary["status"] = final_status
result_summary["project_status_after"] = project.status
result_summary["total_time"] = time.time() - started_at
logger.info(
"ํ”„๋กœ์ ํŠธ ๋ฐฐ์น˜ ๋ถ„์„ ์ข…๋ฃŒ: project_id={} / status={} / success={} / fail={} / {:.2f}s",
project.project_id,
final_status,
result_summary["successful_pages"],
result_summary["failed_pages"],
result_summary["total_time"],
)
return result_summary
def analyze_project_batch(
db: Session,
project_id: int,
*,
use_ai_descriptions: bool = True,
api_key: Optional[str] = None,
ai_max_concurrency: int = DEFAULT_AI_CONCURRENCY,
analysis_model: Optional[str] = None,
) -> Dict[str, Any]:
"""
๋™๊ธฐ ์ปจํ…์ŠคํŠธ ํ˜ธํ™˜์šฉ ๋ž˜ํผ.
"""
return asyncio.run(
analyze_project_batch_async(
db=db,
project_id=project_id,
use_ai_descriptions=use_ai_descriptions,
api_key=api_key,
ai_max_concurrency=ai_max_concurrency,
analysis_model=analysis_model,
)
)
async def analyze_project_batch_async_parallel(
db: Session,
project_id: int,
*,
use_ai_descriptions: bool = True,
api_key: Optional[str] = None,
ai_max_concurrency: int = DEFAULT_AI_CONCURRENCY,
max_concurrent_pages: int = 8,
analysis_model: Optional[str] = None,
) -> Dict[str, Any]:
"""
ํ”„๋กœ์ ํŠธ ๋‚ด 'pending' ์ƒํƒœ ํŽ˜์ด์ง€๋ฅผ ๋ณ‘๋ ฌ๋กœ ๋ถ„์„ํ•˜๊ณ  ๊ฒฐ๊ณผ ์š”์•ฝ์„ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค.
Args:
db: ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์„ธ์…˜
project_id: ํ”„๋กœ์ ํŠธ ID
use_ai_descriptions: AI ์„ค๋ช… ์ƒ์„ฑ ์—ฌ๋ถ€
api_key: OpenAI API ํ‚ค
ai_max_concurrency: AI API ์ตœ๋Œ€ ๋™์‹œ ์š”์ฒญ ์ˆ˜
max_concurrent_pages: ์ตœ๋Œ€ ๋™์‹œ ์ฒ˜๋ฆฌ ํŽ˜์ด์ง€ ์ˆ˜ (๊ธฐ๋ณธ๊ฐ’: 8)
Returns:
๋ถ„์„ ๊ฒฐ๊ณผ ์š”์•ฝ
Note:
๊ธฐ์กด analyze_project_batch_async์™€ ๋™์ผํ•œ ๊ธฐ๋Šฅ์ด์ง€๋งŒ,
์—ฌ๋Ÿฌ ํŽ˜์ด์ง€๋ฅผ ๋™์‹œ์— ๋ณ‘๋ ฌ๋กœ ์ฒ˜๋ฆฌํ•˜์—ฌ ์†๋„๋ฅผ ํ–ฅ์ƒ์‹œํ‚ต๋‹ˆ๋‹ค.
max_concurrent_pages ๊ฐ’์„ ์กฐ์ •ํ•˜์—ฌ ์‹œ์Šคํ…œ ๋ฆฌ์†Œ์Šค์— ๋งž๊ฒŒ ์ตœ์ ํ™”ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
"""
logger.info(
"ํ”„๋กœ์ ํŠธ ๋ณ‘๋ ฌ ๋ฐฐ์น˜ ๋ถ„์„ ์‹œ์ž‘: project_id={}, max_concurrent={}",
project_id,
max_concurrent_pages,
)
started_at = time.time()
project = (
db.query(Project)
.options(selectinload(Project.pages))
.filter(Project.project_id == project_id)
.one_or_none()
)
if not project:
raise ValueError(f"ํ”„๋กœ์ ํŠธ ID {project_id}๋ฅผ ์ฐพ์„ ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค.")
pending_pages = [
page for page in project.pages if page.analysis_status in {"pending", "error"}
]
pending_pages.sort(key=lambda p: p.page_number)
result_summary: Dict[str, Any] = {
"project_id": project.project_id,
"project_status_before": project.status,
"processed_pages": 0,
"successful_pages": 0,
"failed_pages": 0,
"total_pages": len(pending_pages),
"status": "completed" if pending_pages else "no_pending_pages",
"page_results": [],
"total_time": 0.0,
"processing_mode": "parallel",
}
if not pending_pages:
logger.warning("๋ถ„์„ํ•  ํŽ˜์ด์ง€๊ฐ€ ์—†์Šต๋‹ˆ๋‹ค. project_id={}", project.project_id)
return result_summary
_update_project_status(project, "in_progress")
db.commit()
model_choice = resolve_model_choice(project.doc_type_id, analysis_model)
logger.info(
"๋ณ‘๋ ฌ ํ”„๋กœ์ ํŠธ ๋ถ„์„ ๋ชจ๋ธ ์„ ํƒ: project_id={}, doc_type_id={}, model={}",
project.project_id,
project.doc_type_id,
model_choice,
)
analysis_service = _get_analysis_service(model_choice)
formatter = TextFormatter(
doc_type_id=project.doc_type_id,
db=db,
use_db_rules=True,
)
# Semaphore๋กœ ๋™์‹œ ์‹คํ–‰ ์ œ์–ด
semaphore = asyncio.Semaphore(max_concurrent_pages)
async def process_with_semaphore(page: Page) -> Dict[str, Any]:
"""
Semaphore๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋™์‹œ ์‹คํ–‰ ์ˆ˜๋ฅผ ์ œํ•œํ•˜๋ฉด์„œ ํŽ˜์ด์ง€ ์ฒ˜๋ฆฌ
๊ฐ ํŽ˜์ด์ง€ ๋ถ„์„ ์ž‘์—…๋งˆ๋‹ค ๋…๋ฆฝ์ ์ธ DB ์„ธ์…˜์„ ์ƒ์„ฑํ•˜์—ฌ
๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ ์‹œ ์„ธ์…˜ ์ถฉ๋Œ์„ ๋ฐฉ์ง€ํ•ฉ๋‹ˆ๋‹ค.
get_async_db_session() ์ปจํ…์ŠคํŠธ ๋งค๋‹ˆ์ €๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ
์ž๋™ commit/rollback ์ฒ˜๋ฆฌ ๋ฐ ์„ธ์…˜ ์˜ค๋ฒ„ํ—ค๋“œ๋ฅผ ๊ฐ์†Œ์‹œํ‚ต๋‹ˆ๋‹ค.
"""
async with semaphore:
# ๋น„๋™๊ธฐ DB ์„ธ์…˜ ์ปจํ…์ŠคํŠธ ๋งค๋‹ˆ์ € ์‚ฌ์šฉ
async with get_async_db_session() as task_db:
# ์„ธ์…˜์—์„œ ํŽ˜์ด์ง€ ์žฌ๋กœ๋“œ (๋‹ค๋ฅธ ์„ธ์…˜์—์„œ ๊ฐ€์ ธ์˜จ ๊ฐ์ฒด์ด๋ฏ€๋กœ)
task_page = await asyncio.to_thread(
task_db.query(Page).filter(Page.page_id == page.page_id).first
)
task_project = await asyncio.to_thread(
task_db.query(Project).filter(Project.project_id == project.project_id).first
)
if not task_page or not task_project:
raise ValueError(f"ํŽ˜์ด์ง€ ๋˜๋Š” ํ”„๋กœ์ ํŠธ๋ฅผ ์ฐพ์„ ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค: page_id={page.page_id}")
return await _process_single_page_async(
db=task_db,
project=task_project,
page=task_page,
formatter=formatter,
analysis_service=analysis_service,
use_ai_descriptions=use_ai_descriptions,
api_key=api_key,
ai_max_concurrency=ai_max_concurrency,
)
# ๋ชจ๋“  ํŽ˜์ด์ง€๋ฅผ ๋ณ‘๋ ฌ๋กœ ์ฒ˜๋ฆฌ
logger.info(f"์ด {len(pending_pages)}๊ฐœ ํŽ˜์ด์ง€๋ฅผ ์ตœ๋Œ€ {max_concurrent_pages}๊ฐœ์”ฉ ๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ ์‹œ์ž‘")
tasks = [process_with_semaphore(page) for page in pending_pages]
page_results = await asyncio.gather(*tasks, return_exceptions=True)
# ๊ฒฐ๊ณผ ์ง‘๊ณ„
for page_result in page_results:
if isinstance(page_result, Exception):
logger.error(f"ํŽ˜์ด์ง€ ์ฒ˜๋ฆฌ ์ค‘ ์˜ˆ์™ธ ๋ฐœ์ƒ: {page_result}")
result_summary["page_results"].append({
"status": "error",
"message": str(page_result),
})
result_summary["failed_pages"] += 1
else:
result_summary["page_results"].append(page_result)
if page_result["status"] == "completed":
result_summary["successful_pages"] += 1
else:
result_summary["failed_pages"] += 1
result_summary["processed_pages"] += 1
# ์ตœ์ข… ์ƒํƒœ ๊ฒฐ์ •
if result_summary["failed_pages"] == 0:
final_status = "completed"
elif result_summary["successful_pages"] == 0:
final_status = "error"
else:
# ์ผ๋ถ€ ์„ฑ๊ณต, ์ผ๋ถ€ ์‹คํŒจ โ†’ in_progress๋กœ ํ‘œ์‹œ
final_status = "in_progress"
_update_project_status(project, final_status)
db.commit()
result_summary["status"] = final_status
result_summary["project_status_after"] = project.status
result_summary["total_time"] = time.time() - started_at
logger.info(
"ํ”„๋กœ์ ํŠธ ๋ณ‘๋ ฌ ๋ฐฐ์น˜ ๋ถ„์„ ์ข…๋ฃŒ: project_id={} / status={} / success={} / fail={} / {:.2f}s",
project.project_id,
final_status,
result_summary["successful_pages"],
result_summary["failed_pages"],
result_summary["total_time"],
)
return result_summary
def analyze_project_batch_parallel(
db: Session,
project_id: int,
*,
use_ai_descriptions: bool = True,
api_key: Optional[str] = None,
ai_max_concurrency: int = DEFAULT_AI_CONCURRENCY,
max_concurrent_pages: int = DEFAULT_MAX_CONCURRENT_PAGES,
analysis_model: Optional[str] = None,
) -> Dict[str, Any]:
"""
๋™๊ธฐ ์ปจํ…์ŠคํŠธ ํ˜ธํ™˜์šฉ ๋ž˜ํผ (๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ ๋ฒ„์ „).
"""
return asyncio.run(
analyze_project_batch_async_parallel(
db=db,
project_id=project_id,
use_ai_descriptions=use_ai_descriptions,
api_key=api_key,
ai_max_concurrency=ai_max_concurrency,
max_concurrent_pages=max_concurrent_pages,
analysis_model=analysis_model,
)
)
__all__ = [
"analyze_project_batch",
"analyze_project_batch_async",
"analyze_project_batch_parallel",
"analyze_project_batch_async_parallel",
"_get_analysis_service",
"_process_single_page",
"_process_single_page_async",
"DEFAULT_AI_CONCURRENCY",
"is_supported_model",
"resolve_model_choice",
]