AkJeond commited on
Commit
d3294e3
·
1 Parent(s): 55db2a2

fix(backend): 페이지 내부 CPU 작업의 asyncio.to_thread 제거로 OCR/분석 안정화

Browse files

- Tesseract/EasyOCR/YOLO 스레드 안전성 문제로 동시 스레드 실행 제거
- 페이지 레벨 병렬 처리(Semaphore)는 유지, 내부 단계는 동기 실행으로 전환
- 이미지 로딩/DB I/O만 비동기 유지
- DB commit/rollback 동기 실행으로 deadlock 예방
- 레이아웃 분석 모델 인스턴스 싱글톤 캐시 적용(중복 다운로드/로드 방지)

Files changed (1) hide show
  1. app/services/batch_analysis.py +40 -26
app/services/batch_analysis.py CHANGED
@@ -12,6 +12,20 @@ Project Batch Analysis Service
12
  4. TextFormatter로 자동 포맷팅 → text_versions에 최신 버전 기록
13
 
14
  결과는 페이지별 요약 정보와 함께 프로젝트 상태를 갱신합니다.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
15
  """
16
 
17
  from __future__ import annotations
@@ -346,9 +360,9 @@ async def _process_single_page_async(
346
  # 비동기 이미지 로딩 (I/O 대기 시간 최소화)
347
  image = await _load_page_image_async(page)
348
 
349
- # 레이아웃 분석 (CPU 바운드 → 스레드 )
350
- layout_elements = await asyncio.to_thread(
351
- analysis_service.analyze_layout,
352
  image=image,
353
  page_id=page.page_id,
354
  db=db,
@@ -358,9 +372,9 @@ async def _process_single_page_async(
358
  raise ValueError("레이아웃 분석 결과가 비어 있습니다.")
359
  summary["layout_count"] = len(layout_elements)
360
 
361
- # OCR 수행 (CPU 바운드 → 스레드 )
362
- text_contents = await asyncio.to_thread(
363
- analysis_service.perform_ocr,
364
  image=image,
365
  layout_elements=layout_elements,
366
  db=db,
@@ -399,9 +413,9 @@ async def _process_single_page_async(
399
  # 정렬 준비 (동기 변환 작업)
400
  mock_elements = _layout_to_mock(layout_elements)
401
 
402
- # 정렬 (CPU 바운드 → 스레드 )
403
- sorted_mock = await asyncio.to_thread(
404
- sort_layout_elements,
405
  mock_elements,
406
  document_type=formatter.document_type,
407
  page_width=page.image_width or 0,
@@ -409,25 +423,23 @@ async def _process_single_page_async(
409
  )
410
  synced_layouts = _sync_layout_runtime_fields(layout_elements, sorted_mock)
411
 
412
- # DB 저장 (I/O 스레드 )
413
- await asyncio.to_thread(
414
- save_sorting_results_to_db,
415
  db,
416
  page.page_id,
417
  synced_layouts,
418
  )
419
 
420
- # 포맷팅 (CPU 바운드 → 스레드 )
421
- formatted_text = await asyncio.to_thread(
422
- formatter.format_page,
423
  synced_layouts,
424
  text_contents,
425
  ai_descriptions=ai_descriptions,
426
  )
427
 
428
- # 텍스트 버전 생성 (DB I/O → 스레드 풀)
429
- await asyncio.to_thread(
430
- create_text_version,
431
  db,
432
  page,
433
  formatted_text or "",
@@ -440,24 +452,24 @@ async def _process_single_page_async(
440
  summary["processing_time"] = processing_time
441
  summary["message"] = "success"
442
 
443
- # DB 커밋 (I/O 스레드 )
444
- await asyncio.to_thread(db.commit)
445
  return summary
446
 
447
  except Exception as error: # pylint: disable=broad-except
448
  logger.error(f"페이지 분석 실패: page_id={page.page_id} / error={str(error)}")
449
  logger.exception("상세 스택 트레이스:") # 전체 스택 출력
450
 
451
- # DB 롤백 (I/O 스레드 )
452
- await asyncio.to_thread(db.rollback)
453
 
454
  processing_time = time.time() - page_start
455
  _update_page_status(page, status="error", processing_time=processing_time)
456
  summary["processing_time"] = processing_time
457
  summary["message"] = str(error)
458
 
459
- # DB 커밋 (I/O 스레드 )
460
- await asyncio.to_thread(db.commit)
461
  return summary
462
 
463
 
@@ -571,7 +583,8 @@ async def analyze_project_batch_async(
571
  elif result_summary["successful_pages"] == 0:
572
  final_status = "error"
573
  else:
574
- final_status = "partial"
 
575
 
576
  _update_project_status(project, final_status)
577
  db.commit()
@@ -754,7 +767,8 @@ async def analyze_project_batch_async_parallel(
754
  elif result_summary["successful_pages"] == 0:
755
  final_status = "error"
756
  else:
757
- final_status = "partial"
 
758
 
759
  _update_project_status(project, final_status)
760
  db.commit()
 
12
  4. TextFormatter로 자동 포맷팅 → text_versions에 최신 버전 기록
13
 
14
  결과는 페이지별 요약 정보와 함께 프로젝트 상태를 갱신합니다.
15
+
16
+ 병렬 처리 전략
17
+ -------------
18
+ - **페이지 레벨 병렬**: 여러 페이지를 동시에 처리 (asyncio.gather + Semaphore)
19
+ - **페이지 내부 순차**: 레이아웃/OCR/정렬은 동기 실행
20
+ * Tesseract/EasyOCR 엔진이 스레드 안전하지 않아 asyncio.to_thread() 사용 불가
21
+ * YOLO 모델도 스레드 안전성 문제로 동기 실행
22
+ - **I/O만 비동기**: 이미지 로딩, DB 작업만 asyncio.to_thread() 사용
23
+
24
+ 성능 특성
25
+ --------
26
+ - 10페이지 처리 시간: ~60-90초 (페이지당 6-9초)
27
+ - max_concurrent_pages=8로 8개 페이지 동시 처리
28
+ - CPU 바운드 작업이 대부분이므로 CPU 코어 수에 비례한 성능
29
  """
