""" 파일 처리 및 API 관련 함수 - 캐시 시스템 통합 버전 """ import os import re import zlib import zipfile import tempfile import requests from pathlib import Path from typing import Optional, Tuple, List, Dict, Generator from xml.etree import ElementTree as ET from datetime import datetime from utils import ( API_URL, API_KEY, GROQ_API_KEY, CATEGORY_CODES, OLEFILE_AVAILABLE, PYPDF2_AVAILABLE, PDFPLUMBER_AVAILABLE, GROQ_AVAILABLE, extract_region_from_text, extract_region_from_hashtags, classify_org_type, parse_deadline, is_ongoing ) if OLEFILE_AVAILABLE: import olefile if PYPDF2_AVAILABLE: import PyPDF2 if PDFPLUMBER_AVAILABLE: import pdfplumber if GROQ_AVAILABLE: from groq import Groq import pandas as pd from bs4 import BeautifulSoup # ============================================================ # 캐시 시스템 임포트 # ============================================================ try: from cache_db import ( get_cache, get_cached_announcements, sync_from_api, manual_sync, get_sync_status, initialize_cache_system ) CACHE_AVAILABLE = True except ImportError: CACHE_AVAILABLE = False print("Warning: cache_db module not available") # ============================================================ # 파일 텍스트 추출 함수 # ============================================================ def extract_text_from_hwpx(file_path: str) -> Tuple[Optional[str], Optional[str]]: """HWPX 파일에서 텍스트 추출""" try: text_parts = [] with zipfile.ZipFile(file_path, 'r') as zf: file_list = zf.namelist() section_files = sorted([f for f in file_list if f.startswith('Contents/section') and f.endswith('.xml')]) if not section_files: section_files = sorted([f for f in file_list if 'section' in f.lower() and f.endswith('.xml')]) for section_file in section_files: try: with zf.open(section_file) as sf: content = sf.read() content_str = content.decode('utf-8') content_str = re.sub(r'\sxmlns[^"]*"[^"]*"', '', content_str) content_str = re.sub(r'<[a-zA-Z]+:', '<', content_str) content_str = re.sub(r'([^<]+)<', content.decode('utf-8', errors='ignore')) clean_texts = [t.strip() for t in text_matches if t.strip() and len(t.strip()) > 1] if clean_texts: text_parts.append(' '.join(clean_texts)) except: continue if text_parts: result = '\n\n'.join(text_parts) result = re.sub(r'\s+', ' ', result) result = re.sub(r'\n{3,}', '\n\n', result) return result.strip(), None return None, "HWPX에서 텍스트를 찾을 수 없습니다" except zipfile.BadZipFile: return None, "유효하지 않은 HWPX 파일" except Exception as e: return None, f"HWPX 처리 오류: {str(e)}" def extract_hwp_section_text(data: bytes) -> Optional[str]: """HWP 섹션 데이터에서 텍스트 추출""" texts = [] pos = 0 while pos < len(data) - 4: try: header = int.from_bytes(data[pos:pos+4], 'little') tag_id = header & 0x3FF size = (header >> 20) & 0xFFF pos += 4 if size == 0xFFF: if pos + 4 > len(data): break size = int.from_bytes(data[pos:pos+4], 'little') pos += 4 if pos + size > len(data): break record_data = data[pos:pos+size] pos += size if tag_id == 67 and size > 0: text = decode_para_text(record_data) if text: texts.append(text) except: pos += 1 continue return '\n'.join(texts) if texts else None def decode_para_text(data: bytes) -> Optional[str]: """HWP 문단 텍스트 디코딩""" result = [] i = 0 while i < len(data) - 1: code = int.from_bytes(data[i:i+2], 'little') if code == 0: pass elif code in [1, 2, 3]: i += 14 elif code == 9: result.append('\t') elif code in [10, 13]: result.append('\n') elif code == 24: result.append('-') elif code in [30, 31]: result.append(' ') elif code >= 32: try: char = chr(code) if char.isprintable() or char in '\n\t ': result.append(char) except: pass i += 2 text = ''.join(result).strip() text = re.sub(r'[ \t]+', ' ', text) text = re.sub(r'\n{3,}', '\n\n', text) return text if len(text) > 2 else None def extract_text_from_hwp(file_path: str) -> Tuple[Optional[str], Optional[str]]: """HWP 파일에서 텍스트 추출""" if not OLEFILE_AVAILABLE: return None, "olefile 모듈 없음" try: ole = olefile.OleFileIO(file_path) if not ole.exists('FileHeader'): ole.close() return None, "HWP 파일 헤더 없음" header_data = ole.openstream('FileHeader').read() is_compressed = (header_data[36] & 1) == 1 if len(header_data) > 36 else True all_texts = [] for entry in ole.listdir(): entry_path = '/'.join(entry) if entry_path.startswith('BodyText/Section'): try: stream_data = ole.openstream(entry).read() if is_compressed: try: stream_data = zlib.decompress(stream_data, -15) except: try: stream_data = zlib.decompress(stream_data) except: pass section_text = extract_hwp_section_text(stream_data) if section_text: all_texts.append(section_text) except: continue ole.close() if all_texts: return '\n\n'.join(all_texts).strip(), None return None, "텍스트를 찾을 수 없습니다" except Exception as e: return None, f"olefile 오류: {str(e)}" def extract_text_from_pdf(file_path: str) -> Optional[str]: """PDF 파일에서 텍스트 추출""" text_parts = [] if PDFPLUMBER_AVAILABLE: try: with pdfplumber.open(file_path) as pdf: for page in pdf.pages: text = page.extract_text() if text: text_parts.append(text) if text_parts: return "\n\n".join(text_parts) except Exception as e: print(f"pdfplumber error: {e}") if PYPDF2_AVAILABLE: try: with open(file_path, 'rb') as f: reader = PyPDF2.PdfReader(f) for page in reader.pages: text = page.extract_text() if text: text_parts.append(text) if text_parts: return "\n\n".join(text_parts) except Exception as e: print(f"PyPDF2 error: {e}") return None def extract_text_from_file(file_path: str) -> Tuple[Optional[str], Optional[str]]: """파일에서 텍스트 추출 (확장자 기반 자동 선택)""" if not os.path.exists(file_path): return None, "파일을 찾을 수 없습니다" ext = Path(file_path).suffix.lower() if ext == '.hwpx': return extract_text_from_hwpx(file_path) elif ext == '.hwp': return extract_text_from_hwp(file_path) elif ext == '.pdf': text = extract_text_from_pdf(file_path) if text: return text, None return None, "PDF에서 텍스트 추출 실패" elif ext in ['.txt', '.md', '.csv']: try: with open(file_path, 'r', encoding='utf-8') as f: return f.read(), None except: try: with open(file_path, 'r', encoding='cp949') as f: return f.read(), None except Exception as e: return None, f"텍스트 파일 읽기 오류: {str(e)}" else: return None, f"지원하지 않는 파일 형식: {ext}" def extract_zip_files(zip_path: str, extract_dir: str) -> List[str]: """ZIP 파일 압축 해제""" extracted_files = [] try: with zipfile.ZipFile(zip_path, 'r') as zf: for name in zf.namelist(): if name.endswith('/'): continue ext = Path(name).suffix.lower() if ext in ['.hwp', '.hwpx', '.pdf', '.txt', '.doc', '.docx']: try: zf.extract(name, extract_dir) extracted_files.append(os.path.join(extract_dir, name)) except: continue except: pass return extracted_files # ============================================================ # API 관련 함수 # ============================================================ def fetch_all_from_api(category: str = "전체", region: str = "전체(지역)", keyword: str = "") -> Tuple[List[Dict], str]: """API에서 전체 데이터를 페이지네이션으로 수집""" if not API_KEY: return [], "❌ API 키가 설정되지 않았습니다. (BIZ_API 환경변수)" all_items = [] page_size = 100 max_pages = 10 headers = {"User-Agent": "Mozilla/5.0", "Accept": "application/json"} hashtags = [] if category and category != "전체": hashtags.append(category) if region and region != "전체(지역)": hashtags.append(region) if keyword and keyword.strip(): hashtags.append(keyword.strip()) for page_idx in range(1, max_pages + 1): try: params = {"crtfcKey": API_KEY, "dataType": "json", "pageUnit": page_size, "pageIndex": page_idx} if category and category != "전체" and category in CATEGORY_CODES: if CATEGORY_CODES[category]: params["searchLclasId"] = CATEGORY_CODES[category] if hashtags: params["hashtags"] = ",".join(hashtags) response = requests.get(API_URL, params=params, headers=headers, timeout=(15, 60), verify=True) response.raise_for_status() result = response.json() items = [] json_array = result.get("jsonArray", result) if isinstance(json_array, dict): items = json_array.get("item", []) if isinstance(items, dict): items = [items] elif isinstance(json_array, list): items = json_array if not items: break all_items.extend(items) if page_idx == 1 and items: total_cnt = items[0].get("totCnt", 0) if isinstance(items[0], dict) else 0 try: total_cnt = int(total_cnt) except: total_cnt = len(items) needed_pages = (total_cnt + page_size - 1) // page_size max_pages = min(max_pages, needed_pages) if len(items) < page_size: break except requests.exceptions.Timeout: return all_items, f"⏱️ 페이지 {page_idx} 요청 시간 초과" except requests.exceptions.RequestException as e: if all_items: break return [], f"❌ API 요청 오류: {str(e)[:50]}" except Exception as e: if all_items: break return [], f"❌ 오류: {str(e)[:50]}" return all_items, "" def fetch_with_cache(category: str = "전체", region: str = "전체(지역)", keyword: str = "") -> Tuple[List[Dict], str]: """ 캐시를 활용한 공고 조회 (개선됨) - 가능하면 캐시에서 필터링 (빠름!) - 키워드가 있으면 캐시에서 벡터 검색 """ if not CACHE_AVAILABLE: return fetch_all_from_api(category, region, keyword) try: # ⭐ 캐시에서 전체 로드 후 필터링 (API 호출보다 훨씬 빠름!) items, status = get_cached_announcements() if not items: # 캐시가 비어있으면 API에서 가져오기 return fetch_all_from_api(category, region, keyword) # 키워드 검색 (벡터 검색) if keyword and keyword.strip(): cache = get_cache() items = cache.search(keyword.strip(), n_results=500) status = f"🔍 캐시에서 '{keyword}' 검색" # 필터 적용 (캐시 데이터에서 필터링) filtered = items filter_info = [] # 카테고리 필터 if category and category != "전체": filtered = [ i for i in filtered if category.lower() in (i.get("lcategory", "") or i.get("pldirSportRealmLclasCodeNm", "") or i.get("category", "") or "").lower() ] filter_info.append(f"분야:{category}") # 지역 필터 (hashTags, author, title에서 검색) if region and region != "전체(지역)": region_filtered = [] for i in filtered: hash_tags = i.get("hashTags", "") or "" author = i.get("author", "") or i.get("jrsdInsttNm", "") or "" title = i.get("title", "") or i.get("pblancNm", "") or "" if region in hash_tags or region in author or region in title: region_filtered.append(i) filtered = region_filtered filter_info.append(f"지역:{region}") filter_str = ", ".join(filter_info) if filter_info else "전체" result_status = f"⚡ 캐시에서 {len(filtered)}건 필터링 ({filter_str})" return filtered, result_status except Exception as e: print(f"Cache filter error: {e}") # 오류 시 API 폴백 return fetch_all_from_api(category, region, keyword) def download_file(url: str, save_dir: str, hint_filename: str = None) -> Tuple[Optional[str], Optional[str]]: """파일 다운로드""" try: headers = {"User-Agent": "Mozilla/5.0", "Accept": "*/*", "Referer": "https://www.bizinfo.go.kr/"} response = requests.get(url, headers=headers, timeout=60, stream=True, verify=False, allow_redirects=True) response.raise_for_status() cd = response.headers.get('Content-Disposition', '') filename = None if cd: match = re.search(r"filename\*=(?:UTF-8''|utf-8'')(.+)", cd, re.IGNORECASE) if match: from urllib.parse import unquote filename = unquote(match.group(1)) else: match = re.search(r'filename=(["\']?)(.+?)\1(?:;|$)', cd) if match: filename = match.group(2) if not filename and hint_filename: filename = hint_filename if not filename: from urllib.parse import urlparse parsed = urlparse(url) filename = parsed.path.split('/')[-1] if not filename or '.' not in filename: content_type = response.headers.get('Content-Type', '').lower() if 'pdf' in content_type: filename = f"document_{hash(url) % 10000}.pdf" elif 'hwp' in content_type: filename = f"document_{hash(url) % 10000}.hwp" else: filename = f"file_{hash(url) % 10000}.bin" # ⭐ 파일명 정리 (인코딩 문제 해결) # 1. 특수문자 제거 filename = re.sub(r'[<>:"/\\|?*]', '_', filename) # 2. 파일명이 너무 길면 축약 (확장자 보존) name_part, ext = os.path.splitext(filename) if not ext: # 확장자가 없으면 Content-Type에서 추측 content_type = response.headers.get('Content-Type', '').lower() if 'pdf' in content_type: ext = '.pdf' elif 'hwp' in content_type: ext = '.hwp' else: ext = '.bin' # 3. 파일명 최대 길이 제한 (100자) max_name_len = 100 - len(ext) if len(name_part) > max_name_len: # 앞 50자 + 해시 + 뒤 30자 name_hash = f"_{hash(name_part) % 10000:04d}_" name_part = name_part[:50] + name_hash + name_part[-30:] filename = name_part + ext # 4. 안전한 ASCII 파일명 생성 (인코딩 문제 방지) try: filename.encode('utf-8') except UnicodeEncodeError: # 인코딩 문제 시 해시 기반 파일명 사용 filename = f"document_{hash(url) % 100000}{ext}" file_path = os.path.join(save_dir, filename) # 5. 최종 경로 길이 확인 if len(file_path) > 250: filename = f"doc_{hash(url) % 100000}{ext}" file_path = os.path.join(save_dir, filename) with open(file_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) if os.path.getsize(file_path) == 0: os.remove(file_path) return None, "빈 파일이 다운로드됨" return file_path, None except Exception as e: return None, f"다운로드 실패: {str(e)}" def call_groq_api_stream(messages: List[Dict]) -> Generator[str, None, None]: """Groq API 스트리밍 호출""" if not GROQ_AVAILABLE: yield "❌ Groq 라이브러리가 설치되지 않았습니다." return if not GROQ_API_KEY: yield "❌ GROQ_API_KEY 환경변수가 설정되지 않았습니다." return try: client = Groq(api_key=GROQ_API_KEY) completion = client.chat.completions.create( model="llama-3.3-70b-versatile", messages=messages, temperature=0.3, max_tokens=4096, stream=True, ) for chunk in completion: if chunk.choices[0].delta.content: yield chunk.choices[0].delta.content except Exception as e: yield f"❌ API 오류: {str(e)}" def fetch_announcement_detail(url: str) -> Tuple[str, List[Dict], Optional[Dict]]: """공고 상세 페이지에서 본문, 첨부파일, 본문출력파일 정보 추출 Returns: (content_text, attachments, print_file) - content_text: 공고 본문 텍스트 - attachments: 일반 첨부파일 리스트 (서식, 양식 등) - print_file: 본문출력파일 (공고문 PDF/HWP) - AI 분석용 """ try: # URL 정규화 if url.startswith('/'): url = f"https://www.bizinfo.go.kr{url}" headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Accept-Language": "ko-KR,ko;q=0.9,en-US;q=0.8,en;q=0.7", } response = requests.get(url, headers=headers, timeout=30, verify=False) response.raise_for_status() html_text = response.text soup = BeautifulSoup(html_text, 'html.parser') # 본문 텍스트 추출 content_text = "" tables = soup.find_all('table') for table in tables: text = table.get_text(separator='\n', strip=True) if '사업개요' in text or '지원대상' in text or '신청기간' in text: content_text += text + "\n\n" main_content = soup.find('div', {'id': 'container'}) or soup.find('main') or soup.find('article') if main_content and not content_text: content_text = main_content.get_text(separator='\n', strip=True) attachments = [] # 일반 첨부파일 print_file = None # 본문출력파일 # 파일 링크 추출 for a_tag in soup.find_all('a', href=True): href = a_tag.get('href', '') href_clean = re.sub(r';jsessionid=[^?]*', '', href) if 'getImageFile.do' in href_clean or 'fileDown' in href_clean or 'atchFileId' in href_clean: filename = a_tag.get_text(strip=True) # 파일명 추출 개선 if filename in ['다운로드', '바로보기', '내려받기', '']: parent = a_tag.parent if parent: parent_text = parent.get_text(separator='|', strip=True) parts = [p.strip() for p in parent_text.split('|') if p.strip()] for part in parts: if part not in ['다운로드', '바로보기', '내려받기'] and ('.' in part): filename = part break title = a_tag.get('title', '') if title and '첨부파일' in title: match = re.search(r'첨부파일\s+(.+?)\s+다운로드', title) if match: filename = match.group(1) if not filename or filename in ['다운로드', '바로보기', '내려받기']: filename = f"첨부파일_{len(attachments)+1}" # URL 정규화 if href_clean.startswith('/'): full_url = f"https://www.bizinfo.go.kr{href_clean}" elif href_clean.startswith('http'): full_url = href_clean else: continue ext = Path(filename).suffix.lower() if not ext: ext = '.unknown' file_info = { "filename": filename, "url": full_url, "type": ext[1:] if ext.startswith('.') else ext } # ⭐ 본문출력파일 판별 (공고문 PDF/HWP) # 패턴: "(제XXXX-XX호)", "공고", "공고문", "본문", 날짜 패턴 is_print_file = False filename_lower = filename.lower() # 1. "(제XXXX-XX호)" 패턴 - 공고번호 포함 if re.search(r'\(제\d+[-_]?\d*호\)', filename): is_print_file = True # 2. "공고", "공고문", "모집공고" 등 키워드 elif any(kw in filename for kw in ['공고문', '모집공고', '공고(안)', '공고 안', '_공고_', '_공고.']): is_print_file = True # 3. "본문출력" 키워드 elif '본문출력' in filename or '본문 출력' in filename: is_print_file = True # 4. 부모 요소에 "본문출력파일" 텍스트가 있는지 확인 parent = a_tag.parent grandparent = parent.parent if parent else None for ancestor in [parent, grandparent]: if ancestor: ancestor_text = ancestor.get_text(strip=True) if '본문출력파일' in ancestor_text or '본문출력 파일' in ancestor_text: is_print_file = True break if is_print_file and not print_file: print_file = file_info elif not any(att['url'] == full_url for att in attachments): # "(첨부" 로 시작하는 파일은 일반 첨부파일 attachments.append(file_info) return content_text, attachments, print_file except Exception as e: import traceback return f"상세 정보 조회 실패: {str(e)}\n{traceback.format_exc()}", [], None