Spaces:
Sleeping
Sleeping
| # -*- coding: utf-8 -*- | |
| """ | |
| SmartEyeSsen Analysis Service (v1.1 - Duplicate Detection Filter Added) | |
| ======================================================================== | |
| 학습지 분석 서비스 - 레이아웃 분석, OCR, AI 설명 생성을 담당합니다. | |
| Refactored from api_server.py WorksheetAnalyzer class. | |
| 주요 변경사항 (DB 통합 버전): | |
| - analyze_layout: DocLayout-YOLO 결과를 layout_elements 테이블에 저장 후 ORM 객체 반환 | |
| - perform_ocr: text_contents 테이블에 OCR 결과 upsert | |
| - call_openai_api / call_openai_api_async: ai_descriptions 테이블에 설명 텍스트 upsert | |
| - 중복 탐지 필터링(IoU 기반) 로직 유지 | |
| """ | |
| import asyncio | |
| import base64 | |
| import colorsys | |
| import io | |
| import os | |
| import platform | |
| import random | |
| from dataclasses import dataclass | |
| from typing import Dict, List, Optional | |
| import cv2 | |
| import numpy as np | |
| import openai | |
| import pytesseract | |
| import torch | |
| from PIL import Image | |
| from loguru import logger | |
| from openai import AsyncOpenAI | |
| from sqlalchemy.orm import Session | |
| try: | |
| import google.generativeai as genai | |
| GENAI_AVAILABLE = True | |
| except ImportError: | |
| GENAI_AVAILABLE = False | |
| logger.warning("⚠️ google-generativeai 패키지가 설치되지 않았습니다. Tesseract OCR만 사용 가능합니다.") | |
| from .. import models | |
| from .model_registry import model_registry | |
| class GeminiOCRError(Exception): | |
| """Gemini OCR 처리 중 발생하는 예외.""" | |
| class GeminiOCRJob: | |
| element: models.LayoutElement | |
| cls_name: str | |
| pil_image: Image.Image | |
| cropped_img: np.ndarray | |
| class GeminiOCRResult: | |
| job: GeminiOCRJob | |
| text: str = "" | |
| engine: str = "Gemini-2.5-Flash-Lite" | |
| error: Optional[Exception] = None | |
| def _safe_int(value: Optional[str], default: int) -> int: | |
| try: | |
| return int(value) if value is not None else default | |
| except ValueError: | |
| logger.warning( | |
| f"⚠️ 환경 변수 값이 정수가 아닙니다. 기본값 {default} 사용: {value}" | |
| ) | |
| return default | |
| def _safe_float(value: Optional[str], default: float) -> float: | |
| try: | |
| return float(value) if value is not None else default | |
| except ValueError: | |
| logger.warning( | |
| f"⚠️ 환경 변수 값이 실수가 아닙니다. 기본값 {default} 사용: {value}" | |
| ) | |
| return default | |
| GEMINI_MAX_CONCURRENCY = _safe_int(os.getenv("GEMINI_MAX_CONCURRENCY"), 30) | |
| GEMINI_MAX_RETRIES = _safe_int(os.getenv("GEMINI_MAX_RETRIES"), 3) | |
| GEMINI_RETRY_BASE_DELAY = _safe_float(os.getenv("GEMINI_RETRY_BASE_DELAY"), 1.5) | |
| GEMINI_SAFETY_SETTINGS = [ | |
| {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"}, | |
| {"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"}, | |
| {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"}, | |
| {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"}, | |
| ] | |
| # --- 신규: 이미지 설명을 위한 프롬프트 템플릿 추가 (규정 기반 최적화) --- | |
| figure_prompt = """ | |
| 당신은 시각장애 학생을 위한 이미지 설명 전문가입니다. | |
| 역할: | |
| - 한국 점자 도서 규정(제2절 시각 자료, p.82-91)을 정확히 따릅니다. | |
| - 초등학생(8-14세)이 이해할 수 있는 쉬운 말을 씁니다. | |
| - 점자 독자가 촉각으로 정보를 파악하도록 합니다. | |
| - 객관적 사실만 설명하고 주관적 판단은 피합니다. | |
| 설명 규칙 (한국 점자 규정 준수): | |
| 1. 제목 확인: 원본에 제목이 있으면 그대로 사용, 없으면 '제목 없음'으로 표기 | |
| 2. 그림/사진 유형: 실사, 삽화, 도형, 사진 등 구체적으로 명시 | |
| 3. 읽기 순서: 왼쪽→오른쪽, 위→아래 순서 준수 | |
| 4. 구조 설명: | |
| - 배경: 사진/그림의 배경이나 설정 | |
| - 중심 피사체: 가장 중요한 대상 | |
| - 위치 관계: 요소들 간의 공간적 배치 | |
| 5. 세부 설명: | |
| - 크기, 색상, 재질 등 물리적 특성 | |
| - 표정, 자세, 동작(인물/동물 포함 시) | |
| - 숫자나 문자가 있으면 명확하게 전달 | |
| 6. 문장 길이: 초등학생 이해도 기준, 30자 이내 권장 | |
| 7. 추측 금지: 원본에 없는 정보는 절대 추가하지 않음 | |
| [출력 형식] | |
| 자연스러운 문장형 설명 (규정 준수 + 아동 친화적): | |
| - 첫 문장: 제목과 유형을 자연스럽게 소개 | |
| 예) "제목은 '○○○'이고, 사진입니다." | |
| - 둘째 문장: 배경이나 전체 구성 | |
| 예) "뒷부분에는 △△△가 있습니다." | |
| - 셋째 문장: 중심 피사체 위치와 특징 | |
| 예) "앞쪽 가운데에는 파란색 ▲▲▲가 있습니다." | |
| - 넷째 문장: 추가 요소들 (왼쪽→오른쪽, 위→아래 순서) | |
| 예) "왼쪽에는 작은 ○○○ 2개가 있고, 모두 빨간색입니다." | |
| - 마지막 문장: 문자나 숫자가 있으면 명시 | |
| 예) "오른쪽 아래에는 '2023년'이라는 글자가 있습니다." | |
| 주의사항: | |
| - 각 문장은 30자 이내로 유지 | |
| - 위→아래, 왼쪽→오른쪽 읽기 순서 준수 | |
| - 추측이나 해석 금지, 보이는 것만 설명 | |
| """ | |
| table_prompt = """ | |
| 당신은 시각장애 학생을 위한 표 설명 전문가입니다. | |
| 역할: | |
| - 한국 점자 도서 규정(제3장 제1절 표, p.62-81)을 정확히 따릅니다. | |
| - 특히 simple_table 규정(수차/몇 개 단어 표, p.65-66)을 적용합니다. | |
| - 초등학생(8-14세)이 이해할 수 있는 쉬운 말을 씁니다. | |
| - 표의 구조를 명확하게 전달합니다. | |
| - 점자 독자가 행별로 정확하게 파악할 수 있게 합니다. | |
| 설명 규칙 (한국 점자 규정 준수): | |
| 1. 제목과 주제: | |
| - 표의 제목이 있으면 우선 표기 (없으면 '제목 없음') | |
| - 표가 무엇을 보여주는지 한 문장으로 설명 | |
| 2. 단위 표시: | |
| - 수치에 반드시 단위 포함 (예: "20명", "50%", "3kg") | |
| - 단위가 표 전체에 동일하면 처음에만 표기 | |
| 3. 행별 순차 설명: | |
| - 첫 번째 행부터 마지막 행까지 순서대로 | |
| - 각 행 내에서는 왼쪽→오른쪽 순서 | |
| - 행 제목과 데이터를 명확히 연결 | |
| [출력 형식] | |
| 표의 제목은 '[제목]'입니다. [표가 보여주는 내용 한 문장]. | |
| [행_제목1]: [데이터1], [데이터2], [데이터3] | |
| [행_제목2]: [데이터1], [데이터2], [데이터3] | |
| [행_제목3]: [데이터1], [데이터2], [데이터3] | |
| 예시: | |
| 표의 제목은 '학생 수'입니다. 3개 학년의 학생 수를 보여줍니다. | |
| 1학년: 20명, 남학생 10명, 여학생 10명 | |
| 2학년: 25명, 남학생 12명, 여학생 13명 | |
| 3학년: 22명, 남학생 11명, 여학생 11명 | |
| """ | |
| flowchart_prompt = """ | |
| 당신은 시각장애 학생을 위한 순서도 설명 전문가입니다. | |
| 역할: | |
| - 한국 점자 도서 규정(제2절, p.108-112)을 정확히 따릅니다. | |
| - 초등학생(8-14세)이 이해할 수 있는 쉬운 말을 씁니다. | |
| - 순서도의 흐름과 논리를 명확하게 전달합니다. | |
| - 점자 독자가 단계별 진행을 정확하게 파악할 수 있게 합니다. | |
| 설명 규칙 (한국 점자 규정 준수): | |
| 1. 제목 소개: | |
| - 이 순서도가 무엇에 대한 것인지 쉽게 설명 | |
| - 어린이가 쉽게 이해할 수 있는 언어 사용 | |
| 2. 노드(상자)의 내용: | |
| - 순서도의 각 상자에 있는 내용을 그대로 표기 | |
| - 숫자, 기호, 그림 설명 등을 명확하게 전달 | |
| 3. 순서대로 연결: | |
| - 각 노드를 화살표(→)로 순서대로 연결 | |
| - 읽기 순서: 위→아래, 왼쪽→오른쪽 준수 | |
| - 아이들이 따라가기 쉽게 한 줄 또는 간단한 형태로 표현 | |
| 4. 아동 친화적 표현: | |
| - 어려운 용어 피하기 (예: "노드" 대신 "상자", "단계" 등 불필요) | |
| - 직관적인 화살표 연결로 흐름을 보여주기 | |
| - 숫자나 기호가 있으면 그것들을 직접 사용 | |
| 5. 문장 규칙: | |
| - 초등학생(8-14세)이 쉽게 따라갈 수 있는 말 | |
| - 간단하고 명확한 표현 | |
| - 원본에 없는 정보는 추가하지 않음 | |
| [출력 형식] | |
| 이 순서도는 '[제목]'에 대한 것입니다. | |
| [내용1] → [내용2] → [내용3] → [필요 시 계속] | |
| 예시 1 (단순 흐름): | |
| 이 순서도는 '나비의 생활 주기'에 대한 것입니다. | |
| 알 → 애벌레 → 번데기 → 나비 | |
| 예시 2 (숫자/기호가 있는 경우): | |
| 이 순서도는 '숫자의 변화'에 대한 것입니다. | |
| ㉡, 3 → 7 | |
| 예시 3 (동작이 있는 경우): | |
| 이 순서도는 '물을 마시는 방법'에 대한 것입니다. | |
| 컵을 집는다 → 입에 대고 기울인다 → 물을 마신다 | |
| 예시 4 (이미지 설명이 있는 경우): | |
| 이 순서도는 '씨앗에서 꽃이 자라는 과정'에 대한 것입니다. | |
| 씨앗 → 싹이 난다 → 잎이 자란다 → 줄기가 자란다 → 꽃이 핀다 | |
| """ | |
| # --- 신규: IoU 계산 함수 추가 --- | |
| def calculate_iou(box1, box2): | |
| """두 바운딩 박스 간의 IoU(Intersection over Union) 계산""" | |
| # box 형식: [x1, y1, x2, y2] | |
| x1_inter = max(box1[0], box2[0]) | |
| y1_inter = max(box1[1], box2[1]) | |
| x2_inter = min(box1[2], box2[2]) | |
| y2_inter = min(box1[3], box2[3]) | |
| inter_area = max(0, x2_inter - x1_inter) * max(0, y2_inter - y1_inter) | |
| box1_area = (box1[2] - box1[0]) * (box1[3] - box1[1]) | |
| box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1]) | |
| union_area = box1_area + box2_area - inter_area | |
| if union_area == 0: | |
| return 0.0 | |
| return inter_area / union_area | |
| # --- 신규: 중복 제거 후처리 함수 추가 --- | |
| def filter_duplicate_detections(boxes, classes, confs, class_names, iou_threshold=0.7): | |
| """ | |
| 모든 클래스 쌍에 대해 IoU 기반으로 중복 탐지를 필터링. (자동 방식) | |
| 신뢰도가 낮은 쪽을 제거. | |
| """ | |
| num_detections = len(boxes) | |
| suppressed = [False] * num_detections # 제거할 요소 표시 | |
| indices = list(range(num_detections)) | |
| # 신뢰도 높은 순으로 정렬 (높은 것을 남기기 위함) | |
| indices.sort(key=lambda i: confs[i], reverse=True) | |
| for i in range(num_detections): | |
| idx1 = indices[i] | |
| if suppressed[idx1]: | |
| continue | |
| box1 = boxes[idx1] | |
| # cls_id1 = int(classes[idx1]) # 클래스 정보는 제거 로직에 불필요 | |
| # cls_name1 = class_names.get(cls_id1, f"unknown_{cls_id1}") # 클래스 정보는 제거 로직에 불필요 | |
| for j in range(i + 1, num_detections): | |
| idx2 = indices[j] | |
| if suppressed[idx2]: | |
| continue | |
| box2 = boxes[idx2] | |
| # cls_id2 = int(classes[idx2]) # 클래스 정보는 제거 로직에 불필요 | |
| # cls_name2 = class_names.get(cls_id2, f"unknown_{cls_id2}") # 클래스 정보는 제거 로직에 불필요 | |
| # --- 👇 수정된 부분 시작 👇 --- | |
| # 특정 클래스 쌍 확인 조건 제거: 모든 쌍에 대해 IoU 계산 | |
| # if (cls_name1, cls_name2) in problematic_pairs: # 이 조건 제거 | |
| iou = calculate_iou(box1, box2) | |
| if iou > iou_threshold: | |
| # 신뢰도 낮은 쪽(idx2)을 제거 대상으로 표시 | |
| suppressed[idx2] = True | |
| # 로그 메시지에서 클래스 이름 제거 (선택 사항) | |
| logger.debug( | |
| f"중복 탐지 제거: Box {idx2}(conf={confs[idx2]:.2f}) - " | |
| f"Box {idx1}(conf={confs[idx1]:.2f})와 IoU={iou:.2f} > {iou_threshold}" | |
| ) | |
| # --- 👆 수정된 부분 끝 👆 --- | |
| # 제거되지 않은 요소들의 인덱스 반환 | |
| final_indices = [i for i, s in enumerate(suppressed) if not s] | |
| logger.info( | |
| f"자동 중복 탐지 필터링: {num_detections}개 → {len(final_indices)}개 요소 (IoU > {iou_threshold})" | |
| ) # 로그 메시지 수정 | |
| return final_indices | |
| # Windows에서 Tesseract 경로 설정 (기존과 동일) | |
| if platform.system() == "Windows": | |
| pytesseract.pytesseract.tesseract_cmd = ( | |
| r"C:\Program Files\Tesseract-OCR\tesseract.exe" | |
| ) | |
| # 디바이스 설정 (기존과 동일) | |
| device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
| # Google Gemini API 초기화 | |
| GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") | |
| gemini_available = False | |
| if GENAI_AVAILABLE and GEMINI_API_KEY and GEMINI_API_KEY != "your_gemini_api_key_here": | |
| try: | |
| genai.configure(api_key=GEMINI_API_KEY) | |
| gemini_available = True | |
| logger.info("✅ Gemini API 초기화 완료 (OCR 엔진으로 사용 가능)") | |
| except Exception as e: | |
| logger.warning(f"⚠️ Gemini API 초기화 실패: {e} - Tesseract OCR로 대체됩니다") | |
| gemini_available = False | |
| else: | |
| if not GENAI_AVAILABLE: | |
| logger.info("ℹ️ google-generativeai 패키지 미설치 - Tesseract OCR 사용") | |
| else: | |
| logger.info("ℹ️ GEMINI_API_KEY 미설정 - Tesseract OCR 사용") | |
| class AnalysisService: | |
| """학습지 분석 서비스 - 상태 없는 함수형 디자인""" | |
| def __init__(self, model_choice: str = "SmartEyeSsen", auto_load: bool = False): | |
| """ | |
| 분석 서비스 초기화 | |
| Args: | |
| model_choice: 사용할 모델 선택 (기본값: "SmartEyeSsen") | |
| auto_load: True이면 초기화 시 자동으로 모델 로드 (기본값: False, 하위 호환성 유지) | |
| """ | |
| self.device = device | |
| self.model_choice = model_choice | |
| self.model_registry = model_registry | |
| self._model_handle = None | |
| self._model_loaded = False | |
| self.gemini_max_concurrency = max(1, GEMINI_MAX_CONCURRENCY) | |
| self.gemini_max_retries = max(1, GEMINI_MAX_RETRIES) | |
| self.gemini_retry_base_delay = max(0.1, GEMINI_RETRY_BASE_DELAY) | |
| # 자동 로드 옵션이 활성화된 경우 즉시 모델 로드 | |
| if auto_load: | |
| self._ensure_model_loaded() | |
| def _ensure_model_loaded(self, model_choice: Optional[str] = None): | |
| """ | |
| Lazy Loading: 모델이 로드되지 않았으면 자동으로 로드 | |
| (다중 페이지 처리 시 모델을 한 번만 로드하도록 최적화) | |
| """ | |
| target_model = model_choice or self.model_choice | |
| if ( | |
| self._model_loaded | |
| and self._model_handle is not None | |
| and self._model_handle.name == target_model | |
| ): | |
| return self._model_handle | |
| handle = self.model_registry.get_model(target_model, device=self.device) | |
| self._model_handle = handle | |
| self.model_choice = target_model | |
| self._model_loaded = True | |
| return handle | |
| def analyze_layout( | |
| self, | |
| image: np.ndarray, | |
| *, | |
| page_id: int, | |
| db: Session, | |
| model_choice: Optional[str] = None, | |
| ) -> List[models.LayoutElement]: | |
| """ | |
| 레이아웃 분석 + 중복 탐지 필터링 후 결과를 DB에 저장한다. | |
| Args: | |
| image: 분석할 이미지 (numpy array) | |
| page_id: 결과를 저장할 pages.page_id | |
| db: SQLAlchemy Session | |
| model_choice: 사용할 모델 (미지정 시 인스턴스 기본값 사용) | |
| Returns: | |
| DB에 저장된 LayoutElement ORM 객체 리스트 | |
| """ | |
| active_model = model_choice or self.model_choice | |
| try: | |
| # 모델 선택이 변경되었으면 재로드 | |
| if active_model != self.model_choice: | |
| logger.warning(f"모델 변경 감지: {self.model_choice} -> {active_model}") | |
| self.model_choice = active_model | |
| self._model_loaded = False | |
| # Lazy Loading: 모델이 없으면 자동 로드 | |
| handle = self._ensure_model_loaded(active_model) | |
| model = handle.model | |
| model_spec = handle.spec | |
| logger.info("레이아웃 분석 시작...") | |
| temp_path = "temp_image.jpg" | |
| cv2.imwrite(temp_path, image) | |
| imgsz, conf = model_spec.imgsz, model_spec.conf | |
| results = model.predict( | |
| temp_path, imgsz=imgsz, conf=conf, iou=0.45, device=self.device | |
| ) | |
| boxes = results[0].boxes.xyxy.cpu().numpy() # [x1, y1, x2, y2] | |
| classes = results[0].boxes.cls.cpu().numpy() | |
| confs = results[0].boxes.conf.cpu().numpy() | |
| class_names = model.names # 클래스 ID → 이름 | |
| detection_records: List[Dict[str, float]] = [] | |
| if not boxes.size: | |
| logger.warning("레이아웃 분석 결과, 감지된 요소가 없습니다.") | |
| return self._create_elements_from_layout( | |
| detections=detection_records, page_id=page_id, db=db | |
| ) | |
| final_indices = filter_duplicate_detections( | |
| boxes, classes, confs, class_names, iou_threshold=0.7 | |
| ) | |
| for i in final_indices: | |
| box = boxes[i] | |
| cls_id = int(classes[i]) | |
| conf_val = float(confs[i]) | |
| x1, y1, x2, y2 = map(int, box) | |
| cls_name = ( | |
| class_names.get(cls_id, f"unknown_{cls_id}") | |
| if isinstance(class_names, dict) | |
| else None | |
| ) | |
| if cls_name is None: | |
| try: | |
| cls_name = class_names[cls_id] | |
| except (IndexError, KeyError): | |
| cls_name = f"unknown_{cls_id}" | |
| width = x2 - x1 | |
| height = y2 - y1 | |
| area = width * height | |
| if area < 100: | |
| continue | |
| detection_records.append( | |
| { | |
| "class_name": cls_name, | |
| "confidence": conf_val, | |
| "bbox_x": x1, | |
| "bbox_y": y1, | |
| "bbox_width": width, | |
| "bbox_height": height, | |
| } | |
| ) | |
| elements = self._create_elements_from_layout( | |
| detections=detection_records, page_id=page_id, db=db | |
| ) | |
| logger.info(f"레이아웃 분석 완료: 최종 {len(elements)}개 요소 저장") | |
| return elements | |
| except Exception as e: | |
| logger.error(f"레이아웃 분석 실패: {e}", exc_info=True) | |
| return [] | |
| def _create_elements_from_layout( | |
| self, *, detections: List[Dict[str, float]], page_id: int, db: Session | |
| ) -> List[models.LayoutElement]: | |
| """ | |
| 감지 결과를 layout_elements 테이블에 저장하고 ORM 객체 리스트를 반환한다. | |
| """ | |
| logger.debug(f"페이지 {page_id} 기존 레이아웃 요소 정리") | |
| existing_elements = ( | |
| db.query(models.LayoutElement) | |
| .filter(models.LayoutElement.page_id == page_id) | |
| .all() | |
| ) | |
| for element in existing_elements: | |
| db.delete(element) | |
| db.flush() # CASCADE 관계 정리 | |
| if not detections: | |
| db.commit() | |
| return [] | |
| created_elements: List[models.LayoutElement] = [] | |
| for record in detections: | |
| element = models.LayoutElement( | |
| page_id=page_id, | |
| class_name=record["class_name"], | |
| confidence=record["confidence"], | |
| bbox_x=int(record["bbox_x"]), | |
| bbox_y=int(record["bbox_y"]), | |
| bbox_width=int(record["bbox_width"]), | |
| bbox_height=int(record["bbox_height"]), | |
| ) | |
| db.add(element) | |
| created_elements.append(element) | |
| db.flush() | |
| db.commit() | |
| for element in created_elements: | |
| db.refresh(element) | |
| return created_elements | |
| def _upsert_text_content( | |
| self, | |
| *, | |
| db: Session, | |
| element_id: int, | |
| ocr_text: str, | |
| ocr_engine: str, | |
| language: str, | |
| ocr_confidence: Optional[float] = None, | |
| ) -> models.TextContent: | |
| """ | |
| 텍스트 콘텐츠를 생성하거나 업데이트한다. | |
| """ | |
| existing = ( | |
| db.query(models.TextContent) | |
| .filter(models.TextContent.element_id == element_id) | |
| .one_or_none() | |
| ) | |
| if existing: | |
| existing.ocr_text = ocr_text | |
| existing.ocr_engine = ocr_engine | |
| existing.language = language | |
| existing.ocr_confidence = ocr_confidence | |
| db.flush() | |
| return existing | |
| content = models.TextContent( | |
| element_id=element_id, | |
| ocr_text=ocr_text, | |
| ocr_engine=ocr_engine, | |
| ocr_confidence=ocr_confidence, | |
| language=language, | |
| ) | |
| db.add(content) | |
| db.flush() | |
| return content | |
| def _upsert_ai_descriptions( | |
| self, | |
| *, | |
| db: Session, | |
| descriptions: Dict[int, str], | |
| model_name: str, | |
| prompt: Optional[str], | |
| ) -> List[models.AIDescription]: | |
| """ | |
| AI 설명을 생성하거나 갱신한다. | |
| """ | |
| saved_records: List[models.AIDescription] = [] | |
| for element_id, description in descriptions.items(): | |
| existing = ( | |
| db.query(models.AIDescription) | |
| .filter(models.AIDescription.element_id == element_id) | |
| .one_or_none() | |
| ) | |
| if existing: | |
| existing.description = description | |
| existing.ai_model = model_name | |
| existing.prompt_used = prompt | |
| db.flush() | |
| saved_records.append(existing) | |
| continue | |
| record = models.AIDescription( | |
| element_id=element_id, | |
| description=description, | |
| ai_model=model_name, | |
| prompt_used=prompt, | |
| ) | |
| db.add(record) | |
| saved_records.append(record) | |
| db.flush() | |
| db.commit() | |
| for record in saved_records: | |
| db.refresh(record) | |
| return saved_records | |
| def call_openai_api( | |
| self, | |
| image: np.ndarray, | |
| layout_elements: List[models.LayoutElement], | |
| *, | |
| api_key: Optional[str], | |
| db: Session, | |
| model_name: str = "gpt-4-turbo", | |
| ) -> Dict[int, str]: | |
| """OpenAI API 호출 및 ai_descriptions 테이블 저장""" | |
| if not api_key: | |
| logger.warning("API 키가 없어 AI 설명 생성을 건너뜁니다.") | |
| return {} | |
| target_classes = ["figure", "table", "flowchart"] | |
| ai_descriptions: Dict[int, str] = {} | |
| try: | |
| client = openai.OpenAI(api_key=api_key) | |
| logger.info("OpenAI API 처리 시작...") | |
| except Exception as e: | |
| logger.error(f"OpenAI 클라이언트 초기화 실패: {e}") | |
| return {} | |
| prompts = { | |
| "figure": figure_prompt, | |
| "table": table_prompt, | |
| "flowchart": flowchart_prompt, | |
| } | |
| system_prompt = ( | |
| "당신은 시각 장애 아동 학습 AI 비서입니다. " | |
| "시각 자료 내용을 한국어로 간결하고 명확하게 설명하세요. " | |
| "음성 변환 시 이해하기 쉽도록 직접적인 문장을 사용하세요." | |
| ) | |
| for element in layout_elements: | |
| cls_name = element.class_name | |
| if cls_name not in target_classes: | |
| continue | |
| x1, y1 = element.bbox_x, element.bbox_y | |
| x2, y2 = x1 + element.bbox_width, y1 + element.bbox_height | |
| if y2 <= y1 or x2 <= x1: | |
| continue | |
| cropped_img = image[y1:y2, x1:x2] | |
| pil_img = Image.fromarray(cv2.cvtColor(cropped_img, cv2.COLOR_BGR2RGB)) | |
| buffered = io.BytesIO() | |
| pil_img.save(buffered, format="PNG") | |
| img_base64 = base64.b64encode(buffered.getvalue()).decode("utf-8") | |
| prompt = prompts.get(cls_name, f"이 {cls_name} 내용 설명") | |
| try: | |
| response = client.chat.completions.create( | |
| model=model_name, | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": prompt}, | |
| { | |
| "type": "image_url", | |
| "image_url": { | |
| "url": f"data:image/png;base64,{img_base64}" | |
| }, | |
| }, | |
| ], | |
| }, | |
| ], | |
| temperature=0.2, | |
| max_tokens=600, | |
| ) | |
| description = response.choices[0].message.content.strip() | |
| ai_descriptions[element.element_id] = description | |
| logger.info(f"API 응답 완료: ID {element.element_id} - {cls_name}") | |
| except Exception as e: | |
| logger.error( | |
| f"API 요청 실패: ID {element.element_id} - {e}", exc_info=True | |
| ) | |
| saved = self._upsert_ai_descriptions( | |
| db=db, descriptions=ai_descriptions, model_name=model_name, prompt=None | |
| ) | |
| logger.info(f"OpenAI API 처리 완료: {len(saved)}개 설명 생성 및 저장") | |
| return ai_descriptions | |
| async def call_openai_api_async( | |
| self, | |
| image: np.ndarray, | |
| layout_elements: List[models.LayoutElement], | |
| api_key: str, | |
| *, | |
| db: Optional[Session] = None, | |
| model_name: str = "gpt-4-turbo", | |
| max_concurrent_requests: int = 15, | |
| ) -> Dict[int, str]: | |
| """ | |
| OpenAI API 비동기 병렬 호출 (성능 최적화 버전) | |
| Args: | |
| image: 원본 이미지 (BGR 포맷) | |
| layout_elements: 레이아웃 요소 리스트 | |
| api_key: OpenAI API 키 | |
| db: SQLAlchemy Session (선택, 제공 시 DB에 설명 저장) | |
| model_name: 사용할 OpenAI 모델 이름 | |
| max_concurrent_requests: 최대 동시 요청 수 (기본값: 5) | |
| Returns: | |
| Dict[int, str]: {element_id: AI 설명} 딕셔너리 | |
| 주요 개선사항: | |
| - 비동기 병렬 처리로 처리 시간 70% 단축 | |
| - asyncio.Semaphore로 Rate Limit 대응 | |
| - 지수 백오프 재시도 로직 (exponential backoff) | |
| """ | |
| if not api_key: | |
| logger.warning("API 키가 없어 AI 설명 생성을 건너뜁니다.") | |
| return {} | |
| # 1. 대상 클래스 필터링 (figure, table, flowchart만 처리) | |
| target_classes = ["figure", "table", "flowchart"] | |
| target_elements = [ | |
| elem for elem in layout_elements if elem.class_name in target_classes | |
| ] | |
| if not target_elements: | |
| logger.info("AI 설명 대상 요소가 없습니다.") | |
| return {} | |
| logger.info( | |
| f"OpenAI API 비동기 처리 시작... (총 {len(target_elements)}개 요소)" | |
| ) | |
| # 2. AsyncOpenAI 클라이언트 초기화 | |
| try: | |
| async_client = AsyncOpenAI(api_key=api_key) | |
| except Exception as e: | |
| logger.error(f"AsyncOpenAI 클라이언트 초기화 실패: {e}") | |
| return {} | |
| # 3. Semaphore로 동시 요청 수 제한 (Rate Limit 대응) | |
| semaphore = asyncio.Semaphore(max_concurrent_requests) | |
| # 4. 모든 비동기 태스크 생성 | |
| tasks = [ | |
| self._process_single_element_async( | |
| async_client=async_client, | |
| image=image, | |
| element=elem, | |
| semaphore=semaphore, | |
| model_name=model_name, | |
| ) | |
| for elem in target_elements | |
| ] | |
| # 5. 병렬 실행 (asyncio.gather) | |
| results = await asyncio.gather(*tasks, return_exceptions=True) | |
| # 6. 결과 매핑 및 예외 처리 | |
| ai_descriptions = {} | |
| success_count = 0 | |
| error_count = 0 | |
| for element, result in zip(target_elements, results): | |
| if isinstance(result, Exception): | |
| logger.error(f"API 실패: Element {element.element_id} - {result}") | |
| error_count += 1 | |
| elif result: # 성공 시 (빈 문자열이 아닌 경우) | |
| ai_descriptions[element.element_id] = result | |
| success_count += 1 | |
| logger.info( | |
| f"✅ API 성공: Element {element.element_id} ({element.class_name})" | |
| ) | |
| logger.info( | |
| f"OpenAI API 비동기 처리 완료: " | |
| f"성공 {success_count}건, 실패 {error_count}건 / 총 {len(target_elements)}건" | |
| ) | |
| if db and ai_descriptions: | |
| saved = self._upsert_ai_descriptions( | |
| db=db, descriptions=ai_descriptions, model_name=model_name, prompt=None | |
| ) | |
| logger.info(f"AI 설명 {len(saved)}건 저장 완료 (비동기)") | |
| return ai_descriptions | |
| async def _process_single_element_async( | |
| self, | |
| async_client: AsyncOpenAI, | |
| image: np.ndarray, | |
| element: models.LayoutElement, | |
| semaphore: asyncio.Semaphore, | |
| model_name: str, | |
| ) -> str: | |
| """ | |
| 단일 element에 대한 비동기 AI 설명 생성 (지수 백오프 재시도 포함) | |
| Args: | |
| async_client: AsyncOpenAI 클라이언트 | |
| image: 원본 이미지 | |
| element: 처리할 레이아웃 요소 | |
| semaphore: 동시 요청 수 제한용 Semaphore | |
| model_name: 사용할 OpenAI 모델 이름 | |
| Returns: | |
| str: AI 생성 설명 텍스트 | |
| 재시도 로직: | |
| - 최대 3회 재시도 | |
| - 대기 시간: 1초 → 2초 → 4초 (지수 백오프) | |
| """ | |
| # 1. 이미지 크롭 및 검증 | |
| x1, y1 = element.bbox_x, element.bbox_y | |
| x2, y2 = x1 + element.bbox_width, y1 + element.bbox_height | |
| # 크기 검증 | |
| if y2 <= y1 or x2 <= x1: | |
| logger.warning(f"유효하지 않은 BBox 크기: Element {element.element_id}") | |
| return "" | |
| # 이미지 크롭 | |
| cropped_img = image[y1:y2, x1:x2] | |
| # 2. PIL 이미지 변환 및 Base64 인코딩 | |
| pil_img = Image.fromarray(cv2.cvtColor(cropped_img, cv2.COLOR_BGR2RGB)) | |
| buffered = io.BytesIO() | |
| pil_img.save(buffered, format="PNG") | |
| img_base64 = base64.b64encode(buffered.getvalue()).decode("utf-8") | |
| # 3. 프롬프트 선택 | |
| prompts = { | |
| "figure": figure_prompt, | |
| "table": table_prompt, | |
| "flowchart": flowchart_prompt, | |
| } | |
| prompt = prompts.get(element.class_name, f"이 {element.class_name} 내용 설명") | |
| system_prompt = ( | |
| "당신은 시각 장애 아동 학습 AI 비서입니다. " | |
| "시각 자료 내용을 한국어로 간결, 명확하게 설명하세요. " | |
| "음성 변환 가능하게 직접적이고 이해하기 쉽게 작성하세요." | |
| ) | |
| # 4. 지수 백오프 재시도 로직 | |
| max_retries = 3 | |
| base_delay = 1.0 # 초 단위 | |
| async with semaphore: # Rate Limit 제어 | |
| for attempt in range(max_retries): | |
| try: | |
| # API 호출 | |
| response = await async_client.chat.completions.create( | |
| model=model_name, | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": prompt}, | |
| { | |
| "type": "image_url", | |
| "image_url": { | |
| "url": f"data:image/png;base64,{img_base64}" | |
| }, | |
| }, | |
| ], | |
| }, | |
| ], | |
| temperature=0.2, | |
| max_tokens=600, | |
| ) | |
| # 성공 시 결과 반환 | |
| description = response.choices[0].message.content.strip() | |
| logger.debug( | |
| f"API 응답 완료 (시도 {attempt + 1}/{max_retries}): " | |
| f"Element {element.element_id}" | |
| ) | |
| return description | |
| except openai.RateLimitError as e: | |
| # Rate Limit 오류: 지수 백오프 대기 후 재시도 | |
| if attempt < max_retries - 1: | |
| delay = base_delay * (2**attempt) # 1초 → 2초 → 4초 | |
| logger.warning( | |
| f"⚠️ Rate Limit 오류 (Element {element.element_id}): " | |
| f"{delay}초 대기 후 재시도 ({attempt + 1}/{max_retries})" | |
| ) | |
| await asyncio.sleep(delay) | |
| else: | |
| logger.error( | |
| f"❌ Rate Limit 오류 최종 실패 (Element {element.element_id}): {e}" | |
| ) | |
| raise # 최종 실패 시 예외 전파 | |
| except openai.APIError as e: | |
| # API 일반 오류: 지수 백오프 대기 후 재시도 | |
| if attempt < max_retries - 1: | |
| delay = base_delay * (2**attempt) | |
| logger.warning( | |
| f"⚠️ API 오류 (Element {element.element_id}): " | |
| f"{delay}초 대기 후 재시도 ({attempt + 1}/{max_retries}) - {e}" | |
| ) | |
| await asyncio.sleep(delay) | |
| else: | |
| logger.error( | |
| f"❌ API 오류 최종 실패 (Element {element.element_id}): {e}" | |
| ) | |
| raise | |
| except Exception as e: | |
| # 기타 예외: 즉시 실패 | |
| logger.error( | |
| f"❌ 예상치 못한 오류 (Element {element.element_id}): {e}", | |
| exc_info=True, | |
| ) | |
| raise | |
| # 모든 재시도 실패 시 빈 문자열 반환 (unreachable, but for type safety) | |
| return "" | |
| def visualize_results( | |
| self, image: np.ndarray, layout_elements: List[models.LayoutElement] | |
| ) -> np.ndarray: | |
| """결과 시각화 (기존과 동일)""" | |
| img_result = image.copy() | |
| overlay = image.copy() | |
| random.seed(42) | |
| unique_classes = list({elem.class_name for elem in layout_elements}) | |
| class_colors = {} | |
| for i, cls_name in enumerate(unique_classes): | |
| h, s, v = i / max(1, len(unique_classes)), 0.8, 0.9 | |
| r, g, b = colorsys.hsv_to_rgb(h, s, v) | |
| class_colors[cls_name] = (int(b * 255), int(g * 255), int(r * 255)) | |
| for element in layout_elements: | |
| x1, y1 = element.bbox_x, element.bbox_y | |
| x2, y2 = x1 + element.bbox_width, y1 + element.bbox_height | |
| cls_name, color = element.class_name, class_colors[element.class_name] | |
| cv2.rectangle(overlay, (x1, y1), (x2, y2), color, -1) | |
| cv2.rectangle(img_result, (x1, y1), (x2, y2), color, 2) | |
| label = f"{cls_name} ({element.confidence:.2f})" | |
| labelSize, _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1) | |
| y1_label = max(y1, labelSize[1] + 10) | |
| cv2.rectangle( | |
| img_result, | |
| (x1, y1_label - labelSize[1] - 10), | |
| (x1 + labelSize[0], y1_label), | |
| color, | |
| -1, | |
| ) | |
| cv2.putText( | |
| img_result, | |
| label, | |
| (x1, y1_label - 5), | |
| cv2.FONT_HERSHEY_SIMPLEX, | |
| 0.5, | |
| (255, 255, 255), | |
| 1, | |
| ) | |
| img_result = cv2.addWeighted(overlay, 0.2, img_result, 0.8, 0) | |
| return cv2.cvtColor(img_result, cv2.COLOR_BGR2RGB) | |
| async def perform_ocr_async( | |
| self, | |
| image: np.ndarray, | |
| layout_elements: List[models.LayoutElement], | |
| *, | |
| db: Session, | |
| language: str = "kor", | |
| use_gemini: bool = True, | |
| max_concurrent_requests: Optional[int] = None, | |
| ) -> List[models.TextContent]: | |
| """ | |
| Gemini 비동기 + Tesseract 동기 하이브리드 OCR 파이프라인. | |
| """ | |
| target_classes = [ | |
| "plain text", | |
| "unit", | |
| "question type", | |
| "question text", | |
| "question number", | |
| "title", | |
| "figure_caption", | |
| "table caption", | |
| "table footnote", | |
| "isolate_formula", | |
| "formula_caption", | |
| "list", | |
| "choices", | |
| "page", | |
| "second_question_number", | |
| ] | |
| tesseract_only_classes = {"question number", "second_question_number"} | |
| gemini_classes = [ | |
| cls for cls in target_classes if cls not in tesseract_only_classes | |
| ] | |
| ocr_results: List[models.TextContent] = [] | |
| tesseract_config = r"--oem 3 --psm 6" | |
| concurrency_limit = max( | |
| 1, max_concurrent_requests or self.gemini_max_concurrency | |
| ) | |
| logger.info( | |
| f"하이브리드 OCR 처리 시작... 총 {len(layout_elements)}개 레이아웃 요소 중 OCR 대상 필터링" | |
| ) | |
| logger.info(f" - Tesseract 전용 클래스 (2개): {sorted(tesseract_only_classes)}") | |
| logger.info(f" - Gemini API 사용 클래스 (13개): {gemini_classes}") | |
| logger.info(f" - Gemini API 가용 여부: {gemini_available}") | |
| if gemini_available and use_gemini: | |
| logger.info(f" - Gemini 동시 처리 제한: {concurrency_limit}") | |
| detected_classes = {elem.class_name for elem in layout_elements} | |
| logger.info(f" - 감지된 모든 클래스: {detected_classes}") | |
| gemini_jobs: List[GeminiOCRJob] = [] | |
| target_count = 0 | |
| for element in layout_elements: | |
| cls_name = element.class_name | |
| logger.debug( | |
| f"레이아웃 ID {element.element_id}: 클래스 '{cls_name}' 확인 중..." | |
| ) | |
| if cls_name not in target_classes: | |
| logger.debug(" → OCR 대상 아님") | |
| continue | |
| target_count += 1 | |
| logger.debug( | |
| f" → OCR 대상 {target_count}: ID {element.element_id} - 클래스 '{cls_name}'" | |
| ) | |
| x1, y1 = element.bbox_x, element.bbox_y | |
| x2, y2 = x1 + element.bbox_width, y1 + element.bbox_height | |
| x1, y1 = max(0, x1), max(0, y1) | |
| x2, y2 = min(image.shape[1], x2), min(image.shape[0], y2) | |
| if y2 <= y1 or x2 <= x1: | |
| logger.warning( | |
| f" → 유효하지 않은 BBox 크기: ID {element.element_id}, 건너뜀" | |
| ) | |
| continue | |
| cropped_img = image[y1:y2, x1:x2] | |
| should_use_gemini = ( | |
| use_gemini and gemini_available and cls_name in gemini_classes | |
| ) | |
| if should_use_gemini: | |
| pil_img = Image.fromarray(cv2.cvtColor(cropped_img, cv2.COLOR_BGR2RGB)) | |
| gemini_jobs.append( | |
| GeminiOCRJob( | |
| element=element, | |
| cls_name=cls_name, | |
| pil_image=pil_img, | |
| cropped_img=cropped_img, | |
| ) | |
| ) | |
| continue | |
| text = self._run_tesseract_ocr( | |
| cropped_img=cropped_img, | |
| language=language, | |
| config=tesseract_config, | |
| ) | |
| self._store_ocr_result( | |
| db=db, | |
| ocr_results=ocr_results, | |
| element=element, | |
| text=text, | |
| engine_name="Tesseract", | |
| language=language, | |
| cls_name=cls_name, | |
| ) | |
| if gemini_jobs: | |
| logger.info( | |
| f"Gemini OCR 비동기 처리 시작: {len(gemini_jobs)}개 요소 (동시 {concurrency_limit})" | |
| ) | |
| gemini_results = await self._execute_gemini_jobs_async( | |
| jobs=gemini_jobs, | |
| language=language, | |
| concurrency_limit=concurrency_limit, | |
| ) | |
| for result in gemini_results: | |
| element = result.job.element | |
| cls_name = result.job.cls_name | |
| text = result.text | |
| engine_name = result.engine | |
| if not text: | |
| warn_msg = ( | |
| f"⚠️ [{cls_name}] Gemini OCR 실패: ID {element.element_id}" | |
| ) | |
| if result.error: | |
| warn_msg += f" - {result.error}" | |
| logger.warning(warn_msg + " - Tesseract로 대체") | |
| text = self._run_tesseract_ocr( | |
| cropped_img=result.job.cropped_img, | |
| language=language, | |
| config=tesseract_config, | |
| ) | |
| engine_name = "Tesseract (Fallback)" | |
| else: | |
| logger.debug( | |
| f" → [{cls_name}] Gemini API 응답 성공: {len(text)}자" | |
| ) | |
| self._store_ocr_result( | |
| db=db, | |
| ocr_results=ocr_results, | |
| element=element, | |
| text=text, | |
| engine_name=engine_name, | |
| language=language, | |
| cls_name=cls_name, | |
| ) | |
| db.commit() | |
| for content in ocr_results: | |
| db.refresh(content) | |
| engine_summary = "하이브리드 OCR (Tesseract + Gemini-2.5-Flash-Lite)" | |
| logger.info( | |
| f"OCR 처리 완료 ({engine_summary}): {len(ocr_results)}개 텍스트 블록 저장" | |
| ) | |
| return ocr_results | |
| def perform_ocr( | |
| self, | |
| image: np.ndarray, | |
| layout_elements: List[models.LayoutElement], | |
| *, | |
| db: Session, | |
| language: str = "kor", | |
| use_gemini: bool = True, | |
| max_concurrent_requests: Optional[int] = None, | |
| ) -> List[models.TextContent]: | |
| """ | |
| 동기 환경 호환을 위한 래퍼. 비동기 컨텍스트에서는 `await perform_ocr_async()` 사용. | |
| """ | |
| try: | |
| asyncio.get_running_loop() | |
| except RuntimeError: | |
| return asyncio.run( | |
| self.perform_ocr_async( | |
| image, | |
| layout_elements, | |
| db=db, | |
| language=language, | |
| use_gemini=use_gemini, | |
| max_concurrent_requests=max_concurrent_requests, | |
| ) | |
| ) | |
| raise RuntimeError( | |
| "perform_ocr()는 비동기 이벤트 루프 안에서 호출할 수 없습니다. " | |
| "대신 await analysis_service.perform_ocr_async(...)를 사용하세요." | |
| ) | |
| def _run_tesseract_ocr( | |
| self, *, cropped_img: np.ndarray, language: str, config: str | |
| ) -> str: | |
| """Tesseract OCR 전처리 및 실행.""" | |
| gray_img = cv2.cvtColor(cropped_img, cv2.COLOR_BGR2GRAY) | |
| _, binary_img = cv2.threshold( | |
| gray_img, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU | |
| ) | |
| _ = cv2.medianBlur(binary_img, 3) # 노이즈 제거 (추후 필요 시 활용) | |
| pil_img = Image.fromarray(cropped_img) | |
| tess_lang = language or "kor+eng" | |
| if tess_lang == "kor": | |
| tess_lang = "kor+eng" | |
| return ( | |
| pytesseract.image_to_string(pil_img, lang=tess_lang, config=config) | |
| .strip() | |
| ) | |
| def _store_ocr_result( | |
| self, | |
| *, | |
| db: Session, | |
| ocr_results: List[models.TextContent], | |
| element: models.LayoutElement, | |
| text: str, | |
| engine_name: str, | |
| language: str, | |
| cls_name: str, | |
| ) -> None: | |
| """OCR 결과 DB 저장 + 로깅.""" | |
| normalized_text = (text or "").strip() | |
| if len(normalized_text) <= 1: | |
| logger.warning( | |
| f"⚠️ OCR 결과 없음 ({engine_name}): ID {element.element_id} ({cls_name})" | |
| ) | |
| return | |
| db_text = self._upsert_text_content( | |
| db=db, | |
| element_id=element.element_id, | |
| ocr_text=normalized_text, | |
| ocr_engine=engine_name, | |
| language=language, | |
| ocr_confidence=None, | |
| ) | |
| ocr_results.append(db_text) | |
| preview = normalized_text[:50].replace("\n", " ") | |
| logger.info( | |
| f"✅ OCR 성공 ({engine_name}): ID {element.element_id} ({cls_name}) - " | |
| f"'{preview}...' ({len(normalized_text)}자)" | |
| ) | |
| async def _execute_gemini_jobs_async( | |
| self, | |
| *, | |
| jobs: List[GeminiOCRJob], | |
| language: str, | |
| concurrency_limit: int, | |
| ) -> List[GeminiOCRResult]: | |
| """Gemini OCR 작업을 비동기로 실행.""" | |
| if not jobs: | |
| return [] | |
| semaphore = asyncio.Semaphore(max(1, concurrency_limit)) | |
| tasks = [ | |
| self.call_gemini_ocr_async( | |
| job=job, | |
| language=language, | |
| semaphore=semaphore, | |
| max_retries=self.gemini_max_retries, | |
| base_delay=self.gemini_retry_base_delay, | |
| ) | |
| for job in jobs | |
| ] | |
| return await asyncio.gather(*tasks) | |
| async def call_gemini_ocr_async( | |
| self, | |
| *, | |
| job: GeminiOCRJob, | |
| language: str, | |
| semaphore: asyncio.Semaphore, | |
| max_retries: int, | |
| base_delay: float, | |
| ) -> GeminiOCRResult: | |
| """단일 Gemini OCR 호출 (세마포어 + 재시도 포함).""" | |
| if not gemini_available: | |
| return GeminiOCRResult( | |
| job=job, text="", error=GeminiOCRError("Gemini API 비활성화") | |
| ) | |
| retries = max(1, max_retries) | |
| delay = max(0.1, base_delay) | |
| async with semaphore: | |
| for attempt in range(1, retries + 1): | |
| try: | |
| text = await asyncio.to_thread( | |
| self._generate_gemini_text, | |
| job.pil_image.copy(), | |
| language, | |
| ) | |
| if not text: | |
| raise GeminiOCRError("Gemini 응답이 비어 있습니다.") | |
| return GeminiOCRResult(job=job, text=text) | |
| except Exception as exc: | |
| if attempt < retries: | |
| wait_time = delay * (2 ** (attempt - 1)) | |
| logger.warning( | |
| f"⚠️ [{job.cls_name}] Gemini OCR 실패 (시도 {attempt}/{retries}): " | |
| f"{exc} - {wait_time:.1f}s 후 재시도" | |
| ) | |
| await asyncio.sleep(wait_time) | |
| else: | |
| logger.error( | |
| f"❌ [{job.cls_name}] Gemini OCR 최종 실패: {exc}" | |
| ) | |
| return GeminiOCRResult(job=job, text="", error=exc) | |
| return GeminiOCRResult(job=job, text="", error=GeminiOCRError("알 수 없는 오류")) | |
| def _generate_gemini_text( | |
| self, pil_img: Image.Image, language: str = "kor" | |
| ) -> str: | |
| """Gemini 2.5 Flash Lite 호출.""" | |
| if not gemini_available: | |
| raise GeminiOCRError("Gemini API가 활성화되지 않았습니다.") | |
| prompt = ( | |
| "Extract all text from this image exactly as it appears, regardless of language. " | |
| "Return only the plain text without any markdown formatting, translations, explanations, or additional comments." | |
| ) | |
| model = genai.GenerativeModel("gemini-2.5-flash-lite") | |
| response = model.generate_content( | |
| [pil_img, prompt], | |
| safety_settings=GEMINI_SAFETY_SETTINGS, | |
| ) | |
| candidates = getattr(response, "candidates", None) | |
| if not candidates: | |
| feedback = getattr(response, "prompt_feedback", None) | |
| raise GeminiOCRError(f"후보 응답 없음 (prompt_feedback={feedback})") | |
| first_candidate = candidates[0] | |
| parts = getattr(first_candidate.content, "parts", None) | |
| if not parts: | |
| raise GeminiOCRError("응답 콘텐츠가 비어 있습니다.") | |
| return response.text.strip() | |
| def analyze_page( | |
| *, | |
| page_id: int, | |
| image: np.ndarray, | |
| db: Session, | |
| api_key: Optional[str] = None, | |
| model_choice: Optional[str] = None, | |
| ) -> Dict[str, object]: | |
| """단일 페이지에 대한 전체 분석 파이프라인을 실행한다.""" | |
| service = AnalysisService( | |
| model_choice=model_choice or "SmartEyeSsen", auto_load=False | |
| ) | |
| layout_elements = service.analyze_layout( | |
| image=image, page_id=page_id, db=db, model_choice=model_choice | |
| ) | |
| text_contents = service.perform_ocr( | |
| image=image, layout_elements=layout_elements, db=db | |
| ) | |
| ai_descriptions: Dict[int, str] = {} | |
| if api_key: | |
| ai_descriptions = service.call_openai_api( | |
| image=image, layout_elements=layout_elements, api_key=api_key, db=db | |
| ) | |
| return { | |
| "layout_elements": layout_elements, | |
| "text_contents": text_contents, | |
| "ai_descriptions": ai_descriptions, | |
| } | |