smarteye-backend / app /services /formatter_utils.py
KwanHak's picture
sync: Smart_Demo ๋ธŒ๋žœ์น˜์˜ Backend ์ฝ”๋“œ ๋ณ‘ํ•ฉ & ์ด๋ฏธ์ง€ ๋กœ๋“œ๋ฅผ ์œ„ํ•œ MultiFileLoader ์ปดํฌ๋„ŒํŠธ ๊ตฌํ˜„
82c1146
"""
ํฌ๋งทํ„ฐ ์œ ํ‹ธ๋ฆฌํ‹ฐ ํ•จ์ˆ˜ ๋ชจ์Œ.
ํฌ๋งทํŒ… ๊ทœ์น™ ์ ์šฉ, ์„ ํƒ์ง€ ์ •๊ทœํ™”, ์‹œ๊ฐ ์ž๋ฃŒ ์„ค๋ช… ๋ณ‘ํ•ฉ ๋“ฑ
ํ•ต์‹ฌ ํ›„์ฒ˜๋ฆฌ ๋กœ์ง์„ ํ•œ ๊ณณ์— ๋ชจ์•„๋‘”๋‹ค.
"""
from __future__ import annotations
import re
import unicodedata
from dataclasses import dataclass
from typing import Dict, Iterable, List, Optional, Tuple
from .mock_models import MockElement
from .formatter_rules import RuleConfig
AI_PRIORITY_CLASSES = {"figure", "table", "flowchart"}
# ---------------------------------------------------------------------------
# ๋ฐ์ดํ„ฐ ๋ณ€ํ™˜ ํ—ฌํผ
# ---------------------------------------------------------------------------
def ocr_inputs_to_dict(ocr_texts) -> Dict[int, str]:
"""
OCR ์ž…๋ ฅ์„ element_id โ†’ text ๋”•์…”๋„ˆ๋ฆฌ๋กœ ๋ณ€ํ™˜.
"""
if isinstance(ocr_texts, dict):
return {int(k): (v or "").strip() for k, v in ocr_texts.items()}
ocr_dict: Dict[int, str] = {}
for item in ocr_texts or []:
try:
element_id = int(getattr(item, "element_id"))
text = getattr(item, "ocr_text", "") or ""
except AttributeError:
continue
cleaned = text.strip()
if cleaned:
ocr_dict[element_id] = cleaned
return ocr_dict
def normalize_ai_descriptions(
ai_descriptions: Optional[Dict[int, str]],
) -> Dict[int, str]:
"""
AI ์„ค๋ช… ๋”•์…”๋„ˆ๋ฆฌ๋ฅผ ์ •๋ฆฌํ•ฉ๋‹ˆ๋‹ค.
"""
if not ai_descriptions:
return {}
return {
int(k): (v or "").strip()
for k, v in ai_descriptions.items()
if (v or "").strip()
}
def split_first_line(text: str) -> Tuple[str, str]:
"""
๋ฌธ์ž์—ด์„ ์ฒซ ์ค„๊ณผ ๋‚˜๋จธ์ง€๋กœ ๋ถ„๋ฆฌํ•œ๋‹ค.
"""
if not text:
return "", ""
lines = text.splitlines()
first = lines[0]
remainder = "\n".join(lines[1:]).strip()
return first, remainder
# ---------------------------------------------------------------------------
# ์ฝ˜ํ…์ธ  ํ›„์ฒ˜๋ฆฌ
# ---------------------------------------------------------------------------
CHOICE_PATTERN = re.compile(
r"^(\(?\d{1,2}[\).]|[โ‘ -โ‘ณ]|[A-Z][\).]|[๊ฐ€-ํ•˜]\.|[๊ฐ€-ํ•˜]\))\s*(.+)$"
)
def normalize_choices(text: str) -> str:
"""
์„ ํƒ์ง€ ํ…์ŠคํŠธ๋ฅผ ํ‘œ์ค€ํ™”ํ•œ๋‹ค.
- ํŒจํ„ด์ด ๋ช…ํ™•ํ•˜๋ฉด ๊ทธ๋Œ€๋กœ ์‚ฌ์šฉ.
- ๊ทธ๋ ‡์ง€ ์•Š์œผ๋ฉด 'โ€ข ' ๋ถˆ๋ฆฟ์„ ๋ถ™์ธ๋‹ค.
"""
lines = [line.strip() for line in text.splitlines() if line.strip()]
normalized: List[str] = []
for line in lines:
match = CHOICE_PATTERN.match(line)
if match:
label, body = match.groups()
normalized.append(f"{label} {body.strip()}")
else:
normalized.append(f"โ€ข {line}")
return "\n".join(normalized)
LIST_PATTERN = re.compile(r"^([\-โ€ข]|\d+\.)\s*(.+)$")
def normalize_list(text: str) -> str:
"""
์ผ๋ฐ˜ ๋ฆฌ์ŠคํŠธ ํ…์ŠคํŠธ๋ฅผ ์ •๊ทœํ™”.
"""
lines = [line.strip() for line in text.splitlines() if line.strip()]
normalized: List[str] = []
for line in lines:
match = LIST_PATTERN.match(line)
if match:
normalized.append(f"- {match.group(2).strip()}")
else:
normalized.append(f"- {line}")
return "\n".join(normalized)
def normalize_reading_list(text: str) -> str:
"""
์ผ๋ฐ˜ ๋ฌธ์„œ์šฉ ๋ฆฌ์ŠคํŠธ ์ •๊ทœํ™” (๋ถˆ๋ฆฟ ๊ธฐํ˜ธ ์œ ์ง€).
"""
lines = [line.strip() for line in text.splitlines() if line.strip()]
normalized: List[str] = []
for line in lines:
match = LIST_PATTERN.match(line)
if match:
normalized.append(f"โ€ข {match.group(2).strip()}")
else:
normalized.append(f"โ€ข {line}")
return "\n".join(normalized)
def merge_visual_description(text: str, ai_text: Optional[str]) -> str:
"""
๊ทธ๋ฆผ/ํ‘œ/์ˆœ์„œ๋„ ์„ค๋ช…์„ ๊ฒฐํ•ฉํ•œ๋‹ค.
AI ์„ค๋ช…์ด ์žˆ์œผ๋ฉด ์šฐ์„  ์‚ฌ์šฉํ•˜๊ณ , OCR ํ…์ŠคํŠธ๊ฐ€ ์žˆ์œผ๋ฉด ๋‹ค์Œ ์ค„์— ์ถ”๊ฐ€ํ•œ๋‹ค.
"""
if ai_text and text:
return f"{ai_text}\n{text}"
return ai_text or text
def isolate_formula(text: str) -> str:
"""
์ˆ˜์‹์€ ์ฃผ์–ด์ง„ ํ…์ŠคํŠธ๋ฅผ ๊ทธ๋Œ€๋กœ ์‚ฌ์šฉํ•˜๋˜ ์•ž๋’ค ๊ณต๋ฐฑ์„ ์ •๋ˆํ•œ๋‹ค.
"""
return text.strip()
def uppercase_title(text: str) -> str:
return text.strip()
def normalize_question_type(text: str) -> str:
"""
question type OCR ๊ฒฐ๊ณผ์˜ ์ค„ ์ •๋ ฌ/๋…ธ์ด์ฆˆ ์ œ๊ฑฐ.
- ์ค„๋ฐ”๊ฟˆ์„ ๊ณต๋ฐฑ์œผ๋กœ ์น˜ํ™˜ํ•˜์—ฌ ํ•œ ์ค„๋กœ ์ •๋ฆฌ
(๊ทธ ์™ธ ๋ฌธ์ž/๊ณต๋ฐฑ์€ ์›๋ณธ์„ ์ตœ๋Œ€ํ•œ ์œ ์ง€)
"""
normalized = unicodedata.normalize("NFKC", text or "")
normalized = normalized.replace("\r\n", "\n").replace("\r", "\n")
return normalized.replace("\n", " ")
TRANSFORM_DISPATCH = {
"normalize_choices": normalize_choices,
"normalize_list": normalize_list,
"normalize_reading_list": normalize_reading_list,
"merge_visual_description": merge_visual_description,
"isolate_formula": isolate_formula,
"uppercase_title": uppercase_title,
"normalize_question_type": normalize_question_type,
}
# ---------------------------------------------------------------------------
# ๊ทœ์น™ ์ ์šฉ ๋ฐ ์ถœ๋ ฅ ์ •๋ฆฌ
# ---------------------------------------------------------------------------
def apply_rule(rule: RuleConfig, content: str) -> str:
"""
๊ทœ์น™์— ๋”ฐ๋ผ ์ฝ˜ํ…์ธ ์— ์ ‘๋‘์‚ฌ, ๋“ค์—ฌ์“ฐ๊ธฐ, ์ ‘๋ฏธ์‚ฌ๋ฅผ ์ ์šฉํ•œ๋‹ค.
"""
if not content and not rule.allow_empty:
return ""
working = content
if rule.indent > 0:
indent_str = " " * rule.indent
indented_lines: List[str] = []
for line in working.splitlines():
if not line.strip():
indented_lines.append("")
else:
indented_lines.append(f"{indent_str}{line}")
working = "\n".join(indented_lines)
if not working and not rule.keep_suffix_on_empty:
return rule.prefix if rule.prefix else ""
return f"{rule.prefix}{working}{rule.suffix}"
def clean_output(text: str) -> str:
"""
์ตœ์ข… ์ถœ๋ ฅ ๋ฌธ์ž์—ด์—์„œ ์—ฐ์† ๋นˆ ์ค„ ๋ฐ ํ›„ํ–‰ ๊ณต๋ฐฑ์„ ์ •๋ฆฌํ•œ๋‹ค.
"""
lines = text.splitlines()
cleaned: List[str] = []
empty_streak = 0
for line in lines:
stripped = line.rstrip()
if stripped == "":
empty_streak += 1
if empty_streak > 2:
continue
else:
empty_streak = 0
cleaned.append(stripped)
result = "\n".join(cleaned).strip()
return result
# ---------------------------------------------------------------------------
# ๋ Œ๋”๋ง ์ปจํ…์ŠคํŠธ
# ---------------------------------------------------------------------------
@dataclass
class RenderContext:
"""
๋ Œ๋”๋ง ์‹œ ํ•„์š”ํ•œ ์ปจํ…์ŠคํŠธ.
"""
ocr_texts: Dict[int, str]
ai_texts: Dict[int, str]
rules: Dict[str, RuleConfig]
def get_texts(self, element: MockElement) -> Tuple[str, str]:
element_id = getattr(element, "element_id", None)
base_text = self.ocr_texts.get(element_id, "").strip()
ai_text = self.ai_texts.get(element_id, "").strip()
return base_text, ai_text
def apply_transform(
self,
element: MockElement,
text: str,
*,
base_text: str,
ai_text: str,
) -> str:
rule = self.rules.get(element.class_name)
if not rule or not rule.transform:
return text.strip()
transform = TRANSFORM_DISPATCH.get(rule.transform)
if not transform:
return text.strip()
if rule.transform == "merge_visual_description":
return transform(base_text.strip(), ai_text.strip())
return transform(text.strip())
def format_element(
self, element: MockElement, content_override: Optional[str] = None
) -> str:
"""
๊ฐœ๋ณ„ ์š”์†Œ๋ฅผ ๊ทœ์น™์— ๋”ฐ๋ผ ๋ฌธ์ž์—ด๋กœ ๋ณ€ํ™˜ํ•œ๋‹ค.
"""
element_id = getattr(element, "element_id", None)
base_text = (
content_override
if content_override is not None
else self.ocr_texts.get(element_id, "")
).strip()
ai_text = self.ai_texts.get(element_id, "").strip()
# ๊ทธ๋ฆผ/ํ‘œ/์ˆœ์„œ๋„๋Š” transform ํ•จ์ˆ˜์—์„œ ๋ณ‘ํ•ฉ ์ฒ˜๋ฆฌ
if element.class_name in AI_PRIORITY_CLASSES:
# merge_visual_description transform์— ๋งก๊น€
working = base_text
else:
working = base_text or ai_text
# Transform ์ ์šฉ (merge_visual_description์ด ai_text์™€ ๋ณ‘ํ•ฉ)
working = self.apply_transform(
element,
working,
base_text=base_text,
ai_text=ai_text,
)
# Transform ํ›„์—๋„ ๋น„์–ด์žˆ๊ณ  AI ์„ค๋ช…์ด ์žˆ์œผ๋ฉด AI ์„ค๋ช… ์‚ฌ์šฉ
if not working and ai_text and element.class_name in AI_PRIORITY_CLASSES:
working = ai_text
# ๊ทœ์น™ ์ ์šฉ (prefix, suffix, indent)
rule = self.rules.get(element.class_name)
if rule:
return apply_rule(rule, working)
# ๊ทœ์น™์ด ์—†์œผ๋ฉด ๊ธฐ๋ณธ์ ์œผ๋กœ ํ•œ ์ค„ ์ถœ๋ ฅ
return f"{working}\n" if working else ""