Spaces:
Sleeping
Sleeping
| """ | |
| 첨부파일(PDF / HWP / DOCX / ZIP)에서 신청기간 텍스트를 추출한다. | |
| 추출 우선순위: | |
| 1. pdfplumber 텍스트 추출 (텍스트 기반 PDF) | |
| 2. pytesseract OCR (스캔 PDF / 이미지) | |
| 3. python-docx (DOCX) | |
| 4. olefile (HWP 간이 파싱) | |
| 5. ZIP 압축 해제 후 위 순서 반복 | |
| """ | |
| import io | |
| import re | |
| import zipfile | |
| import logging | |
| from typing import Optional | |
| import requests | |
| logger = logging.getLogger(__name__) | |
| # ───────────────────────────────────────────── | |
| # 날짜 패턴 | |
| # ───────────────────────────────────────────── | |
| _DATE_PATTERNS = [ | |
| # YYYY.MM.DD ~ YYYY.MM.DD 또는 YYYY-MM-DD ~ YYYY-MM-DD | |
| r"(\d{4}[.\-]\d{2}[.\-]\d{2})\s*[~~\-~]\s*(\d{4}[.\-]\d{2}[.\-]\d{2})", | |
| # YYYY년 MM월 DD일 ~ YYYY년 MM월 DD일 | |
| r"(\d{4})년\s*(\d{1,2})월\s*(\d{1,2})일\s*[~~\-~]\s*(\d{4})년\s*(\d{1,2})월\s*(\d{1,2})일", | |
| ] | |
| _PERIOD_KEYWORDS = ["신청기간", "접수기간", "공고기간", "모집기간", "지원기간", "접수일", "신청일"] | |
| def _normalize_date(raw: str) -> Optional[str]: | |
| raw = raw.strip().replace(".", "-") | |
| parts = raw.split("-") | |
| if len(parts) == 3: | |
| y, m, d = parts | |
| if len(y) == 4: | |
| return f"{y}-{m.zfill(2)}-{d.zfill(2)}" | |
| return None | |
| def _extract_dates_from_text(text: str) -> tuple[Optional[str], Optional[str]]: | |
| """텍스트에서 신청기간 날짜 쌍을 추출한다.""" | |
| # 키워드 주변 우선 탐색 | |
| for kw in _PERIOD_KEYWORDS: | |
| idx = text.find(kw) | |
| if idx != -1: | |
| snippet = text[idx: idx + 200] | |
| result = _apply_patterns(snippet) | |
| if result[0]: | |
| return result | |
| # 키워드 없으면 전체 탐색 | |
| return _apply_patterns(text) | |
| def _apply_patterns(text: str) -> tuple[Optional[str], Optional[str]]: | |
| for pat in _DATE_PATTERNS: | |
| m = re.search(pat, text) | |
| if not m: | |
| continue | |
| groups = m.groups() | |
| if len(groups) == 2: | |
| s = _normalize_date(groups[0]) | |
| e = _normalize_date(groups[1]) | |
| if s and e: | |
| return s, e | |
| elif len(groups) == 6: | |
| try: | |
| s = f"{groups[0]}-{int(groups[1]):02d}-{int(groups[2]):02d}" | |
| e = f"{groups[3]}-{int(groups[4]):02d}-{int(groups[5]):02d}" | |
| return s, e | |
| except Exception: | |
| pass | |
| return None, None | |
| # ───────────────────────────────────────────── | |
| # 텍스트 추출 — PDF (pdfplumber + OCR 폴백) | |
| # ───────────────────────────────────────────── | |
| def _extract_text_pdf(content: bytes) -> str: | |
| """ | |
| 1단계: pdfplumber로 텍스트 추출. | |
| 텍스트가 너무 짧으면 (스캔 PDF) OCR로 폴백. | |
| """ | |
| text = "" | |
| try: | |
| import pdfplumber | |
| with pdfplumber.open(io.BytesIO(content)) as pdf: | |
| pages_text = [p.extract_text() or "" for p in pdf.pages[:10]] | |
| text = "\n".join(pages_text) | |
| except Exception as e: | |
| logger.warning(f"pdfplumber 오류: {e}") | |
| # 의미 있는 텍스트가 충분히 추출됐으면 그대로 사용 | |
| if len(text.strip()) >= 50: | |
| return text | |
| # ── OCR 폴백 ── | |
| logger.info("텍스트 부족 → OCR 시도") | |
| return _ocr_pdf(content) or text | |
| def _ocr_pdf(content: bytes) -> str: | |
| """pdf2image로 렌더링 후 pytesseract로 OCR.""" | |
| try: | |
| from pdf2image import convert_from_bytes | |
| import pytesseract | |
| images = convert_from_bytes(content, dpi=200, first_page=1, last_page=5) | |
| texts = [] | |
| for img in images: | |
| ocr_text = pytesseract.image_to_string(img, lang="kor+eng") | |
| texts.append(ocr_text) | |
| # 날짜 패턴을 찾으면 나머지 페이지 생략 | |
| if _apply_patterns("\n".join(texts))[0]: | |
| break | |
| return "\n".join(texts) | |
| except Exception as e: | |
| logger.warning(f"OCR 오류: {e}") | |
| return "" | |
| # ───────────────────────────────────────────── | |
| # 텍스트 추출 — 이미지 단독 파일 | |
| # ───────────────────────────────────────────── | |
| def _ocr_image(content: bytes) -> str: | |
| """이미지 파일(PNG/JPG 등) OCR.""" | |
| try: | |
| from PIL import Image | |
| import pytesseract | |
| img = Image.open(io.BytesIO(content)) | |
| return pytesseract.image_to_string(img, lang="kor+eng") | |
| except Exception as e: | |
| logger.warning(f"이미지 OCR 오류: {e}") | |
| return "" | |
| # ───────────────────────────────────────────── | |
| # 텍스트 추출 — DOCX / HWP | |
| # ───────────────────────────────────────────── | |
| def _extract_text_docx(content: bytes) -> str: | |
| try: | |
| from docx import Document | |
| doc = Document(io.BytesIO(content)) | |
| return "\n".join(p.text for p in doc.paragraphs) | |
| except Exception as e: | |
| logger.warning(f"DOCX 파싱 오류: {e}") | |
| return "" | |
| def _extract_text_hwp(content: bytes) -> str: | |
| """HWP olefile 간이 파싱.""" | |
| try: | |
| import olefile | |
| if not olefile.isOleFile(io.BytesIO(content)): | |
| return "" | |
| with olefile.OleFileIO(io.BytesIO(content)) as ole: | |
| streams = ["BodyText/Section0", "BodyText/Section1", "BodyText/Section2"] | |
| texts = [] | |
| for stream in streams: | |
| if ole.exists(stream): | |
| raw = ole.openstream(stream).read() | |
| try: | |
| import zlib | |
| raw = zlib.decompress(raw, -15) | |
| except Exception: | |
| pass | |
| try: | |
| texts.append(raw.decode("utf-16-le", errors="ignore")) | |
| except Exception: | |
| texts.append(raw.decode("cp949", errors="ignore")) | |
| return "\n".join(texts) | |
| except Exception as e: | |
| logger.warning(f"HWP 파싱 오류: {e}") | |
| return "" | |
| def _extract_text_zip(content: bytes) -> str: | |
| """ZIP 내 파일 순회하며 텍스트 추출.""" | |
| try: | |
| with zipfile.ZipFile(io.BytesIO(content)) as zf: | |
| for name in zf.namelist(): | |
| ext = name.rsplit(".", 1)[-1].lower() | |
| data = zf.read(name) | |
| if ext == "pdf": | |
| text = _extract_text_pdf(data) | |
| elif ext == "docx": | |
| text = _extract_text_docx(data) | |
| elif ext in ("hwp", "hwpx"): | |
| text = _extract_text_hwp(data) | |
| elif ext in ("png", "jpg", "jpeg", "tif", "tiff"): | |
| text = _ocr_image(data) | |
| else: | |
| continue | |
| if text.strip(): | |
| return text | |
| except Exception as e: | |
| logger.warning(f"ZIP 파싱 오류: {e}") | |
| return "" | |
| # ───────────────────────────────────────────── | |
| # 공개 API | |
| # ───────────────────────────────────────────── | |
| def extract_period_from_file( | |
| file_url: str, session: requests.Session | |
| ) -> tuple[Optional[str], Optional[str]]: | |
| """ | |
| URL에서 파일을 다운로드하고 신청기간(start, end)을 추출한다. | |
| 반환: (apply_start, apply_end) — YYYY-MM-DD 또는 None | |
| """ | |
| try: | |
| resp = session.get(file_url, timeout=30) | |
| resp.raise_for_status() | |
| except Exception as e: | |
| logger.warning(f"파일 다운로드 실패 {file_url}: {e}") | |
| return None, None | |
| content = resp.content | |
| fname = file_url.rsplit("/", 1)[-1].lower().split("?")[0] | |
| ext = fname.rsplit(".", 1)[-1] if "." in fname else "" | |
| content_type = resp.headers.get("Content-Type", "").lower() | |
| if ext == "pdf" or "pdf" in content_type: | |
| text = _extract_text_pdf(content) | |
| elif ext == "docx": | |
| text = _extract_text_docx(content) | |
| elif ext in ("hwp", "hwpx"): | |
| text = _extract_text_hwp(content) | |
| elif ext == "zip" or "zip" in content_type: | |
| text = _extract_text_zip(content) | |
| elif ext in ("png", "jpg", "jpeg", "tif", "tiff"): | |
| text = _ocr_image(content) | |
| else: | |
| text = content.decode("utf-8", errors="ignore") | |
| return _extract_dates_from_text(text) | |