""" 첨부파일(PDF / HWP / DOCX / ZIP)에서 신청기간 텍스트를 추출한다. 추출 우선순위: 1. pdfplumber 텍스트 추출 (텍스트 기반 PDF) 2. pytesseract OCR (스캔 PDF / 이미지) 3. python-docx (DOCX) 4. olefile (HWP 간이 파싱) 5. ZIP 압축 해제 후 위 순서 반복 """ import io import re import zipfile import logging from typing import Optional import requests logger = logging.getLogger(__name__) # ───────────────────────────────────────────── # 날짜 패턴 # ───────────────────────────────────────────── _DATE_PATTERNS = [ # YYYY.MM.DD ~ YYYY.MM.DD 또는 YYYY-MM-DD ~ YYYY-MM-DD r"(\d{4}[.\-]\d{2}[.\-]\d{2})\s*[~~\-~]\s*(\d{4}[.\-]\d{2}[.\-]\d{2})", # YYYY년 MM월 DD일 ~ YYYY년 MM월 DD일 r"(\d{4})년\s*(\d{1,2})월\s*(\d{1,2})일\s*[~~\-~]\s*(\d{4})년\s*(\d{1,2})월\s*(\d{1,2})일", ] _PERIOD_KEYWORDS = ["신청기간", "접수기간", "공고기간", "모집기간", "지원기간", "접수일", "신청일"] def _normalize_date(raw: str) -> Optional[str]: raw = raw.strip().replace(".", "-") parts = raw.split("-") if len(parts) == 3: y, m, d = parts if len(y) == 4: return f"{y}-{m.zfill(2)}-{d.zfill(2)}" return None def _extract_dates_from_text(text: str) -> tuple[Optional[str], Optional[str]]: """텍스트에서 신청기간 날짜 쌍을 추출한다.""" # 키워드 주변 우선 탐색 for kw in _PERIOD_KEYWORDS: idx = text.find(kw) if idx != -1: snippet = text[idx: idx + 200] result = _apply_patterns(snippet) if result[0]: return result # 키워드 없으면 전체 탐색 return _apply_patterns(text) def _apply_patterns(text: str) -> tuple[Optional[str], Optional[str]]: for pat in _DATE_PATTERNS: m = re.search(pat, text) if not m: continue groups = m.groups() if len(groups) == 2: s = _normalize_date(groups[0]) e = _normalize_date(groups[1]) if s and e: return s, e elif len(groups) == 6: try: s = f"{groups[0]}-{int(groups[1]):02d}-{int(groups[2]):02d}" e = f"{groups[3]}-{int(groups[4]):02d}-{int(groups[5]):02d}" return s, e except Exception: pass return None, None # ───────────────────────────────────────────── # 텍스트 추출 — PDF (pdfplumber + OCR 폴백) # ───────────────────────────────────────────── def _extract_text_pdf(content: bytes) -> str: """ 1단계: pdfplumber로 텍스트 추출. 텍스트가 너무 짧으면 (스캔 PDF) OCR로 폴백. """ text = "" try: import pdfplumber with pdfplumber.open(io.BytesIO(content)) as pdf: pages_text = [p.extract_text() or "" for p in pdf.pages[:10]] text = "\n".join(pages_text) except Exception as e: logger.warning(f"pdfplumber 오류: {e}") # 의미 있는 텍스트가 충분히 추출됐으면 그대로 사용 if len(text.strip()) >= 50: return text # ── OCR 폴백 ── logger.info("텍스트 부족 → OCR 시도") return _ocr_pdf(content) or text def _ocr_pdf(content: bytes) -> str: """pdf2image로 렌더링 후 pytesseract로 OCR.""" try: from pdf2image import convert_from_bytes import pytesseract images = convert_from_bytes(content, dpi=200, first_page=1, last_page=5) texts = [] for img in images: ocr_text = pytesseract.image_to_string(img, lang="kor+eng") texts.append(ocr_text) # 날짜 패턴을 찾으면 나머지 페이지 생략 if _apply_patterns("\n".join(texts))[0]: break return "\n".join(texts) except Exception as e: logger.warning(f"OCR 오류: {e}") return "" # ───────────────────────────────────────────── # 텍스트 추출 — 이미지 단독 파일 # ───────────────────────────────────────────── def _ocr_image(content: bytes) -> str: """이미지 파일(PNG/JPG 등) OCR.""" try: from PIL import Image import pytesseract img = Image.open(io.BytesIO(content)) return pytesseract.image_to_string(img, lang="kor+eng") except Exception as e: logger.warning(f"이미지 OCR 오류: {e}") return "" # ───────────────────────────────────────────── # 텍스트 추출 — DOCX / HWP # ───────────────────────────────────────────── def _extract_text_docx(content: bytes) -> str: try: from docx import Document doc = Document(io.BytesIO(content)) return "\n".join(p.text for p in doc.paragraphs) except Exception as e: logger.warning(f"DOCX 파싱 오류: {e}") return "" def _extract_text_hwp(content: bytes) -> str: """HWP olefile 간이 파싱.""" try: import olefile if not olefile.isOleFile(io.BytesIO(content)): return "" with olefile.OleFileIO(io.BytesIO(content)) as ole: streams = ["BodyText/Section0", "BodyText/Section1", "BodyText/Section2"] texts = [] for stream in streams: if ole.exists(stream): raw = ole.openstream(stream).read() try: import zlib raw = zlib.decompress(raw, -15) except Exception: pass try: texts.append(raw.decode("utf-16-le", errors="ignore")) except Exception: texts.append(raw.decode("cp949", errors="ignore")) return "\n".join(texts) except Exception as e: logger.warning(f"HWP 파싱 오류: {e}") return "" def _extract_text_zip(content: bytes) -> str: """ZIP 내 파일 순회하며 텍스트 추출.""" try: with zipfile.ZipFile(io.BytesIO(content)) as zf: for name in zf.namelist(): ext = name.rsplit(".", 1)[-1].lower() data = zf.read(name) if ext == "pdf": text = _extract_text_pdf(data) elif ext == "docx": text = _extract_text_docx(data) elif ext in ("hwp", "hwpx"): text = _extract_text_hwp(data) elif ext in ("png", "jpg", "jpeg", "tif", "tiff"): text = _ocr_image(data) else: continue if text.strip(): return text except Exception as e: logger.warning(f"ZIP 파싱 오류: {e}") return "" # ───────────────────────────────────────────── # 공개 API # ───────────────────────────────────────────── def extract_period_from_file( file_url: str, session: requests.Session ) -> tuple[Optional[str], Optional[str]]: """ URL에서 파일을 다운로드하고 신청기간(start, end)을 추출한다. 반환: (apply_start, apply_end) — YYYY-MM-DD 또는 None """ try: resp = session.get(file_url, timeout=30) resp.raise_for_status() except Exception as e: logger.warning(f"파일 다운로드 실패 {file_url}: {e}") return None, None content = resp.content fname = file_url.rsplit("/", 1)[-1].lower().split("?")[0] ext = fname.rsplit(".", 1)[-1] if "." in fname else "" content_type = resp.headers.get("Content-Type", "").lower() if ext == "pdf" or "pdf" in content_type: text = _extract_text_pdf(content) elif ext == "docx": text = _extract_text_docx(content) elif ext in ("hwp", "hwpx"): text = _extract_text_hwp(content) elif ext == "zip" or "zip" in content_type: text = _extract_text_zip(content) elif ext in ("png", "jpg", "jpeg", "tif", "tiff"): text = _ocr_image(content) else: text = content.decode("utf-8", errors="ignore") return _extract_dates_from_text(text)