import base64 import html as html_lib import io import json import os import re import sys import time from dataclasses import asdict, is_dataclass from pathlib import Path from queue import Empty, Queue from threading import Thread import gradio as gr from pydantic import BaseModel, ValidationError # ───────────────────────────────────────────────────────────── # 설정 # ───────────────────────────────────────────────────────────── os.environ.setdefault("LLM_MODEL_NAME", "gpt-5-mini") os.environ.setdefault("LOG_FILE", "analysis_results.jsonl") os.environ.setdefault("PERSONA_FILE", "persona.jsonl") PERSONA_FILE = Path(os.environ.get("PERSONA_FILE", "persona.jsonl")) IMAGE_CACHE_DIR = Path(".persona_images") IMAGE_CACHE_DIR.mkdir(exist_ok=True) LOCAL_IMAGE_DIR = Path(os.environ.get("LOCAL_IMAGE_DIR", Path(__file__).parent / "persona_images")) DEFAULT_IMAGE_FILENAME = "default.png" EXAMPLES_BY_TYPE = { "screener": "PER 낮은 대형주 추천해주세요", "technical": "Apple(AAPL) 차트 분석해주세요", "fundamental": "Microsoft 재무상태 어때요?", "news_summary": "Tesla 최근 뉴스 요약해 주세요", "comparison": "Apple vs Microsoft 비교 분석해 주세요", "earnings": "2025년 4분기 삼성전자 실적은 어땠나요?", "swot": "OpenAI 경쟁력 분석해 주세요", "general": "Tesla(TSLA) 어떻게 보시나요?", "watchlist": "내 관심종목(삼성전자, SK하이닉스, Apple, Microsoft, Tesla) 현황 봐주세요", } EXAMPLE_QUERIES = list(EXAMPLES_BY_TYPE.values()) AUTO_SCROLL_JS = """ """ # ───────────────────────────────────────────────────────────── # 헬퍼 # ───────────────────────────────────────────────────────────── def to_md(text): return text or "" def timer_text(elapsed): return f"⏱ {elapsed}" def _make_elapsed(): t0 = time.time() return lambda: f"{time.time()-t0:.1f}초" _PAREN_MD = re.compile(r'\s*\(\s*\[[^\]]*\]\([^)]*\)\s*\)') _MD_LINK = re.compile(r'\[([^\]]*)\]\([^)]*\)') _PAREN_URL= re.compile(r'\s*\(https?://[^\)]*\)') _BARE_URL = re.compile(r'https?://\S+') _PAREN_DOM= re.compile(r'\s*\([a-zA-Z0-9._-]+\.[a-zA-Z]{2,6}\)') def _safe(text): t = text or "" t = _PAREN_MD.sub('', t) t = _MD_LINK.sub(r'\1', t) t = _PAREN_URL.sub('', t) t = _BARE_URL.sub('', t) t = _PAREN_DOM.sub('', t) t = re.sub(r'[ \t]{2,}', ' ', t).strip() t = re.sub(r'\.\s*\.', '.', t) return html_lib.escape(t).replace("\n", "
") # ───────────────────────────────────────────────────────────── # 진행 로그 HTML 빌더 # ───────────────────────────────────────────────────────────── STATUS_ICONS = { "요청 수신": "📡", "인텐트": "🧠", "도구": "🔧", "시장": "📊", "뉴스": "📰", "컨텍스트": "🗂", "LLM": "✨", "완료": "✅", } def _status_icon(msg): for k, v in STATUS_ICONS.items(): if k in msg: return v return "⏳" # ───────────────────────────────────────────────────────────── # 페르소나 모델 & 파일 IO # ───────────────────────────────────────────────────────────── class PersonaLine(BaseModel): name: str full_name: str summary: str financial_mindset: str data_analysis_approach: str response_style: str key_principles: list[str] famous_quotes: list[str] | None = None image_path: str | None = None _persona_cache: list = [] _persona_cache_mtime: float = 0.0 def _parse_personas(): global _persona_cache, _persona_cache_mtime try: mtime = PERSONA_FILE.stat().st_mtime if PERSONA_FILE.exists() else 0.0 except OSError: mtime = 0.0 if mtime == _persona_cache_mtime and _persona_cache: return _persona_cache personas = [] if not PERSONA_FILE.exists(): _persona_cache, _persona_cache_mtime = personas, mtime return personas try: with PERSONA_FILE.open("r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue try: data = json.loads(line) if not isinstance(data, dict): continue if not data.get("full_name"): data["full_name"] = data.get("name", "") if "background" in data and "summary" not in data: data["summary"] = data["background"] personas.append(PersonaLine(**data)) except (json.JSONDecodeError, TypeError, ValidationError): continue except OSError: pass _persona_cache, _persona_cache_mtime = personas, mtime return personas def load_persona_names(): choices = ["없음"] for p in _parse_personas(): n = p.name.strip() if n and n not in choices: choices.append(n) return choices def load_persona_summary(name): if not name or name == "없음": return "" for p in _parse_personas(): if p.name.strip() == name: summary = (p.summary[:80] + "…") if len(p.summary) > 80 else p.summary return f"**{p.full_name}**\n\n{summary}" return "" # ───────────────────────────────────────────────────────────── # 출력 패널 빌더 # ───────────────────────────────────────────────────────────── def _make_log_html(log_lines): if not log_lines: return '' rows = [] for t, text in log_lines: safe = html_lib.escape(text) if t == "status": icon = next((v for k, v in STATUS_ICONS.items() if k in text), "⏳") rows.append(f'
{icon} {safe}
') elif t == "stdout": rows.append(f'
{safe}
') elif t == "error": rows.append(f'
❌ {safe}
') elif t == "done": rows.append('
✅ 분석 완료
') return "\n".join(rows) def _wrap_log(inner): return ( '
' '
Progress
' '
' + inner + '
' '
' ) def _wrap_answer(log_html, md_html, timer_str): log_section = ( '
' 'Progress Click to show/hide' '
' + log_html + '
' '
' ) if log_html else '' answer_section = ( '
' '
' '📋 Final results' f'{html_lib.escape(timer_str)}' '
' '
' + md_html + '
' '
' ) return ( '
' '
' 'Analysis results' f'{html_lib.escape(timer_str)}' '
' '
' + log_section + answer_section + '
' '
' ) def _md_to_html(text): import re as _re _URL_RE = _re.compile(r'https?://[^\s,,、\))\]】]+') def _url_badge(url: str) -> str: clean = url.rstrip('.,;:') m = _re.match(r'https?://(?:www\.)?([^/\s]+)', clean) label = m.group(1) if m else clean esc = html_lib.escape(clean) return f'🔗 {html_lib.escape(label)}' def _clean_source_line(s: str) -> str: s = _re.sub(r'\s*[—–-]{1,2}\s*(https?://)', r' \1', s) return s def process_inline(s: str) -> str: s = _clean_source_line(s) parts = [] last = 0 for m in _URL_RE.finditer(s): before = s[last:m.start()] before = html_lib.escape(before) before = _re.sub(r'\*\*(.+?)\*\*', r'\1', before) before = _re.sub(r'(?\1', before) before = _re.sub(r'`(.+?)`', r'\1', before) parts.append(before) parts.append(_url_badge(m.group(0))) last = m.end() tail = html_lib.escape(s[last:]) tail = _re.sub(r'\*\*(.+?)\*\*', r'\1', tail) tail = _re.sub(r'(?\1', tail) tail = _re.sub(r'`(.+?)`', r'\1', tail) parts.append(tail) return ''.join(parts) BULLET_RE = _re.compile( r'^([ \t]*)' r'(\d+[.)]\s+|[-•·○◦▸▹*]\s+)' r'(.+)$' ) HEADER_RE = _re.compile(r'^(#{1,4})\s+(.+)$') def indent_level(spaces: str) -> int: n = spaces.count('\t') * 4 + spaces.count(' ') return n // 2 lines = text.split('\n') out: list[str] = [] list_stack: list[tuple[int, str]] = [] para_lines: list[str] = [] def flush_para(): if para_lines: out.append('

