|
|
""" |
|
|
파일 처리 및 API 관련 함수 |
|
|
- 캐시 시스템 통합 버전 |
|
|
""" |
|
|
import os |
|
|
import re |
|
|
import zlib |
|
|
import zipfile |
|
|
import tempfile |
|
|
import requests |
|
|
from pathlib import Path |
|
|
from typing import Optional, Tuple, List, Dict, Generator |
|
|
from xml.etree import ElementTree as ET |
|
|
from datetime import datetime |
|
|
|
|
|
from utils import ( |
|
|
API_URL, API_KEY, GROQ_API_KEY, CATEGORY_CODES, |
|
|
OLEFILE_AVAILABLE, PYPDF2_AVAILABLE, PDFPLUMBER_AVAILABLE, GROQ_AVAILABLE, |
|
|
extract_region_from_text, extract_region_from_hashtags, classify_org_type, |
|
|
parse_deadline, is_ongoing |
|
|
) |
|
|
|
|
|
if OLEFILE_AVAILABLE: |
|
|
import olefile |
|
|
if PYPDF2_AVAILABLE: |
|
|
import PyPDF2 |
|
|
if PDFPLUMBER_AVAILABLE: |
|
|
import pdfplumber |
|
|
if GROQ_AVAILABLE: |
|
|
from groq import Groq |
|
|
|
|
|
import pandas as pd |
|
|
from bs4 import BeautifulSoup |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
from cache_db import ( |
|
|
get_cache, get_cached_announcements, sync_from_api, |
|
|
manual_sync, get_sync_status, initialize_cache_system |
|
|
) |
|
|
CACHE_AVAILABLE = True |
|
|
except ImportError: |
|
|
CACHE_AVAILABLE = False |
|
|
print("Warning: cache_db module not available") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract_text_from_hwpx(file_path: str) -> Tuple[Optional[str], Optional[str]]: |
|
|
"""HWPX 파일에서 텍스트 추출""" |
|
|
try: |
|
|
text_parts = [] |
|
|
with zipfile.ZipFile(file_path, 'r') as zf: |
|
|
file_list = zf.namelist() |
|
|
section_files = sorted([f for f in file_list if f.startswith('Contents/section') and f.endswith('.xml')]) |
|
|
if not section_files: |
|
|
section_files = sorted([f for f in file_list if 'section' in f.lower() and f.endswith('.xml')]) |
|
|
for section_file in section_files: |
|
|
try: |
|
|
with zf.open(section_file) as sf: |
|
|
content = sf.read() |
|
|
content_str = content.decode('utf-8') |
|
|
content_str = re.sub(r'\sxmlns[^"]*"[^"]*"', '', content_str) |
|
|
content_str = re.sub(r'<[a-zA-Z]+:', '<', content_str) |
|
|
content_str = re.sub(r'</[a-zA-Z]+:', '</', content_str) |
|
|
try: |
|
|
root = ET.fromstring(content_str) |
|
|
texts = [] |
|
|
for elem in root.iter(): |
|
|
if elem.tag.endswith('t') or elem.tag == 't': |
|
|
if elem.text: |
|
|
texts.append(elem.text) |
|
|
elif elem.text and elem.text.strip(): |
|
|
if any(x in elem.tag.lower() for x in ['text', 'run', 'para', 'char']): |
|
|
texts.append(elem.text.strip()) |
|
|
if texts: |
|
|
text_parts.append(' '.join(texts)) |
|
|
except ET.ParseError: |
|
|
text_matches = re.findall(r'>([^<]+)<', content.decode('utf-8', errors='ignore')) |
|
|
clean_texts = [t.strip() for t in text_matches if t.strip() and len(t.strip()) > 1] |
|
|
if clean_texts: |
|
|
text_parts.append(' '.join(clean_texts)) |
|
|
except: |
|
|
continue |
|
|
if text_parts: |
|
|
result = '\n\n'.join(text_parts) |
|
|
result = re.sub(r'\s+', ' ', result) |
|
|
result = re.sub(r'\n{3,}', '\n\n', result) |
|
|
return result.strip(), None |
|
|
return None, "HWPX에서 텍스트를 찾을 수 없습니다" |
|
|
except zipfile.BadZipFile: |
|
|
return None, "유효하지 않은 HWPX 파일" |
|
|
except Exception as e: |
|
|
return None, f"HWPX 처리 오류: {str(e)}" |
|
|
|
|
|
|
|
|
def extract_hwp_section_text(data: bytes) -> Optional[str]: |
|
|
"""HWP 섹션 데이터에서 텍스트 추출""" |
|
|
texts = [] |
|
|
pos = 0 |
|
|
while pos < len(data) - 4: |
|
|
try: |
|
|
header = int.from_bytes(data[pos:pos+4], 'little') |
|
|
tag_id = header & 0x3FF |
|
|
size = (header >> 20) & 0xFFF |
|
|
pos += 4 |
|
|
if size == 0xFFF: |
|
|
if pos + 4 > len(data): |
|
|
break |
|
|
size = int.from_bytes(data[pos:pos+4], 'little') |
|
|
pos += 4 |
|
|
if pos + size > len(data): |
|
|
break |
|
|
record_data = data[pos:pos+size] |
|
|
pos += size |
|
|
if tag_id == 67 and size > 0: |
|
|
text = decode_para_text(record_data) |
|
|
if text: |
|
|
texts.append(text) |
|
|
except: |
|
|
pos += 1 |
|
|
continue |
|
|
return '\n'.join(texts) if texts else None |
|
|
|
|
|
|
|
|
def decode_para_text(data: bytes) -> Optional[str]: |
|
|
"""HWP 문단 텍스트 디코딩""" |
|
|
result = [] |
|
|
i = 0 |
|
|
while i < len(data) - 1: |
|
|
code = int.from_bytes(data[i:i+2], 'little') |
|
|
if code == 0: |
|
|
pass |
|
|
elif code in [1, 2, 3]: |
|
|
i += 14 |
|
|
elif code == 9: |
|
|
result.append('\t') |
|
|
elif code in [10, 13]: |
|
|
result.append('\n') |
|
|
elif code == 24: |
|
|
result.append('-') |
|
|
elif code in [30, 31]: |
|
|
result.append(' ') |
|
|
elif code >= 32: |
|
|
try: |
|
|
char = chr(code) |
|
|
if char.isprintable() or char in '\n\t ': |
|
|
result.append(char) |
|
|
except: |
|
|
pass |
|
|
i += 2 |
|
|
text = ''.join(result).strip() |
|
|
text = re.sub(r'[ \t]+', ' ', text) |
|
|
text = re.sub(r'\n{3,}', '\n\n', text) |
|
|
return text if len(text) > 2 else None |
|
|
|
|
|
|
|
|
def extract_text_from_hwp(file_path: str) -> Tuple[Optional[str], Optional[str]]: |
|
|
"""HWP 파일에서 텍스트 추출""" |
|
|
if not OLEFILE_AVAILABLE: |
|
|
return None, "olefile 모듈 없음" |
|
|
try: |
|
|
ole = olefile.OleFileIO(file_path) |
|
|
if not ole.exists('FileHeader'): |
|
|
ole.close() |
|
|
return None, "HWP 파일 헤더 없음" |
|
|
header_data = ole.openstream('FileHeader').read() |
|
|
is_compressed = (header_data[36] & 1) == 1 if len(header_data) > 36 else True |
|
|
all_texts = [] |
|
|
for entry in ole.listdir(): |
|
|
entry_path = '/'.join(entry) |
|
|
if entry_path.startswith('BodyText/Section'): |
|
|
try: |
|
|
stream_data = ole.openstream(entry).read() |
|
|
if is_compressed: |
|
|
try: |
|
|
stream_data = zlib.decompress(stream_data, -15) |
|
|
except: |
|
|
try: |
|
|
stream_data = zlib.decompress(stream_data) |
|
|
except: |
|
|
pass |
|
|
section_text = extract_hwp_section_text(stream_data) |
|
|
if section_text: |
|
|
all_texts.append(section_text) |
|
|
except: |
|
|
continue |
|
|
ole.close() |
|
|
if all_texts: |
|
|
return '\n\n'.join(all_texts).strip(), None |
|
|
return None, "텍스트를 찾을 수 없습니다" |
|
|
except Exception as e: |
|
|
return None, f"olefile 오류: {str(e)}" |
|
|
|
|
|
|
|
|
def extract_text_from_pdf(file_path: str) -> Optional[str]: |
|
|
"""PDF 파일에서 텍스트 추출""" |
|
|
text_parts = [] |
|
|
if PDFPLUMBER_AVAILABLE: |
|
|
try: |
|
|
with pdfplumber.open(file_path) as pdf: |
|
|
for page in pdf.pages: |
|
|
text = page.extract_text() |
|
|
if text: |
|
|
text_parts.append(text) |
|
|
if text_parts: |
|
|
return "\n\n".join(text_parts) |
|
|
except Exception as e: |
|
|
print(f"pdfplumber error: {e}") |
|
|
if PYPDF2_AVAILABLE: |
|
|
try: |
|
|
with open(file_path, 'rb') as f: |
|
|
reader = PyPDF2.PdfReader(f) |
|
|
for page in reader.pages: |
|
|
text = page.extract_text() |
|
|
if text: |
|
|
text_parts.append(text) |
|
|
if text_parts: |
|
|
return "\n\n".join(text_parts) |
|
|
except Exception as e: |
|
|
print(f"PyPDF2 error: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
def extract_text_from_file(file_path: str) -> Tuple[Optional[str], Optional[str]]: |
|
|
"""파일에서 텍스트 추출 (확장자 기반 자동 선택)""" |
|
|
if not os.path.exists(file_path): |
|
|
return None, "파일을 찾을 수 없습니다" |
|
|
ext = Path(file_path).suffix.lower() |
|
|
if ext == '.hwpx': |
|
|
return extract_text_from_hwpx(file_path) |
|
|
elif ext == '.hwp': |
|
|
return extract_text_from_hwp(file_path) |
|
|
elif ext == '.pdf': |
|
|
text = extract_text_from_pdf(file_path) |
|
|
if text: |
|
|
return text, None |
|
|
return None, "PDF에서 텍스트 추출 실패" |
|
|
elif ext in ['.txt', '.md', '.csv']: |
|
|
try: |
|
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
|
return f.read(), None |
|
|
except: |
|
|
try: |
|
|
with open(file_path, 'r', encoding='cp949') as f: |
|
|
return f.read(), None |
|
|
except Exception as e: |
|
|
return None, f"텍스트 파일 읽기 오류: {str(e)}" |
|
|
else: |
|
|
return None, f"지원하지 않는 파일 형식: {ext}" |
|
|
|
|
|
|
|
|
def extract_zip_files(zip_path: str, extract_dir: str) -> List[str]: |
|
|
"""ZIP 파일 압축 해제""" |
|
|
extracted_files = [] |
|
|
try: |
|
|
with zipfile.ZipFile(zip_path, 'r') as zf: |
|
|
for name in zf.namelist(): |
|
|
if name.endswith('/'): |
|
|
continue |
|
|
ext = Path(name).suffix.lower() |
|
|
if ext in ['.hwp', '.hwpx', '.pdf', '.txt', '.doc', '.docx']: |
|
|
try: |
|
|
zf.extract(name, extract_dir) |
|
|
extracted_files.append(os.path.join(extract_dir, name)) |
|
|
except: |
|
|
continue |
|
|
except: |
|
|
pass |
|
|
return extracted_files |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def fetch_all_from_api(category: str = "전체", region: str = "전체(지역)", keyword: str = "") -> Tuple[List[Dict], str]: |
|
|
"""API에서 전체 데이터를 페이지네이션으로 수집""" |
|
|
if not API_KEY: |
|
|
return [], "❌ API 키가 설정되지 않았습니다. (BIZ_API 환경변수)" |
|
|
all_items = [] |
|
|
page_size = 100 |
|
|
max_pages = 10 |
|
|
headers = {"User-Agent": "Mozilla/5.0", "Accept": "application/json"} |
|
|
hashtags = [] |
|
|
if category and category != "전체": |
|
|
hashtags.append(category) |
|
|
if region and region != "전체(지역)": |
|
|
hashtags.append(region) |
|
|
if keyword and keyword.strip(): |
|
|
hashtags.append(keyword.strip()) |
|
|
|
|
|
for page_idx in range(1, max_pages + 1): |
|
|
try: |
|
|
params = {"crtfcKey": API_KEY, "dataType": "json", "pageUnit": page_size, "pageIndex": page_idx} |
|
|
if category and category != "전체" and category in CATEGORY_CODES: |
|
|
if CATEGORY_CODES[category]: |
|
|
params["searchLclasId"] = CATEGORY_CODES[category] |
|
|
if hashtags: |
|
|
params["hashtags"] = ",".join(hashtags) |
|
|
response = requests.get(API_URL, params=params, headers=headers, timeout=(15, 60), verify=True) |
|
|
response.raise_for_status() |
|
|
result = response.json() |
|
|
items = [] |
|
|
json_array = result.get("jsonArray", result) |
|
|
if isinstance(json_array, dict): |
|
|
items = json_array.get("item", []) |
|
|
if isinstance(items, dict): |
|
|
items = [items] |
|
|
elif isinstance(json_array, list): |
|
|
items = json_array |
|
|
if not items: |
|
|
break |
|
|
all_items.extend(items) |
|
|
if page_idx == 1 and items: |
|
|
total_cnt = items[0].get("totCnt", 0) if isinstance(items[0], dict) else 0 |
|
|
try: |
|
|
total_cnt = int(total_cnt) |
|
|
except: |
|
|
total_cnt = len(items) |
|
|
needed_pages = (total_cnt + page_size - 1) // page_size |
|
|
max_pages = min(max_pages, needed_pages) |
|
|
if len(items) < page_size: |
|
|
break |
|
|
except requests.exceptions.Timeout: |
|
|
return all_items, f"⏱️ 페이지 {page_idx} 요청 시간 초과" |
|
|
except requests.exceptions.RequestException as e: |
|
|
if all_items: |
|
|
break |
|
|
return [], f"❌ API 요청 오류: {str(e)[:50]}" |
|
|
except Exception as e: |
|
|
if all_items: |
|
|
break |
|
|
return [], f"❌ 오류: {str(e)[:50]}" |
|
|
return all_items, "" |
|
|
|
|
|
|
|
|
def fetch_with_cache(category: str = "전체", region: str = "전체(지역)", keyword: str = "") -> Tuple[List[Dict], str]: |
|
|
""" |
|
|
캐시를 활용한 공고 조회 (개선됨) |
|
|
- 가능하면 캐시에서 필터링 (빠름!) |
|
|
- 키워드가 있으면 캐시에서 벡터 검색 |
|
|
""" |
|
|
if not CACHE_AVAILABLE: |
|
|
return fetch_all_from_api(category, region, keyword) |
|
|
|
|
|
try: |
|
|
|
|
|
items, status = get_cached_announcements() |
|
|
|
|
|
if not items: |
|
|
|
|
|
return fetch_all_from_api(category, region, keyword) |
|
|
|
|
|
|
|
|
if keyword and keyword.strip(): |
|
|
cache = get_cache() |
|
|
items = cache.search(keyword.strip(), n_results=500) |
|
|
status = f"🔍 캐시에서 '{keyword}' 검색" |
|
|
|
|
|
|
|
|
filtered = items |
|
|
filter_info = [] |
|
|
|
|
|
|
|
|
if category and category != "전체": |
|
|
filtered = [ |
|
|
i for i in filtered |
|
|
if category.lower() in (i.get("lcategory", "") or i.get("pldirSportRealmLclasCodeNm", "") or i.get("category", "") or "").lower() |
|
|
] |
|
|
filter_info.append(f"분야:{category}") |
|
|
|
|
|
|
|
|
if region and region != "전체(지역)": |
|
|
region_filtered = [] |
|
|
for i in filtered: |
|
|
hash_tags = i.get("hashTags", "") or "" |
|
|
author = i.get("author", "") or i.get("jrsdInsttNm", "") or "" |
|
|
title = i.get("title", "") or i.get("pblancNm", "") or "" |
|
|
|
|
|
if region in hash_tags or region in author or region in title: |
|
|
region_filtered.append(i) |
|
|
filtered = region_filtered |
|
|
filter_info.append(f"지역:{region}") |
|
|
|
|
|
filter_str = ", ".join(filter_info) if filter_info else "전체" |
|
|
result_status = f"⚡ 캐시에서 {len(filtered)}건 필터링 ({filter_str})" |
|
|
|
|
|
return filtered, result_status |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Cache filter error: {e}") |
|
|
|
|
|
return fetch_all_from_api(category, region, keyword) |
|
|
|
|
|
|
|
|
def download_file(url: str, save_dir: str, hint_filename: str = None) -> Tuple[Optional[str], Optional[str]]: |
|
|
"""파일 다운로드""" |
|
|
try: |
|
|
headers = {"User-Agent": "Mozilla/5.0", "Accept": "*/*", "Referer": "https://www.bizinfo.go.kr/"} |
|
|
response = requests.get(url, headers=headers, timeout=60, stream=True, verify=False, allow_redirects=True) |
|
|
response.raise_for_status() |
|
|
cd = response.headers.get('Content-Disposition', '') |
|
|
filename = None |
|
|
if cd: |
|
|
match = re.search(r"filename\*=(?:UTF-8''|utf-8'')(.+)", cd, re.IGNORECASE) |
|
|
if match: |
|
|
from urllib.parse import unquote |
|
|
filename = unquote(match.group(1)) |
|
|
else: |
|
|
match = re.search(r'filename=(["\']?)(.+?)\1(?:;|$)', cd) |
|
|
if match: |
|
|
filename = match.group(2) |
|
|
if not filename and hint_filename: |
|
|
filename = hint_filename |
|
|
if not filename: |
|
|
from urllib.parse import urlparse |
|
|
parsed = urlparse(url) |
|
|
filename = parsed.path.split('/')[-1] |
|
|
if not filename or '.' not in filename: |
|
|
content_type = response.headers.get('Content-Type', '').lower() |
|
|
if 'pdf' in content_type: |
|
|
filename = f"document_{hash(url) % 10000}.pdf" |
|
|
elif 'hwp' in content_type: |
|
|
filename = f"document_{hash(url) % 10000}.hwp" |
|
|
else: |
|
|
filename = f"file_{hash(url) % 10000}.bin" |
|
|
|
|
|
|
|
|
|
|
|
filename = re.sub(r'[<>:"/\\|?*]', '_', filename) |
|
|
|
|
|
|
|
|
name_part, ext = os.path.splitext(filename) |
|
|
if not ext: |
|
|
|
|
|
content_type = response.headers.get('Content-Type', '').lower() |
|
|
if 'pdf' in content_type: |
|
|
ext = '.pdf' |
|
|
elif 'hwp' in content_type: |
|
|
ext = '.hwp' |
|
|
else: |
|
|
ext = '.bin' |
|
|
|
|
|
|
|
|
max_name_len = 100 - len(ext) |
|
|
if len(name_part) > max_name_len: |
|
|
|
|
|
name_hash = f"_{hash(name_part) % 10000:04d}_" |
|
|
name_part = name_part[:50] + name_hash + name_part[-30:] |
|
|
|
|
|
filename = name_part + ext |
|
|
|
|
|
|
|
|
try: |
|
|
filename.encode('utf-8') |
|
|
except UnicodeEncodeError: |
|
|
|
|
|
filename = f"document_{hash(url) % 100000}{ext}" |
|
|
|
|
|
file_path = os.path.join(save_dir, filename) |
|
|
|
|
|
|
|
|
if len(file_path) > 250: |
|
|
filename = f"doc_{hash(url) % 100000}{ext}" |
|
|
file_path = os.path.join(save_dir, filename) |
|
|
|
|
|
with open(file_path, 'wb') as f: |
|
|
for chunk in response.iter_content(chunk_size=8192): |
|
|
if chunk: |
|
|
f.write(chunk) |
|
|
if os.path.getsize(file_path) == 0: |
|
|
os.remove(file_path) |
|
|
return None, "빈 파일이 다운로드됨" |
|
|
return file_path, None |
|
|
except Exception as e: |
|
|
return None, f"다운로드 실패: {str(e)}" |
|
|
|
|
|
|
|
|
def call_groq_api_stream(messages: List[Dict]) -> Generator[str, None, None]: |
|
|
"""Groq API 스트리밍 호출""" |
|
|
if not GROQ_AVAILABLE: |
|
|
yield "❌ Groq 라이브러리가 설치되지 않았습니다." |
|
|
return |
|
|
if not GROQ_API_KEY: |
|
|
yield "❌ GROQ_API_KEY 환경변수가 설정되지 않았습니다." |
|
|
return |
|
|
try: |
|
|
client = Groq(api_key=GROQ_API_KEY) |
|
|
completion = client.chat.completions.create( |
|
|
model="llama-3.3-70b-versatile", |
|
|
messages=messages, |
|
|
temperature=0.3, |
|
|
max_tokens=4096, |
|
|
stream=True, |
|
|
) |
|
|
for chunk in completion: |
|
|
if chunk.choices[0].delta.content: |
|
|
yield chunk.choices[0].delta.content |
|
|
except Exception as e: |
|
|
yield f"❌ API 오류: {str(e)}" |
|
|
|
|
|
|
|
|
def fetch_announcement_detail(url: str) -> Tuple[str, List[Dict], Optional[Dict]]: |
|
|
"""공고 상세 페이지에서 본문, 첨부파일, 본문출력파일 정보 추출 |
|
|
|
|
|
Returns: |
|
|
(content_text, attachments, print_file) |
|
|
- content_text: 공고 본문 텍스트 |
|
|
- attachments: 일반 첨부파일 리스트 (서식, 양식 등) |
|
|
- print_file: 본문출력파일 (공고문 PDF/HWP) - AI 분석용 |
|
|
""" |
|
|
try: |
|
|
|
|
|
if url.startswith('/'): |
|
|
url = f"https://www.bizinfo.go.kr{url}" |
|
|
|
|
|
headers = { |
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", |
|
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", |
|
|
"Accept-Language": "ko-KR,ko;q=0.9,en-US;q=0.8,en;q=0.7", |
|
|
} |
|
|
response = requests.get(url, headers=headers, timeout=30, verify=False) |
|
|
response.raise_for_status() |
|
|
html_text = response.text |
|
|
soup = BeautifulSoup(html_text, 'html.parser') |
|
|
|
|
|
|
|
|
content_text = "" |
|
|
tables = soup.find_all('table') |
|
|
for table in tables: |
|
|
text = table.get_text(separator='\n', strip=True) |
|
|
if '사업개요' in text or '지원대상' in text or '신청기간' in text: |
|
|
content_text += text + "\n\n" |
|
|
main_content = soup.find('div', {'id': 'container'}) or soup.find('main') or soup.find('article') |
|
|
if main_content and not content_text: |
|
|
content_text = main_content.get_text(separator='\n', strip=True) |
|
|
|
|
|
attachments = [] |
|
|
print_file = None |
|
|
|
|
|
|
|
|
for a_tag in soup.find_all('a', href=True): |
|
|
href = a_tag.get('href', '') |
|
|
href_clean = re.sub(r';jsessionid=[^?]*', '', href) |
|
|
|
|
|
if 'getImageFile.do' in href_clean or 'fileDown' in href_clean or 'atchFileId' in href_clean: |
|
|
filename = a_tag.get_text(strip=True) |
|
|
|
|
|
|
|
|
if filename in ['다운로드', '바로보기', '내려받기', '']: |
|
|
parent = a_tag.parent |
|
|
if parent: |
|
|
parent_text = parent.get_text(separator='|', strip=True) |
|
|
parts = [p.strip() for p in parent_text.split('|') if p.strip()] |
|
|
for part in parts: |
|
|
if part not in ['다운로드', '바로보기', '내려받기'] and ('.' in part): |
|
|
filename = part |
|
|
break |
|
|
title = a_tag.get('title', '') |
|
|
if title and '첨부파일' in title: |
|
|
match = re.search(r'첨부파일\s+(.+?)\s+다운로드', title) |
|
|
if match: |
|
|
filename = match.group(1) |
|
|
|
|
|
if not filename or filename in ['다운로드', '바로보기', '내려받기']: |
|
|
filename = f"첨부파일_{len(attachments)+1}" |
|
|
|
|
|
|
|
|
if href_clean.startswith('/'): |
|
|
full_url = f"https://www.bizinfo.go.kr{href_clean}" |
|
|
elif href_clean.startswith('http'): |
|
|
full_url = href_clean |
|
|
else: |
|
|
continue |
|
|
|
|
|
ext = Path(filename).suffix.lower() |
|
|
if not ext: |
|
|
ext = '.unknown' |
|
|
|
|
|
file_info = { |
|
|
"filename": filename, |
|
|
"url": full_url, |
|
|
"type": ext[1:] if ext.startswith('.') else ext |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
is_print_file = False |
|
|
filename_lower = filename.lower() |
|
|
|
|
|
|
|
|
if re.search(r'\(제\d+[-_]?\d*호\)', filename): |
|
|
is_print_file = True |
|
|
|
|
|
elif any(kw in filename for kw in ['공고문', '모집공고', '공고(안)', '공고 안', '_공고_', '_공고.']): |
|
|
is_print_file = True |
|
|
|
|
|
elif '본문출력' in filename or '본문 출력' in filename: |
|
|
is_print_file = True |
|
|
|
|
|
parent = a_tag.parent |
|
|
grandparent = parent.parent if parent else None |
|
|
for ancestor in [parent, grandparent]: |
|
|
if ancestor: |
|
|
ancestor_text = ancestor.get_text(strip=True) |
|
|
if '본문출력파일' in ancestor_text or '본문출력 파일' in ancestor_text: |
|
|
is_print_file = True |
|
|
break |
|
|
|
|
|
if is_print_file and not print_file: |
|
|
print_file = file_info |
|
|
elif not any(att['url'] == full_url for att in attachments): |
|
|
|
|
|
attachments.append(file_info) |
|
|
|
|
|
return content_text, attachments, print_file |
|
|
|
|
|
except Exception as e: |
|
|
import traceback |
|
|
return f"상세 정보 조회 실패: {str(e)}\n{traceback.format_exc()}", [], None |
|
|
|