KwanHak's picture
sync: Smart_Demo ๋ธŒ๋žœ์น˜์˜ Backend ์ฝ”๋“œ ๋ณ‘ํ•ฉ & ์ด๋ฏธ์ง€ ๋กœ๋“œ๋ฅผ ์œ„ํ•œ MultiFileLoader ์ปดํฌ๋„ŒํŠธ ๊ตฌํ˜„
82c1146
# -*- coding: utf-8 -*-
"""
SmartEyeSsen Layout Sorter (v.LayoutDetect.2.4 - Tie-breaker in Post-processing)
=================================================================================
๋ฌธ์ œ ๋ ˆ์ด์•„์›ƒ ์ •๋ ฌ ์•Œ๊ณ ๋ฆฌ์ฆ˜ ๊ตฌํ˜„ (Layout Type Detection ๊ธฐ๋ฐ˜ Hybrid)
ํŽ˜์ด์ง€ ์ „์ฒด ๋ ˆ์ด์•„์›ƒ ์œ ํ˜•(1๋‹จ, 2๋‹จ, ํ˜ผํ•ฉํ˜• ๋“ฑ)์„ ๋จผ์ € ํŒ๋ณ„ํ•˜๊ณ ,
์œ ํ˜•์— ๋งž๋Š” ๋ถ„ํ•  ์ „๋žต(์ˆ˜ํ‰/์ˆ˜์ง) ์ ์šฉ.
๋ถ„ํ•  ์‹คํŒจ ์‹œ(Base Case), ๋ ˆ์ด์•„์›ƒ ์œ ํ˜•๋ณ„๋กœ ํŠนํ™”๋œ ๊ทธ๋ฃนํ•‘ ๋กœ์ง ํ˜ธ์ถœ.
- ํ‘œ์ค€ 1๋‹จ/2๋‹จ ์ปฌ๋Ÿผ: _base_case_standard_1_column
- ํ˜ผํ•ฉํ˜•: _base_case_mixed_layout
์ตœ์ข… ๋ณ‘ํ•ฉ ์‹œ ์ „์—ญ ๊ณ ์•„ ๊ทธ๋ฃน ์ฒ˜๋ฆฌ ๋กœ์ง ์ ์šฉ.
์•Œ๊ณ ๋ฆฌ์ฆ˜ ํ๋ฆ„: (v.LayoutDetect.2.1/2.2/2.3๊ณผ ๋™์ผ)
0. ์ „์ฒ˜๋ฆฌ
1. ๋ ˆ์ด์•„์›ƒ ์œ ํ˜• ํŒ๋ณ„
2. ์œ ํ˜•๋ณ„ ์žฌ๊ท€ ์ฒ˜๋ฆฌ
3. Base Case ์ฒ˜๋ฆฌ (ํ›„์ฒ˜๋ฆฌ ํฌํ•จ)
4. ์ตœ์ข… ๋ณ‘ํ•ฉ ๋ฐ ์ˆœ์„œ ๋ถ€์—ฌ
v.LayoutDetect.2.4:
- _post_process_table_figure_assignment: ์ตœ์  ๊ทธ๋ฃน ํƒ์ƒ‰ ์‹œ Y ๊ฑฐ๋ฆฌ๊ฐ€ ๋™์ผํ•  ๊ฒฝ์šฐ ๋” ๋’ค์ชฝ ๊ทธ๋ฃน์„ ์šฐ์„ ํ•˜๋Š” Tie-breaker ์ถ”๊ฐ€.
- sort_layout_elements: ํ›„์ฒ˜๋ฆฌ ํ˜ธ์ถœ ์ „์— ์ž„์‹œ ๊ทธ๋ฃน ID ํ• ๋‹นํ•˜์—ฌ ๋กœ๊ทธ ๊ฐ€๋…์„ฑ ๊ฐœ์„ .
- (v2.3 ๋ณ€๊ฒฝ ์œ ์ง€) _post_process_table_figure_assignment: ์ตœ์  ๊ทธ๋ฃน ํƒ์ƒ‰ ๋กœ์ง (Lookahead).
- (v2.2 ๋ณ€๊ฒฝ ์œ ์ง€) _post_process_table_figure_assignment: ์ด๋™ ์กฐ๊ฑด์€ ๊ฑฐ๋ฆฌ ๋น„๊ต ๋กœ์ง ์‚ฌ์šฉ.
- (v2.1 ๋ณ€๊ฒฝ ์œ ์ง€) _post_process_table_figure_assignment: y_diff_threshold ๊ธฐ๋ณธ๊ฐ’ 150.
- (v2.1 ๋ณ€๊ฒฝ ์œ ์ง€) _base_case_standard_1_column: ์ƒ๋‹จ ๊ณ ์•„ ์š”์†Œ ๋ถ„๋ฆฌ ๋กœ์ง.
"""
# ํ•„์š”ํ•œ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ ์ž„ํฌํŠธ
from typing import List, Dict, Tuple, Optional, Any, Union, TYPE_CHECKING
from dataclasses import dataclass, field
import numpy as np
from sklearn.cluster import KMeans
from loguru import logger
import math
from enum import Enum, auto
import os
# Mock ๋ชจ๋ธ ์ž„ํฌํŠธ (ํ˜ธํ™˜์„ฑ ์œ ์ง€์šฉ, ์ถ”ํ›„ ์ œ๊ฑฐ ์˜ˆ์ •)
from .mock_models import MockElement
if TYPE_CHECKING:
from sqlalchemy.orm import Session
from ..models import LayoutElement
# ============================================================================
# ๋ฐ์ดํ„ฐ ํด๋ž˜์Šค ๋ฐ Enum ์ •์˜ (๊ธฐ์กด๊ณผ ๋™์ผ)
# ============================================================================
class LayoutType(Enum):
STANDARD_1_COLUMN = auto()
STANDARD_2_COLUMN = auto()
MIXED_TOP1_BOTTOM2 = auto()
MIXED_TOP2_BOTTOM1 = auto()
HORIZONTAL_SEP_PRESENT = auto()
READING_ORDER = auto()
UNKNOWN = auto()
@dataclass
class Zone:
x_min: int
y_min: int
x_max: int
y_max: int
@property
def width(self) -> int:
return max(0, self.x_max - self.x_min)
@property
def height(self) -> int:
return max(0, self.y_max - self.y_min)
def __repr__(self) -> str:
return f"Zone(x=[{self.x_min}, {self.x_max}), y=[{self.y_min}, {self.y_max}))"
@dataclass
class HorizontalSplit:
top_zone: Zone
bottom_zone: Zone
separator_element: MockElement
@dataclass
class HorizontalSplitYGap:
top_zone: Zone
bottom_zone: Zone
split_y: float
@dataclass
class VerticalSplit:
left_zone: Zone
right_zone: Zone
gutter_x: float
@dataclass
class ElementGroup:
anchor: Optional[MockElement]
children: List[MockElement] = field(default_factory=list)
group_id: int = -1 # flatten ํ•จ์ˆ˜์—์„œ ์ตœ์ข… ํ• ๋‹น, ํ›„์ฒ˜๋ฆฌ ์ „ ์ž„์‹œ ํ• ๋‹น
def add_child(self, child: MockElement):
self.children.append(child)
def get_all_elements_sorted(self) -> List[MockElement]:
"""
๊ทธ๋ฃน ๋‚ด ์š”์†Œ๋“ค์„ ์ •๋ ฌํ•ฉ๋‹ˆ๋‹ค.
- ์•ต์ปค(Anchor)๊ฐ€ ํ•ญ์ƒ ๊ฐ€์žฅ ๋จผ์ € ์œ„์น˜ํ•ฉ๋‹ˆ๋‹ค.
- ๋‚˜๋จธ์ง€ ์ž์‹(Children) ์š”์†Œ๋“ค์€ (Y, X) ์ขŒํ‘œ ์ˆœ์œผ๋กœ ์ •๋ ฌ๋ฉ๋‹ˆ๋‹ค.
"""
# 1. ์•ต์ปค๊ฐ€ ์กด์žฌํ•˜๋ฉด ๋ฆฌ์ŠคํŠธ์˜ ์ฒซ ์š”์†Œ๋กœ ์„ค์ •ํ•ฉ๋‹ˆ๋‹ค.
elements = [self.anchor] if self.anchor else []
# 2. ์ž์‹ ์š”์†Œ๋“ค์„ (Y, X) ์ขŒํ‘œ ๊ธฐ์ค€์œผ๋กœ ์ •๋ ฌํ•ฉ๋‹ˆ๋‹ค.
sorted_children = sorted(
self.children, key=lambda e: (e.y_position, e.x_position)
)
# 3. ์•ต์ปค ์š”์†Œ ๋’ค์— ์ •๋ ฌ๋œ ์ž์‹ ์š”์†Œ๋“ค์„ ์ถ”๊ฐ€ํ•ฉ๋‹ˆ๋‹ค.
elements.extend(sorted_children)
return elements
def is_empty(self) -> bool:
return self.anchor is None and not self.children
def __repr__(self) -> str:
anchor_id = self.anchor.element_id if self.anchor else "Orphan"
child_ids = sorted([c.element_id for c in self.children])
# flatten ์ „์—๋Š” group_id๊ฐ€ ์ž„์‹œ๊ฐ’์ผ ์ˆ˜ ์žˆ์Œ
return f"Group(ID:{self.group_id}, Anchor: {anchor_id}, Children: {child_ids})"
# ============================================================================
# ์ƒ์ˆ˜ ์ •์˜ (๊ธฐ์กด๊ณผ ๋™์ผ)
# ============================================================================
ALLOWED_ANCHORS = ["question type", "question number", "second_question_number"]
ALLOWED_CHILDREN = ["question text", "list", "choices", "figure", "table", "flowchart"]
ALLOWED_CLASSES = ALLOWED_ANCHORS + ALLOWED_CHILDREN
HORIZONTAL_SEP_WIDTH_THRESHOLD = 0.8
HORIZONTAL_SEP_Y_POS_THRESHOLD = 0.15
MIN_ANCHORS_FOR_SPLIT = 2
VERTICAL_GAP_THRESHOLD_RATIO = 1.5
VERTICAL_GAP_THRESHOLD_ABS = 100
KMEANS_N_CLUSTERS = 2
KMEANS_CLUSTER_SEPARATION_MIN = 50
LAYOUT_DETECT_Y_SPLIT_POINT = 0.4
LAYOUT_DETECT_X_STD_THRESHOLD_RATIO = 0.1
HORIZONTAL_ADJACENCY_Y_CENTER_RATIO = 0.7
HORIZONTAL_ADJACENCY_X_PROXIMITY = 50
BASE_CASE_TOP_ORPHAN_THRESHOLD_RATIO = 0.15
POST_PROCESS_CLOSENESS_RATIO = 0.5
POST_PROCESS_LOOKAHEAD = 2
# 2D ๊ฑฐ๋ฆฌ ๊ธฐ๋ฐ˜ ๊ทธ๋ฃนํ•‘ ๊ด€๋ จ ์ƒ์ˆ˜
ANCHOR_VERTICAL_PROXIMITY_THRESHOLD = 250 # px - ์•ต์ปค์™€ Y ๊ฑฐ๋ฆฌ ์ž„๊ณ„๊ฐ’
ANCHOR_2D_DISTANCE_WEIGHT_X = 0.2 # X ๊ฑฐ๋ฆฌ ๊ฐ€์ค‘์น˜ (๋‚ฎ๊ฒŒ ์„ค์ •)
ANCHOR_2D_DISTANCE_WEIGHT_Y = 1.0 # Y ๊ฑฐ๋ฆฌ ๊ฐ€์ค‘์น˜
# ============================================================================
# ๋ฉ”์ธ ํ•จ์ˆ˜: ๋ ˆ์ด์•„์›ƒ ์œ ํ˜• ํŒ๋ณ„ ํ›„ ์ •๋ ฌ (์ˆ˜์ •๋จ)
# ============================================================================
def _sort_layout_elements_v24(
elements: List[MockElement],
document_type: str = "question_based",
page_width: Optional[int] = None,
page_height: Optional[int] = None,
) -> List[MockElement]:
"""
๋ ˆ์ด์•„์›ƒ ์œ ํ˜• ํŒ๋ณ„ ํ›„ ๋งž์ถคํ˜• ์ •๋ ฌ ๋กœ์ง ์ ์šฉ (v.LayoutDetect.2.4)
"""
logger.info(
f"๋งž์ถคํ˜• ์ •๋ ฌ(v.LayoutDetect.2.4) ์‹œ์ž‘: {len(elements)}๊ฐœ ์š”์†Œ, ํƒ€์ž…={document_type}"
)
filtered_elements = preprocess_elements(elements, document_type)
if not filtered_elements:
logger.warning("์ „์ฒ˜๋ฆฌ ํ›„ ์ •๋ ฌํ•  ์š”์†Œ๊ฐ€ ์—†์Šต๋‹ˆ๋‹ค.")
return []
if page_width is None:
page_width = calculate_page_width(filtered_elements)
if page_height is None:
page_height = calculate_page_height(filtered_elements)
logger.info(f"ํŽ˜์ด์ง€ ํฌ๊ธฐ: {page_width} x {page_height}")
initial_zone = Zone(x_min=0, y_min=0, x_max=page_width, y_max=page_height)
grouped_results: List[ElementGroup] = []
try:
if document_type == "reading_order":
layout_type = LayoutType.READING_ORDER
logger.info(f"ํŒ๋ณ„๋œ ๋ ˆ์ด์•„์›ƒ ์œ ํ˜•: {layout_type.name} (๋ฌธ์„œ ํƒ€์ž… ์ง€์ •)")
sorted_elements_reading = sorted(
filtered_elements, key=lambda e: (e.y_position, e.x_position)
)
grouped_results = [
ElementGroup(anchor=None, children=[elem])
for elem in sorted_elements_reading
]
else:
layout_type = detect_layout_type(filtered_elements, page_width, page_height)
logger.info(f"ํŒ๋ณ„๋œ ๋ ˆ์ด์•„์›ƒ ์œ ํ˜•: {layout_type.name}")
if layout_type == LayoutType.STANDARD_1_COLUMN:
logger.debug(
f"{layout_type.name}: ๋ถ„ํ•  ์—†์ด ์ „์ฒด ๊ตฌ์—ญ ํ‘œ์ค€ 1๋‹จ Base Case ์‹คํ–‰"
)
grouped_results = _base_case_standard_1_column(
initial_zone, filtered_elements
)
elif layout_type == LayoutType.STANDARD_2_COLUMN:
grouped_results = _sort_standard_2_column(
initial_zone, filtered_elements
)
elif layout_type in [
LayoutType.HORIZONTAL_SEP_PRESENT,
LayoutType.MIXED_TOP1_BOTTOM2,
LayoutType.MIXED_TOP2_BOTTOM1,
LayoutType.UNKNOWN,
]:
grouped_results = _sort_recursive_by_layout(
initial_zone, filtered_elements, layout_type, depth=0
)
else:
logger.error(
f"์ฒ˜๋ฆฌํ•  ์ˆ˜ ์—†๋Š” ๋ ˆ์ด์•„์›ƒ ์œ ํ˜•: {layout_type.name}. (Y,X) ์ •๋ ฌ๋กœ ๋Œ€์ฒดํ•ฉ๋‹ˆ๋‹ค."
)
sorted_elements_fallback = sorted(
filtered_elements, key=lambda e: (e.y_position, e.x_position)
)
grouped_results = [
ElementGroup(anchor=None, children=[elem])
for elem in sorted_elements_fallback
]
# --- ๐Ÿ‘‡ ์ˆ˜์ •: ํ›„์ฒ˜๋ฆฌ ์ „์— ์ž„์‹œ ๊ทธ๋ฃน ID ํ• ๋‹น (๋กœ๊น…์šฉ) ---
if grouped_results and document_type == "question_based":
logger.debug("ํ›„์ฒ˜๋ฆฌ ์ „ ์ž„์‹œ ๊ทธ๋ฃน ID ํ• ๋‹น...")
temp_groups_with_id = []
temp_group_id_counter = 0
temp_orphan_groups = [g for g in grouped_results if g.anchor is None]
temp_non_orphan_groups = [
g for g in grouped_results if g.anchor is not None
]
# ๊ณ ์•„ ๊ทธ๋ฃน ๋จผ์ € ID ํ• ๋‹น
if temp_orphan_groups:
temp_orphan_groups.sort(
key=lambda g: (
min(c.y_position for c in g.children)
if g.children
else float("inf")
)
)
for group in temp_orphan_groups:
group.group_id = temp_group_id_counter
temp_groups_with_id.append(group)
temp_group_id_counter += 1
# ์•ต์ปค ๊ทธ๋ฃน ID ํ• ๋‹น
# (์ฃผ์˜: _post_process... ํ•จ์ˆ˜๋Š” ์•ต์ปค ๊ทธ๋ฃน ๋ฆฌ์ŠคํŠธ๋งŒ ๋ฐ›๋„๋ก ์ˆ˜์ • ํ•„์š”)
# ์šฐ์„  ์—ฌ๊ธฐ์„œ ID๋งŒ ํ• ๋‹นํ•˜๊ณ , ํ›„์ฒ˜๋ฆฌ๋Š” non_orphan_groups ๋Œ€์ƒ์œผ๋กœ ์ˆ˜ํ–‰
for group in temp_non_orphan_groups:
group.group_id = temp_group_id_counter
# temp_groups_with_id.append(group) # flatten ์ „ ์ตœ์ข… ์ˆœ์„œ๋Š” ์•„์ง ๋ชจ๋ฆ„
temp_group_id_counter += 1
# ํ›„์ฒ˜๋ฆฌ๋Š” ์•ต์ปค๊ฐ€ ์žˆ๋Š” ๊ทธ๋ฃน๋“ค์„ ๋Œ€์ƒ์œผ๋กœ ์ˆ˜ํ–‰
logger.debug(
f"{len(temp_non_orphan_groups)}๊ฐœ ์•ต์ปค ๊ทธ๋ฃน ๋Œ€์ƒ ํ›„์ฒ˜๋ฆฌ ์‹คํ–‰..."
)
processed_non_orphan_groups = _post_process_table_figure_assignment(
temp_non_orphan_groups
)
# ์ตœ์ข… ๊ทธ๋ฃน ๋ฆฌ์ŠคํŠธ ์žฌ๊ตฌ์„ฑ (๊ณ ์•„ + ํ›„์ฒ˜๋ฆฌ๋œ ์•ต์ปค ๊ทธ๋ฃน)
grouped_results = temp_orphan_groups + processed_non_orphan_groups
logger.debug("ํ›„์ฒ˜๋ฆฌ ๋ฐ ์ž„์‹œ ๊ทธ๋ฃน ID ํ• ๋‹น ์™„๋ฃŒ.")
# --- ๐Ÿ‘† ์ˆ˜์ • ๋ ---
except Exception as e:
logger.error(
f"๋งž์ถคํ˜• ์ •๋ ฌ ์ค‘ ์‹ฌ๊ฐํ•œ ์˜ค๋ฅ˜ ๋ฐœ์ƒ: {e}. (Y,X) ์ขŒํ‘œ ์ •๋ ฌ๋กœ ๋Œ€์ฒดํ•ฉ๋‹ˆ๋‹ค.",
exc_info=True,
)
sorted_elements_fallback = sorted(
filtered_elements, key=lambda e: (e.y_position, e.x_position)
)
grouped_results = [
ElementGroup(anchor=None, children=[elem])
for elem in sorted_elements_fallback
]
if not grouped_results:
logger.warning("๊ทธ๋ฃนํ•‘ ๊ฒฐ๊ณผ๊ฐ€ ๋น„์–ด ์žˆ์Šต๋‹ˆ๋‹ค.")
return []
# ์ตœ์ข… ๋ณ‘ํ•ฉ: ๊ณ ์•„ ๊ทธ๋ฃน๊ณผ ์•ต์ปค ๊ทธ๋ฃน ์ˆœ์„œ ๊ฒฐ์ • (๊ธฐ์กด ๋กœ์ง ์œ ์ง€)
orphan_groups = [g for g in grouped_results if g.anchor is None]
non_orphan_groups = [
g for g in grouped_results if g.anchor is not None
] # ํ›„์ฒ˜๋ฆฌ๋œ ๋ฆฌ์ŠคํŠธ ์‚ฌ์šฉ
final_ordered_groups: List[ElementGroup] = []
if orphan_groups:
# ๊ณ ์•„ ๊ทธ๋ฃน์€ Y ์ขŒํ‘œ ๊ธฐ์ค€์œผ๋กœ ์ •๋ ฌ
orphan_groups.sort(
key=lambda g: (
min(c.y_position for c in g.children) if g.children else float("inf")
)
)
logger.debug(
f"์ „์—ญ ๊ณ ์•„ ๊ทธ๋ฃน {len(orphan_groups)}๊ฐœ (Y ์ขŒํ‘œ ์ •๋ ฌ๋จ) ๋ฆฌ์ŠคํŠธ ๋งจ ์•ž์œผ๋กœ ์ด๋™"
)
final_ordered_groups.extend(orphan_groups)
else:
logger.debug("์ „์—ญ ๊ณ ์•„ ๊ทธ๋ฃน ์—†์Œ")
# ์•ต์ปค ๊ทธ๋ฃน์€ Base Case/์žฌ๊ท€ ํ˜ธ์ถœ์—์„œ ๊ฒฐ์ •๋œ ์ˆœ์„œ ์œ ์ง€ (Y์ขŒํ‘œ ์ •๋ ฌ ๋ถˆํ•„์š”)
final_ordered_groups.extend(non_orphan_groups)
# ์ตœ์ข… ์ˆœ์„œ ๋ฐ ID ๋ถ€์—ฌ
final_sorted_elements, _, _ = flatten_groups_and_assign_order(
final_ordered_groups, start_global_order=0, start_group_id=0
)
logger.info(f"๋งž์ถคํ˜• ์ •๋ ฌ ์™„๋ฃŒ: {len(final_sorted_elements)}๊ฐœ ์š”์†Œ")
return final_sorted_elements
def _use_adaptive_strategy() -> bool:
"""ํ™˜๊ฒฝ ๋ณ€์ˆ˜ ๊ธฐ๋ฐ˜ Adaptive ์ „๋žต ์‚ฌ์šฉ ์—ฌ๋ถ€ ํŒ๋‹จ"""
return os.getenv("USE_ADAPTIVE_SORTER", "false").lower() in {"1", "true", "yes"}
def sort_layout_elements(
elements: List[MockElement],
document_type: str = "question_based",
page_width: Optional[int] = None,
page_height: Optional[int] = None,
page_dpi: Optional[float] = None,
) -> List[MockElement]:
"""
Adaptive ์ „๋žต ํ”Œ๋ž˜๊ทธ๊ฐ€ ํ™œ์„ฑํ™”๋œ ๊ฒฝ์šฐ sorter_strategies์˜ Adaptive ์—”ํŠธ๋ฆฌํฌ์ธํŠธ๋กœ ์œ„์ž„ํ•˜๊ณ ,
๊ทธ๋ ‡์ง€ ์•Š์œผ๋ฉด v2.4 ์ฝ”์–ด ๊ตฌํ˜„์„ ๊ทธ๋Œ€๋กœ ์‚ฌ์šฉํ•œ๋‹ค.
"""
if _use_adaptive_strategy():
from .sorter_strategies import sort_layout_elements_adaptive
return sort_layout_elements_adaptive(
elements=elements,
document_type=document_type,
page_width=page_width,
page_height=page_height,
force_strategy=None,
page_dpi=page_dpi,
)
return _sort_layout_elements_v24(
elements=elements,
document_type=document_type,
page_width=page_width,
page_height=page_height,
)
# ============================================================================
# ๋ ˆ์ด์•„์›ƒ ์œ ํ˜• ํŒ๋ณ„ ํ•จ์ˆ˜ (๊ธฐ์กด๊ณผ ๋™์ผ)
# ============================================================================
def detect_layout_type(
elements: List[MockElement], page_width: int, page_height: int
) -> LayoutType:
# ... (์ฝ”๋“œ ๋™์ผ) ...
"""์•ต์ปค ์š”์†Œ ๋ถ„ํฌ๋ฅผ ๋ถ„์„ํ•˜์—ฌ ํŽ˜์ด์ง€ ๋ ˆ์ด์•„์›ƒ ์œ ํ˜• ํŒ๋ณ„"""
anchors = [e for e in elements if e.class_name in ALLOWED_ANCHORS]
if len(anchors) < MIN_ANCHORS_FOR_SPLIT:
logger.debug(
f"๋ ˆ์ด์•„์›ƒ ํŒ๋ณ„: ์•ต์ปค ์ˆ˜({len(anchors)}) ๋ถ€์กฑ -> STANDARD_1_COLUMN"
)
return LayoutType.STANDARD_1_COLUMN
top_zone_height = page_height * HORIZONTAL_SEP_Y_POS_THRESHOLD
wide_q_type = find_wide_question_type(elements, page_width, top_zone_height)
if wide_q_type:
logger.debug(
f"๋ ˆ์ด์•„์›ƒ ํŒ๋ณ„: ๋„“์€ question_type(ID:{wide_q_type.element_id}) ์กด์žฌ -> HORIZONTAL_SEP_PRESENT"
)
return LayoutType.HORIZONTAL_SEP_PRESENT
anchor_x_centers = np.array([[a.bbox_x + a.bbox_width / 2] for a in anchors])
is_clearly_2_column = False
if len(np.unique(anchor_x_centers)) >= 2:
try:
kmeans = KMeans(
n_clusters=KMEANS_N_CLUSTERS, random_state=42, n_init="auto"
)
kmeans.fit(anchor_x_centers)
centers = sorted(kmeans.cluster_centers_.flatten())
if (
len(centers) == 2
and centers[1] - centers[0] >= KMEANS_CLUSTER_SEPARATION_MIN
):
is_clearly_2_column = True
logger.trace(
f"๋ ˆ์ด์•„์›ƒ ํŒ๋ณ„: ์ „์ฒด X ๋ถ„ํฌ๋Š” 2๋‹จ ๊ตฌ์กฐ ๊ฐ€๋Šฅ์„ฑ ๋†’์Œ (Centers: {centers})"
)
else:
logger.trace(f"๋ ˆ์ด์•„์›ƒ ํŒ๋ณ„: ์ „์ฒด X ๋ถ„ํฌ๋Š” 1๋‹จ ๊ตฌ์กฐ ๋˜๋Š” ๋ถˆ๋ถ„๋ช…")
except Exception as e:
logger.warning(f"๋ ˆ์ด์•„์›ƒ ํŒ๋ณ„ ์ค‘ K-Means ์˜ค๋ฅ˜ ๋ฐœ์ƒ: {e}")
if is_clearly_2_column:
split_y = page_height * LAYOUT_DETECT_Y_SPLIT_POINT
top_anchors = [
a for a in anchors if (a.y_position + a.bbox_height / 2) < split_y
]
bottom_anchors = [
a for a in anchors if (a.y_position + a.bbox_height / 2) >= split_y
]
if not top_anchors or not bottom_anchors:
logger.debug("๋ ˆ์ด์•„์›ƒ ํŒ๋ณ„: ์ƒ/ํ•˜๋‹จ ์•ต์ปค ๊ทธ๋ฃน ๋ถˆ์™„์ „ -> STANDARD_2_COLUMN")
return LayoutType.STANDARD_2_COLUMN
top_x_centers = (
np.array([[a.bbox_x + a.bbox_width / 2] for a in top_anchors])
if top_anchors
else np.array([])
)
bottom_x_centers = (
np.array([[a.bbox_x + a.bbox_width / 2] for a in bottom_anchors])
if bottom_anchors
else np.array([])
)
x_std_threshold = page_width * LAYOUT_DETECT_X_STD_THRESHOLD_RATIO
top_is_multi_column = (
top_x_centers.size > 1 and np.std(top_x_centers) > x_std_threshold
)
bottom_is_multi_column = (
bottom_x_centers.size > 1 and np.std(bottom_x_centers) > x_std_threshold
)
if not top_is_multi_column and bottom_is_multi_column:
logger.debug(
f"๋ ˆ์ด์•„์›ƒ ํŒ๋ณ„: ์ƒ๋‹จ({len(top_anchors)}๊ฐœ) 1๋‹จ, ํ•˜๋‹จ({len(bottom_anchors)}๊ฐœ) 2๋‹จ -> MIXED_TOP1_BOTTOM2"
)
return LayoutType.MIXED_TOP1_BOTTOM2
elif top_is_multi_column and not bottom_is_multi_column:
logger.debug(
f"๋ ˆ์ด์•„์›ƒ ํŒ๋ณ„: ์ƒ๋‹จ({len(top_anchors)}๊ฐœ) 2๋‹จ, ํ•˜๋‹จ({len(bottom_anchors)}๊ฐœ) 1๋‹จ -> MIXED_TOP2_BOTTOM1"
)
return LayoutType.MIXED_TOP2_BOTTOM1
elif top_is_multi_column and bottom_is_multi_column:
logger.debug(
f"๋ ˆ์ด์•„์›ƒ ํŒ๋ณ„: ์ƒ๋‹จ({len(top_anchors)}๊ฐœ) 2๋‹จ, ํ•˜๋‹จ({len(bottom_anchors)}๊ฐœ) 2๋‹จ -> STANDARD_2_COLUMN"
)
return LayoutType.STANDARD_2_COLUMN
else:
logger.warning(
f"๋ ˆ์ด์•„์›ƒ ํŒ๋ณ„: ์ƒ/ํ•˜๋‹จ ๋ชจ๋‘ 1๋‹จ์œผ๋กœ ๋ณด์ด๋‚˜ ์ „์ฒด๋Š” 2๋‹จ ๊ตฌ์กฐ? -> UNKNOWN"
)
return LayoutType.UNKNOWN
else:
logger.debug("๋ ˆ์ด์•„์›ƒ ํŒ๋ณ„: ์ „์ฒด 1๋‹จ ๊ตฌ์กฐ -> STANDARD_1_COLUMN")
return LayoutType.STANDARD_1_COLUMN
# ============================================================================
# ์žฌ๊ท€ ์ •๋ ฌ ํ•จ์ˆ˜ (๊ธฐ์กด๊ณผ ๋™์ผ)
# ============================================================================
def _sort_recursive_by_layout(
current_zone: Zone,
elements_in_zone: List[MockElement],
layout_type: LayoutType,
depth: int,
) -> List[ElementGroup]:
# ... (์ฝ”๋“œ ๋™์ผ) ...
"""๋ ˆ์ด์•„์›ƒ ์œ ํ˜•์— ๋”ฐ๋ผ ๋‹ค๋ฅธ ๋ถ„ํ•  ์šฐ์„ ์ˆœ์œ„๋ฅผ ์ ์šฉํ•˜๋Š” ์žฌ๊ท€ ํ•จ์ˆ˜"""
indent = " " * depth
logger.debug(
f"{indent}[Depth {depth}, Type: {layout_type.name}] ๊ตฌ์—ญ ์ฒ˜๋ฆฌ ์‹œ์ž‘: {current_zone}, ์š”์†Œ ์ˆ˜={len(elements_in_zone)}"
)
if not elements_in_zone:
logger.trace(f"{indent} -> ๋นˆ ๊ตฌ์—ญ")
return []
if len(elements_in_zone) == 1:
element = elements_in_zone[0]
logger.trace(f"{indent} -> ์š”์†Œ 1๊ฐœ")
return (
[ElementGroup(anchor=element)]
if element.class_name in ALLOWED_ANCHORS
else [ElementGroup(anchor=None, children=[element])]
)
if layout_type == LayoutType.STANDARD_2_COLUMN:
logger.debug(f"{indent} -> {layout_type.name}: ํ‘œ์ค€ 2๋‹จ ์ฒ˜๋ฆฌ ํ•จ์ˆ˜ ์ง์ ‘ ํ˜ธ์ถœ")
return _sort_standard_2_column(current_zone, elements_in_zone)
split_result: Optional[
Union[HorizontalSplit, HorizontalSplitYGap, VerticalSplit]
] = None
split_type = "None"
if layout_type == LayoutType.HORIZONTAL_SEP_PRESENT:
split_result = find_horizontal_split_by_type(current_zone, elements_in_zone)
if split_result:
split_type = "H_Type"
else:
anchors = [e for e in elements_in_zone if e.class_name in ALLOWED_ANCHORS]
split_result = find_vertical_split_kmeans(current_zone, anchors)
if split_result:
split_type = "Vertical"
else:
split_result = find_horizontal_split_by_y_gap(
current_zone, elements_in_zone
)
if split_result:
split_type = "H_YGap"
elif (
layout_type == LayoutType.MIXED_TOP1_BOTTOM2
or layout_type == LayoutType.MIXED_TOP2_BOTTOM1
):
split_result = find_horizontal_split_by_y_gap(current_zone, elements_in_zone)
if split_result:
split_type = "H_YGap"
else:
split_result = find_horizontal_split_by_type(current_zone, elements_in_zone)
if split_result:
split_type = "H_Type"
else:
anchors = [
e for e in elements_in_zone if e.class_name in ALLOWED_ANCHORS
]
split_result = find_vertical_split_kmeans(current_zone, anchors)
if split_result:
split_type = "Vertical"
elif layout_type == LayoutType.UNKNOWN:
split_result = find_horizontal_split_by_type(current_zone, elements_in_zone)
if split_result:
split_type = "H_Type"
else:
anchors = [e for e in elements_in_zone if e.class_name in ALLOWED_ANCHORS]
split_result = find_vertical_split_kmeans(current_zone, anchors)
if split_result:
split_type = "Vertical"
else:
split_result = find_horizontal_split_by_y_gap(
current_zone, elements_in_zone
)
if split_result:
split_type = "H_YGap"
if split_result:
if isinstance(split_result, (HorizontalSplit, HorizontalSplitYGap)):
split_y = (
split_result.split_y
if isinstance(split_result, HorizontalSplitYGap)
else split_result.separator_element.y_position
+ split_result.separator_element.bbox_height / 2
)
top_elements = [
e
for e in elements_in_zone
if getattr(e, "element_id", -1)
!= getattr(
getattr(split_result, "separator_element", None), "element_id", -2
)
and (e.bbox_y + e.bbox_height / 2) < split_y
]
bottom_elements = [
e
for e in elements_in_zone
if getattr(e, "element_id", -1)
!= getattr(
getattr(split_result, "separator_element", None), "element_id", -2
)
and (e.bbox_y + e.bbox_height / 2) >= split_y
]
logger.debug(
f"{indent} -> {split_type} ์ˆ˜ํ‰ ๋ถ„ํ•  ์„ฑ๊ณต! Top:{len(top_elements)}, Bottom:{len(bottom_elements)}"
)
top_layout_type = (
detect_layout_type(
top_elements,
split_result.top_zone.width,
split_result.top_zone.height,
)
if top_elements
else LayoutType.UNKNOWN
)
bottom_layout_type = (
detect_layout_type(
bottom_elements,
split_result.bottom_zone.width,
split_result.bottom_zone.height,
)
if bottom_elements
else LayoutType.UNKNOWN
)
sorted_top = _sort_recursive_by_layout(
split_result.top_zone, top_elements, top_layout_type, depth + 1
)
sep_group = (
[ElementGroup(anchor=split_result.separator_element)]
if isinstance(split_result, HorizontalSplit)
else []
)
sorted_bottom = _sort_recursive_by_layout(
split_result.bottom_zone, bottom_elements, bottom_layout_type, depth + 1
)
logger.debug(f"{indent} <- {split_type} ์ˆ˜ํ‰ ๋ถ„ํ•  ๊ฒฐ๊ณผ ๋ณ‘ํ•ฉ")
return sorted_top + sep_group + sorted_bottom
elif isinstance(split_result, VerticalSplit):
left_elements = [
e
for e in elements_in_zone
if (e.bbox_x + e.bbox_width / 2) < split_result.gutter_x
]
right_elements = [
e
for e in elements_in_zone
if (e.bbox_x + e.bbox_width / 2) >= split_result.gutter_x
]
logger.debug(
f"{indent} -> Vertical ์ˆ˜์ง ๋ถ„ํ•  ์„ฑ๊ณต! Left:{len(left_elements)}, Right:{len(right_elements)}"
)
left_layout_type = (
detect_layout_type(
left_elements,
split_result.left_zone.width,
split_result.left_zone.height,
)
if left_elements
else LayoutType.UNKNOWN
)
right_layout_type = (
detect_layout_type(
right_elements,
split_result.right_zone.width,
split_result.right_zone.height,
)
if right_elements
else LayoutType.UNKNOWN
)
sorted_left = _sort_recursive_by_layout(
split_result.left_zone, left_elements, left_layout_type, depth + 1
)
sorted_right = _sort_recursive_by_layout(
split_result.right_zone, right_elements, right_layout_type, depth + 1
)
logger.debug(f"{indent} <- Vertical ์ˆ˜์ง ๋ถ„ํ•  ๊ฒฐ๊ณผ ๋ณ‘ํ•ฉ")
return sorted_left + sorted_right
else:
logger.debug(
f"{indent} -> ๋ชจ๋“  ๋ถ„ํ•  ์‹คํŒจ, ๋ ˆ์ด์•„์›ƒ ์œ ํ˜•({layout_type.name})์— ๋”ฐ๋ฅธ Base Case ์‹คํ–‰"
)
result_groups: List[ElementGroup] = []
if layout_type == LayoutType.STANDARD_1_COLUMN:
result_groups = _base_case_standard_1_column(current_zone, elements_in_zone)
elif (
layout_type == LayoutType.MIXED_TOP1_BOTTOM2
or layout_type == LayoutType.MIXED_TOP2_BOTTOM1
):
result_groups = _base_case_mixed_layout(
current_zone, elements_in_zone, layout_type
)
elif (
layout_type == LayoutType.HORIZONTAL_SEP_PRESENT
or layout_type == LayoutType.UNKNOWN
):
logger.warning(
f"{indent} -> {layout_type.name} ์œ ํ˜• ๋ถ„ํ•  ์‹คํŒจ. 1๋‹จ Base Case๋กœ ์ฒ˜๋ฆฌํ•ฉ๋‹ˆ๋‹ค."
)
result_groups = _base_case_standard_1_column(current_zone, elements_in_zone)
else:
logger.error(
f"{indent} -> ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์—†๋Š” Base Case ์œ ํ˜•: {layout_type.name}. 1๋‹จ์œผ๋กœ ์ฒ˜๋ฆฌ."
)
result_groups = _base_case_standard_1_column(current_zone, elements_in_zone)
logger.debug(f"{indent} <- Base Case ์ฒ˜๋ฆฌ ์™„๋ฃŒ: {len(result_groups)} ๊ทธ๋ฃน ์ƒ์„ฑ")
return result_groups
# ============================================================================
# ํ‘œ์ค€ 2๋‹จ ๋ ˆ์ด์•„์›ƒ ์ฒ˜๋ฆฌ ํ•จ์ˆ˜ (๊ธฐ์กด๊ณผ ๋™์ผ)
# ============================================================================
def _sort_standard_2_column(
zone: Zone, elements: List[MockElement]
) -> List[ElementGroup]:
# ... (์ฝ”๋“œ ๋™์ผ) ...
"""ํ‘œ์ค€ 2๋‹จ ๋ ˆ์ด์•„์›ƒ ์ฒ˜๋ฆฌ: K-Means ๋ถ„ํ•  ํ›„ ์ปฌ๋Ÿผ๋ณ„ _base_case_standard_1_column ํ˜ธ์ถœ"""
logger.debug("ํ‘œ์ค€ 2๋‹จ ์ฒ˜๋ฆฌ: K-Means ๋ถ„ํ•  ์‹œ๋„")
anchors = [e for e in elements if e.class_name in ALLOWED_ANCHORS]
vertical_split = find_vertical_split_kmeans(zone, anchors)
if vertical_split:
logger.debug(f" -> ์ˆ˜์ง ๋ถ„ํ•  ์„ฑ๊ณต! ๋ถ„๋ฆฌ์„  X={vertical_split.gutter_x:.1f}")
left_elements = [
e
for e in elements
if (e.bbox_x + e.bbox_width / 2) < vertical_split.gutter_x
]
right_elements = [
e
for e in elements
if (e.bbox_x + e.bbox_width / 2) >= vertical_split.gutter_x
]
logger.debug(
f" Left ์š”์†Œ ์ˆ˜: {len(left_elements)}, Right ์š”์†Œ ์ˆ˜: {len(right_elements)}"
)
groups_left = _base_case_standard_1_column(
vertical_split.left_zone, left_elements
)
groups_right = _base_case_standard_1_column(
vertical_split.right_zone, right_elements
)
logger.debug(
f" <- ์ปฌ๋Ÿผ๋ณ„ ๊ทธ๋ฃนํ•‘ ์™„๋ฃŒ (Left: {len(groups_left)} ๊ทธ๋ฃน, Right: {len(groups_right)} ๊ทธ๋ฃน)"
)
return groups_left + groups_right
else:
logger.warning(
"ํ‘œ์ค€ 2๋‹จ ์ฒ˜๋ฆฌ ์‹คํŒจ: ์ˆ˜์ง ๋ถ„ํ•  ๋ถˆ๊ฐ€. ์ „์ฒด ๊ตฌ์—ญ ํ‘œ์ค€ 1๋‹จ Base Case ์‹คํ–‰"
)
return _base_case_standard_1_column(zone, elements)
# ============================================================================
# ๋ถ„ํ•  ํ•จ์ˆ˜ ๊ตฌํ˜„ (๊ธฐ์กด๊ณผ ๋™์ผ)
# ============================================================================
def find_wide_question_type(
elements: List[MockElement], page_width: int, top_y_limit: float
) -> Optional[MockElement]:
# ... (์ฝ”๋“œ ๋™์ผ) ...
"""ํŽ˜์ด์ง€ ์ƒ๋‹จ ์˜์—ญ์—์„œ ๋„“์€ question_type ์ฐพ๊ธฐ"""
wide_types = [
e
for e in elements
if e.class_name == "question_type"
and e.y_position < top_y_limit
and (e.bbox_width / page_width if page_width > 0 else 0)
>= HORIZONTAL_SEP_WIDTH_THRESHOLD
]
return min(wide_types, key=lambda e: e.y_position) if wide_types else None
def find_horizontal_split_by_type(
zone: Zone, elements: List[MockElement]
) -> Optional[HorizontalSplit]:
# ... (์ฝ”๋“œ ๋™์ผ) ...
"""๋„“์€ question_type์œผ๋กœ ์ˆ˜ํ‰ ๋ถ„ํ• """
potential_separators = []
for element in elements:
if element.class_name == "question_type":
width_ratio = element.bbox_width / zone.width if zone.width > 0 else 0
if width_ratio >= HORIZONTAL_SEP_WIDTH_THRESHOLD:
potential_separators.append(element)
if not potential_separators:
return None
separator = min(potential_separators, key=lambda e: e.y_position)
if not (zone.y_min < separator.y_position < zone.y_max):
return None
top_zone = Zone(zone.x_min, zone.y_min, zone.x_max, separator.y_position)
bottom_zone = Zone(
zone.x_min, separator.y_position + separator.bbox_height, zone.x_max, zone.y_max
)
if top_zone.height <= 0 or bottom_zone.height <= 0:
return None
return HorizontalSplit(top_zone, bottom_zone, separator)
def find_horizontal_split_by_y_gap(
zone: Zone, elements: List[MockElement]
) -> Optional[HorizontalSplitYGap]:
# ... (์ฝ”๋“œ ๋™์ผ) ...
"""์•ต์ปค Y Gap์œผ๋กœ ์ˆ˜ํ‰ ๋ถ„ํ• """
anchors = sorted(
[e for e in elements if e.class_name in ALLOWED_ANCHORS],
key=lambda e: e.y_position,
)
if len(anchors) < MIN_ANCHORS_FOR_SPLIT:
return None
max_gap = -1
split_index = -1
avg_anchor_height = (
np.mean([a.bbox_height for a in anchors if a.bbox_height > 0])
if any(a.bbox_height > 0 for a in anchors)
else 30
)
for i in range(len(anchors) - 1):
gap = (anchors[i + 1].y_position + anchors[i + 1].bbox_height / 2) - (
anchors[i].y_position + anchors[i].bbox_height / 2
)
if gap > max_gap:
max_gap = gap
split_index = i
threshold = max(
avg_anchor_height * VERTICAL_GAP_THRESHOLD_RATIO, VERTICAL_GAP_THRESHOLD_ABS
)
if max_gap >= threshold:
split_y = (
anchors[split_index].y_position
+ anchors[split_index].bbox_height
+ anchors[split_index + 1].y_position
) / 2
if zone.y_min < split_y < zone.y_max:
top_zone = Zone(zone.x_min, zone.y_min, zone.x_max, int(split_y))
bottom_zone = Zone(zone.x_min, int(split_y), zone.x_max, zone.y_max)
logger.debug(
f" Y Gap ๋ถ„์„: ์ˆ˜ํ‰ ๋ถ„ํ•  ๊ฐ€๋Šฅ (Max Gap={max_gap:.1f} >= Threshold={threshold:.1f})"
)
return HorizontalSplitYGap(top_zone, bottom_zone, split_y)
else:
logger.warning(
f" Y Gap ๋ถ„์„: ๋ถ„ํ• ์„ ({split_y:.1f})์ด ๊ตฌ์—ญ({zone.y_min}-{zone.y_max}) ๋ฐ–์— ์œ„์น˜. ๋ถ„ํ•  ์ทจ์†Œ."
)
return None
else:
logger.debug(
f" Y Gap ๋ถ„์„: ์ตœ๋Œ€ ๊ฐ„๊ฒฉ({max_gap:.1f}) ์ž„๊ณ„๊ฐ’({threshold:.1f}) ๋ฏธ๋งŒ. ์ˆ˜ํ‰ ๋ถ„ํ•  ๋ถˆ๊ฐ€."
)
return None
def find_vertical_split_kmeans(
zone: Zone, anchors: List[MockElement]
) -> Optional[VerticalSplit]:
"""์•ต์ปค X ์ขŒํ‘œ K-Means๋กœ ์ˆ˜์ง ๋ถ„ํ•  (๊ฐœ์„ : ์˜ค๋ฅธ์ชฝ ์นผ๋Ÿผ ์‹œ์ž‘์  ๊ธฐ์ค€ ๋ถ„ํ• )"""
if len(anchors) < MIN_ANCHORS_FOR_SPLIT:
return None
anchor_x_centers = np.array([[a.bbox_x + a.bbox_width / 2] for a in anchors])
if len(np.unique(anchor_x_centers)) < 2:
return None
try:
kmeans = KMeans(n_clusters=KMEANS_N_CLUSTERS, random_state=42, n_init="auto")
kmeans.fit(anchor_x_centers)
centers = sorted(kmeans.cluster_centers_.flatten())
if (
len(centers) == 2
and centers[1] - centers[0] >= KMEANS_CLUSTER_SEPARATION_MIN
):
# ๐Ÿ”ฅ ํ•ต์‹ฌ ๋ณ€๊ฒฝ: ์˜ค๋ฅธ์ชฝ ์นผ๋Ÿผ ์•ต์ปค์˜ ์‹œ์ž‘์ ์„ ๊ฒฝ๊ณ„๋กœ ์‚ฌ์šฉ
# ๋„ˆ๋ฌด ํƒ€์ดํŠธํ•œ ๊ฒฝ๊ณ„๊ฐ€ ๋ฌธ์ œ๋  ๊ฒฝ์šฐ
COLUMN_BOUNDARY_MARGIN = 20 # px
gutter_x = centers[1] - COLUMN_BOUNDARY_MARGIN
# gutter_x = centers[1] # ๊ธฐ์กด: (centers[0] + centers[1]) / 2
if zone.x_min < gutter_x < zone.x_max:
left_zone = Zone(zone.x_min, zone.y_min, int(gutter_x), zone.y_max)
right_zone = Zone(int(gutter_x), zone.y_min, zone.x_max, zone.y_max)
logger.debug(
f" ์ˆ˜์ง ๋ถ„ํ•  ์„ฑ๊ณต: ์™ผ์ชฝ ์นผ๋Ÿผ X=[{zone.x_min}, {int(gutter_x)}), "
f"์˜ค๋ฅธ์ชฝ ์นผ๋Ÿผ X=[{int(gutter_x)}, {zone.x_max})"
)
return VerticalSplit(left_zone, right_zone, gutter_x)
else:
logger.warning(
f" ์ˆ˜์ง ๋ถ„ํ• : ๊ฒฝ๊ณ„์„ ({gutter_x:.1f})์ด ๊ตฌ์—ญ ๋ฐ–. ๋ถ„ํ•  ์ทจ์†Œ."
)
return None
else:
logger.debug(f" ์ˆ˜์ง ๋ถ„ํ•  ์‹คํŒจ: ์ค‘์‹ฌ๊ฐ„ ๊ฑฐ๋ฆฌ ๋ถ€์กฑ")
return None
except Exception as e:
logger.error(f" ์ˆ˜์ง ๋ถ„ํ•  K-Means ์˜ค๋ฅ˜: {e}")
return None
# ============================================================================
# ํ›„์ฒ˜๋ฆฌ ํ•จ์ˆ˜ (์ˆ˜์ •๋จ)
# ============================================================================
def _post_process_table_figure_assignment(
groups: List[ElementGroup], y_diff_threshold: int = 150
) -> List[ElementGroup]:
"""
๊ทธ๋ฃนํ•‘ ํ›„์ฒ˜๋ฆฌ: ํ…Œ์ด๋ธ”/๊ทธ๋ฆผ ์š”์†Œ๊ฐ€ ํ˜„์žฌ ์•ต์ปค๋ณด๋‹ค ๋‹ค์Œ ์•ต์ปค(๋“ค)์— ํ›จ์”ฌ ๊ฐ€๊นŒ์šฐ๋ฉด ์ด๋™ ์‹œ๋„
--- ์ˆ˜์ •: ์ตœ์  ๊ทธ๋ฃน ํƒ์ƒ‰ ๋ฐ Tie-breaker ์ถ”๊ฐ€ ---
"""
logger.debug(
f" ํ…Œ์ด๋ธ”/๊ทธ๋ฆผ ํ• ๋‹น ํ›„์ฒ˜๋ฆฌ ์‹œ์ž‘: {len(groups)}๊ฐœ ๊ทธ๋ฃน (Threshold={y_diff_threshold}px, Closeness Ratio={POST_PROCESS_CLOSENESS_RATIO}, Lookahead={POST_PROCESS_LOOKAHEAD})"
)
adjusted_groups = groups # ์›๋ณธ ๋ฆฌ์ŠคํŠธ๋ฅผ ์ง์ ‘ ์ˆ˜์ •
elements_to_move_dict: Dict[int, Tuple[MockElement, int]] = (
{}
) # {element_id: (element, target_group_idx)}
moved_elements_log = [] # ๋กœ๊น…์šฉ
for i in range(len(adjusted_groups)):
current_group = adjusted_groups[i]
if not current_group.anchor:
continue
current_children_copy = list(
current_group.children
) # ์ˆœํšŒ ์ค‘ ๋ณ€๊ฒฝ์„ ์œ„ํ•œ ๋ณต์‚ฌ๋ณธ
for child_idx, child in enumerate(current_children_copy):
# ์ด๋ฏธ ์ด๋™ ๋Œ€์ƒ์œผ๋กœ ๊ฒฐ์ •๋œ ์š”์†Œ๋Š” ๊ฑด๋„ˆ๋œ€
if child.element_id in elements_to_move_dict:
continue
if child.class_name in ["table", "figure", "flowchart"]:
y_diff_current = child.y_position - current_group.anchor.y_position
best_target_group_idx = -1
min_y_diff_next = float("inf")
# ํ˜„์žฌ ๊ทธ๋ฃน ์ดํ›„ ๋ช‡ ๊ฐœ ๊ทธ๋ฃน๊นŒ์ง€ ํƒ์ƒ‰
for lookahead_idx in range(1, POST_PROCESS_LOOKAHEAD + 1):
next_group_idx = i + lookahead_idx
if next_group_idx >= len(adjusted_groups):
break
next_group = adjusted_groups[next_group_idx]
if not next_group.anchor:
continue
y_diff_next = abs(child.y_position - next_group.anchor.y_position)
# ์ด๋™ ์กฐ๊ฑด ๊ฒ€์‚ฌ (v2.2 ์กฐ๊ฑด)
if y_diff_current > (y_diff_threshold / 2) and y_diff_next < (
y_diff_current * POST_PROCESS_CLOSENESS_RATIO
):
# --- ๐Ÿ‘‡ Tie-breaker ์ˆ˜์ • ๐Ÿ‘‡ ---
# ๋” ๊ฐ€๊นŒ์šด ๊ทธ๋ฃน์„ ์ฐพ๊ฑฐ๋‚˜, ๊ฑฐ๋ฆฌ๊ฐ€ ๊ฐ™์ง€๋งŒ ๋” ๋’ค์˜ ๊ทธ๋ฃน์ผ ๊ฒฝ์šฐ ๊ฐฑ์‹ 
if y_diff_next < min_y_diff_next or (
y_diff_next == min_y_diff_next
and next_group_idx > best_target_group_idx
):
min_y_diff_next = y_diff_next
best_target_group_idx = next_group_idx
# --- ๐Ÿ‘† Tie-breaker ์ˆ˜์ • ๋ ๐Ÿ‘† ---
# ์ตœ์  ๊ทธ๋ฃน์„ ์ฐพ์•˜์œผ๋ฉด ์ด๋™ ๋Œ€์ƒ์œผ๋กœ ๋“ฑ๋ก
if best_target_group_idx != -1:
elements_to_move_dict[child.element_id] = (
child,
best_target_group_idx,
)
moved_elements_log.append(
f"Elem {child.element_id} ({child.class_name}) from Grp {current_group.group_id} to Grp {adjusted_groups[best_target_group_idx].group_id}"
)
logger.trace(
f" ์ด๋™ ํ›„๋ณด ํ™•์ •: Elem {child.element_id} -> Group {adjusted_groups[best_target_group_idx].group_id} (Min Y diff next={min_y_diff_next:.0f})"
)
# --- ์‹ค์ œ ์š”์†Œ ์ด๋™ (๋ฃจํ”„ ์ข…๋ฃŒ ํ›„) ---
if elements_to_move_dict:
# 1. ์›๋ณธ ๊ทธ๋ฃน์—์„œ ์š”์†Œ ์ œ๊ฑฐ
elements_removed_count = 0
for group in adjusted_groups:
original_children_count = len(group.children)
group.children = [
child
for child in group.children
if child.element_id not in elements_to_move_dict
]
elements_removed_count += original_children_count - len(group.children)
# 2. ๋Œ€์ƒ ๊ทธ๋ฃน์— ์š”์†Œ ์ถ”๊ฐ€
elements_added_count = 0
for element_id, (element, target_group_idx) in elements_to_move_dict.items():
if 0 <= target_group_idx < len(adjusted_groups):
adjusted_groups[target_group_idx].children.insert(
0, element
) # ๊ทธ๋ฃน ๋งจ ์•ž์— ์ถ”๊ฐ€
elements_added_count += 1
else:
logger.error(
f"ํ›„์ฒ˜๋ฆฌ ์ด๋™ ์ค‘ ์œ ํšจํ•˜์ง€ ์•Š์€ ๋Œ€์ƒ ๊ทธ๋ฃน ์ธ๋ฑ์Šค: {target_group_idx} for Elem {element_id}"
)
logger.debug(
f" ํ›„์ฒ˜๋ฆฌ ์š”์†Œ ์ด๋™ ์™„๋ฃŒ: {elements_removed_count}๊ฐœ ์ œ๊ฑฐ, {elements_added_count}๊ฐœ ์ถ”๊ฐ€"
)
if moved_elements_log:
logger.info(
f" ํ…Œ์ด๋ธ”/๊ทธ๋ฆผ ํ• ๋‹น ํ›„์ฒ˜๋ฆฌ: {len(moved_elements_log)}๊ฐœ ์š”์†Œ ์ด๋™๋จ - {', '.join(moved_elements_log)}"
)
else:
logger.debug(" ํ…Œ์ด๋ธ”/๊ทธ๋ฆผ ํ• ๋‹น ํ›„์ฒ˜๋ฆฌ: ์ด๋™๋œ ์š”์†Œ ์—†์Œ")
return adjusted_groups
# ============================================================================
# Base Case ํ•จ์ˆ˜๋“ค (๊ธฐ์กด๊ณผ ๋™์ผ v2.1)
# ============================================================================
def _assign_children_to_anchors_with_2d_proximity(
anchors: List[MockElement],
children: List[MockElement],
zone: Zone,
preserve_top_orphans: bool = True,
) -> Tuple[List[ElementGroup], List[MockElement]]:
"""
์•ต์ปค์™€ ์ž์‹ ์š”์†Œ๋ฅผ 2D ๊ฑฐ๋ฆฌ ๊ธฐ๋ฐ˜์œผ๋กœ ๊ทธ๋ฃนํ•‘ (Phase 1: STANDARD_2_COLUMN ์ ์šฉ)
Args:
anchors: ์•ต์ปค ์š”์†Œ ๋ฆฌ์ŠคํŠธ
children: ์ž์‹ ์š”์†Œ ๋ฆฌ์ŠคํŠธ
zone: ํ˜„์žฌ ์ฒ˜๋ฆฌ ์ค‘์ธ ๊ตฌ์—ญ
preserve_top_orphans: True์ผ ๊ฒฝ์šฐ ์ƒ๋‹จ ์˜์—ญ์˜ ์š”์†Œ๋Š” ๊ณ ์•„๋กœ ์œ ์ง€
Returns:
(๊ทธ๋ฃน ๋ฆฌ์ŠคํŠธ, ๊ณ ์•„ ์š”์†Œ ๋ฆฌ์ŠคํŠธ)
"""
groups: List[ElementGroup] = [ElementGroup(anchor=a) for a in anchors]
orphans: List[MockElement] = []
# ์ƒ๋‹จ ๊ณ ์•„ ์ž„๊ณ„๊ฐ’ (๊ธฐ์กด ๋กœ์ง ์œ ์ง€ ์˜ต์…˜)
top_orphan_threshold_y = (
zone.y_min + zone.height * BASE_CASE_TOP_ORPHAN_THRESHOLD_RATIO
if preserve_top_orphans
else zone.y_min
)
for child in children:
child_x_center = child.bbox_x + child.bbox_width / 2
child_y_center = child.bbox_y + child.bbox_height / 2
# ์ƒ๋‹จ ๊ณ ์•„ ์ฒดํฌ (์„ ํƒ์ )
if preserve_top_orphans and child.bbox_y < top_orphan_threshold_y:
# ์ฒซ ๋ฒˆ์งธ ์•ต์ปค๋ณด๋‹ค ํ›จ์”ฌ ์œ„์ชฝ์ธ ๊ฒฝ์šฐ๋งŒ ๊ณ ์•„๋กœ ์ฒ˜๋ฆฌ
if not anchors or child_y_center < (
anchors[0].bbox_y - ANCHOR_VERTICAL_PROXIMITY_THRESHOLD / 2
):
orphans.append(child)
logger.trace(
f" Elem {child.element_id} ์ƒ๋‹จ ๊ณ ์•„ ์œ ์ง€ (Y={child.bbox_y})"
)
continue
best_anchor_idx = None
min_distance = float("inf")
for idx, anchor in enumerate(anchors):
anchor_x_center = anchor.bbox_x + anchor.bbox_width / 2
anchor_y_center = anchor.bbox_y + anchor.bbox_height / 2
# ๐Ÿ”ฅ ํ•ต์‹ฌ ์ˆ˜์ •: ์ž์‹์ด ์•ต์ปค๋ณด๋‹ค ์œ„์ชฝ์— ์žˆ์œผ๋ฉด ์ œ์™ธ
# figure/table์€ ๋ฐ˜๋“œ์‹œ ์ž์‹ ๋ณด๋‹ค ์œ„์ชฝ์— ์žˆ๋Š” ์•ต์ปค์—๋งŒ ๋ฐฐ์ •๋˜์–ด์•ผ ํ•จ
if child_y_center < anchor_y_center:
logger.trace(
f" Elem {child.element_id} โ†’ Anchor {anchor.element_id} ์ œ์™ธ "
f"(์ž์‹ Y={child_y_center:.0f} < ์•ต์ปค Y={anchor_y_center:.0f})"
)
continue
# ๊ฐ€์ค‘ 2D ๊ฑฐ๋ฆฌ ๊ณ„์‚ฐ
x_diff = abs(child_x_center - anchor_x_center) * ANCHOR_2D_DISTANCE_WEIGHT_X
y_diff = abs(child_y_center - anchor_y_center) * ANCHOR_2D_DISTANCE_WEIGHT_Y
distance = (x_diff**2 + y_diff**2) ** 0.5
if distance < min_distance:
min_distance = distance
best_anchor_idx = idx
# ๊ฑฐ๋ฆฌ ์ž„๊ณ„๊ฐ’ ์ฒดํฌ
if (
best_anchor_idx is not None
and min_distance < ANCHOR_VERTICAL_PROXIMITY_THRESHOLD
):
groups[best_anchor_idx].children.append(child)
logger.trace(
f" Elem {child.element_id} โ†’ Anchor {anchors[best_anchor_idx].element_id} "
f"(2D ๊ฑฐ๋ฆฌ={min_distance:.1f})"
)
else:
orphans.append(child)
if best_anchor_idx is None:
reason = "์œ„์ชฝ ์•ต์ปค๋งŒ ํ—ˆ์šฉ (๋ชจ๋“  ์•ต์ปค๊ฐ€ ์ž์‹๋ณด๋‹ค ์•„๋ž˜์ชฝ)"
else:
reason = f"์ตœ์†Œ ๊ฑฐ๋ฆฌ={min_distance:.1f} > {ANCHOR_VERTICAL_PROXIMITY_THRESHOLD}"
logger.debug(f" Elem {child.element_id} ๊ณ ์•„ ({reason})")
return groups, orphans
def _base_case_standard_1_column(
zone: Zone, elements: List[MockElement]
) -> List[ElementGroup]:
# ... (v2.1 ์ฝ”๋“œ์™€ ๋™์ผ) ...
"""ํ‘œ์ค€ 1๋‹จ ๊ตฌ์—ญ Base Case ์ฒ˜๋ฆฌ (์ƒ๋‹จ ๊ณ ์•„ ๋ถ„๋ฆฌ)"""
logger.debug(
f" ํ‘œ์ค€ 1๋‹จ Base Case ์‹œ์ž‘ (์ˆœ์ฐจ ์ฒ˜๋ฆฌ + ๊ณ ์•„ ๊ฐœ์„ ): {len(elements)}๊ฐœ ์š”์†Œ in {zone}"
)
anchors = sorted(
[e for e in elements if e.class_name in ALLOWED_ANCHORS],
key=lambda e: e.y_position,
)
children = [e for e in elements if e.class_name in ALLOWED_CHILDREN]
groups: Dict[int, ElementGroup] = {
anchor.element_id: ElementGroup(anchor=anchor) for anchor in anchors
}
assigned_children_ids = set()
logger.trace(" ์ˆ˜ํ‰ ์ธ์ ‘ ์ฒ˜๋ฆฌ ์‹œ์ž‘...")
if anchors and children:
for anchor in anchors:
anchor_cy = anchor.bbox_y + anchor.bbox_height / 2
anchor_right_x = anchor.bbox_x + anchor.bbox_width
anchor_left_x = anchor.bbox_x
unassigned_children = [
c for c in children if c.element_id not in assigned_children_ids
]
adjacent_child = None
min_y_diff = float("inf")
for child in unassigned_children:
child_cy = child.bbox_y + child.bbox_height / 2
child_right_x = child.bbox_x + child.bbox_width
child_left_x = child.bbox_x
y_diff = abs(anchor_cy - child_cy)
y_threshold = (
(anchor.bbox_height + child.bbox_height)
/ 2
* HORIZONTAL_ADJACENCY_Y_CENTER_RATIO
if (anchor.bbox_height + child.bbox_height) > 0
else 0
)
if y_diff >= y_threshold:
continue
gap_right = child_left_x - anchor_right_x
gap_left = anchor_left_x - child_right_x
is_adjacent = (abs(gap_right) < HORIZONTAL_ADJACENCY_X_PROXIMITY) or (
abs(gap_left) < HORIZONTAL_ADJACENCY_X_PROXIMITY
)
if is_adjacent and y_diff < min_y_diff:
min_y_diff = y_diff
adjacent_child = child
if adjacent_child:
logger.trace(
f" ์ˆ˜ํ‰ ์ธ์ ‘ ๋ฐฐ์ •: ์•ต์ปค ID {anchor.element_id} <- ์ž์‹ ID {adjacent_child.element_id}"
)
groups[anchor.element_id].add_child(adjacent_child)
assigned_children_ids.add(adjacent_child.element_id)
logger.debug(
f" ์ˆ˜ํ‰ ์ธ์ ‘ ์ฒ˜๋ฆฌ ์™„๋ฃŒ: {len(assigned_children_ids)}๊ฐœ ์ž์‹ ์šฐ์„  ๋ฐฐ์ •๋จ"
)
remaining_elements = anchors + [
c for c in children if c.element_id not in assigned_children_ids
]
if not remaining_elements:
logger.debug(" ๋ชจ๋“  ์š”์†Œ๊ฐ€ ์ˆ˜ํ‰ ์ธ์ ‘์œผ๋กœ ๋ฐฐ์ •๋˜์–ด ๊ทธ๋ฃนํ•‘ ์™„๋ฃŒ.")
# ํ›„์ฒ˜๋ฆฌ ํ˜ธ์ถœ ์ „ ๊ทธ๋ฃน ID ์ž„์‹œ ํ• ๋‹น (์„ ํƒ์ )
temp_groups = sorted(
list(groups.values()),
key=lambda g: g.anchor.y_position if g.anchor else float("inf"),
)
for idx, group in enumerate(temp_groups):
group.group_id = idx
return _post_process_table_figure_assignment(temp_groups)
# 2๋‹จ๊ณ„: ๋‚˜๋จธ์ง€ ์š”์†Œ๋ฅผ 2D ๊ฑฐ๋ฆฌ ๊ธฐ๋ฐ˜์œผ๋กœ ๊ทธ๋ฃนํ•‘ (Phase 1 ์ ์šฉ)
remaining_children = [
c for c in children if c.element_id not in assigned_children_ids
]
if remaining_children and anchors:
logger.trace(
f" 2๋‹จ๊ณ„: ๋‚˜๋จธ์ง€ {len(remaining_children)}๊ฐœ ์š”์†Œ 2D ๊ฑฐ๋ฆฌ ๊ทธ๋ฃนํ•‘..."
)
# ๐Ÿ”ฅ 2D ๊ฑฐ๋ฆฌ ๊ธฐ๋ฐ˜ ๊ทธ๋ฃนํ•‘ (์ƒ๋‹จ ๊ณ ์•„ ๋ณด์กด ์˜ต์…˜ ํ™œ์„ฑํ™”)
proximity_groups, proximity_orphans = (
_assign_children_to_anchors_with_2d_proximity(
anchors,
remaining_children,
zone,
preserve_top_orphans=True, # ์ƒ๋‹จ ๊ณ ์•„ ๋ณด์กด
)
)
# 2D ๊ฑฐ๋ฆฌ๋กœ ๋ฐฐ์ •๋œ ์ž์‹๋“ค์„ ๊ธฐ์กด ๊ทธ๋ฃน์— ๋ณ‘ํ•ฉ
for idx, proximity_group in enumerate(proximity_groups):
anchor_id = anchors[idx].element_id
if anchor_id in groups:
groups[anchor_id].children.extend(proximity_group.children)
# 2D ๊ทธ๋ฃนํ•‘ ํ›„ ์—ฌ์ „ํžˆ ๋‚จ์€ ์š”์†Œ๋“ค์€ ์ˆœ์ฐจ ์ฒ˜๋ฆฌ๋กœ ๋„˜๊น€
remaining_elements = [
a for a in anchors if a.element_id not in assigned_children_ids
] + proximity_orphans
logger.debug(
f" 2๋‹จ๊ณ„ ์™„๋ฃŒ: {len(remaining_children) - len(proximity_orphans)}๊ฐœ ๋ฐฐ์ •, {len(proximity_orphans)}๊ฐœ ๊ณ ์•„๋กœ ์ˆœ์ฐจ ์ฒ˜๋ฆฌ ๋Œ€๊ธฐ"
)
else:
remaining_elements = anchors + [
c for c in children if c.element_id not in assigned_children_ids
]
if not remaining_elements:
logger.debug(" 2D ๊ฑฐ๋ฆฌ ๊ทธ๋ฃนํ•‘ ํ›„ ๋‚˜๋จธ์ง€ ์š”์†Œ ์—†์Œ. ๊ทธ๋ฃนํ•‘ ์™„๋ฃŒ.")
temp_groups = sorted(
list(groups.values()),
key=lambda g: g.anchor.y_position if g.anchor else float("inf"),
)
for idx, group in enumerate(temp_groups):
group.group_id = idx
return _post_process_table_figure_assignment(temp_groups)
logger.trace(
f" 3๋‹จ๊ณ„: ๋‚˜๋จธ์ง€ ์š”์†Œ {len(remaining_elements)}๊ฐœ (Y, X) ์ •๋ ฌ ๋ฐ ์ˆœ์ฐจ ๊ทธ๋ฃนํ•‘ ์‹œ์ž‘..."
)
remaining_elements.sort(key=lambda e: (e.y_position, e.x_position))
final_groups: List[ElementGroup] = []
current_group: Optional[ElementGroup] = None
initial_top_orphan_children: List[MockElement] = []
initial_bottom_orphan_children: List[MockElement] = []
first_anchor_found = False
top_orphan_threshold_y = (
zone.y_min + zone.height * BASE_CASE_TOP_ORPHAN_THRESHOLD_RATIO
)
for element in remaining_elements:
if element.class_name in ALLOWED_ANCHORS:
first_anchor_found = True
if initial_top_orphan_children:
logger.trace(
f" ๋…๋ฆฝ์ ์ธ ์ƒ๋‹จ ๊ณ ์•„ ๊ทธ๋ฃน ์ƒ์„ฑ ({len(initial_top_orphan_children)}๊ฐœ ์š”์†Œ)"
)
final_groups.append(
ElementGroup(anchor=None, children=initial_top_orphan_children)
)
initial_top_orphan_children = []
if (
current_group is not None
and current_group.anchor is not None
and not current_group.is_empty()
):
final_groups.append(current_group)
if element.element_id in groups:
current_group = groups[element.element_id]
logger.trace(f" ์•ต์ปค ๊ทธ๋ฃน ์žฌ์‚ฌ์šฉ (ID: {element.element_id})")
else:
current_group = ElementGroup(anchor=element, children=[])
logger.trace(f" ์ƒˆ ์•ต์ปค ๊ทธ๋ฃน ์‹œ์ž‘ (ID: {element.element_id})")
if initial_bottom_orphan_children:
logger.trace(
f" ์ฒซ ์•ต์ปค(ID: {element.element_id}) ๊ทธ๋ฃน์— ํ•˜๋‹จ ๊ณ ์•„ ์ž์‹ {len(initial_bottom_orphan_children)}๊ฐœ ์ถ”๊ฐ€"
)
current_group.children = (
initial_bottom_orphan_children + current_group.children
)
initial_bottom_orphan_children = []
else:
if first_anchor_found:
if current_group is None:
logger.warning(
f" ์•ต์ปค ์—†์ด ์ž์‹ ์š”์†Œ(ID: {element.element_id}) ๋ฐœ๊ฒฌ๋จ. ์œ„์น˜({element.y_position:.1f}) ๋”ฐ๋ผ ์ž„์‹œ ๊ณ ์•„ ๋ฆฌ์ŠคํŠธ์— ์ถ”๊ฐ€."
)
if element.y_position < top_orphan_threshold_y:
initial_top_orphan_children.append(element)
else:
initial_bottom_orphan_children.append(element)
else:
current_group.add_child(element)
logger.trace(
f" ํ˜„์žฌ ๊ทธ๋ฃน(์•ต์ปค: {current_group.anchor.element_id if current_group.anchor else 'Orphan'})์— ์ž์‹ ์ถ”๊ฐ€ (ID: {element.element_id})"
)
else:
if element.y_position < top_orphan_threshold_y:
initial_top_orphan_children.append(element)
logger.trace(
f" ์ƒ๋‹จ ๊ณ ์•„ ์ž์‹ ์š”์†Œ(ID: {element.element_id}) ์ž„์‹œ ์ €์žฅ (Y < {top_orphan_threshold_y:.0f})"
)
else:
initial_bottom_orphan_children.append(element)
logger.trace(
f" ํ•˜๋‹จ ๊ณ ์•„ ์ž์‹ ์š”์†Œ(ID: {element.element_id}) ์ž„์‹œ ์ €์žฅ (Y >= {top_orphan_threshold_y:.0f})"
)
if initial_top_orphan_children:
logger.trace(
f" ๋งˆ์ง€๋ง‰ ๋…๋ฆฝ ์ƒ๋‹จ ๊ณ ์•„ ๊ทธ๋ฃน ์ƒ์„ฑ ({len(initial_top_orphan_children)}๊ฐœ ์š”์†Œ)"
)
final_groups.append(
ElementGroup(anchor=None, children=initial_top_orphan_children)
)
if current_group is not None and not current_group.is_empty():
final_groups.append(current_group)
elif initial_bottom_orphan_children:
logger.warning(" ๋ชจ๋“  ์š”์†Œ๊ฐ€ ํ•˜๋‹จ ์ž์‹ ์š”์†Œ์ž„. ๋‹จ์ผ ๊ณ ์•„ ๊ทธ๋ฃน ์ƒ์„ฑ.")
final_groups.append(
ElementGroup(anchor=None, children=initial_bottom_orphan_children)
)
processed_anchor_ids = set(g.anchor.element_id for g in final_groups if g.anchor)
for anchor_id, group in groups.items():
if anchor_id not in processed_anchor_ids and group.anchor:
final_groups.append(group)
logger.trace(f" ๋ฏธํฌํ•จ ์•ต์ปค ๊ทธ๋ฃน ์ถ”๊ฐ€ (์ˆ˜ํ‰ ์ธ์ ‘๋งŒ): ID {anchor_id}")
final_groups.sort(
key=lambda g: (
g.anchor.y_position
if g.anchor
else (min(c.y_position for c in g.children) if g.children else float("inf"))
)
)
# ํ›„์ฒ˜๋ฆฌ ํ˜ธ์ถœ ์ „ ๊ทธ๋ฃน ID ์ž„์‹œ ํ• ๋‹น
for idx, group in enumerate(final_groups):
group.group_id = idx
final_groups = _post_process_table_figure_assignment(final_groups)
logger.debug(
f" ์ˆœ์ฐจ ์ฒ˜๋ฆฌ ๊ธฐ๋ฐ˜ ๊ทธ๋ฃนํ•‘ (+ํ›„์ฒ˜๋ฆฌ) ์™„๋ฃŒ: {len(final_groups)} ๊ทธ๋ฃน ์ƒ์„ฑ"
)
return final_groups
def _base_case_mixed_layout(
zone: Zone, elements: List[MockElement], layout_type: LayoutType
) -> List[ElementGroup]:
"""ํ˜ผํ•ฉํ˜• ๋ ˆ์ด์•„์›ƒ Base Case ์ฒ˜๋ฆฌ (๊ธฐ์กด๊ณผ ๋™์ผ)"""
# ... (v2.1 ์ฝ”๋“œ์™€ ๋™์ผ) ...
logger.debug(
f" ํ˜ผํ•ฉํ˜• Base Case ์‹œ์ž‘ ({layout_type.name}): {len(elements)}๊ฐœ ์š”์†Œ in {zone}"
)
sorted_elements = sorted(elements, key=lambda e: (e.y_position, e.x_position))
final_groups: List[ElementGroup] = []
current_group: Optional[ElementGroup] = None
initial_top_orphan_children: List[MockElement] = []
initial_bottom_orphan_children: List[MockElement] = []
first_anchor_found = False
split_y = zone.y_min + zone.height * LAYOUT_DETECT_Y_SPLIT_POINT
logger.trace(f" ํ˜ผํ•ฉํ˜• Base Case Y ๋ถ„ํ• ์ : {split_y:.1f}")
for element in sorted_elements:
element_y_center = element.y_position + element.bbox_height / 2
if element.class_name in ALLOWED_ANCHORS:
first_anchor_found = True
if initial_top_orphan_children:
logger.trace(
f" ๋…๋ฆฝ์ ์ธ ์ƒ๋‹จ ๊ณ ์•„ ๊ทธ๋ฃน ์ƒ์„ฑ ({len(initial_top_orphan_children)}๊ฐœ ์š”์†Œ)"
)
final_groups.append(
ElementGroup(anchor=None, children=initial_top_orphan_children)
)
initial_top_orphan_children = []
if current_group is not None and not current_group.is_empty():
final_groups.append(current_group)
current_group = ElementGroup(anchor=element, children=[])
logger.trace(f" ์ƒˆ ์•ต์ปค ๊ทธ๋ฃน ์‹œ์ž‘ (ID: {element.element_id})")
if initial_bottom_orphan_children:
logger.trace(
f" ์ฒซ ์•ต์ปค(ID: {element.element_id}) ๊ทธ๋ฃน์— ํ•˜๋‹จ ๊ณ ์•„ ์ž์‹ {len(initial_bottom_orphan_children)}๊ฐœ ์ถ”๊ฐ€"
)
current_group.children = (
initial_bottom_orphan_children + current_group.children
)
initial_bottom_orphan_children = []
else:
if first_anchor_found:
if current_group is None:
logger.warning(
f" ์•ต์ปค ์—†์ด ์ž์‹ ์š”์†Œ(ID: {element.element_id}) ๋ฐœ๊ฒฌ๋จ. ์œ„์น˜({element_y_center:.1f}) ๋”ฐ๋ผ ์ž„์‹œ ๊ณ ์•„ ๋ฆฌ์ŠคํŠธ์— ์ถ”๊ฐ€."
)
if element_y_center < split_y:
initial_top_orphan_children.append(element)
else:
initial_bottom_orphan_children.append(element)
else:
current_group.add_child(element)
logger.trace(
f" ํ˜„์žฌ ๊ทธ๋ฃน(์•ต์ปค: {current_group.anchor.element_id if current_group.anchor else 'Orphan'})์— ์ž์‹ ์ถ”๊ฐ€ (ID: {element.element_id})"
)
else:
if element_y_center < split_y:
initial_top_orphan_children.append(element)
logger.trace(
f" ์ƒ๋‹จ ๊ณ ์•„ ์ž์‹ ์š”์†Œ(ID: {element.element_id}) ์ž„์‹œ ์ €์žฅ"
)
else:
initial_bottom_orphan_children.append(element)
logger.trace(
f" ํ•˜๋‹จ ๊ณ ์•„ ์ž์‹ ์š”์†Œ(ID: {element.element_id}) ์ž„์‹œ ์ €์žฅ"
)
if initial_top_orphan_children:
logger.trace(
f" ๋งˆ์ง€๋ง‰ ๋…๋ฆฝ ์ƒ๋‹จ ๊ณ ์•„ ๊ทธ๋ฃน ์ƒ์„ฑ ({len(initial_top_orphan_children)}๊ฐœ ์š”์†Œ)"
)
final_groups.append(
ElementGroup(anchor=None, children=initial_top_orphan_children)
)
if current_group is not None and not current_group.is_empty():
final_groups.append(current_group)
elif initial_bottom_orphan_children:
logger.warning(" ๋ชจ๋“  ์š”์†Œ๊ฐ€ ํ•˜๋‹จ ์ž์‹ ์š”์†Œ์ž„. ๋‹จ์ผ ๊ณ ์•„ ๊ทธ๋ฃน ์ƒ์„ฑ.")
final_groups.append(
ElementGroup(anchor=None, children=initial_bottom_orphan_children)
)
# ํ›„์ฒ˜๋ฆฌ ํ˜ธ์ถœ ์ „ ๊ทธ๋ฃน ID ์ž„์‹œ ํ• ๋‹น
for idx, group in enumerate(final_groups):
group.group_id = idx
final_groups = _post_process_table_figure_assignment(final_groups)
return final_groups
# ============================================================================
# ์ตœ์ข… ๋ณ‘ํ•ฉ ๋ฐ ์ˆœ์„œ ๋ถ€์—ฌ ํ•จ์ˆ˜ (๊ธฐ์กด๊ณผ ๋™์ผ)
# ============================================================================
def flatten_groups_and_assign_order(
groups: List[ElementGroup], start_global_order: int, start_group_id: int
) -> Tuple[List[MockElement], int, int]:
# ... (์ฝ”๋“œ ๋™์ผ) ...
"""์ฃผ์–ด์ง„ ๊ทธ๋ฃน ๋ฆฌ์ŠคํŠธ๋ฅผ ํ‰ํƒ„ํ™”ํ•˜๊ณ  ์ „์—ญ ์ˆœ์„œ/๊ทธ๋ฃน ID ๋ถ€์—ฌ"""
flattened = []
global_order = start_global_order
group_id_counter = start_group_id
logger.debug(
f" ํ‰ํƒ„ํ™” ์‹œ์ž‘: {len(groups)}๊ฐœ ๊ทธ๋ฃน (์‹œ์ž‘ order={global_order}, group_id={group_id_counter})"
)
for group in groups: # ์ตœ์ข… ์ •๋ ฌ๋œ ๊ทธ๋ฃน ์ˆœ์„œ ์‚ฌ์šฉ
# ๊ทธ๋ฃน ๊ฐ์ฒด์˜ ID๋Š” ์ž„์‹œ ID์ผ ์ˆ˜ ์žˆ์œผ๋ฏ€๋กœ ์—ฌ๊ธฐ์„œ ์ตœ์ข… ID ํ• ๋‹น
final_group_id = group_id_counter
group.group_id = final_group_id # ๋กœ๊น… ๋ฐ ์ฐธ์กฐ์šฉ ์—…๋ฐ์ดํŠธ
elements_in_group = group.get_all_elements_sorted()
logger.trace(
f" ๊ทธ๋ฃน {final_group_id} ํ‰ํƒ„ํ™” (Anchor: {group.anchor.element_id if group.anchor else 'Orphan'}, ์š”์†Œ ์ˆ˜: {len(elements_in_group)})"
)
for local_order, element in enumerate(elements_in_group):
try:
setattr(element, "order_in_question", global_order)
setattr(element, "group_id", final_group_id) # ์ตœ์ข… ๊ทธ๋ฃน ID ์‚ฌ์šฉ
setattr(element, "order_in_group", local_order)
flattened.append(element)
global_order += 1
except AttributeError as e:
logger.error(
f"์š”์†Œ (ID: {getattr(element, 'element_id', 'N/A')})์— ์ •๋ ฌ ์†์„ฑ ์ถ”๊ฐ€ ์‹คํŒจ: {e}"
)
group_id_counter += 1
logger.debug(
f" ํ‰ํƒ„ํ™” ์™„๋ฃŒ: {len(flattened)}๊ฐœ ์š”์†Œ ์ƒ์„ฑ (๋‹ค์Œ order={global_order}, group_id={group_id_counter})"
)
return flattened, global_order, group_id_counter
# ============================================================================
# ํ—ฌํผ ํ•จ์ˆ˜ (๊ธฐ์กด๊ณผ ๋™์ผ)
# ============================================================================
def preprocess_elements(
elements: List[MockElement], document_type: str
) -> List[MockElement]:
# ... (์ฝ”๋“œ ๋™์ผ) ...
"""0๋‹จ๊ณ„ ์ „์ฒ˜๋ฆฌ"""
original_count = len(elements)
if document_type == "question_based":
filtered = [e for e in elements if e.class_name in ALLOWED_CLASSES]
logger.info(
f"์ „์ฒ˜๋ฆฌ (question_based): {original_count}๊ฐœ โ†’ {len(filtered)}๊ฐœ (ํ—ˆ์šฉ ํด๋ž˜์Šค ํ•„ํ„ฐ๋ง)"
)
elif document_type == "reading_order":
filtered = elements
logger.info(f"์ „์ฒ˜๋ฆฌ (reading_order): {original_count}๊ฐœ (๋ชจ๋“  ํด๋ž˜์Šค ํ—ˆ์šฉ)")
else:
logger.warning(f"์•Œ ์ˆ˜ ์—†๋Š” ๋ฌธ์„œ ํƒ€์ž… '{document_type}', ๋ชจ๋“  ์š”์†Œ ๋ฐ˜ํ™˜")
filtered = elements
valid_elements = [e for e in filtered if hasattr(e, "area") and e.area > 0]
if len(valid_elements) < len(filtered):
logger.warning(
f"์ „์ฒ˜๋ฆฌ: ๋ฉด์ ์ด 0 ์ดํ•˜์ธ ์š”์†Œ {len(filtered) - len(valid_elements)}๊ฐœ ์ œ๊ฑฐ"
)
return valid_elements
def calculate_page_width(elements: List[MockElement]) -> int:
# ... (์ฝ”๋“œ ๋™์ผ) ...
"""ํŽ˜์ด์ง€ ๋„ˆ๋น„ ์ถ”์ •"""
if not elements:
return 0
return max(e.bbox_x + e.bbox_width for e in elements) if elements else 0
def calculate_page_height(elements: List[MockElement]) -> int:
# ... (์ฝ”๋“œ ๋™์ผ) ...
"""ํŽ˜์ด์ง€ ๋†’์ด ์ถ”์ •"""
if not elements:
return 0
return max(e.bbox_y + e.bbox_height for e in elements) if elements else 0
# ============================================================================
# DB ์ €์žฅ ํ•จ์ˆ˜ (ORM ์—ฐ๋™)
# ============================================================================
def save_sorting_results_to_db(
db: "Session", page_id: int, sorted_elements: List["LayoutElement"]
) -> Tuple[int, int]:
"""
์ •๋ ฌ๋œ LayoutElement ๋ฆฌ์ŠคํŠธ๋ฅผ question_groups์™€ question_elements ํ…Œ์ด๋ธ”์— ์ €์žฅํ•ฉ๋‹ˆ๋‹ค.
Args:
db: SQLAlchemy ์„ธ์…˜
page_id: ํŽ˜์ด์ง€ ID
sorted_elements: sorter.py๋กœ ์ •๋ ฌ๋œ LayoutElement ๋ฆฌ์ŠคํŠธ
(order_in_question, group_id ์†์„ฑ ํ•„์ˆ˜)
Returns:
(์ƒ์„ฑ๋œ ๊ทธ๋ฃน ์ˆ˜, ์ƒ์„ฑ๋œ ์š”์†Œ ์ˆ˜) ํŠœํ”Œ
Raises:
ValueError: sorted_elements์— order_in_question ๋˜๋Š” group_id๊ฐ€ ์—†๋Š” ๊ฒฝ์šฐ
"""
from .. import crud
from ..schemas import QuestionGroupCreate, QuestionElementCreate
if not sorted_elements:
logger.warning(f"page_id={page_id}: ์ •๋ ฌ๋œ ์š”์†Œ๊ฐ€ ์—†์–ด DB ์ €์žฅ์„ ๊ฑด๋„ˆ๋œ๋‹ˆ๋‹ค.")
return 0, 0
# 1. ์š”์†Œ๋“ค์„ group_id๋ณ„๋กœ ๊ทธ๋ฃนํ™”
groups_dict: Dict[int, List["LayoutElement"]] = {}
for elem in sorted_elements:
if not hasattr(elem, "order_in_question") or not hasattr(elem, "group_id"):
raise ValueError(
f"element_id={elem.element_id}: order_in_question ๋˜๋Š” group_id ์†์„ฑ์ด ์—†์Šต๋‹ˆ๋‹ค. "
"sorter.py์˜ flatten_groups_and_assign_order() ์‹คํ–‰ ํ›„ ํ˜ธ์ถœํ•˜์„ธ์š”."
)
group_id = elem.group_id
if group_id not in groups_dict:
groups_dict[group_id] = []
groups_dict[group_id].append(elem)
logger.info(
f"page_id={page_id}: {len(groups_dict)}๊ฐœ ๊ทธ๋ฃน, {len(sorted_elements)}๊ฐœ ์š”์†Œ๋ฅผ DB์— ์ €์žฅ ์‹œ์ž‘"
)
# 2. ๊ฐ ๊ทธ๋ฃน์— ๋Œ€ํ•ด QuestionGroup ์ƒ์„ฑ
group_count = 0
element_count = 0
for group_id, group_elements in sorted(groups_dict.items()):
# ์•ต์ปค ์š”์†Œ ์ฐพ๊ธฐ (๊ทธ๋ฃน ๋‚ด ์ฒซ ๋ฒˆ์งธ ์š”์†Œ๊ฐ€ ์•ต์ปค)
anchor_elem = min(group_elements, key=lambda e: e.order_in_question)
# Y ๋ฒ”์œ„ ๊ณ„์‚ฐ
start_y = min(e.y_position for e in group_elements)
end_y = max(
e.y_position + (e.bbox_height if hasattr(e, "bbox_height") else 0)
for e in group_elements
)
# QuestionGroup ์ƒ์„ฑ
group_create = QuestionGroupCreate(
page_id=page_id,
anchor_element_id=anchor_elem.element_id,
start_y=start_y,
end_y=end_y,
element_count=len(group_elements),
)
db_group = crud.create_question_group(db, group_create)
group_count += 1
logger.debug(
f" ๊ทธ๋ฃน {group_id} โ†’ question_group_id={db_group.question_group_id} (์•ต์ปค: {anchor_elem.element_id}, ์š”์†Œ ์ˆ˜: {len(group_elements)})"
)
# 3. ๊ทธ๋ฃน ๋‚ด ๊ฐ ์š”์†Œ์— ๋Œ€ํ•ด QuestionElement ์ƒ์„ฑ
for elem in group_elements:
element_create = QuestionElementCreate(
question_group_id=db_group.question_group_id,
element_id=elem.element_id,
order_in_question=elem.order_in_question + 1,
)
crud.create_question_element(db, element_create)
element_count += 1
logger.info(
f"page_id={page_id}: DB ์ €์žฅ ์™„๋ฃŒ ({group_count}๊ฐœ ๊ทธ๋ฃน, {element_count}๊ฐœ ์š”์†Œ)"
)
return group_count, element_count