# ============================================
# text_utils.py
# 파일 추출, 웹 검색, 기본 텍스트 처리 함수들
# ============================================
import re, os, json, time, zipfile, tempfile, zlib
from pathlib import Path
from collections import Counter
from concurrent.futures import ThreadPoolExecutor, as_completed
from xml.etree import ElementTree as ET
try:
import httpx
HAS_HTTPX = True
except ImportError:
HAS_HTTPX = False
try:
import pdfplumber
HAS_PDFPLUMBER = True
except ImportError:
HAS_PDFPLUMBER = False
try:
import PyPDF2
HAS_PYPDF2 = True
except ImportError:
HAS_PYPDF2 = False
try:
from docx import Document as DocxDocument
HAS_DOCX = True
except ImportError:
HAS_DOCX = False
try:
import olefile
HAS_OLEFILE = True
except ImportError:
HAS_OLEFILE = False
# ============================================
# 파일 추출 함수들
# ============================================
def extract_text_from_pdf(file_path):
"""PDF → 텍스트"""
pages = []
if HAS_PDFPLUMBER:
try:
with pdfplumber.open(file_path) as pdf:
for p in pdf.pages:
t = p.extract_text()
if t: pages.append(t)
if pages: return pages, None
except Exception as e:
print(f"pdfplumber: {e}")
if HAS_PYPDF2:
try:
with open(file_path, 'rb') as f:
reader = PyPDF2.PdfReader(f)
for p in reader.pages:
t = p.extract_text()
if t: pages.append(t)
if pages: return pages, None
except Exception as e:
print(f"PyPDF2: {e}")
return None, "PDF 추출 실패"
def extract_text_from_docx(file_path):
"""DOCX → 텍스트"""
if not HAS_DOCX: return None, "python-docx 없음"
try:
doc = DocxDocument(file_path)
sections = []
current = []
for para in doc.paragraphs:
txt = para.text.strip()
if not txt:
if current:
sections.append('\n'.join(current))
current = []
else:
current.append(txt)
if current: sections.append('\n'.join(current))
if sections: return sections, None
return None, "DOCX 텍스트 없음"
except Exception as e:
return None, f"DOCX 오류: {e}"
def extract_text_from_txt(file_path):
"""TXT/MD/CSV 등"""
for enc in ['utf-8', 'euc-kr', 'cp949', 'utf-16', 'latin-1']:
try:
with open(file_path, 'r', encoding=enc) as f:
text = f.read()
if text.strip():
sections = [s.strip() for s in re.split(r'\n{2,}', text) if s.strip()]
return sections if sections else [text], None
except: continue
return None, "텍스트 인코딩 실패"
def extract_text_from_hwpx(file_path):
"""HWPX(한글 2007 이상) → 텍스트"""
try:
text_parts = []
with zipfile.ZipFile(file_path, 'r') as zf:
file_list = zf.namelist()
section_files = sorted([f for f in file_list if f.startswith('Contents/section') and f.endswith('.xml')])
if not section_files:
section_files = sorted([f for f in file_list if 'section' in f.lower() and f.endswith('.xml')])
for sf_name in section_files:
try:
with zf.open(sf_name) as sf:
content = sf.read().decode('utf-8', errors='ignore')
content = re.sub(r'\sxmlns[^"]*"[^"]*"', '', content)
content = re.sub(r'<[a-zA-Z]+:', '<', content)
content = re.sub(r'[a-zA-Z]+:', '', content)
try:
root = ET.fromstring(content)
texts = []
for elem in root.iter():
if elem.tag.endswith('t') or elem.tag == 't':
if elem.text: texts.append(elem.text)
elif elem.text and elem.text.strip():
if any(x in elem.tag.lower() for x in ['text', 'run', 'para', 'char']):
texts.append(elem.text.strip())
if texts: text_parts.append(' '.join(texts))
except ET.ParseError:
matches = re.findall(r'>([^<]+)<', content)
clean = [t.strip() for t in matches if t.strip() and len(t.strip()) > 1]
if clean: text_parts.append(' '.join(clean))
except: continue
if text_parts:
return text_parts, None
return None, "HWPX 텍스트 없음"
except zipfile.BadZipFile:
return None, "유효하지 않은 HWPX"
except Exception as e:
return None, f"HWPX 오류: {e}"
def _decode_hwp_para(data):
"""HWP 문단 디코딩"""
result = []
i = 0
while i < len(data) - 1:
code = int.from_bytes(data[i:i+2], 'little')
if code in (1,2,3): i += 14
elif code == 9: result.append('\t')
elif code in (10,13): result.append('\n')
elif code == 24: result.append('-')
elif code in (30,31): result.append(' ')
elif code >= 32:
try:
ch = chr(code)
if ch.isprintable() or ch in '\n\t ': result.append(ch)
except: pass
i += 2
text = ''.join(result).strip()
text = re.sub(r'[ \t]+', ' ', text)
text = re.sub(r'\n{3,}', '\n\n', text)
return text if len(text) > 2 else None
def _extract_hwp_section(data):
"""HWP 섹션 추출"""
texts = []
pos = 0
while pos < len(data) - 4:
try:
header = int.from_bytes(data[pos:pos+4], 'little')
tag_id = header & 0x3FF
size = (header >> 20) & 0xFFF
pos += 4
if size == 0xFFF:
if pos + 4 > len(data): break
size = int.from_bytes(data[pos:pos+4], 'little')
pos += 4
if pos + size > len(data): break
record_data = data[pos:pos+size]
pos += size
if tag_id == 67 and size > 0:
t = _decode_hwp_para(record_data)
if t: texts.append(t)
except:
pos += 1
return '\n'.join(texts) if texts else None
def extract_text_from_hwp(file_path):
"""HWP(구형) → 텍스트"""
if not HAS_OLEFILE: return None, "olefile 없음"
try:
ole = olefile.OleFileIO(file_path)
if not ole.exists('FileHeader'):
ole.close(); return None, "HWP 헤더 없음"
header_data = ole.openstream('FileHeader').read()
is_compressed = (header_data[36] & 1) == 1 if len(header_data) > 36 else True
all_texts = []
for entry in ole.listdir():
entry_path = '/'.join(entry)
if 'Section' in entry_path and entry_path.endswith('_content.xml'):
try:
with ole.openstream(entry) as stream:
content = stream.read()
if is_compressed:
try:
content = zlib.decompress(content, -zlib.MAX_WBITS)
except: pass
t = _extract_hwp_section(content)
if t: all_texts.append(t)
except: pass
ole.close()
if all_texts:
return all_texts, None
return None, "HWP 텍스트 없음"
except Exception as e:
return None, f"HWP 오류: {e}"
def extract_file_text_api(file_obj):
"""파일 객체 → 텍스트"""
if not file_obj: return ""
fp = Path(file_obj.name)
suffix = fp.suffix.lower()
texts = None
error = None
if suffix == '.pdf':
texts, error = extract_text_from_pdf(str(fp))
elif suffix == '.docx':
texts, error = extract_text_from_docx(str(fp))
elif suffix in ['.txt', '.md', '.csv']:
texts, error = extract_text_from_txt(str(fp))
elif suffix == '.hwpx':
texts, error = extract_text_from_hwpx(str(fp))
elif suffix == '.hwp':
texts, error = extract_text_from_hwp(str(fp))
else:
texts, error = extract_text_from_txt(str(fp))
if error:
return f"⚠️ {error}"
return '\n\n'.join(texts) if texts else "텍스트 추출 실패"
# ============================================
# 기본 텍스트 처리
# ============================================
def split_sentences(text):
"""문장 분리"""
text = re.sub(r'\s+', ' ', text).strip()
sents = re.split(r'[.!?]+(?=\s|$)', text)
sents = [s.strip() for s in sents if s.strip()]
return sents
def split_words(text):
"""단어 분리"""
return [w for w in re.findall(r'[가-힣a-zA-Z0-9]+', text) if w]
# ============================================
# HTTP 헬퍼
# ============================================
def http_get(url, headers=None, timeout=10):
"""HTTP GET"""
if HAS_HTTPX:
try:
r = httpx.get(url, headers=headers, timeout=timeout)
return r.text if r.status_code == 200 else None
except: return None
return None
# ============================================
# 웹 검색 함수들
# ============================================
def brave_search(query, count=5):
"""Brave Search API"""
BRAVE_KEY = os.getenv("BRAVE_API_KEY", "")
if not BRAVE_KEY: return []
url = f"https://api.search.brave.com/res/v1/web/search?q={query}&count={count}"
try:
if HAS_HTTPX:
r = httpx.get(url, headers={"X-Subscription-Token": BRAVE_KEY, "Accept": "application/json"}, timeout=10)
if r.status_code == 200:
data = r.json()
results = []
for item in data.get("web", {}).get("results", []):
results.append({"title": item.get("title",""), "url": item.get("url",""), "snippet": item.get("description",""), "source": "Brave"})
return results
except: pass
return []
def search_kci(query):
"""KCI 검색"""
try:
url = f"https://open.kci.go.kr/po/openapi/openApiSearch.kci?apiCode=articleSearch&title={query}&displayCount=3"
resp = http_get(url, timeout=8)
if resp:
results = []
for m in re.finditer(r'.*?', resp, re.S):
results.append({"title": m.group(1), "url": m.group(2), "snippet": "", "source": "KCI"})
return results[:3]
except: pass
return []
def search_riss(query):
"""RISS 검색"""
results = []
try:
url = f"http://www.riss.kr/search/Search.do?isDetailSearch=N&searchGubun=true&viewYn=OP&queryText=&strQuery={query}&iStartCount=0&iGroupView=5&icate=all"
resp = http_get(url, timeout=8)
if resp:
for m in re.finditer(r'class="title"[^>]*>.*?]*href="([^"]+)"[^>]*>(.*?)', resp, re.S):
title = re.sub(r'<[^>]+>', '', m.group(2)).strip()
if title:
results.append({"title": title, "url": "https://www.riss.kr" + m.group(1), "snippet": "", "source": "RISS"})
except: pass
return results[:3]
def search_arxiv(query):
"""arXiv 검색"""
results = []
try:
import urllib.parse
q = urllib.parse.quote(query)
url = f"https://export.arxiv.org/api/query?search_query=all:{q}&start=0&max_results=3&sortBy=relevance"
resp = http_get(url, timeout=12)
if resp:
for m in re.finditer(r'.*?(.*?).*?(.*?).*?(.*?)', resp, re.S):
title = re.sub(r'\s+', ' ', m.group(1)).strip()
results.append({"title": title, "url": m.group(2).strip(), "snippet": re.sub(r'\s+', ' ', m.group(3)).strip()[:150], "source": "arXiv"})
except: pass
return results[:3]
def duckduckgo_search(query, max_results=5):
"""DuckDuckGo 검색"""
results = []
try:
import urllib.parse
q = urllib.parse.quote(query)
url = f"https://html.duckduckgo.com/html/?q={q}"
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"}
resp = http_get(url, headers=headers, timeout=10)
if resp:
for m in re.finditer(r']+class="result__a"[^>]+href="([^"]+)"[^>]*>(.*?).*?]+class="result__snippet"[^>]*>(.*?)', resp, re.S):
href = m.group(1)
title = re.sub(r'<[^>]+>', '', m.group(2)).strip()
snippet = re.sub(r'<[^>]+>', '', m.group(3)).strip()
real_url = href
if 'uddg=' in href:
um = re.search(r'uddg=([^&]+)', href)
if um: real_url = urllib.parse.unquote(um.group(1))
if title:
results.append({"title": title, "url": real_url, "snippet": snippet, "source": "Web"})
if len(results) >= max_results: break
except: pass
return results
def self_crawl_search(query, max_results=3):
"""DuckDuckGo 크롤링"""
all_results = []
all_results.extend(duckduckgo_search(query, max_results))
if '논문' not in query and 'paper' not in query.lower():
all_results.extend(duckduckgo_search(f"{query} 논문 학술", 2))
return all_results
def parallel_brave_search(queries, max_workers=10):
"""Brave Search 병렬 실행"""
all_results = {}
with ThreadPoolExecutor(max_workers=min(max_workers, 20)) as executor:
futures = {executor.submit(brave_search, q, 3): q for q in queries}
for future in as_completed(futures):
q = futures[future]
try:
results = future.result()
all_results[q] = results
except: all_results[q] = []
return all_results