' + process_inline(' '.join(para_lines)) + '

') para_lines.clear() def open_list(depth: int, tag: str): while list_stack and list_stack[-1][0] > depth: _, t = list_stack.pop() out.append(f'') if list_stack and list_stack[-1][0] == depth: if list_stack[-1][1] != tag: _, t = list_stack.pop() out.append(f'') out.append(f'<{tag}>') list_stack.append((depth, tag)) else: out.append(f'<{tag}>') list_stack.append((depth, tag)) def close_all_lists(): while list_stack: _, t = list_stack.pop() out.append(f'') for line in lines: stripped = line.strip() if not stripped: flush_para() close_all_lists() continue mh = HEADER_RE.match(stripped) if mh: flush_para(); close_all_lists() level = min(len(mh.group(1)), 4) out.append(f'{process_inline(mh.group(2))}') continue if _URL_RE.fullmatch(stripped): flush_para() badge = _url_badge(stripped) if list_stack: out.append(f'') else: out.append(f'') continue mb = BULLET_RE.match(line) if mb: flush_para() spaces = mb.group(1) marker = mb.group(2) content = mb.group(3).strip() depth = indent_level(spaces) is_ordered = bool(_re.match(r'\d+', marker.strip())) tag = 'ol' if is_ordered else 'ul' open_list(depth, tag) if is_ordered: num = int(_re.match(r'(\d+)', marker.strip()).group(1)) out.append(f'
  • {process_inline(content)}
  • ') else: out.append(f'
  • {process_inline(content)}
  • ') continue close_all_lists() para_lines.append(stripped) flush_para() close_all_lists() return '\n'.join(out) IDLE_PANEL = ( '
    ' '
    🔍 Enter your question on the left and click Ask a Question..
    ' '
    ' ) # ───────────────────────────────────────────────────────────── # stdout 캡처 (pipeline 내부 print → Queue) # ───────────────────────────────────────────────────────────── class _StdoutCapture(io.TextIOBase): def __init__(self, q: Queue): self._q = q def write(self, s: str): if s: self._q.put(("stdout", s)) return len(s) def flush(self): pass # ───────────────────────────────────────────────────────────── # 스트림 분석 — pipeline() 직접 호출 # ───────────────────────────────────────────────────────────── def stream_analyze(query, persona_name, api_key, history=None): query = (query or "").strip() persona_name = (persona_name or "").strip() api_key = (api_key or "").strip() if not query: yield (_wrap_log('
    ❌ Please enter your question.
    '), "", "") return if not api_key: yield (_wrap_log('
    ❌ Please enter your OpenAI API key.
    '), "", "") return os.environ["LLM_MODEL_API_KEY"] = api_key try: from pipeline import pipeline except Exception as e: yield (_wrap_log(f'
    ❌ pipeline 로드 실패: {html_lib.escape(str(e))}
    '), "", "") return text_acc = "" log_lines = [] frozen_log = "" result_json = "" first_delta = False worker_done = False elapsed = _make_elapsed() eq: Queue = Queue() def status_cb(msg: str): eq.put(("status", msg)) def delta_cb(delta: str): eq.put(("delta", delta)) def reader(): capture = _StdoutCapture(eq) old_stdout = sys.stdout sys.stdout = capture try: pname = persona_name if persona_name and persona_name != "없음" else None result = pipeline( query, persona_name=pname, status_callback=status_cb, stream_callback=delta_cb, stream=True, history=history or [], ) eq.put(("result", result)) except Exception as exc: eq.put(("exception", str(exc))) finally: sys.stdout = old_stdout eq.put(("worker_done", None)) Thread(target=reader, daemon=True).start() while True: try: kind, payload = eq.get(timeout=0.1) buf = [(kind, payload)] while True: try: buf.append(eq.get_nowait()) except Empty: break except Empty: buf = [] for kind, payload in buf: if kind == "status": if not first_delta: log_lines.append(("status", payload)) elif kind == "stdout": if not first_delta: if log_lines and log_lines[-1][0] == "stdout": log_lines[-1] = ("stdout", log_lines[-1][1] + payload) else: log_lines.append(("stdout", payload)) elif kind == "delta": if payload: if not first_delta: first_delta = True frozen_log = _make_log_html(log_lines) text_acc += payload elif kind == "result": result = payload try: result_dict = asdict(result) if is_dataclass(result) else dict(result) result_json = json.dumps(result_dict, ensure_ascii=False, indent=2, default=str) except Exception: result_json = "" if not text_acc: llm = getattr(result, "llm_response", "") or "" if llm: first_delta = True frozen_log = _make_log_html(log_lines) text_acc = llm elif kind == "exception": log_lines.append(("error", str(payload))) elif kind == "worker_done": worker_done = True t = timer_text(elapsed()) if first_delta: panel = _wrap_answer(frozen_log, _md_to_html(text_acc), t) else: panel = _wrap_log(_make_log_html(log_lines)) yield (panel, t, result_json) if worker_done: break t = timer_text(elapsed()) if first_delta: panel = _wrap_answer(frozen_log, _md_to_html(text_acc), t) else: panel = _wrap_log(_make_log_html(log_lines)) yield (panel, t, result_json) # ───────────────────────────────────────────────────────────── # CSS (gradio_app.py 원본 그대로) # ───────────────────────────────────────────────────────────── CSS = """ @import url("https://fonts.googleapis.com/css2?family=IBM+Plex+Sans+KR:wght@400;500;600;700&family=JetBrains+Mono:wght@400;500;600&display=swap"); :root { --ws-bg: #f7faf9; --ws-surface: #ffffff; --ws-border: #dbe5e2; --ws-text: #182022; --ws-muted: #5d6b70; --ws-accent: #0f766e; --ws-accent2: #14b8a6; --ws-code-bg: #f1f5f9; --ws-green-bg: #f0fdfa; --ws-green-border: #99f6e4; --panel-height: 680px; --pf-left-h: 130px; --pf-right-h: calc(var(--pf-left-h) * 3 + 12px * 2); } .gradio-container, .gradio-container :is(h1,h2,h3,h4,h5,h6,p,span,div,label,button,input,textarea,select) { font-family: "IBM Plex Sans KR","Noto Sans KR","Source Sans 3",sans-serif !important; letter-spacing: 0.005em; } .gradio-container { background: radial-gradient(circle at top left,#edf9f6 0%,#f8fbfc 35%,#fdfefe 100%) !important; } #ws-header { background: linear-gradient(135deg,#f0fdfa 0%,#e8faf7 55%,#f7faf9 100%); border: 1px solid #b2e8e2; border-radius: 14px; padding: 20px 28px 16px; margin-bottom: 8px; position: relative; overflow: hidden; } #ws-header::before { content:""; position:absolute; top:-70px; right:-70px; width:260px; height:260px; background:radial-gradient(circle,rgba(20,184,166,.13) 0%,transparent 68%); pointer-events:none; } #ws-header h1 { font-size:22px !important; font-weight:700 !important; color:#0b3b39 !important; margin:0 0 4px !important; } #ws-header p { font-size:13px !important; color:var(--ws-muted) !important; margin:0 !important; } #ws-header .ws-badge { display:inline-block; padding:1px 8px; border-radius:20px; font-size:10px; font-weight:700; letter-spacing:.07em; text-transform:uppercase; background:rgba(15,118,110,.1); border:1px solid rgba(15,118,110,.25); color:var(--ws-accent); margin-right:7px; vertical-align:middle; } .tab-nav button { background:transparent !important; color:var(--ws-muted) !important; border:none !important; border-bottom:2px solid transparent !important; font-size:13px !important; font-weight:600 !important; padding:8px 18px !important; border-radius:0 !important; transition:color .18s,border-color .18s !important; } .tab-nav button.selected,.tab-nav button:hover { color:var(--ws-accent) !important; border-bottom-color:var(--ws-accent) !important; background:transparent !important; } #qa-row { align-items: stretch !important; } #input-col { background: var(--ws-surface); border: 1px solid var(--ws-border) !important; border-radius: 14px !important; padding: 18px 20px !important; box-shadow: 0 2px 12px rgba(16,24,40,.04); display: flex; flex-direction: column; gap: 10px; height: auto !important; min-height: var(--panel-height); overflow-y: visible !important; box-sizing: border-box; } .ws-label { font-size: 10px; font-weight: 700; text-transform: uppercase; letter-spacing: .08em; color: var(--ws-muted); margin-bottom: 4px; } .ws-divider { border:none; border-top:1px solid var(--ws-border); margin:10px 0; } #persona-summary { background: linear-gradient(180deg,#f9fefd 0%,#f3fbf9 100%) !important; border: 1px solid #cde8e3 !important; border-radius: 10px !important; padding: 10px 14px !important; font-size: 13px !important; color: var(--ws-text) !important; } #persona-summary > .wrap,#persona-summary > div.prose { padding:0!important;border:none!important;box-shadow:none!important; } #example-grid { display: grid; grid-template-columns: 1fr 1fr 1fr; gap: 5px; } #example-grid button { width: 100% !important; text-align: left !important; background: var(--ws-code-bg) !important; border: 1px solid var(--ws-border) !important; border-radius: 8px !important; color: var(--ws-text) !important; font-size: 11px !important; padding: 7px 10px !important; white-space: normal !important; line-height: 1.4 !important; transition: border-color .18s, color .18s, background .18s !important; min-height: 44px; } #example-grid button:hover { background: var(--ws-green-bg) !important; border-color: var(--ws-accent2) !important; color: var(--ws-accent) !important; } #run-btn { background: linear-gradient(135deg,#0f766e 0%,#14b8a6 100%) !important; border: none !important; color: #fff !important; font-weight: 700 !important; font-size: 13px !important; border-radius: 9px !important; transition: opacity .18s, transform .12s !important; } #run-btn:hover { opacity:.87!important; transform:translateY(-1px)!important; } #clear-btn { background: var(--ws-code-bg) !important; border: 1px solid var(--ws-border) !important; color: var(--ws-muted) !important; border-radius: 9px !important; font-size:12px!important; } #clear-btn:hover { border-color:var(--ws-accent)!important; color:var(--ws-accent)!important; } #refresh-btn { min-width:34px!important; padding:0 8px!important; background:var(--ws-code-bg)!important; border:1px solid var(--ws-border)!important; color:var(--ws-muted)!important; border-radius:8px!important; font-size:15px!important; } #refresh-btn:hover { color:var(--ws-accent)!important; border-color:var(--ws-accent)!important; background:var(--ws-green-bg)!important; } #output-col { border: 1px solid var(--ws-border) !important; border-radius: 14px !important; overflow: hidden !important; background: var(--ws-surface); box-shadow: 0 2px 12px rgba(16,24,40,.04); height: var(--panel-height) !important; display: flex !important; flex-direction: column !important; box-sizing: border-box; min-height: var(--panel-height) !important; height: auto !important; } #output-col > .wrap, #output-col > div { padding: 0 !important; margin: 0 !important; border: none !important; box-shadow: none !important; height: 100% !important; display: flex !important; flex-direction: column !important; overflow: hidden !important; min-height: 0 !important; } #output-panel { display:flex; flex-direction:column; height:100%; overflow:hidden; min-height:0; } .panel-header { display:flex; align-items:center; justify-content:space-between; padding:10px 16px; background:#f7faf9; border-bottom:1px solid var(--ws-border); flex-shrink:0; } .panel-title { font-size:11px; font-weight:700; text-transform:uppercase; letter-spacing:.07em; color:var(--ws-muted); } .panel-timer { font-size:11px; font-weight:600; color:var(--ws-accent); background:var(--ws-green-bg); border:1px solid var(--ws-green-border); padding:2px 10px; border-radius:999px; } .idle-msg { display:flex; align-items:center; justify-content:center; flex:1; color:var(--ws-muted); font-size:13px; } #log-scroll { flex:1; overflow-y:auto; padding:14px 18px; font-size:12px; line-height:1.7; } .log-status { display:flex; align-items:flex-start; gap:7px; padding:3px 0; color:var(--ws-text); } .log-status span { color:var(--ws-text); } .log-stdout pre { margin:3px 0; padding:4px 10px; background:#f1f8f7; border-left:3px solid var(--ws-accent2); border-radius:0 5px 5px 0; font-family:"JetBrains Mono","IBM Plex Mono",monospace !important; font-size:11px !important; color:var(--ws-muted); white-space:pre-wrap; word-break:break-all; } .log-done { color:var(--ws-accent); font-weight:700; padding:4px 0; } .log-error { color:#e53e3e; padding:4px 0; } #answer-scroll-wrap { display:flex; flex-direction:column; height:calc(var(--panel-height) - 42px); overflow:hidden; } .result-log-section { border-bottom:2px solid var(--ws-border); background:#f7faf9; flex-shrink:0; max-height:140px; overflow:hidden; } .result-log-section[open] { overflow-y:auto; } .result-log-header { font-size:10px; font-weight:700; text-transform:uppercase; letter-spacing:.07em; color:var(--ws-muted); padding:7px 18px; cursor:pointer; display:flex; align-items:center; justify-content:space-between; list-style:none; user-select:none; } .result-log-header::-webkit-details-marker { display:none; } .result-log-header::after { content:"▲"; font-size:9px; color:var(--ws-muted); transition:transform 0.2s; } .result-log-section:not([open]) .result-log-header::after { transform:rotate(180deg); } .result-log-section:not([open]) ~ .result-answer-section #answer-scroll { height:calc(var(--panel-height) - 78px) !important; } .log-toggle-hint { font-size:9px; font-weight:400; color:#9bb0ac; margin-left:6px; text-transform:none; letter-spacing:0; } .result-log-body { padding:0 18px 10px; font-size:12px; line-height:1.7; } .result-answer-section { padding:0; flex:1; display:flex; flex-direction:column; min-height:0; overflow:hidden; } .result-answer-header { display:flex; align-items:center; justify-content:space-between; padding:12px 18px 8px; border-bottom:1px solid var(--ws-border); background:linear-gradient(135deg,#f0fdfa 0%,#e8faf7 100%); flex-shrink:0; } .result-answer-badge { font-size:13px; font-weight:700; color:var(--ws-accent); letter-spacing:.01em; } #answer-scroll { padding:18px 22px; overflow-y:auto !important; height:calc(var(--panel-height) - 118px) !important; box-sizing:border-box; } .md-body { color:var(--ws-text); line-height:1.75; font-size:14.5px; } .md-body h1,.md-body h2,.md-body h3,.md-body h4 { color:#0b3b39; margin:.85em 0 .35em; } .md-body h2 { font-size:16px; border-bottom:1px solid var(--ws-border); padding-bottom:5px; } .md-body h3 { font-size:14px; color:var(--ws-accent); } .md-body h4 { font-size:13px; } .md-body p { margin:.4em 0; } .md-body strong { font-weight:700; } .md-body em { font-style:italic; } .md-body a.md-link { display:inline-flex; align-items:center; gap:4px; color:var(--ws-accent); font-size:12.5px; font-weight:500; text-decoration:none; background:var(--ws-green-bg); border:1px solid var(--ws-green-border); border-radius:6px; padding:2px 9px; transition:background .15s,border-color .15s; } .md-body a.md-link:hover { background:#ccfbf1; border-color:var(--ws-accent); } .md-body li.md-link-item { list-style:none; margin:3px 0; } .md-body p.md-link-p { margin:3px 0; } .md-body ul,.md-body ol { margin:.4em 0; padding-left:1.5em; } .md-body ul ul,.md-body ol ol,.md-body ul ol,.md-body ol ul { margin:.2em 0; padding-left:1.4em; } .md-body li { margin:.2em 0; } .md-body li > ul,.md-body li > ol { margin-top:.15em; } .md-body code { background:var(--ws-code-bg); color:#0b3b39; border:1px solid #d9e2ec; border-radius:5px; padding:.1em .35em; font-size:.91em; font-family:"JetBrains Mono","IBM Plex Mono",monospace; } .md-body pre { background:#0f172a; color:#e2e8f0; border-radius:10px; border:1px solid #1e293b; padding:.85em 1em; overflow-x:auto; margin:.7em 0; } .md-body pre code { background:transparent; border:none; color:inherit; padding:0; } .md-body blockquote { margin:.8em 0; padding:.6em .9em; border-left:4px solid var(--ws-accent2); background:var(--ws-green-bg); color:#115e59; border-radius:0 8px 8px 0; } .md-body table { width:100%; border-collapse:collapse; margin:.7em 0; } .md-body th { background:#eef6f4; color:#0f3f3b; font-weight:600; font-size:12px; text-transform:uppercase; letter-spacing:.04em; padding:7px 10px; border:1px solid var(--ws-border); } .md-body td { border:1px solid var(--ws-border); padding:6px 10px; vertical-align:top; } .md-body tr:hover td { background:#f9fefd; } #timer-row { display:none !important; } #result-json-wrap { border-top:2px solid var(--ws-border); background:var(--ws-surface); } #result-json-wrap .accordion-header { padding:10px 16px !important; } #meta-box { max-height:220px; overflow-y:auto; background:var(--ws-surface)!important; border:none!important; } #meta-box code,#meta-box pre { font-family:"JetBrains Mono",monospace!important; font-size:11.5px!important; color:var(--ws-muted)!important; background:transparent!important; } ::-webkit-scrollbar { width:5px; height:5px; } ::-webkit-scrollbar-track { background:transparent; } ::-webkit-scrollbar-thumb { background:var(--ws-border); border-radius:3px; } ::-webkit-scrollbar-thumb:hover { background:#aec5c1; } body:has(.options:not(.hide)) { overflow:hidden!important; } @media (max-width: 768px) { #example-grid { grid-template-columns:1fr 1fr!important; } :root { --panel-height:500px; } #output-col { max-height:none!important; } } """ HEADER_HTML = """

    📈 Wallstreet AI

    Financial analysis assistant combining legendary investor personas with prices, fundamentals, earnings, news, and technical indicators

    """ # ───────────────────────────────────────────────────────────── # Gradio 앱 — Ask a Question 탭만 # ───────────────────────────────────────────────────────────── def create_app(): theme = gr.themes.Soft(primary_hue="emerald", secondary_hue="teal", neutral_hue="slate", radius_size="lg") with gr.Blocks(title="Wallstreet AI", analytics_enabled=False) as demo: gr.HTML(HEADER_HTML) # ════════════════════════════════════════════ # Ask a Question (단일 탭 없이 바로 배치) # ════════════════════════════════════════════ with gr.Row(equal_height=True, elem_id="qa-row"): with gr.Column(scale=1, min_width=280, elem_id="input-col"): # API Key api_key_input = gr.Textbox( label="🔑 OpenAI API Key", placeholder="sk-...", type="password", ) gr.HTML("
    ") # 페르소나 선택 gr.HTML("

    Persona

    ") with gr.Row(): persona_dd = gr.Dropdown( label="", choices=load_persona_names(), value="없음", interactive=True, scale=5, show_label=False, ) refresh_btn = gr.Button("↺", size="sm", scale=1, min_width=34, elem_id="refresh-btn") persona_summary = gr.Markdown(value="", elem_id="persona-summary", visible=True) gr.HTML("
    ") # 예시 질문 gr.HTML("

    Example Questions

    ") with gr.Column(elem_id="example-grid"): example_btns = [] for example_text in EXAMPLES_BY_TYPE.values(): b = gr.Button(example_text, size="sm") example_btns.append((b, example_text)) gr.HTML("
    ") # 질문 입력 gr.HTML("

    Ask Question

    ") query_input = gr.Textbox( label="", placeholder="종목명, 티커, 분석 요청을 입력하세요...", lines=3, value=EXAMPLE_QUERIES[0], show_label=False, ) with gr.Row(): run_btn = gr.Button("🔍 Ask Question", variant="primary", scale=3, elem_id="run-btn") clear_btn = gr.Button("Newchat", scale=1, elem_id="clear-btn") with gr.Column(scale=1, min_width=300, elem_id="output-col"): output_panel = gr.HTML(value=IDLE_PANEL, show_label=False) timer = gr.Markdown(value="", visible=False) # 채팅 히스토리 (내부 state) chat_history = gr.State([]) with gr.Row(elem_id="result-json-wrap"): with gr.Column(): gr.HTML("

    📄 Original Data(JSON)

    ") meta = gr.Code(label="", language="json", elem_id="meta-box", show_label=False) gr.HTML(AUTO_SCROLL_JS, visible=False) # ── 이벤트 핸들러 ────────────────────────────── def on_run(q, persona, api_key, history): for panel, t, rj in stream_analyze(q, persona, api_key, history): yield panel, t, rj def update_history(q, rj, history): new_history = list(history) new_history.append({"role": "user", "content": q}) try: result = json.loads(rj) llm_response = result.get("llm_response", "") if llm_response: new_history.append({"role": "assistant", "content": llm_response}) except (json.JSONDecodeError, AttributeError, TypeError): pass return new_history run_click = run_btn.click( fn=on_run, inputs=[query_input, persona_dd, api_key_input, chat_history], outputs=[output_panel, timer, meta], ) run_then = run_click.then( fn=update_history, inputs=[query_input, meta, chat_history], outputs=[chat_history], ) submit_click = query_input.submit( fn=on_run, inputs=[query_input, persona_dd, api_key_input, chat_history], outputs=[output_panel, timer, meta], ) submit_then = submit_click.then( fn=update_history, inputs=[query_input, meta, chat_history], outputs=[chat_history], ) clear_btn.click( fn=lambda: (IDLE_PANEL, "", "", [], "없음", ""), outputs=[output_panel, timer, meta, chat_history, persona_dd, query_input], cancels=[run_click, run_then, submit_click, submit_then], ) refresh_btn.click( fn=lambda: gr.update(choices=load_persona_names(), value="없음"), outputs=[persona_dd], ) def on_persona_change(name): return gr.update(value=load_persona_summary(name)) persona_dd.change(fn=on_persona_change, inputs=[persona_dd], outputs=[persona_summary]) for _b, _text in example_btns: _b.click(fn=lambda t=_text: t, outputs=[query_input]) return demo, theme # ───────────────────────────────────────────────────────────── # 진입점 # ───────────────────────────────────────────────────────────── def main(): app, theme = create_app() app.queue(default_concurrency_limit=8, max_size=64) app.launch(server_name="0.0.0.0", server_port=7860, debug=True, theme=theme, css=CSS) if __name__ == "__main__": main()