EXAM-GEN / app-backup.py
seawolf2357's picture
Create app-backup.py
edb886d verified
raw
history blame
53.1 kB
"""
HWP AI 어시스턴트 - Gradio 웹 앱
AI가 HWP 파일을 읽고, 보고, 말하며, 생각하고 기억합니다.
- Tab 1: LLM 채팅 (스트리밍, 파일 첨부 지원)
- Tab 2: HWP 변환기
"""
import gradio as gr
import tempfile
import os
import subprocess
import shutil
import sys
import re
import json
import uuid
import sqlite3
import base64
import requests
import zlib
import zipfile
from pathlib import Path
from datetime import datetime
from typing import Generator, List, Dict, Optional
from xml.etree import ElementTree as ET
# Groq 라이브러리 임포트
try:
from groq import Groq
GROQ_AVAILABLE = True
print("✅ Groq library loaded")
except ImportError:
GROQ_AVAILABLE = False
print("❌ Groq library not available - pip install groq")
# ============== Comic Style CSS ==============
COMIC_CSS = """
@import url('https://fonts.googleapis.com/css2?family=Bangers&family=Comic+Neue:wght@400;700&display=swap');
.gradio-container {
background-color: #FEF9C3 !important;
background-image: radial-gradient(#1F2937 1px, transparent 1px) !important;
background-size: 20px 20px !important;
min-height: 100vh !important;
font-family: 'Comic Neue', cursive, sans-serif !important;
}
footer, .footer, .gradio-container footer, .built-with, [class*="footer"], .gradio-footer, a[href*="gradio.app"] {
display: none !important;
visibility: hidden !important;
height: 0 !important;
}
/* HOME Button Style */
.home-button-container {
display: flex;
justify-content: center;
align-items: center;
gap: 15px;
margin-bottom: 15px;
padding: 12px 20px;
background: linear-gradient(135deg, #10B981 0%, #059669 100%);
border: 4px solid #1F2937;
border-radius: 12px;
box-shadow: 6px 6px 0 #1F2937;
}
.home-button {
display: inline-flex;
align-items: center;
gap: 8px;
padding: 10px 25px;
background: linear-gradient(135deg, #FACC15 0%, #F59E0B 100%);
color: #1F2937;
font-family: 'Bangers', cursive;
font-size: 1.4rem;
letter-spacing: 2px;
text-decoration: none;
border: 3px solid #1F2937;
border-radius: 8px;
box-shadow: 4px 4px 0 #1F2937;
transition: all 0.2s ease;
}
.home-button:hover {
background: linear-gradient(135deg, #FDE047 0%, #FACC15 100%);
transform: translate(-2px, -2px);
box-shadow: 6px 6px 0 #1F2937;
}
.home-button:active {
transform: translate(2px, 2px);
box-shadow: 2px 2px 0 #1F2937;
}
.url-display {
font-family: 'Comic Neue', cursive;
font-size: 1.1rem;
font-weight: 700;
color: #FFF;
background: rgba(0,0,0,0.3);
padding: 8px 16px;
border-radius: 6px;
border: 2px solid rgba(255,255,255,0.3);
}
.header-container {
text-align: center;
padding: 25px 20px;
background: linear-gradient(135deg, #3B82F6 0%, #8B5CF6 100%);
border: 4px solid #1F2937;
border-radius: 12px;
margin-bottom: 20px;
box-shadow: 8px 8px 0 #1F2937;
position: relative;
}
.header-title {
font-family: 'Bangers', cursive !important;
color: #FFF !important;
font-size: 2.8rem !important;
text-shadow: 3px 3px 0 #1F2937 !important;
letter-spacing: 3px !important;
margin: 0 !important;
}
.header-subtitle {
font-family: 'Comic Neue', cursive !important;
font-size: 1.1rem !important;
color: #FEF9C3 !important;
margin-top: 8px !important;
font-weight: 700 !important;
}
.stats-badge {
display: inline-block;
background: #FACC15;
color: #1F2937;
padding: 6px 14px;
border-radius: 20px;
font-size: 0.9rem;
margin: 3px;
font-weight: 700;
border: 2px solid #1F2937;
box-shadow: 2px 2px 0 #1F2937;
}
/* 무료 서비스 안내 박스 */
.free-service-notice {
text-align: center;
padding: 10px 15px;
background: linear-gradient(135deg, #FEE2E2 0%, #FECACA 100%);
border: 3px solid #1F2937;
border-radius: 8px;
margin: 10px 0;
box-shadow: 4px 4px 0 #1F2937;
font-family: 'Comic Neue', cursive;
font-weight: 700;
color: #991B1B;
}
.free-service-notice a {
color: #1D4ED8;
text-decoration: none;
font-weight: 700;
}
.free-service-notice a:hover {
text-decoration: underline;
}
.gr-panel, .gr-box, .gr-form, .block, .gr-group {
background: #FFF !important;
border: 3px solid #1F2937 !important;
border-radius: 8px !important;
box-shadow: 5px 5px 0 #1F2937 !important;
}
.gr-button-primary, button.primary, .gr-button.primary {
background: linear-gradient(135deg, #EF4444 0%, #F97316 100%) !important;
border: 3px solid #1F2937 !important;
border-radius: 8px !important;
color: #FFF !important;
font-family: 'Bangers', cursive !important;
font-size: 1.3rem !important;
letter-spacing: 2px !important;
padding: 12px 24px !important;
box-shadow: 4px 4px 0 #1F2937 !important;
text-shadow: 1px 1px 0 #1F2937 !important;
transition: all 0.2s ease !important;
}
.gr-button-primary:hover, button.primary:hover {
background: linear-gradient(135deg, #DC2626 0%, #EA580C 100%) !important;
transform: translate(-2px, -2px) !important;
box-shadow: 6px 6px 0 #1F2937 !important;
}
.gr-button-primary:active, button.primary:active {
transform: translate(2px, 2px) !important;
box-shadow: 2px 2px 0 #1F2937 !important;
}
textarea, input[type="text"], input[type="number"] {
background: #FFF !important;
border: 3px solid #1F2937 !important;
border-radius: 8px !important;
color: #1F2937 !important;
font-family: 'Comic Neue', cursive !important;
font-weight: 700 !important;
}
textarea:focus, input[type="text"]:focus {
border-color: #3B82F6 !important;
box-shadow: 3px 3px 0 #3B82F6 !important;
}
.info-box {
background: linear-gradient(135deg, #FACC15 0%, #FDE047 100%) !important;
border: 3px solid #1F2937 !important;
border-radius: 8px !important;
padding: 12px 15px !important;
margin: 10px 0 !important;
box-shadow: 4px 4px 0 #1F2937 !important;
font-family: 'Comic Neue', cursive !important;
font-weight: 700 !important;
color: #1F2937 !important;
}
.feature-box {
background: linear-gradient(135deg, #E0F2FE 0%, #BAE6FD 100%) !important;
border: 3px solid #1F2937 !important;
border-radius: 12px !important;
padding: 20px !important;
margin: 15px 0 !important;
box-shadow: 5px 5px 0 #1F2937 !important;
}
.feature-title {
font-family: 'Bangers', cursive !important;
font-size: 1.5rem !important;
color: #1F2937 !important;
margin-bottom: 10px !important;
text-shadow: 1px 1px 0 #FFF !important;
}
.feature-item {
display: flex;
align-items: center;
gap: 10px;
padding: 8px 0;
font-family: 'Comic Neue', cursive !important;
font-weight: 700 !important;
font-size: 1rem !important;
color: #1F2937 !important;
}
.feature-icon {
font-size: 1.5rem;
}
/* Markdown 강조 박스 */
.markdown-highlight-box {
background: linear-gradient(135deg, #EC4899 0%, #F472B6 100%) !important;
border: 4px solid #1F2937 !important;
border-radius: 12px !important;
padding: 20px !important;
margin: 15px 0 !important;
box-shadow: 6px 6px 0 #1F2937 !important;
animation: pulse-glow 2s ease-in-out infinite;
}
@keyframes pulse-glow {
0%, 100% { box-shadow: 6px 6px 0 #1F2937; }
50% { box-shadow: 8px 8px 0 #1F2937, 0 0 20px rgba(236, 72, 153, 0.5); }
}
.markdown-title {
font-family: 'Bangers', cursive !important;
font-size: 2rem !important;
color: #FFF !important;
text-shadow: 3px 3px 0 #1F2937 !important;
letter-spacing: 2px !important;
margin-bottom: 15px !important;
text-align: center !important;
}
.markdown-benefits {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(180px, 1fr));
gap: 12px;
margin-top: 10px;
}
.markdown-benefit-item {
background: rgba(255,255,255,0.95) !important;
border: 3px solid #1F2937 !important;
border-radius: 8px !important;
padding: 12px !important;
box-shadow: 3px 3px 0 #1F2937 !important;
font-family: 'Comic Neue', cursive !important;
font-weight: 700 !important;
font-size: 0.95rem !important;
color: #1F2937 !important;
text-align: center !important;
}
.markdown-benefit-icon {
font-size: 1.8rem !important;
display: block !important;
margin-bottom: 5px !important;
}
label, .gr-input-label, .gr-block-label {
color: #1F2937 !important;
font-family: 'Comic Neue', cursive !important;
font-weight: 700 !important;
}
.gr-accordion {
background: #E0F2FE !important;
border: 3px solid #1F2937 !important;
border-radius: 8px !important;
box-shadow: 4px 4px 0 #1F2937 !important;
}
.footer-comic {
text-align: center;
padding: 20px;
background: linear-gradient(135deg, #3B82F6 0%, #8B5CF6 100%);
border: 4px solid #1F2937;
border-radius: 12px;
margin-top: 20px;
box-shadow: 6px 6px 0 #1F2937;
}
.footer-comic p {
font-family: 'Comic Neue', cursive !important;
color: #FFF !important;
margin: 5px 0 !important;
font-weight: 700 !important;
}
::-webkit-scrollbar {
width: 12px;
height: 12px;
}
::-webkit-scrollbar-track {
background: #FEF9C3;
border: 2px solid #1F2937;
}
::-webkit-scrollbar-thumb {
background: #3B82F6;
border: 2px solid #1F2937;
border-radius: 6px;
}
::-webkit-scrollbar-thumb:hover {
background: #EF4444;
}
::selection {
background: #FACC15;
color: #1F2937;
}
/* Chatbot Styling */
.gr-chatbot {
border: 3px solid #1F2937 !important;
border-radius: 12px !important;
box-shadow: 5px 5px 0 #1F2937 !important;
}
/* Tab Styling */
.gr-tab-nav {
background: linear-gradient(135deg, #F59E0B 0%, #FACC15 100%) !important;
border: 3px solid #1F2937 !important;
border-radius: 8px 8px 0 0 !important;
}
.gr-tab-nav button {
font-family: 'Bangers', cursive !important;
font-size: 1.2rem !important;
letter-spacing: 1px !important;
color: #1F2937 !important;
}
.gr-tab-nav button.selected {
background: #FFF !important;
border-bottom: 3px solid #FFF !important;
}
/* File Upload Box */
.upload-box {
border: 3px dashed #3B82F6 !important;
border-radius: 12px !important;
background: linear-gradient(135deg, #EFF6FF 0%, #DBEAFE 100%) !important;
box-shadow: 4px 4px 0 #1F2937 !important;
}
.download-box {
border: 3px solid #10B981 !important;
border-radius: 12px !important;
background: linear-gradient(135deg, #ECFDF5 0%, #D1FAE5 100%) !important;
box-shadow: 4px 4px 0 #1F2937 !important;
}
"""
# ============== 환경 설정 ==============
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
PYHWP_PATH = os.path.join(SCRIPT_DIR, 'pyhwp')
DB_PATH = os.path.join(SCRIPT_DIR, 'chat_history.db')
if os.path.exists(PYHWP_PATH):
sys.path.insert(0, PYHWP_PATH)
# ============== 모듈 임포트 ==============
try:
import olefile
OLEFILE_AVAILABLE = True
print("✅ olefile loaded")
except ImportError:
OLEFILE_AVAILABLE = False
try:
from markdownify import markdownify as md
MARKDOWNIFY_AVAILABLE = True
print("✅ markdownify loaded")
except ImportError:
MARKDOWNIFY_AVAILABLE = False
try:
import html2text
HTML2TEXT_AVAILABLE = True
print("✅ html2text loaded")
except ImportError:
HTML2TEXT_AVAILABLE = False
try:
from bs4 import BeautifulSoup
BS4_AVAILABLE = True
except ImportError:
BS4_AVAILABLE = False
try:
import PyPDF2
PYPDF2_AVAILABLE = True
print("✅ PyPDF2 loaded")
except ImportError:
PYPDF2_AVAILABLE = False
try:
import pdfplumber
PDFPLUMBER_AVAILABLE = True
print("✅ pdfplumber loaded")
except ImportError:
PDFPLUMBER_AVAILABLE = False
# ============== API 키 설정 ==============
GROQ_API_KEY = os.environ.get("GROQ_API_KEY", "")
FIREWORKS_API_KEY = os.environ.get("FIREWORKS_API_KEY", "")
# ============== SQLite 데이터베이스 ==============
def init_database():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS sessions (
session_id TEXT PRIMARY KEY,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
title TEXT
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT,
role TEXT,
content TEXT,
file_info TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (session_id) REFERENCES sessions(session_id)
)
''')
conn.commit()
conn.close()
def create_session() -> str:
session_id = str(uuid.uuid4())
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("INSERT INTO sessions (session_id, title) VALUES (?, ?)",
(session_id, f"대화 {datetime.now().strftime('%Y-%m-%d %H:%M')}"))
conn.commit()
conn.close()
return session_id
def save_message(session_id: str, role: str, content: str, file_info: str = None):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("INSERT INTO messages (session_id, role, content, file_info) VALUES (?, ?, ?, ?)",
(session_id, role, content, file_info))
cursor.execute("UPDATE sessions SET updated_at = CURRENT_TIMESTAMP WHERE session_id = ?", (session_id,))
conn.commit()
conn.close()
def get_session_messages(session_id: str, limit: int = 20) -> List[Dict]:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT role, content, file_info, created_at FROM messages WHERE session_id = ? ORDER BY created_at DESC LIMIT ?",
(session_id, limit))
rows = cursor.fetchall()
conn.close()
return [{"role": r[0], "content": r[1], "file_info": r[2], "created_at": r[3]} for r in reversed(rows)]
def get_all_sessions() -> List[Dict]:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT session_id, title, created_at, updated_at FROM sessions ORDER BY updated_at DESC LIMIT 50")
rows = cursor.fetchall()
conn.close()
return [{"session_id": r[0], "title": r[1], "created_at": r[2], "updated_at": r[3]} for r in rows]
def update_session_title(session_id: str, title: str):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("UPDATE sessions SET title = ? WHERE session_id = ?", (title, session_id))
conn.commit()
conn.close()
init_database()
# ============== 파일 유틸리티 ==============
def extract_text_from_pdf(file_path: str) -> str:
text_parts = []
if PDFPLUMBER_AVAILABLE:
try:
with pdfplumber.open(file_path) as pdf:
for page in pdf.pages:
text = page.extract_text()
if text:
text_parts.append(text)
if text_parts:
return "\n\n".join(text_parts)
except Exception as e:
print(f"pdfplumber error: {e}")
if PYPDF2_AVAILABLE:
try:
with open(file_path, 'rb') as f:
reader = PyPDF2.PdfReader(f)
for page in reader.pages:
text = page.extract_text()
if text:
text_parts.append(text)
if text_parts:
return "\n\n".join(text_parts)
except Exception as e:
print(f"PyPDF2 error: {e}")
return None
def extract_text_from_txt(file_path: str) -> str:
for encoding in ['utf-8', 'euc-kr', 'cp949', 'utf-16', 'latin-1']:
try:
with open(file_path, 'r', encoding=encoding) as f:
return f.read()
except:
continue
return None
def image_to_base64(file_path: str) -> str:
with open(file_path, 'rb') as f:
return base64.b64encode(f.read()).decode('utf-8')
def get_image_mime_type(file_path: str) -> str:
ext = Path(file_path).suffix.lower()
return {'.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png',
'.gif': 'image/gif', '.webp': 'image/webp', '.bmp': 'image/bmp'}.get(ext, 'image/jpeg')
def is_image_file(fp: str) -> bool:
return Path(fp).suffix.lower() in ['.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp']
def is_hwp_file(fp: str) -> bool:
return Path(fp).suffix.lower() == '.hwp'
def is_hwpx_file(fp: str) -> bool:
return Path(fp).suffix.lower() == '.hwpx'
def is_pdf_file(fp: str) -> bool:
return Path(fp).suffix.lower() == '.pdf'
def is_text_file(fp: str) -> bool:
return Path(fp).suffix.lower() in ['.txt', '.md', '.json', '.csv', '.xml', '.html', '.css', '.js', '.py']
# ============== HWPX 텍스트 추출 ==============
def extract_text_from_hwpx(file_path: str) -> tuple:
try:
text_parts = []
with zipfile.ZipFile(file_path, 'r') as zf:
file_list = zf.namelist()
section_files = sorted([f for f in file_list if f.startswith('Contents/section') and f.endswith('.xml')])
if not section_files:
section_files = sorted([f for f in file_list if 'section' in f.lower() and f.endswith('.xml')])
for section_file in section_files:
try:
with zf.open(section_file) as sf:
content = sf.read()
content_str = content.decode('utf-8')
content_str = re.sub(r'\sxmlns[^"]*"[^"]*"', '', content_str)
content_str = re.sub(r'<[a-zA-Z]+:', '<', content_str)
content_str = re.sub(r'</[a-zA-Z]+:', '</', content_str)
try:
root = ET.fromstring(content_str)
texts = []
for elem in root.iter():
if elem.tag.endswith('t') or elem.tag == 't':
if elem.text:
texts.append(elem.text)
elif elem.text and elem.text.strip():
if any(x in elem.tag.lower() for x in ['text', 'run', 'para', 'char']):
texts.append(elem.text.strip())
if texts:
text_parts.append(' '.join(texts))
except ET.ParseError:
text_matches = re.findall(r'>([^<]+)<', content.decode('utf-8', errors='ignore'))
clean_texts = [t.strip() for t in text_matches if t.strip() and len(t.strip()) > 1]
if clean_texts:
text_parts.append(' '.join(clean_texts))
except:
continue
if text_parts:
result = '\n\n'.join(text_parts)
result = re.sub(r'\s+', ' ', result)
result = re.sub(r'\n{3,}', '\n\n', result)
return result.strip(), None
return None, "HWPX에서 텍스트를 찾을 수 없습니다"
except zipfile.BadZipFile:
return None, "유효하지 않은 HWPX 파일"
except Exception as e:
return None, f"HWPX 처리 오류: {str(e)}"
# ============== HWP 텍스트 추출 ==============
def extract_text_with_hwp5txt(file_path: str) -> tuple:
try:
result = subprocess.run(['hwp5txt', file_path], capture_output=True, timeout=60)
if result.returncode == 0 and result.stdout:
for enc in ['utf-8', 'cp949', 'euc-kr']:
try:
text = result.stdout.decode(enc)
if text.strip() and len(text.strip()) > 10:
return text.strip(), None
except:
continue
except FileNotFoundError:
pass
except Exception as e:
print(f"hwp5txt error: {e}")
try:
code = f'''
import sys
sys.path.insert(0, "{PYHWP_PATH}")
from hwp5.filestructure import Hwp5File
from hwp5.hwp5txt import extract_text
hwp = Hwp5File("{file_path}")
for idx in hwp.bodytext.sections():
section = hwp.bodytext.section(idx)
for para in extract_text(section):
if para.strip():
print(para.strip())
hwp.close()
'''
result = subprocess.run([sys.executable, '-c', code], capture_output=True, timeout=60)
if result.returncode == 0 and result.stdout:
for enc in ['utf-8', 'cp949', 'euc-kr']:
try:
text = result.stdout.decode(enc)
if text.strip() and len(text.strip()) > 10:
return text.strip(), None
except:
continue
except Exception as e:
print(f"hwp5txt subprocess error: {e}")
return None, "hwp5txt 실패"
def extract_text_with_olefile(file_path: str) -> tuple:
if not OLEFILE_AVAILABLE:
return None, "olefile 모듈 없음"
try:
ole = olefile.OleFileIO(file_path)
if not ole.exists('FileHeader'):
ole.close()
return None, "HWP 파일 헤더 없음"
header_data = ole.openstream('FileHeader').read()
is_compressed = (header_data[36] & 1) == 1 if len(header_data) > 36 else True
all_texts = []
for entry in ole.listdir():
entry_path = '/'.join(entry)
if entry_path.startswith('BodyText/Section'):
try:
stream_data = ole.openstream(entry).read()
if is_compressed:
try:
stream_data = zlib.decompress(stream_data, -15)
except:
try:
stream_data = zlib.decompress(stream_data)
except:
pass
section_text = extract_hwp_section_text(stream_data)
if section_text:
all_texts.append(section_text)
except:
continue
ole.close()
if all_texts:
return '\n\n'.join(all_texts).strip(), None
return None, "텍스트를 찾을 수 없습니다"
except Exception as e:
return None, f"olefile 오류: {str(e)}"
def extract_hwp_section_text(data: bytes) -> str:
texts = []
pos = 0
while pos < len(data) - 4:
try:
header = int.from_bytes(data[pos:pos+4], 'little')
tag_id = header & 0x3FF
size = (header >> 20) & 0xFFF
pos += 4
if size == 0xFFF:
if pos + 4 > len(data):
break
size = int.from_bytes(data[pos:pos+4], 'little')
pos += 4
if pos + size > len(data):
break
record_data = data[pos:pos+size]
pos += size
if tag_id == 67 and size > 0:
text = decode_para_text(record_data)
if text:
texts.append(text)
except:
pos += 1
continue
return '\n'.join(texts) if texts else None
def decode_para_text(data: bytes) -> str:
result = []
i = 0
while i < len(data) - 1:
code = int.from_bytes(data[i:i+2], 'little')
if code == 0:
pass
elif code == 1:
i += 14
elif code == 2:
i += 14
elif code == 3:
i += 14
elif code == 4:
pass
elif code == 9:
result.append('\t')
elif code == 10:
result.append('\n')
elif code == 13:
result.append('\n')
elif code == 24:
result.append('-')
elif code == 30 or code == 31:
result.append(' ')
elif code < 32:
pass
else:
try:
char = chr(code)
if char.isprintable() or char in '\n\t ':
result.append(char)
except:
pass
i += 2
text = ''.join(result).strip()
text = re.sub(r'[ \t]+', ' ', text)
text = re.sub(r'\n{3,}', '\n\n', text)
return text if len(text) > 2 else None
def extract_text_from_hwp(file_path: str) -> tuple:
print(f"\n📖 [HWP 읽기] {os.path.basename(file_path)}")
text, error = extract_text_with_hwp5txt(file_path)
if text and len(text.strip()) > 20:
print(f" ✅ 성공: {len(text)} 글자")
return text, None
text, error = extract_text_with_olefile(file_path)
if text and len(text.strip()) > 20:
print(f" ✅ 성공: {len(text)} 글자")
return text, None
print(f" ❌ 실패: {error}")
return None, "모든 추출 방법 실패"
def extract_text_from_hwp_or_hwpx(file_path: str) -> tuple:
if is_hwpx_file(file_path):
print(f"\n📖 [HWPX 읽기] {os.path.basename(file_path)}")
return extract_text_from_hwpx(file_path)
else:
return extract_text_from_hwp(file_path)
# ============== HWP 변환 함수들 ==============
def check_hwp_version(file_path):
try:
with open(file_path, 'rb') as f:
header = f.read(32)
if b'HWP Document File' in header:
return "HWP v5", True
elif header[:4] == b'\xd0\xcf\x11\xe0':
return "HWP v5 (OLE)", True
elif header[:4] == b'PK\x03\x04':
return "HWPX", True
else:
return "Unknown", False
except Exception as e:
return f"Error: {e}", False
def convert_to_html_subprocess(input_path, output_dir):
output_path = os.path.join(output_dir, "output.html")
try:
for cmd in [['hwp5html', '--output', output_path, input_path]]:
try:
result = subprocess.run(cmd, capture_output=True, timeout=120)
if result.returncode == 0:
if os.path.exists(output_path):
return output_path, None
for item in os.listdir(output_dir):
item_path = os.path.join(output_dir, item)
if item.lower().endswith(('.html', '.htm')):
return item_path, None
if os.path.isdir(item_path):
return item_path, None
except:
continue
except Exception as e:
print(f"HTML 변환 오류: {e}")
return None, "HTML 변환 실패"
def html_to_markdown(html_content):
if MARKDOWNIFY_AVAILABLE:
try:
return md(html_content, heading_style="ATX", bullets="-"), None
except:
pass
if HTML2TEXT_AVAILABLE:
try:
h = html2text.HTML2Text()
h.body_width = 0
return h.handle(html_content), None
except:
pass
if BS4_AVAILABLE:
try:
soup = BeautifulSoup(html_content, 'html.parser')
return soup.get_text(separator='\n'), None
except:
pass
return None, "Markdown 변환 실패"
def convert_hwp_to_markdown(input_path: str) -> tuple:
text, error = extract_text_from_hwp_or_hwpx(input_path)
if text:
return text, None
return None, error
# ============== LLM API (Groq 라이브러리 사용) ==============
def call_groq_api_stream(messages: List[Dict]) -> Generator[str, None, None]:
"""Groq API 스트리밍 호출 - openai/gpt-oss-120b 모델 사용"""
if not GROQ_AVAILABLE:
yield "❌ Groq 라이브러리가 설치되지 않았습니다. pip install groq"
return
if not GROQ_API_KEY:
yield "❌ GROQ_API_KEY 환경변수가 설정되지 않았습니다."
return
try:
client = Groq(api_key=GROQ_API_KEY)
completion = client.chat.completions.create(
model="openai/gpt-oss-120b",
messages=messages,
temperature=1,
max_completion_tokens=8192,
top_p=1,
reasoning_effort="medium",
stream=True,
stop=None
)
for chunk in completion:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
except Exception as e:
error_msg = str(e)
print(f"❌ Groq API 오류: {error_msg}")
yield f"❌ API 오류: {error_msg}"
def call_fireworks_api_stream(messages: List[Dict], image_base64: str, mime_type: str) -> Generator[str, None, None]:
"""Fireworks API 스트리밍 호출 (이미지 분석용)"""
if not FIREWORKS_API_KEY:
yield "❌ FIREWORKS_API_KEY 환경변수가 설정되지 않았습니다."
return
try:
formatted_messages = [{"role": m["role"], "content": m["content"]} for m in messages[:-1]]
formatted_messages.append({
"role": messages[-1]["role"],
"content": [
{"type": "image_url", "image_url": {"url": f"data:{mime_type};base64,{image_base64}"}},
{"type": "text", "text": messages[-1]["content"]}
]
})
response = requests.post(
"https://api.fireworks.ai/inference/v1/chat/completions",
headers={"Authorization": f"Bearer {FIREWORKS_API_KEY}", "Content-Type": "application/json"},
json={
"model": "accounts/fireworks/models/qwen3-vl-235b-a22b-thinking",
"max_tokens": 4096,
"temperature": 0.6,
"messages": formatted_messages,
"stream": True
},
stream=True
)
if response.status_code != 200:
yield f"❌ Fireworks API 오류: {response.status_code}"
return
for line in response.iter_lines():
if line:
line = line.decode('utf-8')
if line.startswith('data: ') and line[6:] != '[DONE]':
try:
data = json.loads(line[6:])
content = data.get('choices', [{}])[0].get('delta', {}).get('content', '')
if content:
yield content
except:
continue
except Exception as e:
yield f"❌ API 오류: {str(e)}"
# ============== 채팅 처리 ==============
def process_file(file_path: str) -> tuple:
if not file_path:
return None, None, None
filename = os.path.basename(file_path)
if is_image_file(file_path):
return "image", image_to_base64(file_path), get_image_mime_type(file_path)
if is_hwp_file(file_path) or is_hwpx_file(file_path):
text, error = extract_text_from_hwp_or_hwpx(file_path)
if text and len(text.strip()) > 20:
print(f"📄 [문서 내용 추출 완료] {len(text)} 글자")
print(f"📄 [문서 미리보기] {text[:500]}...")
return "text", text, None
return "error", f"한글 문서 추출 실패: {error}", None
if is_pdf_file(file_path):
text = extract_text_from_pdf(file_path)
if text:
print(f"📄 [PDF 내용 추출 완료] {len(text)} 글자")
return "text", text, None
return "error", "PDF 추출 실패", None
if is_text_file(file_path):
text = extract_text_from_txt(file_path)
if text:
return "text", text, None
return "error", "텍스트 읽기 실패", None
return "unsupported", f"지원하지 않는 형식: {filename}", None
def chat_response(message: str, history: List[Dict], file: Optional[str],
session_id: str) -> Generator[tuple, None, None]:
if history is None:
history = []
if not message.strip() and not file:
yield history, session_id
return
if not session_id:
session_id = create_session()
file_type, file_content, file_mime = None, None, None
file_info = None
filename = None
if file:
filename = os.path.basename(file)
file_type, file_content, file_mime = process_file(file)
file_info = json.dumps({"type": file_type, "filename": filename})
if file_type == "error":
history = history + [
{"role": "user", "content": message or "파일 업로드"},
{"role": "assistant", "content": f"❌ {file_content}"}
]
yield history, session_id
return
elif file_type == "unsupported":
history = history + [
{"role": "user", "content": message or "파일 업로드"},
{"role": "assistant", "content": f"⚠️ {file_content}"}
]
yield history, session_id
return
# 사용자 메시지 표시
user_msg = message
if file:
user_msg = f"📎 {filename}\n\n{message}" if message else f"📎 {filename}"
history = history + [{"role": "user", "content": user_msg}, {"role": "assistant", "content": ""}]
yield history, session_id
# 이전 대화 불러오기
db_messages = get_session_messages(session_id, limit=10)
# 시스템 프롬프트 - 문서 분석 강화
system_prompt = """당신은 문서 분석 전문 AI 어시스턴트입니다.
## 핵심 역할
- 사용자가 업로드한 문서의 내용을 **정확하게 분석**하고 **구체적으로 답변**합니다.
- 문서에 있는 **실제 내용**을 기반으로만 답변합니다.
- 문서에 없는 내용은 추측하지 않습니다.
## 문서 분석 방법
1. **문서가 제공되면**: 문서 전체 내용을 꼼꼼히 읽고 핵심 정보를 파악합니다.
2. **요약 요청 시**: 문서의 주제, 목적, 핵심 내용, 주요 항목을 구조화하여 요약합니다.
3. **질문 응답 시**: 문서에서 관련 내용을 찾아 **직접 인용하거나 구체적으로 설명**합니다.
## 답변 형식
- 한국어로 자연스럽고 명확하게 답변합니다.
- 문서 내용을 인용할 때는 구체적으로 언급합니다.
- 긴 문서는 섹션별로 나누어 정리합니다.
## 주의사항
- 문서에 **실제로 있는 내용만** 답변에 포함합니다.
- 불확실한 내용은 "문서에서 확인되지 않습니다"라고 명시합니다."""
api_messages = [{"role": "system", "content": system_prompt}]
# 이전 대화 추가
for m in db_messages:
api_messages.append({"role": m["role"], "content": m["content"]})
# 현재 메시지 구성 - 문서 내용을 명확하게 구분
if file_type == "text" and file_content:
if message:
current_content = f"""## 📄 업로드된 문서 내용 ({filename})
다음은 사용자가 업로드한 문서의 전체 내용입니다:
---
{file_content}
---
## 💬 사용자 질문
{message}
위 문서 내용을 바탕으로 사용자의 질문에 **구체적이고 정확하게** 답변해주세요."""
else:
current_content = f"""## 📄 업로드된 문서 내용 ({filename})
다음은 사용자가 업로드한 문서의 전체 내용입니다:
---
{file_content}
---
## 📋 요청사항
위 문서의 내용을 다음 형식으로 **상세하게 요약**해주세요:
1. **문서 제목/주제**: 문서가 다루는 주요 주제
2. **문서 목적**: 이 문서의 작성 목적
3. **핵심 내용**: 가장 중요한 내용 3-5가지
4. **세부 항목**: 문서에 포함된 주요 섹션이나 항목
5. **결론/요약**: 문서의 핵심 메시지"""
else:
current_content = message or ""
api_messages.append({"role": "user", "content": current_content})
# 디버그 로그
print(f"\n🤖 [API 요청]")
print(f" - 모델: openai/gpt-oss-120b")
print(f" - 메시지 수: {len(api_messages)}")
print(f" - 파일 타입: {file_type}")
print(f" - 문서 길이: {len(file_content) if file_content else 0} 글자")
if file_content:
print(f" - 문서 미리보기: {file_content[:200]}...")
# 응답 생성
full_response = ""
if file_type == "image":
for chunk in call_fireworks_api_stream(api_messages, file_content, file_mime):
full_response += chunk
history[-1] = {"role": "assistant", "content": full_response}
yield history, session_id
else:
for chunk in call_groq_api_stream(api_messages):
full_response += chunk
history[-1] = {"role": "assistant", "content": full_response}
yield history, session_id
# DB 저장
save_message(session_id, "user", current_content, file_info)
save_message(session_id, "assistant", full_response)
if len(db_messages) == 0 and message:
update_session_title(session_id, message[:50])
def new_chat():
return [], create_session(), None
def load_session(session_id: str) -> tuple:
if not session_id:
return [], ""
messages = get_session_messages(session_id, limit=50)
return [{"role": m["role"], "content": m["content"]} for m in messages], session_id
# ============== HWP 변환기 ==============
def convert_to_odt_subprocess(input_path, output_dir):
output_path = os.path.join(output_dir, "output.odt")
try:
result = subprocess.run(['hwp5odt', '--output', output_path, input_path], capture_output=True, timeout=120)
if result.returncode == 0 and os.path.exists(output_path):
return output_path, None
except:
pass
return None, "ODT 변환 실패"
def convert_to_xml_subprocess(input_path, output_dir):
output_path = os.path.join(output_dir, "output.xml")
try:
result = subprocess.run(['hwp5xml', input_path], capture_output=True, timeout=120)
if result.returncode == 0 and result.stdout:
with open(output_path, 'wb') as f:
f.write(result.stdout)
return output_path, None
except:
pass
return None, "XML 변환 실패"
def convert_hwp(file, output_format, progress=gr.Progress()):
if not file:
return None, "❌ 파일을 업로드해주세요.", ""
input_file = file.name if hasattr(file, 'name') else str(file)
ext_lower = Path(input_file).suffix.lower()
if ext_lower not in ['.hwp', '.hwpx']:
return None, "❌ HWP 또는 HWPX 파일만 지원됩니다.", ""
progress(0.1, desc="📖 파일 읽는 중...")
version, is_valid = check_hwp_version(input_file)
if not is_valid:
return None, f"❌ 지원하지 않는 파일: {version}", ""
tmp_dir = tempfile.mkdtemp()
try:
input_filename = os.path.basename(input_file)
input_path = os.path.join(tmp_dir, input_filename)
shutil.copy(input_file, input_path)
progress(0.3, desc=f"🔄 {output_format}로 변환 중...")
output_path, error, ext = None, None, ""
if output_format == "HTML":
if ext_lower == '.hwpx':
return None, "❌ HWPX는 HTML 변환을 지원하지 않습니다.", ""
output_path, error = convert_to_html_subprocess(input_path, tmp_dir)
ext = ".html"
if output_path and os.path.isdir(output_path):
zip_path = shutil.make_archive(os.path.join(tmp_dir, "html"), 'zip', output_path)
output_path, ext = zip_path, ".zip"
elif output_format == "ODT (OpenDocument)":
if ext_lower == '.hwpx':
return None, "❌ HWPX는 ODT 변환을 지원하지 않습니다.", ""
output_path, error = convert_to_odt_subprocess(input_path, tmp_dir)
ext = ".odt"
elif output_format == "TXT (텍스트)":
text, error = extract_text_from_hwp_or_hwpx(input_path)
if text:
output_path = os.path.join(tmp_dir, "output.txt")
with open(output_path, 'w', encoding='utf-8') as f:
f.write(text)
ext = ".txt"
elif output_format == "⭐ MARKDOWN (추천)":
text, error = convert_hwp_to_markdown(input_path)
if text:
output_path = os.path.join(tmp_dir, "output.md")
with open(output_path, 'w', encoding='utf-8') as f:
f.write(text)
ext = ".md"
elif output_format == "XML":
if ext_lower == '.hwpx':
try:
with zipfile.ZipFile(input_path, 'r') as zf:
xml_contents = []
for name in zf.namelist():
if name.endswith('.xml'):
with zf.open(name) as f:
xml_contents.append(f"<!-- {name} -->\n{f.read().decode('utf-8', errors='ignore')}")
output_path = os.path.join(tmp_dir, "output.xml")
with open(output_path, 'w', encoding='utf-8') as f:
f.write('\n\n'.join(xml_contents))
except Exception as e:
error = f"HWPX XML 추출 실패: {e}"
else:
output_path, error = convert_to_xml_subprocess(input_path, tmp_dir)
ext = ".xml"
if not output_path:
return None, f"❌ {error or '변환 실패'}", ""
if not os.path.exists(output_path):
return None, "❌ 변환된 파일을 찾을 수 없습니다.", ""
progress(0.8, desc="✅ 완료 중...")
base_name = Path(input_filename).stem
final_output = os.path.join(tmp_dir, f"{base_name}{ext}")
if output_path != final_output:
shutil.copy2(output_path, final_output)
file_size = os.path.getsize(final_output)
size_str = f"{file_size/1024:.1f} KB" if file_size > 1024 else f"{file_size} bytes"
preview = ""
if ext in ['.txt', '.md', '.xml']:
try:
with open(final_output, 'r', encoding='utf-8', errors='ignore') as f:
preview = f.read(5000)
if len(preview) >= 5000:
preview += "\n\n... (생략)"
except:
pass
elif ext == '.zip':
preview = "📦 HTML이 ZIP으로 압축되었습니다."
progress(1.0, desc="🎉 완료!")
return final_output, f"✅ 변환 완료: {base_name}{ext} ({size_str})", preview
except Exception as e:
import traceback
traceback.print_exc()
return None, f"❌ 오류: {str(e)}", ""
# ============== Gradio UI ==============
with gr.Blocks(title="HWPower AI 어시스턴트", css=COMIC_CSS, delete_cache=(60, 60)) as demo:
# HOME Button
gr.HTML("""
<div class="home-button-container">
<a href="https://www.humangen.ai" target="_blank" class="home-button">
🏠 HOME
</a>
<span class="url-display">🌐 www.humangen.ai</span>
</div>
""")
# Header
gr.HTML("""
<div class="header-container">
<div class="header-title">📄 HWPower AI 어시스턴트 🤖</div>
<div class="header-subtitle">AI가 HWP 파일을 읽고, 보고, 말하며, 생각하고 기억합니다!</div>
<div style="margin-top:12px">
<span class="stats-badge">📖 읽기 READ</span>
<span class="stats-badge">👁️ 보기 SEE</span>
<span class="stats-badge">💬 말하기 SPEAK</span>
<span class="stats-badge">🧠 생각 THINK</span>
<span class="stats-badge">💾 기억 MEMORY</span>
</div>
</div>
""")
# 무료 서비스 안내
gr.HTML("""
<div class="free-service-notice">
🆓 본 서비스는 <b>무료 버전</b>으로 일부 기능에 제약이 있습니다.<br>
📧 문의: <a href="mailto:arxivgpt@gmail.com">arxivgpt@gmail.com</a>
</div>
""")
session_state = gr.State("")
with gr.Tabs():
# Tab 1: AI 채팅
with gr.Tab("💬 AI 채팅"):
# Feature Box
with gr.Row():
with gr.Column(scale=1):
gr.HTML("""
<div class="info-box">
📁 <b>지원 파일 형식</b><br><br>
🖼️ <b>이미지</b>: JPG, PNG, GIF, WebP<br>
📑 <b>문서</b>: PDF, TXT, MD<br>
📄 <b>한글</b>: HWP, HWPX ✨
</div>
""")
new_btn = gr.Button("🆕 새 대화 시작", variant="primary")
with gr.Accordion("📜 대화 기록 (Memory)", open=False):
session_list = gr.Dataframe(headers=["ID", "제목", "시간"], interactive=False)
refresh_btn = gr.Button("🔄 새로고침", size="sm")
with gr.Column(scale=3):
chatbot = gr.Chatbot(label="💬 AI 대화", height=500)
with gr.Row():
file_upload = gr.File(
label="📎 파일 첨부 (HWP/HWPX/PDF/이미지)",
file_types=[".jpg", ".jpeg", ".png", ".gif", ".webp", ".pdf", ".txt", ".md", ".hwp", ".hwpx"],
scale=1,
elem_classes=["upload-box"]
)
msg_input = gr.Textbox(
placeholder="💭 메시지를 입력하세요... (파일을 업로드하면 AI가 내용을 읽고 분석합니다)",
lines=2,
show_label=False,
scale=4
)
with gr.Row():
submit_btn = gr.Button("🚀 전송", variant="primary", scale=3)
clear_btn = gr.Button("🗑️ 지우기", scale=1)
# Tab 2: HWP 변환기
with gr.Tab("📄 HWP 변환기"):
gr.HTML("""
<div class="feature-box">
<div class="feature-title">🔄 HWP/HWPX 파일 변환기</div>
<p style="font-family: 'Comic Neue', cursive; font-weight: 700; color: #1F2937;">
한글 문서를 다양한 형식으로 변환합니다. AI가 문서를 읽고 텍스트를 추출합니다.
</p>
</div>
""")
# Markdown 강조 박스
gr.HTML("""
<div class="markdown-highlight-box">
<div class="markdown-title">⭐ MARKDOWN 변환 추천! ⭐</div>
<div class="markdown-benefits">
<div class="markdown-benefit-item">
<span class="markdown-benefit-icon">🤖</span>
<b>AI/LLM 최적화</b><br>
ChatGPT, Claude 등 AI에 바로 입력 가능
</div>
<div class="markdown-benefit-item">
<span class="markdown-benefit-icon">📝</span>
<b>범용 포맷</b><br>
GitHub, Notion, 블로그 등 어디서나 사용
</div>
<div class="markdown-benefit-item">
<span class="markdown-benefit-icon">🔍</span>
<b>구조 유지</b><br>
제목, 목록, 표 등 문서 구조 보존
</div>
<div class="markdown-benefit-item">
<span class="markdown-benefit-icon">⚡</span>
<b>가볍고 빠름</b><br>
용량이 작고 처리 속도 빠름
</div>
<div class="markdown-benefit-item">
<span class="markdown-benefit-icon">🔄</span>
<b>변환 용이</b><br>
HTML, PDF, Word 등으로 재변환 가능
</div>
<div class="markdown-benefit-item">
<span class="markdown-benefit-icon">✏️</span>
<b>편집 간편</b><br>
메모장으로도 바로 수정 가능
</div>
</div>
</div>
""")
with gr.Row():
with gr.Column():
gr.HTML('<div class="info-box">📤 <b>파일 업로드</b></div>')
hwp_input = gr.File(
label="HWP/HWPX 파일 선택",
file_types=[".hwp", ".hwpx"],
elem_classes=["upload-box"]
)
format_select = gr.Radio(
["⭐ MARKDOWN (추천)", "TXT (텍스트)", "HTML", "ODT (OpenDocument)", "XML"],
value="⭐ MARKDOWN (추천)",
label="📋 변환 형식"
)
convert_btn = gr.Button("🔄 변환하기", variant="primary", size="lg")
with gr.Column():
gr.HTML('<div class="info-box">📥 <b>변환 결과</b></div>')
status_out = gr.Textbox(label="상태", interactive=False)
file_out = gr.File(label="다운로드", elem_classes=["download-box"])
with gr.Accordion("📋 미리보기", open=False):
preview_out = gr.Textbox(lines=15, interactive=False)
gr.HTML("""
<div class="info-box">
ℹ️ <b>안내</b>: 변환 서비스는 개인용도로 사용시 어떠한 제약도 없습니다. * Special Thanks: june9713@gmail.com *
</div>
""")
# Footer
gr.HTML("""
<div class="footer-comic">
<p style="font-family:'Bangers',cursive;font-size:1.8rem;letter-spacing:2px">📄 HWP AI 어시스턴트 🤖</p>
<p>AI가 HWP 파일을 읽고, 보고, 말하며, 생각하고 기억합니다!</p>
<p>📖 READ • 👁️ SEE • 💬 SPEAK • 🧠 THINK • 💾 MEMORY</p>
<p style="margin-top:8px;font-size:0.9rem;">🆓 무료 서비스 (일부 기능 제한) | 📧 arxivgpt@gmail.com</p>
<p style="margin-top:10px"><a href="https://www.humangen.ai" target="_blank" style="color:#FACC15;text-decoration:none;font-weight:bold;">🏠 www.humangen.ai</a></p>
</div>
""")
# ============== 이벤트 핸들러 ==============
def on_submit(msg, hist, f, sid):
if hist is None:
hist = []
for r in chat_response(msg, hist, f, sid):
yield r[0], r[1], "", None
submit_btn.click(on_submit, [msg_input, chatbot, file_upload, session_state],
[chatbot, session_state, msg_input, file_upload])
msg_input.submit(on_submit, [msg_input, chatbot, file_upload, session_state],
[chatbot, session_state, msg_input, file_upload])
new_btn.click(lambda: ([], create_session(), None, ""), outputs=[chatbot, session_state, file_upload, msg_input])
clear_btn.click(lambda: ([], None, ""), outputs=[chatbot, file_upload, msg_input])
def refresh():
sessions = get_all_sessions()
return [[s["session_id"][:8], s["title"] or "제목없음", s["updated_at"][:16] if s["updated_at"] else ""] for s in sessions]
refresh_btn.click(refresh, outputs=[session_list])
def select_session(evt: gr.SelectData, data):
if evt.index[0] < len(data):
for s in get_all_sessions():
if s["session_id"].startswith(data[evt.index[0]][0]):
return load_session(s["session_id"])
return [], ""
session_list.select(select_session, [session_list], [chatbot, session_state])
convert_btn.click(convert_hwp, [hwp_input, format_select], [file_out, status_out, preview_out])
demo.load(refresh, outputs=[session_list])
if __name__ == "__main__":
demo.launch(ssr_mode=False)