# ============================================================ # 한지(HANJI) · HWP AI Agent 서비스 — App Router # core.so (또는 core.py)에서 엔진을 import # ============================================================ import os, re, json, time, tempfile, threading import gradio as gr # ── core 엔진 import (.so 또는 .py) ── import core from core import * def build_ui(): with gr.Blocks(title="한지(HANJI) · HWP AI Agent 서비스") as app: gr.HTML(f"") # ── Top Bar ── gr.HTML("""
HWP AI Agent 서비스 🔗 hanji.ginigen.ai 📧 문의 · 온프레미스 · 제휴
""") # ── States ── ref_text_state = gr.State("") ref_hwpx_path_state = gr.State("") state = gr.State({"final_doc": "", "search_count": 0}) dummy_state = gr.State("") doc_text_state = gr.State("") _transform_result_path = "" # 문서 변환 결과 경로 (run_soma에서 설정) # ══════════════════════════════════════════════════ # MAIN LAYOUT: Left 1/3 Controls | Right 2/3 Viewer # ══════════════════════════════════════════════════ with gr.Row(equal_height=False): # ── LEFT PANEL (1/3) ────────────────────────── with gr.Column(scale=1, min_width=320): # Prompt prompt_input = gr.Textbox( label="📌 프롬프트", placeholder="예: 2026년 AI 보안 유망기업 육성 지원사업 공모 안내문을 작성해주세요.", lines=3) # File upload ref_file_upload = gr.File( label="📎 레퍼런스 문서", file_types=[".hwp",".hwpx",".hml",".pdf",".docx",".txt",".md", ".csv",".json",".xml",".xlsx",".xls",".py",".html",".log"]) ref_upload_status = gr.Textbox(label="파일 상태", interactive=False, lines=2, placeholder="레퍼런스 파일을 업로드하면 여기에 상태가 표시됩니다.") # Generation Mode mode_radio = gr.Radio( choices=[ "새로 생성 — AI가 주제에 맞는 문서를 처음부터 작성", "서식 유지 · 내용 변경 — 원본 레이아웃 100% 보존, 텍스트만 교체", "구조 참고 · 새로 생성 — 원본 구조를 참고하여 새 내용으로 작성", ], value="새로 생성 — AI가 주제에 맞는 문서를 처음부터 작성", label="⚙️ 생성 모드", interactive=True) mode_state = gr.State(1) # 1=새로, 2=서식유지, 3=구조참고 # Settings (compact) with gr.Row(): max_search_slider = gr.Slider(minimum=5, maximum=100, value=20, step=5, label="🔍 검색", scale=1) temperature_slider = gr.Slider(minimum=0.1, maximum=1.0, value=0.6, step=0.05, label="🌡 Temp", scale=1) # Action buttons with gr.Row(): run_btn = gr.Button("🚀 문서 생성", variant="primary", scale=2) stop_btn = gr.Button("⛔", variant="secondary", scale=0) # Status indicator search_counter = gr.Markdown("대기 중") # HWPX Download with gr.Row(): gen_hml_btn = gr.Button("📥 HWPX 변환", variant="primary", scale=2) copy_text_btn = gr.Button("📋", variant="secondary", scale=0) hml_status = gr.Textbox(label="", interactive=False, value="", placeholder="HWPX 변환 상태", lines=1) hml_file = gr.File(label="다운로드", file_types=[".hwpx"], visible=True) # Generated text (collapsed) with gr.Accordion("📝 생성된 텍스트", open=False): final_doc_box = gr.Textbox(label="", value="", interactive=True, lines=12, placeholder="SOMA 파이프라인 실행 후 최종 문서 텍스트") # Pipeline internals (collapsed) with gr.Accordion("🧬 파이프라인 로그", open=False): agent_stream = gr.Textbox(label="Agent Stream", value="", interactive=False, lines=6) search_log_box = gr.Textbox(label="Search Log", value="", interactive=False, lines=4) agent_log_box = gr.Textbox(label="Pipeline Log", value="", interactive=False, lines=6) # Doc Chat (collapsed) with gr.Accordion("📎 문서 분석 챗", open=False): doc_upload = gr.File(label="📄 문서 업로드", file_types=[".hwp",".hwpx",".hml",".pdf",".docx",".txt",".md", ".csv",".json",".xml",".xlsx",".xls",".py",".html",".log"]) doc_upload_status = gr.Textbox(label="", interactive=False, lines=1) doc_chatbot = gr.Chatbot(label="💬 Chat", height=200) with gr.Row(): doc_msg = gr.Textbox(label="", placeholder="질문하세요...", lines=1, scale=4) doc_send_btn = gr.Button("🚀", variant="primary", scale=0) doc_clear_btn = gr.Button("🗑️ Clear", size="sm") # ── 문서 변환 (XML 직접 치환) ── with gr.Accordion("🔄 문서 변환 (서식 100% 보존)", open=False): gr.HTML('
' '원본 HWPX의 XML 구조를 보존하면서 LLM이 맥락을 이해하여 텍스트만 교체합니다.' '
') transform_file = gr.File(label="📂 원본 HWPX 업로드", file_types=[".hwpx"]) transform_instruction = gr.Textbox( label="📝 변환 지시", placeholder="예: 경기도→서울, 노인말벗서비스→청년창업지원벗서비스로 변경하되 맥락에 맞게 조정", lines=3, ) transform_temp = gr.Slider(0.0, 1.0, 0.3, step=0.1, label="Temperature (낮을수록 정확)") transform_btn = gr.Button("🔄 변환 실행", variant="primary", size="lg") transform_status = gr.Textbox(label="상태", interactive=False) transform_diff = gr.HTML(label="변경 사항") transform_output = gr.File(label="📥 변환된 HWPX 다운로드") # ── RIGHT PANEL (2/3) — DOCUMENT VIEWER ────── with gr.Column(scale=2, min_width=500, elem_classes=["viewer-panel"]): viewer_main = gr.HTML(value=_SAMPLE_PREVIEW) # ── Hidden component for ohaeng (pipeline needs it) ── ohaeng_display = gr.HTML(value="", visible=False) # ── Event Handlers def handle_ref_upload(file): if file is None: return "", "", "", _viewer_empty("파일을 선택하면 여기에 미리보기가 표시됩니다.") fpath = file.name if hasattr(file, 'name') else str(file) fname = os.path.basename(fpath) ext = Path(fpath).suffix.lower() # ── 바이너리 HWP 감지 → 변환 안내 ── if ext == '.hwp' and _is_binary_hwp(fpath): text, err = process_uploaded_file(fpath) preview = hwpx_to_html_preview(fpath) if text: status = f"📄 {fname} ({len(text):,}자 추출)\n\n{_HWP_CONVERT_GUIDE}" return text, status, "", preview return "", f"❌ {fname}: {err}", "", preview # ── HWPX 파일 → 스타일 복원 모드 ── hwpx_path = "" if ext == '.hwpx': try: with zipfile.ZipFile(fpath, 'r') as zf: if 'Contents/header.xml' in zf.namelist(): hwpx_path = fpath styles = analyze_hwpx_styles(fpath) print(f"📋 레퍼런스 HWPX 분석 완료: charPr {styles['char_count']}개, " f"paraPr {styles['para_count']}개, " f"borderFill {styles['bf_count']}개") except: pass # ── 그 외 파일 ── text, err = process_uploaded_file(fpath) # 뷰어: HWP/HWPX만 렌더링 if ext in ('.hwp', '.hwpx'): preview = hwpx_to_html_preview(fpath) else: preview = _viewer_empty(f"{fname} — HWP/HWPX 파일만 미리보기 지원됩니다.") if text: status = f"✅ {fname} ({len(text):,}자)" if hwpx_path: status += "\n🔄 '서식 유지 · 내용 변경' 및 '구조 참고 · 새로 생성' 모드 사용 가능" return text, status, hwpx_path, preview return "", f"❌ {fname}: {err}", "", preview ref_file_upload.change(fn=handle_ref_upload, inputs=[ref_file_upload], outputs=[ref_text_state, ref_upload_status, ref_hwpx_path_state, viewer_main]) def _radio_to_mode(radio_val): """라디오 레이블 → 모드 번호 변환""" if not radio_val: return 1 if "서식 유지" in radio_val: return 2 if "구조 참고" in radio_val: return 3 return 1 def run_soma(prompt, max_search, temperature, ref_text, ref_hwpx_path="", mode_val=1): mode = mode_val if isinstance(mode_val, int) else _radio_to_mode(str(mode_val)) if not prompt.strip(): yield (ohaeng_cards_html("水"), "⚠️ 프롬프트를 입력하세요.", "", "", "", "대기 중", "") return # ════════════════════════════════════════════════════════ # MODE 2: 서식 유지 · 내용 변경 (XML 직접 치환) # ════════════════════════════════════════════════════════ if mode == 2 and ref_hwpx_path and os.path.exists(ref_hwpx_path): # ── XML 직접 치환 모드: SOMA 전체 바이패스 ── yield (ohaeng_cards_html("水"), "🔄 **서식 유지 · 내용 변경** 모드 — XML 직접 치환 (서식 100% 보존)\n\n" "📖 원본 텍스트 노드 추출 중...\n", "", "🔄 Mode 2: XML 키워드 치환\n", "", "🔄 변환 중", "") try: text_list, raw_xml, orig_flags = extract_text_nodes(ref_hwpx_path) yield (ohaeng_cards_html("木"), f"📖 텍스트 노드 {len(text_list)}개 추출 완료\n\n" f"🤖 LLM 키워드 매핑 생성 중...\n", "", f"📖 {len(text_list)}개 노드 추출\n", "", "🔄 LLM 분석 중", "") mapping = generate_keyword_mapping(raw_xml, prompt, temperature) if not mapping: yield (ohaeng_cards_html("金"), "⚠️ 변경할 키워드가 없습니다. 지시를 더 구체적으로 입력하세요.", "", "❌ 매핑 0건\n", "", "⚠️ 변경 없음", "") return yield (ohaeng_cards_html("火"), f"🤖 키워드 매핑 {len(mapping)}쌍 생성\n\n" f"🔧 XML 적용 중...\n", "", f"🤖 {len(mapping)}쌍 매핑\n", "", "🔧 적용 중", "") new_xml, details = apply_keyword_mapping(raw_xml, mapping) output_path = repack_transform_hwpx(ref_hwpx_path, new_xml, orig_flags) orig_name = os.path.splitext(os.path.basename(ref_hwpx_path))[0] final_name = f"{orig_name}_변환.hwpx" final_path = os.path.join(os.path.dirname(output_path), final_name) os.rename(output_path, final_path) total_count = sum(d.get("count",0) for d in details) summary_lines = [] for d in details: summary_lines.append(f"• '{d['original']}' → '{d['replacement']}' ({d.get('count',0)}회)") summary = "\n".join(summary_lines) final_doc = ( f"## 🔄 문서 변환 완료 (서식 100% 보존)\n\n" f"**{len(details)}개 키워드 · {total_count}회 치환**\n\n" f"{summary}\n\n" f"---\n" f"✅ header.xml: 원본 그대로\n" f"✅ 이미지/스크립트: 원본 그대로\n" f"✅ charPr/paraPr: 원본 그대로\n" f"✅ 문단 구조: 원본 그대로\n" f"✅ section0.xml: 키워드만 {total_count}회 치환\n" ) preview = hwpx_to_html_preview(final_path) if 'hwpx_to_html_preview' in dir() else "" yield (ohaeng_cards_html("金"), f"🎉 **문서 변환 완료!**\n\n" f"서식 100% 보존 · {len(details)}개 키워드 · {total_count}회 치환\n\n" f"아래 'HWPX 생성' 버튼으로 다운로드하세요.\n", "", f"✅ 변환 완료: {total_count}회\n", final_doc, f"✅ 변환 완료", final_doc) nonlocal _transform_result_path _transform_result_path = final_path return except Exception as e: import traceback traceback.print_exc() yield (ohaeng_cards_html("金"), f"❌ 변환 오류: {e}\n\n모드를 '새로 생성'으로 변경하여 다시 시도하세요.", "", f"❌ {e}\n", "", "❌ 오류", "") return # ════════════════════════════════════════════════════════ # MODE 1 & 3: SOMA 파이프라인 (문서 신규 생성) # ════════════════════════════════════════════════════════ full_prompt = prompt if mode == 3 and ref_text and ref_text.strip(): # MODE 3: 참조 문서의 구조 골격을 압축 추출하여 주입 structure = extract_structure_summary(ref_text) full_prompt = f"{prompt}\n\n{structure}" elif ref_text and ref_text.strip(): # MODE 1: 레퍼런스 텍스트가 있으면 참고자료로만 활용 ref_content = ref_text.strip()[:8000] full_prompt = f"{prompt}\n\n[참고자료]\n{ref_content}" stream_acc, log_acc, search_log, final_doc = "", "", "", "" active_agent, sc = "水", 0 for chunk in soma_pipeline(full_prompt, int(max_search), temperature): if chunk.get("done"): final_doc = chunk.get("final_doc", "") sc = chunk.get("search_count", sc) log_acc = chunk.get("log", "") search_log = chunk.get("search_log", "") break active_agent = chunk.get("active", active_agent) tok = chunk.get("stream", "") if tok: stream_acc += tok if len(stream_acc) > 8000: stream_acc = "...(이전 생략)...\n" + stream_acc[-6000:] if chunk.get("log"): log_acc = chunk["log"] if chunk.get("search_log"): search_log = chunk["search_log"] if chunk.get("search_count") is not None: sc = chunk["search_count"] if chunk.get("final_doc"): final_doc = chunk["final_doc"] yield (ohaeng_cards_html(active_agent), stream_acc, search_log, log_acc, final_doc if final_doc else "", f"🔍 {sc} / {int(max_search)}", "") yield (ohaeng_cards_html("金"), stream_acc + "\n\n🎉 완료!", search_log, log_acc, final_doc, f"✅ 완료: {sc}회 검색", final_doc) run_btn.click( fn=run_soma, inputs=[prompt_input, max_search_slider, temperature_slider, ref_text_state, ref_hwpx_path_state, mode_radio], outputs=[ohaeng_display, agent_stream, search_log_box, agent_log_box, final_doc_box, search_counter, dummy_state]) def make_hml(doc_text, ref_hwpx_path, mode_val=1): mode = mode_val if isinstance(mode_val, int) else _radio_to_mode(str(mode_val)) nonlocal _transform_result_path # ── MODE 2: 문서 변환 결과가 있으면 바로 반환 ── if mode == 2 and _transform_result_path and os.path.exists(_transform_result_path): path = _transform_result_path _transform_result_path = "" # 1회 사용 후 리셋 preview = hwpx_to_html_preview(path) return path, "✅ 문서 변환 완료 (서식 100% 보존) — XML 직접 치환", preview if not doc_text or not doc_text.strip(): return None, "❌ 문서를 먼저 생성하세요.", _viewer_empty("HWPX 생성 후 여기에 표시됩니다.") try: # MODE 3: 레퍼런스 HWPX 구조 참고 → SectionCloner if mode == 3 and ref_hwpx_path and os.path.exists(ref_hwpx_path): path = generate_hwpx(doc_text.strip(), ref_hwpx_path=ref_hwpx_path) gen_mode = "🧩 구조 참고 · SectionCloner" elif ref_hwpx_path and os.path.exists(ref_hwpx_path): path = generate_hwpx(doc_text.strip(), ref_hwpx_path=ref_hwpx_path) gen_mode = "🎯 레퍼런스 스타일 복원" else: path = generate_hwpx(doc_text.strip()) gen_mode = "📄 report 템플릿" title = normalize_text_for_title(doc_text.strip()) safe_title = re.sub(r'[\\/:*?"<>|]', '', title)[:40].strip() or "문서" new_path = os.path.join(os.path.dirname(path), f"{safe_title}.hwpx") os.rename(path, new_path) # page_guard 결과 표시 status = f"✅ 생성 완료 ({gen_mode})" if ref_hwpx_path and os.path.exists(ref_hwpx_path): guard = page_guard_check(ref_hwpx_path, new_path) if guard["status"] == "PASS": status += f" | 📏 page_guard PASS (ref={guard['ref_chars']}자 → out={guard['out_chars']}자)" else: status += f" | ⚠️ page_guard {len(guard['errors'])}건 경고" # 생성된 HWPX 뷰어 렌더링 preview = hwpx_to_html_preview(new_path) return new_path, status, preview except Exception as e: return None, f"❌ 오류: {e}", _viewer_empty(f"생성 오류: {e}") gen_hml_btn.click(fn=make_hml, inputs=[final_doc_box, ref_hwpx_path_state, mode_radio], outputs=[hml_file, hml_status, viewer_main]) # Doc Chat handlers def handle_doc_upload(file): if file is None: return "", "파일을 선택해주세요." fpath = file.name if hasattr(file, 'name') else str(file) fname = os.path.basename(fpath) ext = Path(fpath).suffix.lower() # 바이너리 HWP 감지 is_bin_hwp = (ext == '.hwp' and _is_binary_hwp(fpath)) text, err = process_uploaded_file(fpath) if text: status = f"✅ {fname} ({len(text):,}자)" if is_bin_hwp: status = f"📄 {fname} ({len(text):,}자 추출) — 바이너리 HWP (텍스트만 추출됨)" return text, status return "", f"❌ {fname}: {err}" doc_upload.change(fn=handle_doc_upload, inputs=[doc_upload], outputs=[doc_text_state, doc_upload_status]) doc_send_btn.click(fn=doc_chat_respond, inputs=[doc_msg, doc_chatbot, doc_text_state], outputs=[doc_chatbot]) doc_msg.submit(fn=doc_chat_respond, inputs=[doc_msg, doc_chatbot, doc_text_state], outputs=[doc_chatbot]) doc_clear_btn.click(fn=lambda: ([], ""), outputs=[doc_chatbot, doc_text_state]) # ── 문서 변환 이벤트 핸들러 ── def handle_transform(hwpx_file, instruction, temperature): if hwpx_file is None: return None, "❌ HWPX 파일을 업로드하세요.", "" if not instruction or not instruction.strip(): return None, "❌ 변환 지시를 입력하세요.", "" fpath = hwpx_file.name if hasattr(hwpx_file, 'name') else str(hwpx_file) try: output_path, replacements, diff_html = transform_hwpx( fpath, instruction.strip(), temperature) orig_name = os.path.splitext(os.path.basename(fpath))[0] new_name = f"{orig_name}_변환.hwpx" final_path = os.path.join(os.path.dirname(output_path), new_name) os.rename(output_path, final_path) return final_path, f"✅ 변환 완료: {len(replacements)}건 변경 | 서식 100% 보존", diff_html except Exception as e: return None, f"❌ 오류: {e}", f"

{e}

" transform_btn.click( fn=handle_transform, inputs=[transform_file, transform_instruction, transform_temp], outputs=[transform_output, transform_status, transform_diff]) return app # ============================================================ # ⑧ Entry Point — FastAPI 메인 + Gradio 서브마운트 # ============================================================ from fastapi import FastAPI, Request as _FAReq from fastapi.responses import FileResponse, JSONResponse, HTMLResponse, StreamingResponse import uvicorn # ── FastAPI 메인 앱 ── app = FastAPI() _APP_DIR = os.path.dirname(os.path.abspath(__file__)) _index_path = os.path.join(_APP_DIR, "index.html") # ── ohah/hwpjs 백그라운드 설치 ── threading.Thread(target=_install_hwpjs, daemon=True).start() # ── "/" → index.html 서빙 ── @app.get("/") async def _serve_index(): if os.path.exists(_index_path): return FileResponse(_index_path, media_type="text/html") return HTMLResponse("

index.html not found

", status_code=404) @app.get("/ui") async def _serve_ui(): return await _serve_index() # ── SOMA API ── import asyncio as _asyncio import queue as _queue _file_registry = {} _doc_text_store = {} # sid → text _doc_hwpx_store = {} # sid → hwpx file path (변환 모드용) _transform_store = {} # sid → 변환 결과 hwpx path _last_transform = {"path": "", "ts": 0} # 마지막 변환 결과 (index.html용) @app.post("/soma/run") async def _soma_run(req: _FAReq): try: body = await req.json() prompt = body.get("prompt", "").strip() max_search = int(body.get("max_search", 20)) temperature = float(body.get("temperature", 0.6)) ref_text = body.get("ref_text", "") # 직접 전달 ref_sid = body.get("ref_sid", "") # doc-upload에서 받은 sid if not ref_text and ref_sid: ref_text = _doc_text_store.get(ref_sid, "") if not prompt: return JSONResponse({"error": "prompt 없음"}, status_code=400) # ════════════════════════════════════════════════════════ # 모드 분기: 1=새로 생성, 2=서식 유지·내용 변경, 3=구조 참고·새로 생성 # ════════════════════════════════════════════════════════ mode = int(body.get("mode", 1)) ref_hwpx_path = _doc_hwpx_store.get(ref_sid, "") # ── 디버그 로그 ── print(f"[MODE] mode={mode} ref_sid='{ref_sid}' hwpx='{ref_hwpx_path}' exists={os.path.exists(ref_hwpx_path) if ref_hwpx_path else False}") print(f"[MODE] prompt[:100]='{prompt[:100]}'") # ════════════════════════════════════════════════════════ # MODE 2: 서식 유지 · 내용 변경 (XML 직접 치환) # ════════════════════════════════════════════════════════ if mode == 2 and ref_hwpx_path and os.path.exists(ref_hwpx_path): # ── XML 직접 치환 모드: SOMA 전체 바이패스 ── def _transform_in_thread(): try: q.put(json.dumps({"active": "水", "stream": "🔄 **문서 변환 모드** — 키워드 매핑 치환 (서식 100% 보존)\n\n📖 텍스트 추출 중...\n"}, ensure_ascii=False)) text_list, raw_xml, orig_flags = extract_text_nodes(ref_hwpx_path) q.put(json.dumps({"active": "木", "stream": f"📖 텍스트 노드 {len(text_list)}개 추출\n🤖 LLM 키워드 매핑 생성 중...\n"}, ensure_ascii=False)) mapping = generate_keyword_mapping(raw_xml, prompt, temperature) if not mapping: q.put(json.dumps({"active": "金", "done": True, "final_doc": "⚠️ 변경할 키워드가 없습니다.", "stream": "⚠️ 매핑 0건\n"}, ensure_ascii=False)) return q.put(json.dumps({"active": "火", "stream": f"🤖 {len(mapping)}쌍 매핑 생성\n🔧 XML 적용 중...\n"}, ensure_ascii=False)) new_xml, details = apply_keyword_mapping(raw_xml, mapping) output_path = repack_transform_hwpx(ref_hwpx_path, new_xml, orig_flags) # 파일 등록 orig_name = os.path.splitext(os.path.basename(ref_hwpx_path))[0] final_name = f"{orig_name}_변환.hwpx" final_path = os.path.join(os.path.dirname(output_path), final_name) os.rename(output_path, final_path) _file_registry[final_name] = final_path if ref_sid: _transform_store[ref_sid] = final_path _last_transform["path"] = final_path _last_transform["ts"] = time.time() # 변경 사항 요약 total_count = sum(d.get("count",0) for d in details) summary = "\n".join(f"• '{d['original']}' → '{d['replacement']}' ({d.get('count',0)}회)" for d in details) final_doc = ( f"## 🔄 문서 변환 완료 (서식 100% 보존)\n\n" f"**{len(details)}개 키워드 · {total_count}회 치환**\n\n{summary}\n\n" f"---\n✅ header.xml/이미지/스크립트/charPr/paraPr: 원본 100% 보존\n" f"✅ section0.xml: 키워드만 {total_count}회 치환\n" ) q.put(json.dumps({"active": "金", "done": True, "final_doc": final_doc, "transform_file": f"/file/{final_name}", "transform_filename": final_name, "transform_path": final_path, "stream": f"🎉 변환 완료! {len(details)}개 키워드 · {total_count}회 · 서식 100% 보존\n", "search_count": 0}, ensure_ascii=False)) except Exception as e: import traceback; traceback.print_exc() q.put(json.dumps({"error": str(e), "done": True}, ensure_ascii=False)) finally: q.put(None) # SSE 종료 신호 q = _queue.Queue() threading.Thread(target=_transform_in_thread, daemon=True).start() async def _async_gen(): while True: try: item = await _asyncio.get_event_loop().run_in_executor( None, lambda: q.get(timeout=300)) except: break if item is None: yield "data: [DONE]\n\n"; break yield f"data: {item}\n\n" if '"done": true' in item or '"done":true' in item: yield "data: [DONE]\n\n"; break return StreamingResponse(_async_gen(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) # ════════════════════════════════════════════════════════ # MODE 1 & 3: SOMA 파이프라인 (문서 신규 생성) # MODE 3은 구조 참고 힌트 추가 + HWPX 생성 시 SectionCloner 사용 # ════════════════════════════════════════════════════════ full_prompt = prompt if mode == 3 and ref_text and ref_text.strip(): # MODE 3: 참조 문서의 구조 골격을 압축 추출하여 주입 structure = extract_structure_summary(ref_text) full_prompt = f"{prompt}\n\n{structure}" elif mode == 1 and ref_text and ref_text.strip(): # MODE 1: 레퍼런스 텍스트가 있어도 참고자료로만 활용 ref_snippet = ref_text.strip()[:8000] full_prompt = f"{prompt}\n\n[참고자료]\n{ref_snippet}" # 동기 제너레이터를 별도 스레드에서 실행 → 이벤트 루프 블로킹 방지 q = _queue.Queue() def _run_in_thread(): try: for chunk in soma_pipeline(full_prompt, max_search, temperature): q.put(json.dumps(chunk, ensure_ascii=False)) except Exception as e: q.put(json.dumps({"error": str(e), "done": True})) finally: q.put(None) # 종료 시그널 threading.Thread(target=_run_in_thread, daemon=True).start() async def _async_generate(): while True: # 큐에서 비동기로 가져오기 (이벤트 루프 블로킹 없음) try: item = await _asyncio.get_event_loop().run_in_executor( None, lambda: q.get(timeout=300)) except: break if item is None: yield "data: [DONE]\n\n" break yield f"data: {item}\n\n" return StreamingResponse(_async_generate(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) except Exception as e: return JSONResponse({"error": str(e)}, status_code=500) @app.post("/soma/hml") async def _soma_hml(req: _FAReq): try: body = await req.json() content = body.get("content", "").strip() ref_sid = body.get("ref_sid", "") mode = int(body.get("mode", 1)) # ── MODE 2: 문서 변환 결과가 있으면 바로 반환 ── if mode == 2: # 1) ref_sid로 찾기 if ref_sid and ref_sid in _transform_store: path = _transform_store.pop(ref_sid) if os.path.exists(path): fname = os.path.basename(path) _file_registry[fname] = path return JSONResponse({"file_url": f"/file/{fname}", "filename": fname, "file_path": path, "mode": "transform"}) # 2) 글로벌 최근 변환 결과 if _last_transform["path"] and os.path.exists(_last_transform["path"]): if time.time() - _last_transform["ts"] < 300: path = _last_transform["path"] _last_transform["path"] = "" fname = os.path.basename(path) _file_registry[fname] = path return JSONResponse({"file_url": f"/file/{fname}", "filename": fname, "file_path": path, "mode": "transform"}) # ── MODE 3: 레퍼런스 HWPX 구조 참고 → SectionCloner ── ref_hwpx_path = "" if mode == 3 and ref_sid: ref_hwpx_path = _doc_hwpx_store.get(ref_sid, "") if not content: return JSONResponse({"error": "content 없음"}, status_code=400) def _blocking(): if ref_hwpx_path and os.path.exists(ref_hwpx_path): path = generate_hwpx(content, ref_hwpx_path=ref_hwpx_path) else: path = generate_hwpx(content) title = normalize_text_for_title(content) safe = re.sub(r'[\\/:*?"<>|]', '', title)[:40].strip() or "문서" new_path = os.path.join(os.path.dirname(path), f"{safe}.hwpx") os.rename(path, new_path) return new_path new_path = await _asyncio.get_event_loop().run_in_executor(None, _blocking) fname = os.path.basename(new_path) _file_registry[fname] = new_path return JSONResponse({"file_url": f"/file/{fname}", "filename": fname, "file_path": new_path}) except Exception as e: return JSONResponse({"error": str(e)}, status_code=500) @app.get("/file/{fname}") async def _serve_file(fname: str): fpath = _file_registry.get(fname) if fpath and os.path.exists(fpath): return FileResponse(fpath, filename=fname, media_type="application/octet-stream") return JSONResponse({"error": "파일 없음"}, status_code=404) @app.post("/soma/preview") async def _soma_preview(req: _FAReq): try: body = await req.json() if "b64" in body: import base64 as _b64 ext = body.get("ext", ".hwpx").lower() tmp = tempfile.NamedTemporaryFile(suffix=ext, delete=False) tmp.write(_b64.b64decode(body["b64"])) tmp.close() fpath = tmp.name else: fpath = body.get("file_path", "") if not fpath or not os.path.exists(fpath): return HTMLResponse(_viewer_empty("파일을 찾을 수 없습니다.")) preview = await _asyncio.get_event_loop().run_in_executor( None, hwpx_to_html_preview, fpath) return HTMLResponse(preview) except Exception as e: return HTMLResponse(_viewer_empty(f"미리보기 오류: {e}")) @app.get("/soma/status") async def _soma_status(): return JSONResponse({ "status": "ok", "hwpjs_ready": core._HWPJS_READY, "engine": "ohah/hwpjs WASM" if core._HWPJS_READY else "Python lxml" }) # HF Spaces 호환 — 헬스체크 @app.get("/api/health") async def _health(): return JSONResponse({"status": "ok"}) # ── 문서 업로드 (텍스트 추출) ── @app.post("/soma/doc-upload") async def _soma_doc_upload(req: _FAReq): """업로드된 문서에서 텍스트 추출 (b64 또는 file_path)""" try: body = await req.json() fpath = body.get("file_path", "") if not (fpath and os.path.exists(fpath)): import base64 as _b64 b64 = body.get("b64", "") fname = body.get("filename", "document") ext = body.get("ext", ".txt").lower() tmp = tempfile.NamedTemporaryFile(suffix=ext, delete=False) tmp.write(_b64.b64decode(b64)) tmp.close() fpath = tmp.name text, err = await _asyncio.get_event_loop().run_in_executor( None, process_uploaded_file, fpath) if text: sid = str(id(text))[-8:] _doc_text_store[sid] = text # HWPX 파일이면 경로도 저장 (변환 모드용) is_hwpx = fpath.lower().endswith('.hwpx') if is_hwpx: _doc_hwpx_store[sid] = fpath print(f"[DOC-UPLOAD] sid={sid} fpath={fpath} is_hwpx={is_hwpx} hwpx_store_keys={list(_doc_hwpx_store.keys())}") return JSONResponse({"ok": True, "sid": sid, "chars": len(text), "is_hwpx": is_hwpx, "preview": text[:200]}) return JSONResponse({"ok": False, "error": err or "텍스트 추출 실패"}) except Exception as e: return JSONResponse({"ok": False, "error": str(e)}) # ── 문서 QnA 챗 (SSE 스트리밍) ── @app.post("/soma/chat") async def _soma_chat(req: _FAReq): """문서 기반 QnA 챗 — SSE 스트리밍""" try: body = await req.json() message = body.get("message", "").strip() sid = body.get("sid", "") history = body.get("history", []) if not message: return JSONResponse({"error": "message 없음"}, status_code=400) if not FIREWORKS_API_KEY: return JSONResponse({"error": "FIREWORKS_API_KEY 미설정"}, status_code=500) doc_text = _doc_text_store.get(sid, "") # 메시지 구성 if doc_text: user_content = f"## 📄 업로드된 문서 내용\n---\n{doc_text[:12000]}\n---\n\n## 💬 질문\n{message}\n\n위 문서 내용을 바탕으로 답변해주세요." else: user_content = message api_messages = [{"role": "system", "content": DOC_CHAT_SYSTEM}] for h in (history or [])[-6:]: if isinstance(h, (list, tuple, dict)): if isinstance(h, dict): api_messages.append({"role": h.get("role","user"), "content": h.get("content","")}) elif len(h) == 2: api_messages.append({"role": "user", "content": h[0] or ""}) api_messages.append({"role": "assistant", "content": h[1] or ""}) api_messages.append({"role": "user", "content": user_content}) q2 = _queue.Queue() def _chat_thread(): try: headers = {"Accept":"application/json","Content-Type":"application/json", "Authorization": f"Bearer {FIREWORKS_API_KEY}"} payload = {"model": FIREWORKS_MODEL, "max_tokens": 16000, "temperature": 0.6, "stream": True, "messages": api_messages} resp = requests.post(FIREWORKS_URL, headers=headers, json=payload, stream=True, timeout=180) resp.raise_for_status() for raw_line in resp.iter_lines(): if not raw_line: continue line = raw_line.decode("utf-8") if isinstance(raw_line, bytes) else raw_line if not line.startswith("data: "): continue data = line[6:] if data.strip() == "[DONE]": break try: chunk = json.loads(data) delta = chunk["choices"][0]["delta"].get("content", "") if delta: q2.put(json.dumps({"delta": delta}, ensure_ascii=False)) except: pass except Exception as e: q2.put(json.dumps({"error": str(e)})) finally: q2.put(None) threading.Thread(target=_chat_thread, daemon=True).start() async def _async_chat(): while True: try: item = await _asyncio.get_event_loop().run_in_executor( None, lambda: q2.get(timeout=300)) except: break if item is None: yield "data: [DONE]\n\n" break yield f"data: {item}\n\n" return StreamingResponse(_async_chat(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) except Exception as e: return JSONResponse({"error": str(e)}, status_code=500) # ── Gradio를 /gradio 서브경로에 마운트 ── demo = build_ui() app = gr.mount_gradio_app(app, demo, path="/gradio") print("✅ FastAPI 메인 서버") print(" / → index.html") print(" /gradio → Gradio UI") print(" /soma/* → API") uvicorn.run(app, host="0.0.0.0", port=7860)