cobiz / file_api.py
seawolf2357's picture
Update file_api.py
0992c5f verified
"""
파일 처리 및 API 관련 함수
- 캐시 시스템 통합 버전
"""
import os
import re
import zlib
import zipfile
import tempfile
import requests
from pathlib import Path
from typing import Optional, Tuple, List, Dict, Generator
from xml.etree import ElementTree as ET
from datetime import datetime
from utils import (
API_URL, API_KEY, GROQ_API_KEY, CATEGORY_CODES,
OLEFILE_AVAILABLE, PYPDF2_AVAILABLE, PDFPLUMBER_AVAILABLE, GROQ_AVAILABLE,
extract_region_from_text, extract_region_from_hashtags, classify_org_type,
parse_deadline, is_ongoing
)
if OLEFILE_AVAILABLE:
import olefile
if PYPDF2_AVAILABLE:
import PyPDF2
if PDFPLUMBER_AVAILABLE:
import pdfplumber
if GROQ_AVAILABLE:
from groq import Groq
import pandas as pd
from bs4 import BeautifulSoup
# ============================================================
# 캐시 시스템 임포트
# ============================================================
try:
from cache_db import (
get_cache, get_cached_announcements, sync_from_api,
manual_sync, get_sync_status, initialize_cache_system
)
CACHE_AVAILABLE = True
except ImportError:
CACHE_AVAILABLE = False
print("Warning: cache_db module not available")
# ============================================================
# 파일 텍스트 추출 함수
# ============================================================
def extract_text_from_hwpx(file_path: str) -> Tuple[Optional[str], Optional[str]]:
"""HWPX 파일에서 텍스트 추출"""
try:
text_parts = []
with zipfile.ZipFile(file_path, 'r') as zf:
file_list = zf.namelist()
section_files = sorted([f for f in file_list if f.startswith('Contents/section') and f.endswith('.xml')])
if not section_files:
section_files = sorted([f for f in file_list if 'section' in f.lower() and f.endswith('.xml')])
for section_file in section_files:
try:
with zf.open(section_file) as sf:
content = sf.read()
content_str = content.decode('utf-8')
content_str = re.sub(r'\sxmlns[^"]*"[^"]*"', '', content_str)
content_str = re.sub(r'<[a-zA-Z]+:', '<', content_str)
content_str = re.sub(r'</[a-zA-Z]+:', '</', content_str)
try:
root = ET.fromstring(content_str)
texts = []
for elem in root.iter():
if elem.tag.endswith('t') or elem.tag == 't':
if elem.text:
texts.append(elem.text)
elif elem.text and elem.text.strip():
if any(x in elem.tag.lower() for x in ['text', 'run', 'para', 'char']):
texts.append(elem.text.strip())
if texts:
text_parts.append(' '.join(texts))
except ET.ParseError:
text_matches = re.findall(r'>([^<]+)<', content.decode('utf-8', errors='ignore'))
clean_texts = [t.strip() for t in text_matches if t.strip() and len(t.strip()) > 1]
if clean_texts:
text_parts.append(' '.join(clean_texts))
except:
continue
if text_parts:
result = '\n\n'.join(text_parts)
result = re.sub(r'\s+', ' ', result)
result = re.sub(r'\n{3,}', '\n\n', result)
return result.strip(), None
return None, "HWPX에서 텍스트를 찾을 수 없습니다"
except zipfile.BadZipFile:
return None, "유효하지 않은 HWPX 파일"
except Exception as e:
return None, f"HWPX 처리 오류: {str(e)}"
def extract_hwp_section_text(data: bytes) -> Optional[str]:
"""HWP 섹션 데이터에서 텍스트 추출"""
texts = []
pos = 0
while pos < len(data) - 4:
try:
header = int.from_bytes(data[pos:pos+4], 'little')
tag_id = header & 0x3FF
size = (header >> 20) & 0xFFF
pos += 4
if size == 0xFFF:
if pos + 4 > len(data):
break
size = int.from_bytes(data[pos:pos+4], 'little')
pos += 4
if pos + size > len(data):
break
record_data = data[pos:pos+size]
pos += size
if tag_id == 67 and size > 0:
text = decode_para_text(record_data)
if text:
texts.append(text)
except:
pos += 1
continue
return '\n'.join(texts) if texts else None
def decode_para_text(data: bytes) -> Optional[str]:
"""HWP 문단 텍스트 디코딩"""
result = []
i = 0
while i < len(data) - 1:
code = int.from_bytes(data[i:i+2], 'little')
if code == 0:
pass
elif code in [1, 2, 3]:
i += 14
elif code == 9:
result.append('\t')
elif code in [10, 13]:
result.append('\n')
elif code == 24:
result.append('-')
elif code in [30, 31]:
result.append(' ')
elif code >= 32:
try:
char = chr(code)
if char.isprintable() or char in '\n\t ':
result.append(char)
except:
pass
i += 2
text = ''.join(result).strip()
text = re.sub(r'[ \t]+', ' ', text)
text = re.sub(r'\n{3,}', '\n\n', text)
return text if len(text) > 2 else None
def extract_text_from_hwp(file_path: str) -> Tuple[Optional[str], Optional[str]]:
"""HWP 파일에서 텍스트 추출"""
if not OLEFILE_AVAILABLE:
return None, "olefile 모듈 없음"
try:
ole = olefile.OleFileIO(file_path)
if not ole.exists('FileHeader'):
ole.close()
return None, "HWP 파일 헤더 없음"
header_data = ole.openstream('FileHeader').read()
is_compressed = (header_data[36] & 1) == 1 if len(header_data) > 36 else True
all_texts = []
for entry in ole.listdir():
entry_path = '/'.join(entry)
if entry_path.startswith('BodyText/Section'):
try:
stream_data = ole.openstream(entry).read()
if is_compressed:
try:
stream_data = zlib.decompress(stream_data, -15)
except:
try:
stream_data = zlib.decompress(stream_data)
except:
pass
section_text = extract_hwp_section_text(stream_data)
if section_text:
all_texts.append(section_text)
except:
continue
ole.close()
if all_texts:
return '\n\n'.join(all_texts).strip(), None
return None, "텍스트를 찾을 수 없습니다"
except Exception as e:
return None, f"olefile 오류: {str(e)}"
def extract_text_from_pdf(file_path: str) -> Optional[str]:
"""PDF 파일에서 텍스트 추출"""
text_parts = []
if PDFPLUMBER_AVAILABLE:
try:
with pdfplumber.open(file_path) as pdf:
for page in pdf.pages:
text = page.extract_text()
if text:
text_parts.append(text)
if text_parts:
return "\n\n".join(text_parts)
except Exception as e:
print(f"pdfplumber error: {e}")
if PYPDF2_AVAILABLE:
try:
with open(file_path, 'rb') as f:
reader = PyPDF2.PdfReader(f)
for page in reader.pages:
text = page.extract_text()
if text:
text_parts.append(text)
if text_parts:
return "\n\n".join(text_parts)
except Exception as e:
print(f"PyPDF2 error: {e}")
return None
def extract_text_from_file(file_path: str) -> Tuple[Optional[str], Optional[str]]:
"""파일에서 텍스트 추출 (확장자 기반 자동 선택)"""
if not os.path.exists(file_path):
return None, "파일을 찾을 수 없습니다"
ext = Path(file_path).suffix.lower()
if ext == '.hwpx':
return extract_text_from_hwpx(file_path)
elif ext == '.hwp':
return extract_text_from_hwp(file_path)
elif ext == '.pdf':
text = extract_text_from_pdf(file_path)
if text:
return text, None
return None, "PDF에서 텍스트 추출 실패"
elif ext in ['.txt', '.md', '.csv']:
try:
with open(file_path, 'r', encoding='utf-8') as f:
return f.read(), None
except:
try:
with open(file_path, 'r', encoding='cp949') as f:
return f.read(), None
except Exception as e:
return None, f"텍스트 파일 읽기 오류: {str(e)}"
else:
return None, f"지원하지 않는 파일 형식: {ext}"
def extract_zip_files(zip_path: str, extract_dir: str) -> List[str]:
"""ZIP 파일 압축 해제"""
extracted_files = []
try:
with zipfile.ZipFile(zip_path, 'r') as zf:
for name in zf.namelist():
if name.endswith('/'):
continue
ext = Path(name).suffix.lower()
if ext in ['.hwp', '.hwpx', '.pdf', '.txt', '.doc', '.docx']:
try:
zf.extract(name, extract_dir)
extracted_files.append(os.path.join(extract_dir, name))
except:
continue
except:
pass
return extracted_files
# ============================================================
# API 관련 함수
# ============================================================
def fetch_all_from_api(category: str = "전체", region: str = "전체(지역)", keyword: str = "") -> Tuple[List[Dict], str]:
"""API에서 전체 데이터를 페이지네이션으로 수집"""
if not API_KEY:
return [], "❌ API 키가 설정되지 않았습니다. (BIZ_API 환경변수)"
all_items = []
page_size = 100
max_pages = 10
headers = {"User-Agent": "Mozilla/5.0", "Accept": "application/json"}
hashtags = []
if category and category != "전체":
hashtags.append(category)
if region and region != "전체(지역)":
hashtags.append(region)
if keyword and keyword.strip():
hashtags.append(keyword.strip())
for page_idx in range(1, max_pages + 1):
try:
params = {"crtfcKey": API_KEY, "dataType": "json", "pageUnit": page_size, "pageIndex": page_idx}
if category and category != "전체" and category in CATEGORY_CODES:
if CATEGORY_CODES[category]:
params["searchLclasId"] = CATEGORY_CODES[category]
if hashtags:
params["hashtags"] = ",".join(hashtags)
response = requests.get(API_URL, params=params, headers=headers, timeout=(15, 60), verify=True)
response.raise_for_status()
result = response.json()
items = []
json_array = result.get("jsonArray", result)
if isinstance(json_array, dict):
items = json_array.get("item", [])
if isinstance(items, dict):
items = [items]
elif isinstance(json_array, list):
items = json_array
if not items:
break
all_items.extend(items)
if page_idx == 1 and items:
total_cnt = items[0].get("totCnt", 0) if isinstance(items[0], dict) else 0
try:
total_cnt = int(total_cnt)
except:
total_cnt = len(items)
needed_pages = (total_cnt + page_size - 1) // page_size
max_pages = min(max_pages, needed_pages)
if len(items) < page_size:
break
except requests.exceptions.Timeout:
return all_items, f"⏱️ 페이지 {page_idx} 요청 시간 초과"
except requests.exceptions.RequestException as e:
if all_items:
break
return [], f"❌ API 요청 오류: {str(e)[:50]}"
except Exception as e:
if all_items:
break
return [], f"❌ 오류: {str(e)[:50]}"
return all_items, ""
def fetch_with_cache(category: str = "전체", region: str = "전체(지역)", keyword: str = "") -> Tuple[List[Dict], str]:
"""
캐시를 활용한 공고 조회 (개선됨)
- 가능하면 캐시에서 필터링 (빠름!)
- 키워드가 있으면 캐시에서 벡터 검색
"""
if not CACHE_AVAILABLE:
return fetch_all_from_api(category, region, keyword)
try:
# ⭐ 캐시에서 전체 로드 후 필터링 (API 호출보다 훨씬 빠름!)
items, status = get_cached_announcements()
if not items:
# 캐시가 비어있으면 API에서 가져오기
return fetch_all_from_api(category, region, keyword)
# 키워드 검색 (벡터 검색)
if keyword and keyword.strip():
cache = get_cache()
items = cache.search(keyword.strip(), n_results=500)
status = f"🔍 캐시에서 '{keyword}' 검색"
# 필터 적용 (캐시 데이터에서 필터링)
filtered = items
filter_info = []
# 카테고리 필터
if category and category != "전체":
filtered = [
i for i in filtered
if category.lower() in (i.get("lcategory", "") or i.get("pldirSportRealmLclasCodeNm", "") or i.get("category", "") or "").lower()
]
filter_info.append(f"분야:{category}")
# 지역 필터 (hashTags, author, title에서 검색)
if region and region != "전체(지역)":
region_filtered = []
for i in filtered:
hash_tags = i.get("hashTags", "") or ""
author = i.get("author", "") or i.get("jrsdInsttNm", "") or ""
title = i.get("title", "") or i.get("pblancNm", "") or ""
if region in hash_tags or region in author or region in title:
region_filtered.append(i)
filtered = region_filtered
filter_info.append(f"지역:{region}")
filter_str = ", ".join(filter_info) if filter_info else "전체"
result_status = f"⚡ 캐시에서 {len(filtered)}건 필터링 ({filter_str})"
return filtered, result_status
except Exception as e:
print(f"Cache filter error: {e}")
# 오류 시 API 폴백
return fetch_all_from_api(category, region, keyword)
def download_file(url: str, save_dir: str, hint_filename: str = None) -> Tuple[Optional[str], Optional[str]]:
"""파일 다운로드"""
try:
headers = {"User-Agent": "Mozilla/5.0", "Accept": "*/*", "Referer": "https://www.bizinfo.go.kr/"}
response = requests.get(url, headers=headers, timeout=60, stream=True, verify=False, allow_redirects=True)
response.raise_for_status()
cd = response.headers.get('Content-Disposition', '')
filename = None
if cd:
match = re.search(r"filename\*=(?:UTF-8''|utf-8'')(.+)", cd, re.IGNORECASE)
if match:
from urllib.parse import unquote
filename = unquote(match.group(1))
else:
match = re.search(r'filename=(["\']?)(.+?)\1(?:;|$)', cd)
if match:
filename = match.group(2)
if not filename and hint_filename:
filename = hint_filename
if not filename:
from urllib.parse import urlparse
parsed = urlparse(url)
filename = parsed.path.split('/')[-1]
if not filename or '.' not in filename:
content_type = response.headers.get('Content-Type', '').lower()
if 'pdf' in content_type:
filename = f"document_{hash(url) % 10000}.pdf"
elif 'hwp' in content_type:
filename = f"document_{hash(url) % 10000}.hwp"
else:
filename = f"file_{hash(url) % 10000}.bin"
# ⭐ 파일명 정리 (인코딩 문제 해결)
# 1. 특수문자 제거
filename = re.sub(r'[<>:"/\\|?*]', '_', filename)
# 2. 파일명이 너무 길면 축약 (확장자 보존)
name_part, ext = os.path.splitext(filename)
if not ext:
# 확장자가 없으면 Content-Type에서 추측
content_type = response.headers.get('Content-Type', '').lower()
if 'pdf' in content_type:
ext = '.pdf'
elif 'hwp' in content_type:
ext = '.hwp'
else:
ext = '.bin'
# 3. 파일명 최대 길이 제한 (100자)
max_name_len = 100 - len(ext)
if len(name_part) > max_name_len:
# 앞 50자 + 해시 + 뒤 30자
name_hash = f"_{hash(name_part) % 10000:04d}_"
name_part = name_part[:50] + name_hash + name_part[-30:]
filename = name_part + ext
# 4. 안전한 ASCII 파일명 생성 (인코딩 문제 방지)
try:
filename.encode('utf-8')
except UnicodeEncodeError:
# 인코딩 문제 시 해시 기반 파일명 사용
filename = f"document_{hash(url) % 100000}{ext}"
file_path = os.path.join(save_dir, filename)
# 5. 최종 경로 길이 확인
if len(file_path) > 250:
filename = f"doc_{hash(url) % 100000}{ext}"
file_path = os.path.join(save_dir, filename)
with open(file_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
if os.path.getsize(file_path) == 0:
os.remove(file_path)
return None, "빈 파일이 다운로드됨"
return file_path, None
except Exception as e:
return None, f"다운로드 실패: {str(e)}"
def call_groq_api_stream(messages: List[Dict]) -> Generator[str, None, None]:
"""Groq API 스트리밍 호출"""
if not GROQ_AVAILABLE:
yield "❌ Groq 라이브러리가 설치되지 않았습니다."
return
if not GROQ_API_KEY:
yield "❌ GROQ_API_KEY 환경변수가 설정되지 않았습니다."
return
try:
client = Groq(api_key=GROQ_API_KEY)
completion = client.chat.completions.create(
model="llama-3.3-70b-versatile",
messages=messages,
temperature=0.3,
max_tokens=4096,
stream=True,
)
for chunk in completion:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
except Exception as e:
yield f"❌ API 오류: {str(e)}"
def fetch_announcement_detail(url: str) -> Tuple[str, List[Dict], Optional[Dict]]:
"""공고 상세 페이지에서 본문, 첨부파일, 본문출력파일 정보 추출
Returns:
(content_text, attachments, print_file)
- content_text: 공고 본문 텍스트
- attachments: 일반 첨부파일 리스트 (서식, 양식 등)
- print_file: 본문출력파일 (공고문 PDF/HWP) - AI 분석용
"""
try:
# URL 정규화
if url.startswith('/'):
url = f"https://www.bizinfo.go.kr{url}"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "ko-KR,ko;q=0.9,en-US;q=0.8,en;q=0.7",
}
response = requests.get(url, headers=headers, timeout=30, verify=False)
response.raise_for_status()
html_text = response.text
soup = BeautifulSoup(html_text, 'html.parser')
# 본문 텍스트 추출
content_text = ""
tables = soup.find_all('table')
for table in tables:
text = table.get_text(separator='\n', strip=True)
if '사업개요' in text or '지원대상' in text or '신청기간' in text:
content_text += text + "\n\n"
main_content = soup.find('div', {'id': 'container'}) or soup.find('main') or soup.find('article')
if main_content and not content_text:
content_text = main_content.get_text(separator='\n', strip=True)
attachments = [] # 일반 첨부파일
print_file = None # 본문출력파일
# 파일 링크 추출
for a_tag in soup.find_all('a', href=True):
href = a_tag.get('href', '')
href_clean = re.sub(r';jsessionid=[^?]*', '', href)
if 'getImageFile.do' in href_clean or 'fileDown' in href_clean or 'atchFileId' in href_clean:
filename = a_tag.get_text(strip=True)
# 파일명 추출 개선
if filename in ['다운로드', '바로보기', '내려받기', '']:
parent = a_tag.parent
if parent:
parent_text = parent.get_text(separator='|', strip=True)
parts = [p.strip() for p in parent_text.split('|') if p.strip()]
for part in parts:
if part not in ['다운로드', '바로보기', '내려받기'] and ('.' in part):
filename = part
break
title = a_tag.get('title', '')
if title and '첨부파일' in title:
match = re.search(r'첨부파일\s+(.+?)\s+다운로드', title)
if match:
filename = match.group(1)
if not filename or filename in ['다운로드', '바로보기', '내려받기']:
filename = f"첨부파일_{len(attachments)+1}"
# URL 정규화
if href_clean.startswith('/'):
full_url = f"https://www.bizinfo.go.kr{href_clean}"
elif href_clean.startswith('http'):
full_url = href_clean
else:
continue
ext = Path(filename).suffix.lower()
if not ext:
ext = '.unknown'
file_info = {
"filename": filename,
"url": full_url,
"type": ext[1:] if ext.startswith('.') else ext
}
# ⭐ 본문출력파일 판별 (공고문 PDF/HWP)
# 패턴: "(제XXXX-XX호)", "공고", "공고문", "본문", 날짜 패턴
is_print_file = False
filename_lower = filename.lower()
# 1. "(제XXXX-XX호)" 패턴 - 공고번호 포함
if re.search(r'\(제\d+[-_]?\d*호\)', filename):
is_print_file = True
# 2. "공고", "공고문", "모집공고" 등 키워드
elif any(kw in filename for kw in ['공고문', '모집공고', '공고(안)', '공고 안', '_공고_', '_공고.']):
is_print_file = True
# 3. "본문출력" 키워드
elif '본문출력' in filename or '본문 출력' in filename:
is_print_file = True
# 4. 부모 요소에 "본문출력파일" 텍스트가 있는지 확인
parent = a_tag.parent
grandparent = parent.parent if parent else None
for ancestor in [parent, grandparent]:
if ancestor:
ancestor_text = ancestor.get_text(strip=True)
if '본문출력파일' in ancestor_text or '본문출력 파일' in ancestor_text:
is_print_file = True
break
if is_print_file and not print_file:
print_file = file_info
elif not any(att['url'] == full_url for att in attachments):
# "(첨부" 로 시작하는 파일은 일반 첨부파일
attachments.append(file_info)
return content_text, attachments, print_file
except Exception as e:
import traceback
return f"상세 정보 조회 실패: {str(e)}\n{traceback.format_exc()}", [], None