# ============================================ # text_utils.py # 파일 추출, 웹 검색, 기본 텍스트 처리 함수들 # ============================================ import re, os, json, time, zipfile, tempfile, zlib from pathlib import Path from collections import Counter from concurrent.futures import ThreadPoolExecutor, as_completed from xml.etree import ElementTree as ET try: import httpx HAS_HTTPX = True except ImportError: HAS_HTTPX = False try: import pdfplumber HAS_PDFPLUMBER = True except ImportError: HAS_PDFPLUMBER = False try: import PyPDF2 HAS_PYPDF2 = True except ImportError: HAS_PYPDF2 = False try: from docx import Document as DocxDocument HAS_DOCX = True except ImportError: HAS_DOCX = False try: import olefile HAS_OLEFILE = True except ImportError: HAS_OLEFILE = False # ============================================ # 파일 추출 함수들 # ============================================ def extract_text_from_pdf(file_path): """PDF → 텍스트""" pages = [] if HAS_PDFPLUMBER: try: with pdfplumber.open(file_path) as pdf: for p in pdf.pages: t = p.extract_text() if t: pages.append(t) if pages: return pages, None except Exception as e: print(f"pdfplumber: {e}") if HAS_PYPDF2: try: with open(file_path, 'rb') as f: reader = PyPDF2.PdfReader(f) for p in reader.pages: t = p.extract_text() if t: pages.append(t) if pages: return pages, None except Exception as e: print(f"PyPDF2: {e}") return None, "PDF 추출 실패" def extract_text_from_docx(file_path): """DOCX → 텍스트""" if not HAS_DOCX: return None, "python-docx 없음" try: doc = DocxDocument(file_path) sections = [] current = [] for para in doc.paragraphs: txt = para.text.strip() if not txt: if current: sections.append('\n'.join(current)) current = [] else: current.append(txt) if current: sections.append('\n'.join(current)) if sections: return sections, None return None, "DOCX 텍스트 없음" except Exception as e: return None, f"DOCX 오류: {e}" def extract_text_from_txt(file_path): """TXT/MD/CSV 등""" for enc in ['utf-8', 'euc-kr', 'cp949', 'utf-16', 'latin-1']: try: with open(file_path, 'r', encoding=enc) as f: text = f.read() if text.strip(): sections = [s.strip() for s in re.split(r'\n{2,}', text) if s.strip()] return sections if sections else [text], None except: continue return None, "텍스트 인코딩 실패" def extract_text_from_hwpx(file_path): """HWPX(한글 2007 이상) → 텍스트""" try: text_parts = [] with zipfile.ZipFile(file_path, 'r') as zf: file_list = zf.namelist() section_files = sorted([f for f in file_list if f.startswith('Contents/section') and f.endswith('.xml')]) if not section_files: section_files = sorted([f for f in file_list if 'section' in f.lower() and f.endswith('.xml')]) for sf_name in section_files: try: with zf.open(sf_name) as sf: content = sf.read().decode('utf-8', errors='ignore') content = re.sub(r'\sxmlns[^"]*"[^"]*"', '', content) content = re.sub(r'<[a-zA-Z]+:', '<', content) content = re.sub(r'([^<]+)<', content) clean = [t.strip() for t in matches if t.strip() and len(t.strip()) > 1] if clean: text_parts.append(' '.join(clean)) except: continue if text_parts: return text_parts, None return None, "HWPX 텍스트 없음" except zipfile.BadZipFile: return None, "유효하지 않은 HWPX" except Exception as e: return None, f"HWPX 오류: {e}" def _decode_hwp_para(data): """HWP 문단 디코딩""" result = [] i = 0 while i < len(data) - 1: code = int.from_bytes(data[i:i+2], 'little') if code in (1,2,3): i += 14 elif code == 9: result.append('\t') elif code in (10,13): result.append('\n') elif code == 24: result.append('-') elif code in (30,31): result.append(' ') elif code >= 32: try: ch = chr(code) if ch.isprintable() or ch in '\n\t ': result.append(ch) except: pass i += 2 text = ''.join(result).strip() text = re.sub(r'[ \t]+', ' ', text) text = re.sub(r'\n{3,}', '\n\n', text) return text if len(text) > 2 else None def _extract_hwp_section(data): """HWP 섹션 추출""" texts = [] pos = 0 while pos < len(data) - 4: try: header = int.from_bytes(data[pos:pos+4], 'little') tag_id = header & 0x3FF size = (header >> 20) & 0xFFF pos += 4 if size == 0xFFF: if pos + 4 > len(data): break size = int.from_bytes(data[pos:pos+4], 'little') pos += 4 if pos + size > len(data): break record_data = data[pos:pos+size] pos += size if tag_id == 67 and size > 0: t = _decode_hwp_para(record_data) if t: texts.append(t) except: pos += 1 return '\n'.join(texts) if texts else None def extract_text_from_hwp(file_path): """HWP(구형) → 텍스트""" if not HAS_OLEFILE: return None, "olefile 없음" try: ole = olefile.OleFileIO(file_path) if not ole.exists('FileHeader'): ole.close(); return None, "HWP 헤더 없음" header_data = ole.openstream('FileHeader').read() is_compressed = (header_data[36] & 1) == 1 if len(header_data) > 36 else True all_texts = [] for entry in ole.listdir(): entry_path = '/'.join(entry) if 'Section' in entry_path and entry_path.endswith('_content.xml'): try: with ole.openstream(entry) as stream: content = stream.read() if is_compressed: try: content = zlib.decompress(content, -zlib.MAX_WBITS) except: pass t = _extract_hwp_section(content) if t: all_texts.append(t) except: pass ole.close() if all_texts: return all_texts, None return None, "HWP 텍스트 없음" except Exception as e: return None, f"HWP 오류: {e}" def extract_file_text_api(file_obj): """파일 객체 → 텍스트""" if not file_obj: return "" fp = Path(file_obj.name) suffix = fp.suffix.lower() texts = None error = None if suffix == '.pdf': texts, error = extract_text_from_pdf(str(fp)) elif suffix == '.docx': texts, error = extract_text_from_docx(str(fp)) elif suffix in ['.txt', '.md', '.csv']: texts, error = extract_text_from_txt(str(fp)) elif suffix == '.hwpx': texts, error = extract_text_from_hwpx(str(fp)) elif suffix == '.hwp': texts, error = extract_text_from_hwp(str(fp)) else: texts, error = extract_text_from_txt(str(fp)) if error: return f"⚠️ {error}" return '\n\n'.join(texts) if texts else "텍스트 추출 실패" # ============================================ # 기본 텍스트 처리 # ============================================ def split_sentences(text): """문장 분리""" text = re.sub(r'\s+', ' ', text).strip() sents = re.split(r'[.!?]+(?=\s|$)', text) sents = [s.strip() for s in sents if s.strip()] return sents def split_words(text): """단어 분리""" return [w for w in re.findall(r'[가-힣a-zA-Z0-9]+', text) if w] # ============================================ # HTTP 헬퍼 # ============================================ def http_get(url, headers=None, timeout=10): """HTTP GET""" if HAS_HTTPX: try: r = httpx.get(url, headers=headers, timeout=timeout) return r.text if r.status_code == 200 else None except: return None return None # ============================================ # 웹 검색 함수들 # ============================================ def brave_search(query, count=5): """Brave Search API""" BRAVE_KEY = os.getenv("BRAVE_API_KEY", "") if not BRAVE_KEY: return [] url = f"https://api.search.brave.com/res/v1/web/search?q={query}&count={count}" try: if HAS_HTTPX: r = httpx.get(url, headers={"X-Subscription-Token": BRAVE_KEY, "Accept": "application/json"}, timeout=10) if r.status_code == 200: data = r.json() results = [] for item in data.get("web", {}).get("results", []): results.append({"title": item.get("title",""), "url": item.get("url",""), "snippet": item.get("description",""), "source": "Brave"}) return results except: pass return [] def search_kci(query): """KCI 검색""" try: url = f"https://open.kci.go.kr/po/openapi/openApiSearch.kci?apiCode=articleSearch&title={query}&displayCount=3" resp = http_get(url, timeout=8) if resp: results = [] for m in re.finditer(r'.*?', resp, re.S): results.append({"title": m.group(1), "url": m.group(2), "snippet": "", "source": "KCI"}) return results[:3] except: pass return [] def search_riss(query): """RISS 검색""" results = [] try: url = f"http://www.riss.kr/search/Search.do?isDetailSearch=N&searchGubun=true&viewYn=OP&queryText=&strQuery={query}&iStartCount=0&iGroupView=5&icate=all" resp = http_get(url, timeout=8) if resp: for m in re.finditer(r'class="title"[^>]*>.*?]*href="([^"]+)"[^>]*>(.*?)', resp, re.S): title = re.sub(r'<[^>]+>', '', m.group(2)).strip() if title: results.append({"title": title, "url": "https://www.riss.kr" + m.group(1), "snippet": "", "source": "RISS"}) except: pass return results[:3] def search_arxiv(query): """arXiv 검색""" results = [] try: import urllib.parse q = urllib.parse.quote(query) url = f"https://export.arxiv.org/api/query?search_query=all:{q}&start=0&max_results=3&sortBy=relevance" resp = http_get(url, timeout=12) if resp: for m in re.finditer(r'.*?(.*?).*?(.*?).*?(.*?)', resp, re.S): title = re.sub(r'\s+', ' ', m.group(1)).strip() results.append({"title": title, "url": m.group(2).strip(), "snippet": re.sub(r'\s+', ' ', m.group(3)).strip()[:150], "source": "arXiv"}) except: pass return results[:3] def duckduckgo_search(query, max_results=5): """DuckDuckGo 검색""" results = [] try: import urllib.parse q = urllib.parse.quote(query) url = f"https://html.duckduckgo.com/html/?q={q}" headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"} resp = http_get(url, headers=headers, timeout=10) if resp: for m in re.finditer(r']+class="result__a"[^>]+href="([^"]+)"[^>]*>(.*?).*?]+class="result__snippet"[^>]*>(.*?)', resp, re.S): href = m.group(1) title = re.sub(r'<[^>]+>', '', m.group(2)).strip() snippet = re.sub(r'<[^>]+>', '', m.group(3)).strip() real_url = href if 'uddg=' in href: um = re.search(r'uddg=([^&]+)', href) if um: real_url = urllib.parse.unquote(um.group(1)) if title: results.append({"title": title, "url": real_url, "snippet": snippet, "source": "Web"}) if len(results) >= max_results: break except: pass return results def self_crawl_search(query, max_results=3): """DuckDuckGo 크롤링""" all_results = [] all_results.extend(duckduckgo_search(query, max_results)) if '논문' not in query and 'paper' not in query.lower(): all_results.extend(duckduckgo_search(f"{query} 논문 학술", 2)) return all_results def parallel_brave_search(queries, max_workers=10): """Brave Search 병렬 실행""" all_results = {} with ThreadPoolExecutor(max_workers=min(max_workers, 20)) as executor: futures = {executor.submit(brave_search, q, 3): q for q in queries} for future in as_completed(futures): q = futures[future] try: results = future.result() all_results[q] = results except: all_results[q] = [] return all_results