Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import asyncio | |
| import os | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Union | |
| from uuid import uuid4 | |
| import cv2 | |
| from fastapi import APIRouter, Depends, File, Form, HTTPException, Query, UploadFile, status | |
| from loguru import logger | |
| from sqlalchemy import func | |
| from sqlalchemy.orm import Session | |
| from .. import crud, schemas | |
| from ..database import get_db | |
| from ..models import AnalysisStatusEnum, AnalysisModeEnum, Page, Project | |
| from ..services.pdf_processor import pdf_processor | |
| from ..services.text_version_service import ( | |
| get_current_page_text, | |
| save_user_edited_version, | |
| ) | |
| router = APIRouter( | |
| prefix="/api/pages", | |
| tags=["Pages"], | |
| ) | |
| UPLOAD_DIR = Path(os.getenv("UPLOAD_DIR", "uploads")).resolve() | |
| UPLOAD_DIR.mkdir(parents=True, exist_ok=True) | |
| PUBLIC_UPLOAD_ROOT = Path("uploads") | |
| DEFAULT_PROJECT_NAME = "temp" | |
| DEFAULT_DOC_TYPE_ID = 1 | |
| DEFAULT_ANALYSIS_MODE = AnalysisModeEnum.AUTO | |
| DEFAULT_USER_ID = 1 | |
| ANCHOR_CLASS_NAMES = {"question number", "question type"} | |
| def _page_to_response(page: Page) -> schemas.PageResponse: | |
| return schemas.PageResponse.model_validate(page) | |
| def _create_project_for_upload(db: Session) -> Project: | |
| """μ λ‘λ μΈμ μ© μ κ· νλ‘μ νΈ μμ±""" | |
| project = crud.create_project( | |
| db=db, | |
| project=schemas.ProjectCreate( | |
| project_name=DEFAULT_PROJECT_NAME, | |
| doc_type_id=DEFAULT_DOC_TYPE_ID, | |
| analysis_mode=DEFAULT_ANALYSIS_MODE, | |
| ), | |
| user_id=DEFAULT_USER_ID, | |
| ) | |
| logger.info("μ λ‘λμ© νλ‘μ νΈ μμ± - ProjectID: {}", project.project_id) | |
| return project | |
| def _calculate_next_page_number(db: Session, project_id: int) -> int: | |
| """ | |
| λ€μ νμ΄μ§ λ²νΈ μλ κ³μ° | |
| κ°μ νλ‘μ νΈ λ΄μμ κ°μ₯ ν° page_numberλ₯Ό μ°Ύμμ +1 λ°ν | |
| """ | |
| max_page = db.query(func.max(Page.page_number))\ | |
| .filter(Page.project_id == project_id)\ | |
| .scalar() | |
| next_page_number = (max_page or 0) + 1 | |
| logger.debug(f"λ€μ νμ΄μ§ λ²νΈ κ³μ° - ProjectID: {project_id}, NextPage: {next_page_number}") | |
| return next_page_number | |
| async def upload_page( | |
| file: UploadFile = File(..., description="νμ΄μ§ μ΄λ―Έμ§ λλ PDF νμΌ"), | |
| project_id: Optional[int] = Form(None, description="μ λ‘λ λμ νλ‘μ νΈ ID (μλ΅ μ μ κ· μμ±)"), | |
| db: Session = Depends(get_db), | |
| ) -> Union[schemas.PageResponse, schemas.MultiPageCreateResponse]: | |
| """ | |
| νμ΄μ§ μ λ‘λ (μ΄λ―Έμ§ λλ PDF) | |
| λμ λ°©μ: | |
| - μ΅μ΄ μ λ‘λ: project_id μμ΄ νΈμΆνλ©΄ μλ²κ° temp νλ‘μ νΈλ₯Ό μμ± | |
| - μ΄ν μ λ‘λ: λμΌν project_idλ₯Ό μ λ¬νλ©΄ ν΄λΉ νλ‘μ νΈμ νμ΄μ§λ₯Ό μΆκ° | |
| - page_numberλ νλ‘μ νΈ λ΄μμ μλ μ¦κ° | |
| μ²λ¦¬ λ°©μ: | |
| - μ΄λ―Έμ§ μ λ‘λ: λ¨μΌ νμ΄μ§ μμ± | |
| - PDF μ λ‘λ: λ€μ€ νμ΄μ§ μλ μμ± | |
| """ | |
| if project_id is not None: | |
| project = crud.get_project(db, project_id) | |
| if not project: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="νλ‘μ νΈλ₯Ό μ°Ύμ μ μμ΅λλ€.", | |
| ) | |
| else: | |
| project = _create_project_for_upload(db) | |
| project_id = project.project_id | |
| # 1. λ€μ νμ΄μ§ λ²νΈ μλ κ³μ° | |
| page_number = _calculate_next_page_number(db, project_id) | |
| # 3. PDF μ λ‘λ λΆκΈ° μ²λ¦¬ | |
| if file.content_type == "application/pdf": | |
| logger.info(f"PDF μ λ‘λ μμ - ProjectID: {project_id}, StartPage: {page_number}") | |
| # PDF λ°μ΄νΈ μ½κΈ° | |
| pdf_bytes = await file.read() | |
| try: | |
| # PDF β μ΄λ―Έμ§ λ³ν (page_numberλΆν° μμ) | |
| converted_pages = pdf_processor.convert_pdf_to_images_parallel( | |
| pdf_bytes=pdf_bytes, | |
| project_id=project_id, | |
| start_page_number=page_number | |
| ) | |
| logger.info(f"PDF λ³ν μλ£ - {len(converted_pages)}κ° νμ΄μ§") | |
| # λΉλκΈ° λ³λ ¬ νμ΄μ§ μμ± (Semaphoreλ‘ λμμ± μ ν) | |
| semaphore = asyncio.Semaphore(5) | |
| created_pages = [] | |
| async def save_page(page_info: dict): | |
| async with semaphore: | |
| page_create = schemas.PageCreate( | |
| project_id=project_id, | |
| page_number=page_info['page_number'], | |
| image_path=page_info['image_path'], | |
| image_width=page_info['width'], | |
| image_height=page_info['height'] | |
| ) | |
| page = crud.create_page(db, page_create) | |
| created_pages.append(page) | |
| logger.debug(f"νμ΄μ§ {page_info['page_number']} μ μ₯ μλ£") | |
| # λ³λ ¬ μ μ₯ μ€ν | |
| await asyncio.gather(*(save_page(info) for info in converted_pages)) | |
| logger.info(f"PDF μ λ‘λ μλ£ - ProjectID: {project_id}, {len(created_pages)}κ° νμ΄μ§ μμ±") | |
| return schemas.MultiPageCreateResponse( | |
| project_id=project_id, | |
| total_created=len(created_pages), | |
| source_type="pdf", | |
| pages=[_page_to_response(p) for p in created_pages], | |
| ) | |
| except ValueError as ve: | |
| logger.error(f"PDF μ²λ¦¬ μ€ν¨: {str(ve)}") | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=f"PDF μ²λ¦¬ μ€ν¨: {str(ve)}" | |
| ) from ve | |
| except Exception as exc: | |
| logger.error(f"PDF μ λ‘λ μ€ μ€λ₯: {str(exc)}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"PDF μ λ‘λ μ€ν¨: {str(exc)}" | |
| ) from exc | |
| # 4. λ¨μΌ μ΄λ―Έμ§ μ λ‘λ μ²λ¦¬ | |
| else: | |
| logger.info(f"μ΄λ―Έμ§ μ λ‘λ - ProjectID: {project_id}, PageNumber: {page_number}") | |
| suffix = Path(file.filename or "").suffix or ".png" | |
| filename = f"page_{page_number}_{uuid4().hex}{suffix}" | |
| project_dir = UPLOAD_DIR / str(project_id) | |
| project_dir.mkdir(parents=True, exist_ok=True) | |
| file_path = project_dir / filename | |
| content = await file.read() | |
| file_path.write_bytes(content) | |
| try: | |
| image = cv2.imread(str(file_path)) | |
| if image is None: | |
| raise ValueError("μ΄λ―Έμ§λ₯Ό μ½μ μ μμ΅λλ€.") | |
| height, width = image.shape[:2] | |
| except Exception as exc: # pylint: disable=broad-except | |
| file_path.unlink(missing_ok=True) | |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"μ΄λ―Έμ§ μ²λ¦¬ μ€ν¨: {exc}") from exc | |
| public_path = PUBLIC_UPLOAD_ROOT / str(project_id) / filename | |
| stored_path = str(public_path).replace("\\", "/") | |
| page_create = schemas.PageCreate( | |
| project_id=project_id, | |
| page_number=page_number, | |
| image_path=stored_path, | |
| image_width=width, | |
| image_height=height, | |
| ) | |
| page = crud.create_page(db, page_create) | |
| logger.info(f"μ΄λ―Έμ§ μ λ‘λ μλ£ - ProjectID: {project_id}, PageID: {page.page_id}") | |
| return _page_to_response(page) | |
| def get_page_detail( | |
| page_id: int, | |
| include_layout: bool = Query(False, description="λ μ΄μμ μμ ν¬ν¨ μ¬λΆ"), | |
| include_text: bool = Query(False, description="ν μ€νΈ μ½ν μΈ ν¬ν¨ μ¬λΆ"), # noqa: ARG001 | |
| db: Session = Depends(get_db), | |
| ) -> schemas.PageResponse: | |
| # include_layout=TrueμΌ λ κ΄κ³ λ‘λ | |
| if include_layout: | |
| page = crud.get_page_with_elements(db, page_id) | |
| else: | |
| page = crud.get_page(db, page_id) | |
| if not page: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="νμ΄μ§λ₯Ό μ°Ύμ μ μμ΅λλ€." | |
| ) | |
| return _page_to_response(page) | |
| def list_project_pages( | |
| project_id: int, | |
| db: Session = Depends(get_db), | |
| include_error: bool = False, | |
| ) -> List[schemas.PageResponse]: | |
| pages = crud.get_pages_by_project(db, project_id) | |
| if not include_error: | |
| pages = [page for page in pages if page.analysis_status != AnalysisStatusEnum.ERROR] | |
| return [_page_to_response(page) for page in pages] | |
| def get_page_text( | |
| page_id: int, | |
| db: Session = Depends(get_db), | |
| ) -> schemas.PageTextResponse: | |
| """ | |
| is_current=TrueμΈ μ΅μ ν μ€νΈ λ²μ μ λ°νν©λλ€. | |
| """ | |
| version_data = get_current_page_text(db, page_id) | |
| if not version_data: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="ν΄λΉ νμ΄μ§μ ν μ€νΈ λ²μ μ μ°Ύμ μ μμ΅λλ€.", | |
| ) | |
| return schemas.PageTextResponse.model_validate(version_data) | |
| def save_page_text( | |
| page_id: int, | |
| payload: schemas.PageTextUpdate, | |
| db: Session = Depends(get_db), | |
| ) -> schemas.PageTextResponse: | |
| """ | |
| μ¬μ©μ νΈμ§ λ΄μ©μ μ ν μ€νΈ λ²μ μΌλ‘ μ μ₯ν©λλ€. | |
| """ | |
| try: | |
| version_data = save_user_edited_version( | |
| db, | |
| page_id, | |
| payload.content, | |
| user_id=payload.user_id, | |
| ) | |
| return schemas.PageTextResponse.model_validate(version_data) | |
| except ValueError as value_error: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=str(value_error), | |
| ) from value_error | |
| except Exception as error: # pylint: disable=broad-except | |
| logger.error( | |
| "νμ΄μ§ ν μ€νΈ μ μ₯ μ€ν¨: page_id={} / error={}", | |
| page_id, | |
| error, | |
| exc_info=True, | |
| ) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="νμ΄μ§ ν μ€νΈ μ μ₯ μ€ μ€λ₯κ° λ°μνμ΅λλ€.", | |
| ) from error | |
| def get_page_stats( | |
| page_id: int, | |
| db: Session = Depends(get_db), | |
| ) -> schemas.PageStatsResponse: | |
| """ | |
| νμ΄μ§λ³ λ μ΄μμ ν΅κ³λ₯Ό κ³μ°νμ¬ λ°νν©λλ€. | |
| - μ΄ λ μ΄μμ μμ μ | |
| - μ΅μ»€ μμ(question_number, question_type) μ | |
| - ν΄λμ€λ³ μμ λΆν¬ | |
| - ν΄λμ€λ³ νκ· μ λ’°λ | |
| - μ²λ¦¬ μκ° | |
| """ | |
| page = crud.get_page_with_elements(db, page_id) | |
| if not page: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="νμ΄μ§λ₯Ό μ°Ύμ μ μμ΅λλ€.", | |
| ) | |
| distribution: Dict[str, int] = {} | |
| confidence_sums: Dict[str, float] = {} | |
| confidence_counts: Dict[str, int] = {} | |
| for element in page.layout_elements: | |
| class_name = element.class_name or "unknown" | |
| distribution[class_name] = distribution.get(class_name, 0) + 1 | |
| if element.confidence is not None: | |
| confidence_sums[class_name] = confidence_sums.get(class_name, 0.0) + float(element.confidence) | |
| confidence_counts[class_name] = confidence_counts.get(class_name, 0) + 1 | |
| confidence_scores: Dict[str, float] = {} | |
| for class_name, total in confidence_sums.items(): | |
| count = confidence_counts.get(class_name, 0) | |
| if count: | |
| confidence_scores[class_name] = total / count | |
| anchor_count = sum( | |
| 1 for element in page.layout_elements if element.class_name in ANCHOR_CLASS_NAMES | |
| ) | |
| return schemas.PageStatsResponse( | |
| page_id=page.page_id, | |
| project_id=page.project_id, | |
| total_elements=len(page.layout_elements), | |
| anchor_element_count=anchor_count, | |
| processing_time=page.processing_time, | |
| class_distribution=distribution, | |
| confidence_scores=confidence_scores, | |
| ) | |