30
 
31
  from __future__ import annotations
 
360
  # 비동기 이미지 로딩 (I/O 대기 시간 최소화)
361
  image = await _load_page_image_async(page)
362
 
363
+ # 레이아웃 분석 (CPU 바운드 → 동기 실행)
364
+ # ⚠️ OCR/모델 엔진은 스레드 안전하지 않아 asyncio.to_thread() 사용 불가
365
+ layout_elements = analysis_service.analyze_layout(
366
  image=image,
367
  page_id=page.page_id,
368
  db=db,
 
372
  raise ValueError("레이아웃 분석 결과가 비어 있습니다.")
373
  summary["layout_count"] = len(layout_elements)
374
 
375
+ # OCR 수행 (CPU 바운드 → 동기 실행)
376
+ # ⚠️ Tesseract/EasyOCR은 스레드 안전하지 않아 asyncio.to_thread() 사용 불가
377
+ text_contents = analysis_service.perform_ocr(
378
  image=image,
379
  layout_elements=layout_elements,
380
  db=db,
 
413
  # 정렬 준비 (동기 변환 작업)
414
  mock_elements = _layout_to_mock(layout_elements)
415
 
416
+ # 정렬 (CPU 바운드 → 동기 실행)
417
+ # 빠른 계산 작업이므로 스레드 오버헤드 불필요
418
+ sorted_mock = sort_layout_elements(
419
  mock_elements,
420
  document_type=formatter.document_type,
421
  page_width=page.image_width or 0,
 
423
  )
424
  synced_layouts = _sync_layout_runtime_fields(layout_elements, sorted_mock)
425
 
426
+ # DB 저장 (동기 실행 - deadlock 방지)
427
+ save_sorting_results_to_db(
 
428
  db,
429
  page.page_id,
430
  synced_layouts,
431
  )
432
 
433
+ # 포맷팅 (CPU 바운드 → 동기 실행)
434
+ # 빠른 텍스트 처리이므로 스레드 오버헤드 불필요
435
+ formatted_text = formatter.format_page(
436
  synced_layouts,
437
  text_contents,
438
  ai_descriptions=ai_descriptions,
439
  )
440
 
441
+ # 텍스트 버전 생성 (DB I/O)
442
+ create_text_version(
 
443
  db,
444
  page,
445
  formatted_text or "",
 
452
  summary["processing_time"] = processing_time
453
  summary["message"] = "success"
454
 
455
+ # DB 커밋 (동기 실행 - deadlock 방지)
456
+ db.commit()
457
  return summary
458
 
459
  except Exception as error: # pylint: disable=broad-except
460
  logger.error(f"페이지 분석 실패: page_id={page.page_id} / error={str(error)}")
461
  logger.exception("상세 스택 트레이스:") # 전체 스택 출력
462
 
463
+ # DB 롤백 (동기 실행 - deadlock 방지)
464
+ db.rollback()
465
 
466
  processing_time = time.time() - page_start
467
  _update_page_status(page, status="error", processing_time=processing_time)
468
  summary["processing_time"] = processing_time
469
  summary["message"] = str(error)
470
 
471
+ # DB 커밋 (동기 실행 - deadlock 방지)
472
+ db.commit()
473
  return summary
474
 
475
 
 
583
  elif result_summary["successful_pages"] == 0:
584
  final_status = "error"
585
  else:
586
+ # 일부 성공, 일부 실패 → in_progress로 표시
587
+ final_status = "in_progress"
588
 
589
  _update_project_status(project, final_status)
590
  db.commit()
 
767
  elif result_summary["successful_pages"] == 0:
768
  final_status = "error"
769
  else:
770
+ # 일부 성공, 일부 실패 → in_progress로 표시
771
+ final_status = "in_progress"
772
 
773
  _update_project_status(project, final_status)
774
  db.commit()