AkJeond's picture
refactor: PDF λ³€ν™˜ 방식을 병렬 처리둜 λ³€κ²½
c309c65
from __future__ import annotations
import asyncio
import os
from pathlib import Path
from typing import Dict, List, Optional, Union
from uuid import uuid4
import cv2
from fastapi import APIRouter, Depends, File, Form, HTTPException, Query, UploadFile, status
from loguru import logger
from sqlalchemy import func
from sqlalchemy.orm import Session
from .. import crud, schemas
from ..database import get_db
from ..models import AnalysisStatusEnum, AnalysisModeEnum, Page, Project
from ..services.pdf_processor import pdf_processor
from ..services.text_version_service import (
get_current_page_text,
save_user_edited_version,
)
router = APIRouter(
prefix="/api/pages",
tags=["Pages"],
)
UPLOAD_DIR = Path(os.getenv("UPLOAD_DIR", "uploads")).resolve()
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
PUBLIC_UPLOAD_ROOT = Path("uploads")
DEFAULT_PROJECT_NAME = "temp"
DEFAULT_DOC_TYPE_ID = 1
DEFAULT_ANALYSIS_MODE = AnalysisModeEnum.AUTO
DEFAULT_USER_ID = 1
ANCHOR_CLASS_NAMES = {"question number", "question type"}
def _page_to_response(page: Page) -> schemas.PageResponse:
return schemas.PageResponse.model_validate(page)
def _create_project_for_upload(db: Session) -> Project:
"""μ—…λ‘œλ“œ μ„Έμ…˜μš© μ‹ κ·œ ν”„λ‘œμ νŠΈ 생성"""
project = crud.create_project(
db=db,
project=schemas.ProjectCreate(
project_name=DEFAULT_PROJECT_NAME,
doc_type_id=DEFAULT_DOC_TYPE_ID,
analysis_mode=DEFAULT_ANALYSIS_MODE,
),
user_id=DEFAULT_USER_ID,
)
logger.info("μ—…λ‘œλ“œμš© ν”„λ‘œμ νŠΈ 생성 - ProjectID: {}", project.project_id)
return project
def _calculate_next_page_number(db: Session, project_id: int) -> int:
"""
λ‹€μŒ νŽ˜μ΄μ§€ 번호 μžλ™ 계산
같은 ν”„λ‘œμ νŠΈ λ‚΄μ—μ„œ κ°€μž₯ 큰 page_numberλ₯Ό μ°Ύμ•„μ„œ +1 λ°˜ν™˜
"""
max_page = db.query(func.max(Page.page_number))\
.filter(Page.project_id == project_id)\
.scalar()
next_page_number = (max_page or 0) + 1
logger.debug(f"λ‹€μŒ νŽ˜μ΄μ§€ 번호 계산 - ProjectID: {project_id}, NextPage: {next_page_number}")
return next_page_number
@router.post(
"/upload",
response_model=Union[schemas.PageResponse, schemas.MultiPageCreateResponse],
status_code=status.HTTP_201_CREATED,
)
async def upload_page(
file: UploadFile = File(..., description="νŽ˜μ΄μ§€ 이미지 λ˜λŠ” PDF 파일"),
project_id: Optional[int] = Form(None, description="μ—…λ‘œλ“œ λŒ€μƒ ν”„λ‘œμ νŠΈ ID (μƒλž΅ μ‹œ μ‹ κ·œ 생성)"),
db: Session = Depends(get_db),
) -> Union[schemas.PageResponse, schemas.MultiPageCreateResponse]:
"""
νŽ˜μ΄μ§€ μ—…λ‘œλ“œ (이미지 λ˜λŠ” PDF)
λ™μž‘ 방식:
- 졜초 μ—…λ‘œλ“œ: project_id 없이 ν˜ΈμΆœν•˜λ©΄ μ„œλ²„κ°€ temp ν”„λ‘œμ νŠΈλ₯Ό 생성
- 이후 μ—…λ‘œλ“œ: λ™μΌν•œ project_idλ₯Ό μ „λ‹¬ν•˜λ©΄ ν•΄λ‹Ή ν”„λ‘œμ νŠΈμ— νŽ˜μ΄μ§€λ₯Ό μΆ”κ°€
- page_numberλŠ” ν”„λ‘œμ νŠΈ λ‚΄μ—μ„œ μžλ™ 증가
처리 방식:
- 이미지 μ—…λ‘œλ“œ: 단일 νŽ˜μ΄μ§€ 생성
- PDF μ—…λ‘œλ“œ: 닀쀑 νŽ˜μ΄μ§€ μžλ™ 생성
"""
if project_id is not None:
project = crud.get_project(db, project_id)
if not project:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="ν”„λ‘œμ νŠΈλ₯Ό 찾을 수 μ—†μŠ΅λ‹ˆλ‹€.",
)
else:
project = _create_project_for_upload(db)
project_id = project.project_id
# 1. λ‹€μŒ νŽ˜μ΄μ§€ 번호 μžλ™ 계산
page_number = _calculate_next_page_number(db, project_id)
# 3. PDF μ—…λ‘œλ“œ λΆ„κΈ° 처리
if file.content_type == "application/pdf":
logger.info(f"PDF μ—…λ‘œλ“œ μ‹œμž‘ - ProjectID: {project_id}, StartPage: {page_number}")
# PDF λ°”μ΄νŠΈ 읽기
pdf_bytes = await file.read()
try:
# PDF β†’ 이미지 λ³€ν™˜ (page_numberλΆ€ν„° μ‹œμž‘)
converted_pages = pdf_processor.convert_pdf_to_images_parallel(
pdf_bytes=pdf_bytes,
project_id=project_id,
start_page_number=page_number
)
logger.info(f"PDF λ³€ν™˜ μ™„λ£Œ - {len(converted_pages)}개 νŽ˜μ΄μ§€")
# 비동기 병렬 νŽ˜μ΄μ§€ 생성 (Semaphore둜 λ™μ‹œμ„± μ œν•œ)
semaphore = asyncio.Semaphore(5)
created_pages = []
async def save_page(page_info: dict):
async with semaphore:
page_create = schemas.PageCreate(
project_id=project_id,
page_number=page_info['page_number'],
image_path=page_info['image_path'],
image_width=page_info['width'],
image_height=page_info['height']
)
page = crud.create_page(db, page_create)
created_pages.append(page)
logger.debug(f"νŽ˜μ΄μ§€ {page_info['page_number']} μ €μž₯ μ™„λ£Œ")
# 병렬 μ €μž₯ μ‹€ν–‰
await asyncio.gather(*(save_page(info) for info in converted_pages))
logger.info(f"PDF μ—…λ‘œλ“œ μ™„λ£Œ - ProjectID: {project_id}, {len(created_pages)}개 νŽ˜μ΄μ§€ 생성")
return schemas.MultiPageCreateResponse(
project_id=project_id,
total_created=len(created_pages),
source_type="pdf",
pages=[_page_to_response(p) for p in created_pages],
)
except ValueError as ve:
logger.error(f"PDF 처리 μ‹€νŒ¨: {str(ve)}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"PDF 처리 μ‹€νŒ¨: {str(ve)}"
) from ve
except Exception as exc:
logger.error(f"PDF μ—…λ‘œλ“œ 쀑 였λ₯˜: {str(exc)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"PDF μ—…λ‘œλ“œ μ‹€νŒ¨: {str(exc)}"
) from exc
# 4. 단일 이미지 μ—…λ‘œλ“œ 처리
else:
logger.info(f"이미지 μ—…λ‘œλ“œ - ProjectID: {project_id}, PageNumber: {page_number}")
suffix = Path(file.filename or "").suffix or ".png"
filename = f"page_{page_number}_{uuid4().hex}{suffix}"
project_dir = UPLOAD_DIR / str(project_id)
project_dir.mkdir(parents=True, exist_ok=True)
file_path = project_dir / filename
content = await file.read()
file_path.write_bytes(content)
try:
image = cv2.imread(str(file_path))
if image is None:
raise ValueError("이미지λ₯Ό 읽을 수 μ—†μŠ΅λ‹ˆλ‹€.")
height, width = image.shape[:2]
except Exception as exc: # pylint: disable=broad-except
file_path.unlink(missing_ok=True)
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"이미지 처리 μ‹€νŒ¨: {exc}") from exc
public_path = PUBLIC_UPLOAD_ROOT / str(project_id) / filename
stored_path = str(public_path).replace("\\", "/")
page_create = schemas.PageCreate(
project_id=project_id,
page_number=page_number,
image_path=stored_path,
image_width=width,
image_height=height,
)
page = crud.create_page(db, page_create)
logger.info(f"이미지 μ—…λ‘œλ“œ μ™„λ£Œ - ProjectID: {project_id}, PageID: {page.page_id}")
return _page_to_response(page)
@router.get(
"/{page_id}",
response_model=schemas.PageResponse,
)
def get_page_detail(
page_id: int,
include_layout: bool = Query(False, description="λ ˆμ΄μ•„μ›ƒ μš”μ†Œ 포함 μ—¬λΆ€"),
include_text: bool = Query(False, description="ν…μŠ€νŠΈ μ½˜ν…μΈ  포함 μ—¬λΆ€"), # noqa: ARG001
db: Session = Depends(get_db),
) -> schemas.PageResponse:
# include_layout=True일 λ•Œ 관계 λ‘œλ“œ
if include_layout:
page = crud.get_page_with_elements(db, page_id)
else:
page = crud.get_page(db, page_id)
if not page:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="νŽ˜μ΄μ§€λ₯Ό 찾을 수 μ—†μŠ΅λ‹ˆλ‹€."
)
return _page_to_response(page)
@router.get(
"/project/{project_id}",
response_model=List[schemas.PageResponse],
)
def list_project_pages(
project_id: int,
db: Session = Depends(get_db),
include_error: bool = False,
) -> List[schemas.PageResponse]:
pages = crud.get_pages_by_project(db, project_id)
if not include_error:
pages = [page for page in pages if page.analysis_status != AnalysisStatusEnum.ERROR]
return [_page_to_response(page) for page in pages]
@router.get(
"/{page_id}/text",
response_model=schemas.PageTextResponse,
summary="ν˜„μž¬ νŽ˜μ΄μ§€ ν…μŠ€νŠΈ 쑰회",
)
def get_page_text(
page_id: int,
db: Session = Depends(get_db),
) -> schemas.PageTextResponse:
"""
is_current=True인 μ΅œμ‹  ν…μŠ€νŠΈ 버전을 λ°˜ν™˜ν•©λ‹ˆλ‹€.
"""
version_data = get_current_page_text(db, page_id)
if not version_data:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="ν•΄λ‹Ή νŽ˜μ΄μ§€μ˜ ν…μŠ€νŠΈ 버전을 찾을 수 μ—†μŠ΅λ‹ˆλ‹€.",
)
return schemas.PageTextResponse.model_validate(version_data)
@router.post(
"/{page_id}/text",
response_model=schemas.PageTextResponse,
summary="μ‚¬μš©μž μˆ˜μ • ν…μŠ€νŠΈ μ €μž₯",
)
def save_page_text(
page_id: int,
payload: schemas.PageTextUpdate,
db: Session = Depends(get_db),
) -> schemas.PageTextResponse:
"""
μ‚¬μš©μž νŽΈμ§‘ λ‚΄μš©μ„ μƒˆ ν…μŠ€νŠΈ λ²„μ „μœΌλ‘œ μ €μž₯ν•©λ‹ˆλ‹€.
"""
try:
version_data = save_user_edited_version(
db,
page_id,
payload.content,
user_id=payload.user_id,
)
return schemas.PageTextResponse.model_validate(version_data)
except ValueError as value_error:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=str(value_error),
) from value_error
except Exception as error: # pylint: disable=broad-except
logger.error(
"νŽ˜μ΄μ§€ ν…μŠ€νŠΈ μ €μž₯ μ‹€νŒ¨: page_id={} / error={}",
page_id,
error,
exc_info=True,
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="νŽ˜μ΄μ§€ ν…μŠ€νŠΈ μ €μž₯ 쀑 였λ₯˜κ°€ λ°œμƒν–ˆμŠ΅λ‹ˆλ‹€.",
) from error
@router.get(
"/{page_id}/stats",
response_model=schemas.PageStatsResponse,
summary="νŽ˜μ΄μ§€ 톡계 쑰회",
)
def get_page_stats(
page_id: int,
db: Session = Depends(get_db),
) -> schemas.PageStatsResponse:
"""
νŽ˜μ΄μ§€λ³„ λ ˆμ΄μ•„μ›ƒ 톡계λ₯Ό κ³„μ‚°ν•˜μ—¬ λ°˜ν™˜ν•©λ‹ˆλ‹€.
- 총 λ ˆμ΄μ•„μ›ƒ μš”μ†Œ 수
- 액컀 μš”μ†Œ(question_number, question_type) 수
- ν΄λž˜μŠ€λ³„ μš”μ†Œ 뢄포
- ν΄λž˜μŠ€λ³„ 평균 신뒰도
- 처리 μ‹œκ°„
"""
page = crud.get_page_with_elements(db, page_id)
if not page:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="νŽ˜μ΄μ§€λ₯Ό 찾을 수 μ—†μŠ΅λ‹ˆλ‹€.",
)
distribution: Dict[str, int] = {}
confidence_sums: Dict[str, float] = {}
confidence_counts: Dict[str, int] = {}
for element in page.layout_elements:
class_name = element.class_name or "unknown"
distribution[class_name] = distribution.get(class_name, 0) + 1
if element.confidence is not None:
confidence_sums[class_name] = confidence_sums.get(class_name, 0.0) + float(element.confidence)
confidence_counts[class_name] = confidence_counts.get(class_name, 0) + 1
confidence_scores: Dict[str, float] = {}
for class_name, total in confidence_sums.items():
count = confidence_counts.get(class_name, 0)
if count:
confidence_scores[class_name] = total / count
anchor_count = sum(
1 for element in page.layout_elements if element.class_name in ANCHOR_CLASS_NAMES
)
return schemas.PageStatsResponse(
page_id=page.page_id,
project_id=page.project_id,
total_elements=len(page.layout_elements),
anchor_element_count=anchor_count,
processing_time=page.processing_time,
class_distribution=distribution,
confidence_scores=confidence_scores,
)