Spaces:
Running
Running
| # ============================================================ | |
| # 한지(HANJI) · HWP AI Agent 서비스 — App Router | |
| # core.so (또는 core.py)에서 엔진을 import | |
| # ============================================================ | |
| import os, re, json, time, tempfile, threading | |
| import gradio as gr | |
| # ── core 엔진 import (.so 또는 .py) ── | |
| import core | |
| from core import * | |
| def build_ui(): | |
| with gr.Blocks(title="한지(HANJI) · HWP AI Agent 서비스") as app: | |
| gr.HTML(f"<style>{SOMA_CUSTOM_CSS}</style>") | |
| # ── Top Bar ── | |
| gr.HTML(""" | |
| <div class="soma-topbar"> | |
| <span class="soma-logo">한지<em>(HANJI)</em></span> | |
| <span class="soma-sep"></span> | |
| <span class="soma-desc">HWP AI Agent 서비스</span> | |
| <a class="soma-url" href="https://hanji.ginigen.ai" target="_blank">🔗 hanji.ginigen.ai</a> | |
| <span class="soma-right"> | |
| <a class="soma-contact" href="mailto:ginigenaihp@gmail.com">📧 문의 · 온프레미스 · 제휴</a> | |
| </span> | |
| </div>""") | |
| # ── States ── | |
| ref_text_state = gr.State("") | |
| ref_hwpx_path_state = gr.State("") | |
| state = gr.State({"final_doc": "", "search_count": 0}) | |
| dummy_state = gr.State("") | |
| doc_text_state = gr.State("") | |
| _transform_result_path = "" # 문서 변환 결과 경로 (run_soma에서 설정) | |
| # ══════════════════════════════════════════════════ | |
| # MAIN LAYOUT: Left 1/3 Controls | Right 2/3 Viewer | |
| # ══════════════════════════════════════════════════ | |
| with gr.Row(equal_height=False): | |
| # ── LEFT PANEL (1/3) ────────────────────────── | |
| with gr.Column(scale=1, min_width=320): | |
| # Prompt | |
| prompt_input = gr.Textbox( | |
| label="📌 프롬프트", | |
| placeholder="예: 2026년 AI 보안 유망기업 육성 지원사업 공모 안내문을 작성해주세요.", | |
| lines=3) | |
| # File upload | |
| ref_file_upload = gr.File( | |
| label="📎 레퍼런스 문서", | |
| file_types=[".hwp",".hwpx",".hml",".pdf",".docx",".txt",".md", | |
| ".csv",".json",".xml",".xlsx",".xls",".py",".html",".log"]) | |
| ref_upload_status = gr.Textbox(label="파일 상태", interactive=False, lines=2, | |
| placeholder="레퍼런스 파일을 업로드하면 여기에 상태가 표시됩니다.") | |
| # Generation Mode | |
| mode_radio = gr.Radio( | |
| choices=[ | |
| "새로 생성 — AI가 주제에 맞는 문서를 처음부터 작성", | |
| "서식 유지 · 내용 변경 — 원본 레이아웃 100% 보존, 텍스트만 교체", | |
| "구조 참고 · 새로 생성 — 원본 구조를 참고하여 새 내용으로 작성", | |
| ], | |
| value="새로 생성 — AI가 주제에 맞는 문서를 처음부터 작성", | |
| label="⚙️ 생성 모드", | |
| interactive=True) | |
| mode_state = gr.State(1) # 1=새로, 2=서식유지, 3=구조참고 | |
| # Settings (compact) | |
| with gr.Row(): | |
| max_search_slider = gr.Slider(minimum=5, maximum=100, value=20, step=5, | |
| label="🔍 검색", scale=1) | |
| temperature_slider = gr.Slider(minimum=0.1, maximum=1.0, value=0.6, step=0.05, | |
| label="🌡 Temp", scale=1) | |
| # Action buttons | |
| with gr.Row(): | |
| run_btn = gr.Button("🚀 문서 생성", variant="primary", scale=2) | |
| stop_btn = gr.Button("⛔", variant="secondary", scale=0) | |
| # Status indicator | |
| search_counter = gr.Markdown("대기 중") | |
| # HWPX Download | |
| with gr.Row(): | |
| gen_hml_btn = gr.Button("📥 HWPX 변환", variant="primary", scale=2) | |
| copy_text_btn = gr.Button("📋", variant="secondary", scale=0) | |
| hml_status = gr.Textbox(label="", interactive=False, value="", | |
| placeholder="HWPX 변환 상태", lines=1) | |
| hml_file = gr.File(label="다운로드", file_types=[".hwpx"], visible=True) | |
| # Generated text (collapsed) | |
| with gr.Accordion("📝 생성된 텍스트", open=False): | |
| final_doc_box = gr.Textbox(label="", value="", interactive=True, lines=12, | |
| placeholder="SOMA 파이프라인 실행 후 최종 문서 텍스트") | |
| # Pipeline internals (collapsed) | |
| with gr.Accordion("🧬 파이프라인 로그", open=False): | |
| agent_stream = gr.Textbox(label="Agent Stream", value="", interactive=False, lines=6) | |
| search_log_box = gr.Textbox(label="Search Log", value="", interactive=False, lines=4) | |
| agent_log_box = gr.Textbox(label="Pipeline Log", value="", interactive=False, lines=6) | |
| # Doc Chat (collapsed) | |
| with gr.Accordion("📎 문서 분석 챗", open=False): | |
| doc_upload = gr.File(label="📄 문서 업로드", | |
| file_types=[".hwp",".hwpx",".hml",".pdf",".docx",".txt",".md", | |
| ".csv",".json",".xml",".xlsx",".xls",".py",".html",".log"]) | |
| doc_upload_status = gr.Textbox(label="", interactive=False, lines=1) | |
| doc_chatbot = gr.Chatbot(label="💬 Chat", height=200) | |
| with gr.Row(): | |
| doc_msg = gr.Textbox(label="", placeholder="질문하세요...", lines=1, scale=4) | |
| doc_send_btn = gr.Button("🚀", variant="primary", scale=0) | |
| doc_clear_btn = gr.Button("🗑️ Clear", size="sm") | |
| # ── 문서 변환 (XML 직접 치환) ── | |
| with gr.Accordion("🔄 문서 변환 (서식 100% 보존)", open=False): | |
| gr.HTML('<div style="font-size:11px;color:#475569;padding:4px 0;border-bottom:1px solid #e2e8f0">' | |
| '원본 HWPX의 XML 구조를 보존하면서 LLM이 맥락을 이해하여 텍스트만 교체합니다.' | |
| '</div>') | |
| transform_file = gr.File(label="📂 원본 HWPX 업로드", file_types=[".hwpx"]) | |
| transform_instruction = gr.Textbox( | |
| label="📝 변환 지시", | |
| placeholder="예: 경기도→서울, 노인말벗서비스→청년창업지원벗서비스로 변경하되 맥락에 맞게 조정", | |
| lines=3, | |
| ) | |
| transform_temp = gr.Slider(0.0, 1.0, 0.3, step=0.1, label="Temperature (낮을수록 정확)") | |
| transform_btn = gr.Button("🔄 변환 실행", variant="primary", size="lg") | |
| transform_status = gr.Textbox(label="상태", interactive=False) | |
| transform_diff = gr.HTML(label="변경 사항") | |
| transform_output = gr.File(label="📥 변환된 HWPX 다운로드") | |
| # ── RIGHT PANEL (2/3) — DOCUMENT VIEWER ────── | |
| with gr.Column(scale=2, min_width=500, elem_classes=["viewer-panel"]): | |
| viewer_main = gr.HTML(value=_SAMPLE_PREVIEW) | |
| # ── Hidden component for ohaeng (pipeline needs it) ── | |
| ohaeng_display = gr.HTML(value="", visible=False) | |
| # ── Event Handlers | |
| def handle_ref_upload(file): | |
| if file is None: | |
| return "", "", "", _viewer_empty("파일을 선택하면 여기에 미리보기가 표시됩니다.") | |
| fpath = file.name if hasattr(file, 'name') else str(file) | |
| fname = os.path.basename(fpath) | |
| ext = Path(fpath).suffix.lower() | |
| # ── 바이너리 HWP 감지 → 변환 안내 ── | |
| if ext == '.hwp' and _is_binary_hwp(fpath): | |
| text, err = process_uploaded_file(fpath) | |
| preview = hwpx_to_html_preview(fpath) | |
| if text: | |
| status = f"📄 {fname} ({len(text):,}자 추출)\n\n{_HWP_CONVERT_GUIDE}" | |
| return text, status, "", preview | |
| return "", f"❌ {fname}: {err}", "", preview | |
| # ── HWPX 파일 → 스타일 복원 모드 ── | |
| hwpx_path = "" | |
| if ext == '.hwpx': | |
| try: | |
| with zipfile.ZipFile(fpath, 'r') as zf: | |
| if 'Contents/header.xml' in zf.namelist(): | |
| hwpx_path = fpath | |
| styles = analyze_hwpx_styles(fpath) | |
| print(f"📋 레퍼런스 HWPX 분석 완료: charPr {styles['char_count']}개, " | |
| f"paraPr {styles['para_count']}개, " | |
| f"borderFill {styles['bf_count']}개") | |
| except: | |
| pass | |
| # ── 그 외 파일 ── | |
| text, err = process_uploaded_file(fpath) | |
| # 뷰어: HWP/HWPX만 렌더링 | |
| if ext in ('.hwp', '.hwpx'): | |
| preview = hwpx_to_html_preview(fpath) | |
| else: | |
| preview = _viewer_empty(f"{fname} — HWP/HWPX 파일만 미리보기 지원됩니다.") | |
| if text: | |
| status = f"✅ {fname} ({len(text):,}자)" | |
| if hwpx_path: | |
| status += "\n🔄 '서식 유지 · 내용 변경' 및 '구조 참고 · 새로 생성' 모드 사용 가능" | |
| return text, status, hwpx_path, preview | |
| return "", f"❌ {fname}: {err}", "", preview | |
| ref_file_upload.change(fn=handle_ref_upload, inputs=[ref_file_upload], | |
| outputs=[ref_text_state, ref_upload_status, ref_hwpx_path_state, viewer_main]) | |
| def _radio_to_mode(radio_val): | |
| """라디오 레이블 → 모드 번호 변환""" | |
| if not radio_val: | |
| return 1 | |
| if "서식 유지" in radio_val: | |
| return 2 | |
| if "구조 참고" in radio_val: | |
| return 3 | |
| return 1 | |
| def run_soma(prompt, max_search, temperature, ref_text, ref_hwpx_path="", mode_val=1): | |
| mode = mode_val if isinstance(mode_val, int) else _radio_to_mode(str(mode_val)) | |
| if not prompt.strip(): | |
| yield (ohaeng_cards_html("水"), "⚠️ 프롬프트를 입력하세요.", "", "", "", "대기 중", "") | |
| return | |
| # ════════════════════════════════════════════════════════ | |
| # MODE 2: 서식 유지 · 내용 변경 (XML 직접 치환) | |
| # ════════════════════════════════════════════════════════ | |
| if mode == 2 and ref_hwpx_path and os.path.exists(ref_hwpx_path): | |
| # ── XML 직접 치환 모드: SOMA 전체 바이패스 ── | |
| yield (ohaeng_cards_html("水"), | |
| "🔄 **서식 유지 · 내용 변경** 모드 — XML 직접 치환 (서식 100% 보존)\n\n" | |
| "📖 원본 텍스트 노드 추출 중...\n", | |
| "", "🔄 Mode 2: XML 키워드 치환\n", "", "🔄 변환 중", "") | |
| try: | |
| text_list, raw_xml, orig_flags = extract_text_nodes(ref_hwpx_path) | |
| yield (ohaeng_cards_html("木"), | |
| f"📖 텍스트 노드 {len(text_list)}개 추출 완료\n\n" | |
| f"🤖 LLM 키워드 매핑 생성 중...\n", | |
| "", f"📖 {len(text_list)}개 노드 추출\n", "", "🔄 LLM 분석 중", "") | |
| mapping = generate_keyword_mapping(raw_xml, prompt, temperature) | |
| if not mapping: | |
| yield (ohaeng_cards_html("金"), | |
| "⚠️ 변경할 키워드가 없습니다. 지시를 더 구체적으로 입력하세요.", | |
| "", "❌ 매핑 0건\n", "", "⚠️ 변경 없음", "") | |
| return | |
| yield (ohaeng_cards_html("火"), | |
| f"🤖 키워드 매핑 {len(mapping)}쌍 생성\n\n" | |
| f"🔧 XML 적용 중...\n", | |
| "", f"🤖 {len(mapping)}쌍 매핑\n", "", "🔧 적용 중", "") | |
| new_xml, details = apply_keyword_mapping(raw_xml, mapping) | |
| output_path = repack_transform_hwpx(ref_hwpx_path, new_xml, orig_flags) | |
| orig_name = os.path.splitext(os.path.basename(ref_hwpx_path))[0] | |
| final_name = f"{orig_name}_변환.hwpx" | |
| final_path = os.path.join(os.path.dirname(output_path), final_name) | |
| os.rename(output_path, final_path) | |
| total_count = sum(d.get("count",0) for d in details) | |
| summary_lines = [] | |
| for d in details: | |
| summary_lines.append(f"• '{d['original']}' → '{d['replacement']}' ({d.get('count',0)}회)") | |
| summary = "\n".join(summary_lines) | |
| final_doc = ( | |
| f"## 🔄 문서 변환 완료 (서식 100% 보존)\n\n" | |
| f"**{len(details)}개 키워드 · {total_count}회 치환**\n\n" | |
| f"{summary}\n\n" | |
| f"---\n" | |
| f"✅ header.xml: 원본 그대로\n" | |
| f"✅ 이미지/스크립트: 원본 그대로\n" | |
| f"✅ charPr/paraPr: 원본 그대로\n" | |
| f"✅ 문단 구조: 원본 그대로\n" | |
| f"✅ section0.xml: 키워드만 {total_count}회 치환\n" | |
| ) | |
| preview = hwpx_to_html_preview(final_path) if 'hwpx_to_html_preview' in dir() else "" | |
| yield (ohaeng_cards_html("金"), | |
| f"🎉 **문서 변환 완료!**\n\n" | |
| f"서식 100% 보존 · {len(details)}개 키워드 · {total_count}회 치환\n\n" | |
| f"아래 'HWPX 생성' 버튼으로 다운로드하세요.\n", | |
| "", f"✅ 변환 완료: {total_count}회\n", final_doc, | |
| f"✅ 변환 완료", final_doc) | |
| nonlocal _transform_result_path | |
| _transform_result_path = final_path | |
| return | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| yield (ohaeng_cards_html("金"), | |
| f"❌ 변환 오류: {e}\n\n모드를 '새로 생성'으로 변경하여 다시 시도하세요.", | |
| "", f"❌ {e}\n", "", "❌ 오류", "") | |
| return | |
| # ════════════════════════════════════════════════════════ | |
| # MODE 1 & 3: SOMA 파이프라인 (문서 신규 생성) | |
| # ════════════════════════════════════════════════════════ | |
| full_prompt = prompt | |
| if mode == 3 and ref_text and ref_text.strip(): | |
| # MODE 3: 참조 문서의 구조 골격을 압축 추출하여 주입 | |
| structure = extract_structure_summary(ref_text) | |
| full_prompt = f"{prompt}\n\n{structure}" | |
| elif ref_text and ref_text.strip(): | |
| # MODE 1: 레퍼런스 텍스트가 있으면 참고자료로만 활용 | |
| ref_content = ref_text.strip()[:8000] | |
| full_prompt = f"{prompt}\n\n[참고자료]\n{ref_content}" | |
| stream_acc, log_acc, search_log, final_doc = "", "", "", "" | |
| active_agent, sc = "水", 0 | |
| for chunk in soma_pipeline(full_prompt, int(max_search), temperature): | |
| if chunk.get("done"): | |
| final_doc = chunk.get("final_doc", "") | |
| sc = chunk.get("search_count", sc) | |
| log_acc = chunk.get("log", "") | |
| search_log = chunk.get("search_log", "") | |
| break | |
| active_agent = chunk.get("active", active_agent) | |
| tok = chunk.get("stream", "") | |
| if tok: | |
| stream_acc += tok | |
| if len(stream_acc) > 8000: | |
| stream_acc = "...(이전 생략)...\n" + stream_acc[-6000:] | |
| if chunk.get("log"): log_acc = chunk["log"] | |
| if chunk.get("search_log"): search_log = chunk["search_log"] | |
| if chunk.get("search_count") is not None: sc = chunk["search_count"] | |
| if chunk.get("final_doc"): final_doc = chunk["final_doc"] | |
| yield (ohaeng_cards_html(active_agent), stream_acc, search_log, log_acc, | |
| final_doc if final_doc else "", f"🔍 {sc} / {int(max_search)}", "") | |
| yield (ohaeng_cards_html("金"), stream_acc + "\n\n🎉 완료!", search_log, log_acc, | |
| final_doc, f"✅ 완료: {sc}회 검색", final_doc) | |
| run_btn.click( | |
| fn=run_soma, | |
| inputs=[prompt_input, max_search_slider, temperature_slider, ref_text_state, ref_hwpx_path_state, mode_radio], | |
| outputs=[ohaeng_display, agent_stream, search_log_box, agent_log_box, final_doc_box, search_counter, dummy_state]) | |
| def make_hml(doc_text, ref_hwpx_path, mode_val=1): | |
| mode = mode_val if isinstance(mode_val, int) else _radio_to_mode(str(mode_val)) | |
| nonlocal _transform_result_path | |
| # ── MODE 2: 문서 변환 결과가 있으면 바로 반환 ── | |
| if mode == 2 and _transform_result_path and os.path.exists(_transform_result_path): | |
| path = _transform_result_path | |
| _transform_result_path = "" # 1회 사용 후 리셋 | |
| preview = hwpx_to_html_preview(path) | |
| return path, "✅ 문서 변환 완료 (서식 100% 보존) — XML 직접 치환", preview | |
| if not doc_text or not doc_text.strip(): | |
| return None, "❌ 문서를 먼저 생성하세요.", _viewer_empty("HWPX 생성 후 여기에 표시됩니다.") | |
| try: | |
| # MODE 3: 레퍼런스 HWPX 구조 참고 → SectionCloner | |
| if mode == 3 and ref_hwpx_path and os.path.exists(ref_hwpx_path): | |
| path = generate_hwpx(doc_text.strip(), ref_hwpx_path=ref_hwpx_path) | |
| gen_mode = "🧩 구조 참고 · SectionCloner" | |
| elif ref_hwpx_path and os.path.exists(ref_hwpx_path): | |
| path = generate_hwpx(doc_text.strip(), ref_hwpx_path=ref_hwpx_path) | |
| gen_mode = "🎯 레퍼런스 스타일 복원" | |
| else: | |
| path = generate_hwpx(doc_text.strip()) | |
| gen_mode = "📄 report 템플릿" | |
| title = normalize_text_for_title(doc_text.strip()) | |
| safe_title = re.sub(r'[\\/:*?"<>|]', '', title)[:40].strip() or "문서" | |
| new_path = os.path.join(os.path.dirname(path), f"{safe_title}.hwpx") | |
| os.rename(path, new_path) | |
| # page_guard 결과 표시 | |
| status = f"✅ 생성 완료 ({gen_mode})" | |
| if ref_hwpx_path and os.path.exists(ref_hwpx_path): | |
| guard = page_guard_check(ref_hwpx_path, new_path) | |
| if guard["status"] == "PASS": | |
| status += f" | 📏 page_guard PASS (ref={guard['ref_chars']}자 → out={guard['out_chars']}자)" | |
| else: | |
| status += f" | ⚠️ page_guard {len(guard['errors'])}건 경고" | |
| # 생성된 HWPX 뷰어 렌더링 | |
| preview = hwpx_to_html_preview(new_path) | |
| return new_path, status, preview | |
| except Exception as e: | |
| return None, f"❌ 오류: {e}", _viewer_empty(f"생성 오류: {e}") | |
| gen_hml_btn.click(fn=make_hml, inputs=[final_doc_box, ref_hwpx_path_state, mode_radio], | |
| outputs=[hml_file, hml_status, viewer_main]) | |
| # Doc Chat handlers | |
| def handle_doc_upload(file): | |
| if file is None: | |
| return "", "파일을 선택해주세요." | |
| fpath = file.name if hasattr(file, 'name') else str(file) | |
| fname = os.path.basename(fpath) | |
| ext = Path(fpath).suffix.lower() | |
| # 바이너리 HWP 감지 | |
| is_bin_hwp = (ext == '.hwp' and _is_binary_hwp(fpath)) | |
| text, err = process_uploaded_file(fpath) | |
| if text: | |
| status = f"✅ {fname} ({len(text):,}자)" | |
| if is_bin_hwp: | |
| status = f"📄 {fname} ({len(text):,}자 추출) — 바이너리 HWP (텍스트만 추출됨)" | |
| return text, status | |
| return "", f"❌ {fname}: {err}" | |
| doc_upload.change(fn=handle_doc_upload, inputs=[doc_upload], outputs=[doc_text_state, doc_upload_status]) | |
| doc_send_btn.click(fn=doc_chat_respond, inputs=[doc_msg, doc_chatbot, doc_text_state], outputs=[doc_chatbot]) | |
| doc_msg.submit(fn=doc_chat_respond, inputs=[doc_msg, doc_chatbot, doc_text_state], outputs=[doc_chatbot]) | |
| doc_clear_btn.click(fn=lambda: ([], ""), outputs=[doc_chatbot, doc_text_state]) | |
| # ── 문서 변환 이벤트 핸들러 ── | |
| def handle_transform(hwpx_file, instruction, temperature): | |
| if hwpx_file is None: | |
| return None, "❌ HWPX 파일을 업로드하세요.", "" | |
| if not instruction or not instruction.strip(): | |
| return None, "❌ 변환 지시를 입력하세요.", "" | |
| fpath = hwpx_file.name if hasattr(hwpx_file, 'name') else str(hwpx_file) | |
| try: | |
| output_path, replacements, diff_html = transform_hwpx( | |
| fpath, instruction.strip(), temperature) | |
| orig_name = os.path.splitext(os.path.basename(fpath))[0] | |
| new_name = f"{orig_name}_변환.hwpx" | |
| final_path = os.path.join(os.path.dirname(output_path), new_name) | |
| os.rename(output_path, final_path) | |
| return final_path, f"✅ 변환 완료: {len(replacements)}건 변경 | 서식 100% 보존", diff_html | |
| except Exception as e: | |
| return None, f"❌ 오류: {e}", f"<p style='color:red'>{e}</p>" | |
| transform_btn.click( | |
| fn=handle_transform, | |
| inputs=[transform_file, transform_instruction, transform_temp], | |
| outputs=[transform_output, transform_status, transform_diff]) | |
| return app | |
| # ============================================================ | |
| # ⑧ Entry Point — FastAPI 메인 + Gradio 서브마운트 | |
| # ============================================================ | |
| from fastapi import FastAPI, Request as _FAReq | |
| from fastapi.responses import FileResponse, JSONResponse, HTMLResponse, StreamingResponse | |
| import uvicorn | |
| # ── FastAPI 메인 앱 ── | |
| app = FastAPI() | |
| _APP_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| _index_path = os.path.join(_APP_DIR, "index.html") | |
| # ── ohah/hwpjs 백그라운드 설치 ── | |
| threading.Thread(target=_install_hwpjs, daemon=True).start() | |
| # ── "/" → index.html 서빙 ── | |
| async def _serve_index(): | |
| if os.path.exists(_index_path): | |
| return FileResponse(_index_path, media_type="text/html") | |
| return HTMLResponse("<h1>index.html not found</h1>", status_code=404) | |
| async def _serve_ui(): | |
| return await _serve_index() | |
| # ── SOMA API ── | |
| import asyncio as _asyncio | |
| import queue as _queue | |
| _file_registry = {} | |
| _doc_text_store = {} # sid → text | |
| _doc_hwpx_store = {} # sid → hwpx file path (변환 모드용) | |
| _transform_store = {} # sid → 변환 결과 hwpx path | |
| _last_transform = {"path": "", "ts": 0} # 마지막 변환 결과 (index.html용) | |
| async def _soma_run(req: _FAReq): | |
| try: | |
| body = await req.json() | |
| prompt = body.get("prompt", "").strip() | |
| max_search = int(body.get("max_search", 20)) | |
| temperature = float(body.get("temperature", 0.6)) | |
| ref_text = body.get("ref_text", "") # 직접 전달 | |
| ref_sid = body.get("ref_sid", "") # doc-upload에서 받은 sid | |
| if not ref_text and ref_sid: | |
| ref_text = _doc_text_store.get(ref_sid, "") | |
| if not prompt: | |
| return JSONResponse({"error": "prompt 없음"}, status_code=400) | |
| # ════════════════════════════════════════════════════════ | |
| # 모드 분기: 1=새로 생성, 2=서식 유지·내용 변경, 3=구조 참고·새로 생성 | |
| # ════════════════════════════════════════════════════════ | |
| mode = int(body.get("mode", 1)) | |
| ref_hwpx_path = _doc_hwpx_store.get(ref_sid, "") | |
| # ── 디버그 로그 ── | |
| print(f"[MODE] mode={mode} ref_sid='{ref_sid}' hwpx='{ref_hwpx_path}' exists={os.path.exists(ref_hwpx_path) if ref_hwpx_path else False}") | |
| print(f"[MODE] prompt[:100]='{prompt[:100]}'") | |
| # ════════════════════════════════════════════════════════ | |
| # MODE 2: 서식 유지 · 내용 변경 (XML 직접 치환) | |
| # ════════════════════════════════════════════════════════ | |
| if mode == 2 and ref_hwpx_path and os.path.exists(ref_hwpx_path): | |
| # ── XML 직접 치환 모드: SOMA 전체 바이패스 ── | |
| def _transform_in_thread(): | |
| try: | |
| q.put(json.dumps({"active": "水", | |
| "stream": "🔄 **문서 변환 모드** — 키워드 매핑 치환 (서식 100% 보존)\n\n📖 텍스트 추출 중...\n"}, ensure_ascii=False)) | |
| text_list, raw_xml, orig_flags = extract_text_nodes(ref_hwpx_path) | |
| q.put(json.dumps({"active": "木", | |
| "stream": f"📖 텍스트 노드 {len(text_list)}개 추출\n🤖 LLM 키워드 매핑 생성 중...\n"}, ensure_ascii=False)) | |
| mapping = generate_keyword_mapping(raw_xml, prompt, temperature) | |
| if not mapping: | |
| q.put(json.dumps({"active": "金", "done": True, | |
| "final_doc": "⚠️ 변경할 키워드가 없습니다.", | |
| "stream": "⚠️ 매핑 0건\n"}, ensure_ascii=False)) | |
| return | |
| q.put(json.dumps({"active": "火", | |
| "stream": f"🤖 {len(mapping)}쌍 매핑 생성\n🔧 XML 적용 중...\n"}, ensure_ascii=False)) | |
| new_xml, details = apply_keyword_mapping(raw_xml, mapping) | |
| output_path = repack_transform_hwpx(ref_hwpx_path, new_xml, orig_flags) | |
| # 파일 등록 | |
| orig_name = os.path.splitext(os.path.basename(ref_hwpx_path))[0] | |
| final_name = f"{orig_name}_변환.hwpx" | |
| final_path = os.path.join(os.path.dirname(output_path), final_name) | |
| os.rename(output_path, final_path) | |
| _file_registry[final_name] = final_path | |
| if ref_sid: | |
| _transform_store[ref_sid] = final_path | |
| _last_transform["path"] = final_path | |
| _last_transform["ts"] = time.time() | |
| # 변경 사항 요약 | |
| total_count = sum(d.get("count",0) for d in details) | |
| summary = "\n".join(f"• '{d['original']}' → '{d['replacement']}' ({d.get('count',0)}회)" for d in details) | |
| final_doc = ( | |
| f"## 🔄 문서 변환 완료 (서식 100% 보존)\n\n" | |
| f"**{len(details)}개 키워드 · {total_count}회 치환**\n\n{summary}\n\n" | |
| f"---\n✅ header.xml/이미지/스크립트/charPr/paraPr: 원본 100% 보존\n" | |
| f"✅ section0.xml: 키워드만 {total_count}회 치환\n" | |
| ) | |
| q.put(json.dumps({"active": "金", "done": True, | |
| "final_doc": final_doc, | |
| "transform_file": f"/file/{final_name}", | |
| "transform_filename": final_name, | |
| "transform_path": final_path, | |
| "stream": f"🎉 변환 완료! {len(details)}개 키워드 · {total_count}회 · 서식 100% 보존\n", | |
| "search_count": 0}, ensure_ascii=False)) | |
| except Exception as e: | |
| import traceback; traceback.print_exc() | |
| q.put(json.dumps({"error": str(e), "done": True}, ensure_ascii=False)) | |
| finally: | |
| q.put(None) # SSE 종료 신호 | |
| q = _queue.Queue() | |
| threading.Thread(target=_transform_in_thread, daemon=True).start() | |
| async def _async_gen(): | |
| while True: | |
| try: | |
| item = await _asyncio.get_event_loop().run_in_executor( | |
| None, lambda: q.get(timeout=300)) | |
| except: break | |
| if item is None: | |
| yield "data: [DONE]\n\n"; break | |
| yield f"data: {item}\n\n" | |
| if '"done": true' in item or '"done":true' in item: | |
| yield "data: [DONE]\n\n"; break | |
| return StreamingResponse(_async_gen(), | |
| media_type="text/event-stream", | |
| headers={"Cache-Control": "no-cache", | |
| "X-Accel-Buffering": "no"}) | |
| # ════════════════════════════════════════════════════════ | |
| # MODE 1 & 3: SOMA 파이프라인 (문서 신규 생성) | |
| # MODE 3은 구조 참고 힌트 추가 + HWPX 생성 시 SectionCloner 사용 | |
| # ════════════════════════════════════════════════════════ | |
| full_prompt = prompt | |
| if mode == 3 and ref_text and ref_text.strip(): | |
| # MODE 3: 참조 문서의 구조 골격을 압축 추출하여 주입 | |
| structure = extract_structure_summary(ref_text) | |
| full_prompt = f"{prompt}\n\n{structure}" | |
| elif mode == 1 and ref_text and ref_text.strip(): | |
| # MODE 1: 레퍼런스 텍스트가 있어도 참고자료로만 활용 | |
| ref_snippet = ref_text.strip()[:8000] | |
| full_prompt = f"{prompt}\n\n[참고자료]\n{ref_snippet}" | |
| # 동기 제너레이터를 별도 스레드에서 실행 → 이벤트 루프 블로킹 방지 | |
| q = _queue.Queue() | |
| def _run_in_thread(): | |
| try: | |
| for chunk in soma_pipeline(full_prompt, max_search, temperature): | |
| q.put(json.dumps(chunk, ensure_ascii=False)) | |
| except Exception as e: | |
| q.put(json.dumps({"error": str(e), "done": True})) | |
| finally: | |
| q.put(None) # 종료 시그널 | |
| threading.Thread(target=_run_in_thread, daemon=True).start() | |
| async def _async_generate(): | |
| while True: | |
| # 큐에서 비동기로 가져오기 (이벤트 루프 블로킹 없음) | |
| try: | |
| item = await _asyncio.get_event_loop().run_in_executor( | |
| None, lambda: q.get(timeout=300)) | |
| except: | |
| break | |
| if item is None: | |
| yield "data: [DONE]\n\n" | |
| break | |
| yield f"data: {item}\n\n" | |
| return StreamingResponse(_async_generate(), | |
| media_type="text/event-stream", | |
| headers={"Cache-Control": "no-cache", | |
| "X-Accel-Buffering": "no"}) | |
| except Exception as e: | |
| return JSONResponse({"error": str(e)}, status_code=500) | |
| async def _soma_hml(req: _FAReq): | |
| try: | |
| body = await req.json() | |
| content = body.get("content", "").strip() | |
| ref_sid = body.get("ref_sid", "") | |
| mode = int(body.get("mode", 1)) | |
| # ── MODE 2: 문서 변환 결과가 있으면 바로 반환 ── | |
| if mode == 2: | |
| # 1) ref_sid로 찾기 | |
| if ref_sid and ref_sid in _transform_store: | |
| path = _transform_store.pop(ref_sid) | |
| if os.path.exists(path): | |
| fname = os.path.basename(path) | |
| _file_registry[fname] = path | |
| return JSONResponse({"file_url": f"/file/{fname}", | |
| "filename": fname, | |
| "file_path": path, | |
| "mode": "transform"}) | |
| # 2) 글로벌 최근 변환 결과 | |
| if _last_transform["path"] and os.path.exists(_last_transform["path"]): | |
| if time.time() - _last_transform["ts"] < 300: | |
| path = _last_transform["path"] | |
| _last_transform["path"] = "" | |
| fname = os.path.basename(path) | |
| _file_registry[fname] = path | |
| return JSONResponse({"file_url": f"/file/{fname}", | |
| "filename": fname, | |
| "file_path": path, | |
| "mode": "transform"}) | |
| # ── MODE 3: 레퍼런스 HWPX 구조 참고 → SectionCloner ── | |
| ref_hwpx_path = "" | |
| if mode == 3 and ref_sid: | |
| ref_hwpx_path = _doc_hwpx_store.get(ref_sid, "") | |
| if not content: | |
| return JSONResponse({"error": "content 없음"}, status_code=400) | |
| def _blocking(): | |
| if ref_hwpx_path and os.path.exists(ref_hwpx_path): | |
| path = generate_hwpx(content, ref_hwpx_path=ref_hwpx_path) | |
| else: | |
| path = generate_hwpx(content) | |
| title = normalize_text_for_title(content) | |
| safe = re.sub(r'[\\/:*?"<>|]', '', title)[:40].strip() or "문서" | |
| new_path = os.path.join(os.path.dirname(path), f"{safe}.hwpx") | |
| os.rename(path, new_path) | |
| return new_path | |
| new_path = await _asyncio.get_event_loop().run_in_executor(None, _blocking) | |
| fname = os.path.basename(new_path) | |
| _file_registry[fname] = new_path | |
| return JSONResponse({"file_url": f"/file/{fname}", | |
| "filename": fname, | |
| "file_path": new_path}) | |
| except Exception as e: | |
| return JSONResponse({"error": str(e)}, status_code=500) | |
| async def _serve_file(fname: str): | |
| fpath = _file_registry.get(fname) | |
| if fpath and os.path.exists(fpath): | |
| return FileResponse(fpath, filename=fname, | |
| media_type="application/octet-stream") | |
| return JSONResponse({"error": "파일 없음"}, status_code=404) | |
| async def _soma_preview(req: _FAReq): | |
| try: | |
| body = await req.json() | |
| if "b64" in body: | |
| import base64 as _b64 | |
| ext = body.get("ext", ".hwpx").lower() | |
| tmp = tempfile.NamedTemporaryFile(suffix=ext, delete=False) | |
| tmp.write(_b64.b64decode(body["b64"])) | |
| tmp.close() | |
| fpath = tmp.name | |
| else: | |
| fpath = body.get("file_path", "") | |
| if not fpath or not os.path.exists(fpath): | |
| return HTMLResponse(_viewer_empty("파일을 찾을 수 없습니다.")) | |
| preview = await _asyncio.get_event_loop().run_in_executor( | |
| None, hwpx_to_html_preview, fpath) | |
| return HTMLResponse(preview) | |
| except Exception as e: | |
| return HTMLResponse(_viewer_empty(f"미리보기 오류: {e}")) | |
| async def _soma_status(): | |
| return JSONResponse({ | |
| "status": "ok", | |
| "hwpjs_ready": core._HWPJS_READY, | |
| "engine": "ohah/hwpjs WASM" if core._HWPJS_READY else "Python lxml" | |
| }) | |
| # HF Spaces 호환 — 헬스체크 | |
| async def _health(): | |
| return JSONResponse({"status": "ok"}) | |
| # ── 문서 업로드 (텍스트 추출) ── | |
| async def _soma_doc_upload(req: _FAReq): | |
| """업로드된 문서에서 텍스트 추출 (b64 또는 file_path)""" | |
| try: | |
| body = await req.json() | |
| fpath = body.get("file_path", "") | |
| if not (fpath and os.path.exists(fpath)): | |
| import base64 as _b64 | |
| b64 = body.get("b64", "") | |
| fname = body.get("filename", "document") | |
| ext = body.get("ext", ".txt").lower() | |
| tmp = tempfile.NamedTemporaryFile(suffix=ext, delete=False) | |
| tmp.write(_b64.b64decode(b64)) | |
| tmp.close() | |
| fpath = tmp.name | |
| text, err = await _asyncio.get_event_loop().run_in_executor( | |
| None, process_uploaded_file, fpath) | |
| if text: | |
| sid = str(id(text))[-8:] | |
| _doc_text_store[sid] = text | |
| # HWPX 파일이면 경로도 저장 (변환 모드용) | |
| is_hwpx = fpath.lower().endswith('.hwpx') | |
| if is_hwpx: | |
| _doc_hwpx_store[sid] = fpath | |
| print(f"[DOC-UPLOAD] sid={sid} fpath={fpath} is_hwpx={is_hwpx} hwpx_store_keys={list(_doc_hwpx_store.keys())}") | |
| return JSONResponse({"ok": True, "sid": sid, | |
| "chars": len(text), | |
| "is_hwpx": is_hwpx, | |
| "preview": text[:200]}) | |
| return JSONResponse({"ok": False, "error": err or "텍스트 추출 실패"}) | |
| except Exception as e: | |
| return JSONResponse({"ok": False, "error": str(e)}) | |
| # ── 문서 QnA 챗 (SSE 스트리밍) ── | |
| async def _soma_chat(req: _FAReq): | |
| """문서 기반 QnA 챗 — SSE 스트리밍""" | |
| try: | |
| body = await req.json() | |
| message = body.get("message", "").strip() | |
| sid = body.get("sid", "") | |
| history = body.get("history", []) | |
| if not message: | |
| return JSONResponse({"error": "message 없음"}, status_code=400) | |
| if not FIREWORKS_API_KEY: | |
| return JSONResponse({"error": "FIREWORKS_API_KEY 미설정"}, status_code=500) | |
| doc_text = _doc_text_store.get(sid, "") | |
| # 메시지 구성 | |
| if doc_text: | |
| user_content = f"## 📄 업로드된 문서 내용\n---\n{doc_text[:12000]}\n---\n\n## 💬 질문\n{message}\n\n위 문서 내용을 바탕으로 답변해주세요." | |
| else: | |
| user_content = message | |
| api_messages = [{"role": "system", "content": DOC_CHAT_SYSTEM}] | |
| for h in (history or [])[-6:]: | |
| if isinstance(h, (list, tuple, dict)): | |
| if isinstance(h, dict): | |
| api_messages.append({"role": h.get("role","user"), "content": h.get("content","")}) | |
| elif len(h) == 2: | |
| api_messages.append({"role": "user", "content": h[0] or ""}) | |
| api_messages.append({"role": "assistant", "content": h[1] or ""}) | |
| api_messages.append({"role": "user", "content": user_content}) | |
| q2 = _queue.Queue() | |
| def _chat_thread(): | |
| try: | |
| headers = {"Accept":"application/json","Content-Type":"application/json", | |
| "Authorization": f"Bearer {FIREWORKS_API_KEY}"} | |
| payload = {"model": FIREWORKS_MODEL, "max_tokens": 16000, | |
| "temperature": 0.6, "stream": True, "messages": api_messages} | |
| resp = requests.post(FIREWORKS_URL, headers=headers, json=payload, | |
| stream=True, timeout=180) | |
| resp.raise_for_status() | |
| for raw_line in resp.iter_lines(): | |
| if not raw_line: continue | |
| line = raw_line.decode("utf-8") if isinstance(raw_line, bytes) else raw_line | |
| if not line.startswith("data: "): continue | |
| data = line[6:] | |
| if data.strip() == "[DONE]": | |
| break | |
| try: | |
| chunk = json.loads(data) | |
| delta = chunk["choices"][0]["delta"].get("content", "") | |
| if delta: | |
| q2.put(json.dumps({"delta": delta}, ensure_ascii=False)) | |
| except: | |
| pass | |
| except Exception as e: | |
| q2.put(json.dumps({"error": str(e)})) | |
| finally: | |
| q2.put(None) | |
| threading.Thread(target=_chat_thread, daemon=True).start() | |
| async def _async_chat(): | |
| while True: | |
| try: | |
| item = await _asyncio.get_event_loop().run_in_executor( | |
| None, lambda: q2.get(timeout=300)) | |
| except: | |
| break | |
| if item is None: | |
| yield "data: [DONE]\n\n" | |
| break | |
| yield f"data: {item}\n\n" | |
| return StreamingResponse(_async_chat(), | |
| media_type="text/event-stream", | |
| headers={"Cache-Control": "no-cache", | |
| "X-Accel-Buffering": "no"}) | |
| except Exception as e: | |
| return JSONResponse({"error": str(e)}, status_code=500) | |
| # ── Gradio를 /gradio 서브경로에 마운트 ── | |
| demo = build_ui() | |
| app = gr.mount_gradio_app(app, demo, path="/gradio") | |
| print("✅ FastAPI 메인 서버") | |
| print(" / → index.html") | |
| print(" /gradio → Gradio UI") | |
| print(" /soma/* → API") | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |