import base64
import html as html_lib
import io
import json
import os
import re
import sys
import time
from dataclasses import asdict, is_dataclass
from pathlib import Path
from queue import Empty, Queue
from threading import Thread
import gradio as gr
from pydantic import BaseModel, ValidationError
# ─────────────────────────────────────────────────────────────
# 설정
# ─────────────────────────────────────────────────────────────
os.environ.setdefault("LLM_MODEL_NAME", "gpt-5-mini")
os.environ.setdefault("LOG_FILE", "analysis_results.jsonl")
os.environ.setdefault("PERSONA_FILE", "persona.jsonl")
PERSONA_FILE = Path(os.environ.get("PERSONA_FILE", "persona.jsonl"))
IMAGE_CACHE_DIR = Path(".persona_images")
IMAGE_CACHE_DIR.mkdir(exist_ok=True)
LOCAL_IMAGE_DIR = Path(os.environ.get("LOCAL_IMAGE_DIR", Path(__file__).parent / "persona_images"))
DEFAULT_IMAGE_FILENAME = "default.png"
EXAMPLES_BY_TYPE = {
"screener": "PER 낮은 대형주 추천해주세요",
"technical": "Apple(AAPL) 차트 분석해주세요",
"fundamental": "Microsoft 재무상태 어때요?",
"news_summary": "Tesla 최근 뉴스 요약해 주세요",
"comparison": "Apple vs Microsoft 비교 분석해 주세요",
"earnings": "2025년 4분기 삼성전자 실적은 어땠나요?",
"swot": "OpenAI 경쟁력 분석해 주세요",
"general": "Tesla(TSLA) 어떻게 보시나요?",
"watchlist": "내 관심종목(삼성전자, SK하이닉스, Apple, Microsoft, Tesla) 현황 봐주세요",
}
EXAMPLE_QUERIES = list(EXAMPLES_BY_TYPE.values())
AUTO_SCROLL_JS = """
"""
# ─────────────────────────────────────────────────────────────
# 헬퍼
# ─────────────────────────────────────────────────────────────
def to_md(text):
return text or ""
def timer_text(elapsed):
return f"⏱ {elapsed}"
def _make_elapsed():
t0 = time.time()
return lambda: f"{time.time()-t0:.1f}초"
_PAREN_MD = re.compile(r'\s*\(\s*\[[^\]]*\]\([^)]*\)\s*\)')
_MD_LINK = re.compile(r'\[([^\]]*)\]\([^)]*\)')
_PAREN_URL= re.compile(r'\s*\(https?://[^\)]*\)')
_BARE_URL = re.compile(r'https?://\S+')
_PAREN_DOM= re.compile(r'\s*\([a-zA-Z0-9._-]+\.[a-zA-Z]{2,6}\)')
def _safe(text):
t = text or ""
t = _PAREN_MD.sub('', t)
t = _MD_LINK.sub(r'\1', t)
t = _PAREN_URL.sub('', t)
t = _BARE_URL.sub('', t)
t = _PAREN_DOM.sub('', t)
t = re.sub(r'[ \t]{2,}', ' ', t).strip()
t = re.sub(r'\.\s*\.', '.', t)
return html_lib.escape(t).replace("\n", "
")
# ─────────────────────────────────────────────────────────────
# 진행 로그 HTML 빌더
# ─────────────────────────────────────────────────────────────
STATUS_ICONS = {
"요청 수신": "📡", "인텐트": "🧠", "도구": "🔧", "시장": "📊",
"뉴스": "📰", "컨텍스트": "🗂", "LLM": "✨", "완료": "✅",
}
def _status_icon(msg):
for k, v in STATUS_ICONS.items():
if k in msg:
return v
return "⏳"
# ─────────────────────────────────────────────────────────────
# 페르소나 모델 & 파일 IO
# ─────────────────────────────────────────────────────────────
class PersonaLine(BaseModel):
name: str
full_name: str
summary: str
financial_mindset: str
data_analysis_approach: str
response_style: str
key_principles: list[str]
famous_quotes: list[str] | None = None
image_path: str | None = None
_persona_cache: list = []
_persona_cache_mtime: float = 0.0
def _parse_personas():
global _persona_cache, _persona_cache_mtime
try:
mtime = PERSONA_FILE.stat().st_mtime if PERSONA_FILE.exists() else 0.0
except OSError:
mtime = 0.0
if mtime == _persona_cache_mtime and _persona_cache:
return _persona_cache
personas = []
if not PERSONA_FILE.exists():
_persona_cache, _persona_cache_mtime = personas, mtime
return personas
try:
with PERSONA_FILE.open("r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
data = json.loads(line)
if not isinstance(data, dict):
continue
if not data.get("full_name"):
data["full_name"] = data.get("name", "")
if "background" in data and "summary" not in data:
data["summary"] = data["background"]
personas.append(PersonaLine(**data))
except (json.JSONDecodeError, TypeError, ValidationError):
continue
except OSError:
pass
_persona_cache, _persona_cache_mtime = personas, mtime
return personas
def load_persona_names():
choices = ["없음"]
for p in _parse_personas():
n = p.name.strip()
if n and n not in choices:
choices.append(n)
return choices
def load_persona_summary(name):
if not name or name == "없음":
return ""
for p in _parse_personas():
if p.name.strip() == name:
summary = (p.summary[:80] + "…") if len(p.summary) > 80 else p.summary
return f"**{p.full_name}**\n\n{summary}"
return ""
# ─────────────────────────────────────────────────────────────
# 출력 패널 빌더
# ─────────────────────────────────────────────────────────────
def _make_log_html(log_lines):
if not log_lines:
return ''
rows = []
for t, text in log_lines:
safe = html_lib.escape(text)
if t == "status":
icon = next((v for k, v in STATUS_ICONS.items() if k in text), "⏳")
rows.append(f'
{icon} {safe}
')
elif t == "stdout":
rows.append(f'')
elif t == "error":
rows.append(f'❌ {safe}
')
elif t == "done":
rows.append('✅ 분석 완료
')
return "\n".join(rows)
def _wrap_log(inner):
return (
''
)
def _wrap_answer(log_html, md_html, timer_str):
log_section = (
''
''
'' + log_html + '
'
' '
) if log_html else ''
answer_section = (
''
)
return (
''
''
'
'
+ log_section
+ answer_section
+ '
'
'
'
)
def _md_to_html(text):
import re as _re
_URL_RE = _re.compile(r'https?://[^\s,,、\))\]】]+')
def _url_badge(url: str) -> str:
clean = url.rstrip('.,;:')
m = _re.match(r'https?://(?:www\.)?([^/\s]+)', clean)
label = m.group(1) if m else clean
esc = html_lib.escape(clean)
return f'🔗 {html_lib.escape(label)}'
def _clean_source_line(s: str) -> str:
s = _re.sub(r'\s*[—–-]{1,2}\s*(https?://)', r' \1', s)
return s
def process_inline(s: str) -> str:
s = _clean_source_line(s)
parts = []
last = 0
for m in _URL_RE.finditer(s):
before = s[last:m.start()]
before = html_lib.escape(before)
before = _re.sub(r'\*\*(.+?)\*\*', r'\1', before)
before = _re.sub(r'(?\1', before)
before = _re.sub(r'`(.+?)`', r'\1', before)
parts.append(before)
parts.append(_url_badge(m.group(0)))
last = m.end()
tail = html_lib.escape(s[last:])
tail = _re.sub(r'\*\*(.+?)\*\*', r'\1', tail)
tail = _re.sub(r'(?\1', tail)
tail = _re.sub(r'`(.+?)`', r'\1', tail)
parts.append(tail)
return ''.join(parts)
BULLET_RE = _re.compile(
r'^([ \t]*)'
r'(\d+[.)]\s+|[-•·○◦▸▹*]\s+)'
r'(.+)$'
)
HEADER_RE = _re.compile(r'^(#{1,4})\s+(.+)$')
def indent_level(spaces: str) -> int:
n = spaces.count('\t') * 4 + spaces.count(' ')
return n // 2
lines = text.split('\n')
out: list[str] = []
list_stack: list[tuple[int, str]] = []
para_lines: list[str] = []
def flush_para():
if para_lines:
out.append('' + process_inline(' '.join(para_lines)) + '
')
para_lines.clear()
def open_list(depth: int, tag: str):
while list_stack and list_stack[-1][0] > depth:
_, t = list_stack.pop()
out.append(f'{t}>')
if list_stack and list_stack[-1][0] == depth:
if list_stack[-1][1] != tag:
_, t = list_stack.pop()
out.append(f'{t}>')
out.append(f'<{tag}>')
list_stack.append((depth, tag))
else:
out.append(f'<{tag}>')
list_stack.append((depth, tag))
def close_all_lists():
while list_stack:
_, t = list_stack.pop()
out.append(f'{t}>')
for line in lines:
stripped = line.strip()
if not stripped:
flush_para()
close_all_lists()
continue
mh = HEADER_RE.match(stripped)
if mh:
flush_para(); close_all_lists()
level = min(len(mh.group(1)), 4)
out.append(f'{process_inline(mh.group(2))}')
continue
if _URL_RE.fullmatch(stripped):
flush_para()
badge = _url_badge(stripped)
if list_stack:
out.append(f'{badge}')
else:
out.append(f'{badge}
')
continue
mb = BULLET_RE.match(line)
if mb:
flush_para()
spaces = mb.group(1)
marker = mb.group(2)
content = mb.group(3).strip()
depth = indent_level(spaces)
is_ordered = bool(_re.match(r'\d+', marker.strip()))
tag = 'ol' if is_ordered else 'ul'
open_list(depth, tag)
if is_ordered:
num = int(_re.match(r'(\d+)', marker.strip()).group(1))
out.append(f'{process_inline(content)}')
else:
out.append(f'{process_inline(content)}')
continue
close_all_lists()
para_lines.append(stripped)
flush_para()
close_all_lists()
return '\n'.join(out)
IDLE_PANEL = (
''
'
🔍 Enter your question on the left and click Ask a Question..
'
'
'
)
# ─────────────────────────────────────────────────────────────
# stdout 캡처 (pipeline 내부 print → Queue)
# ─────────────────────────────────────────────────────────────
class _StdoutCapture(io.TextIOBase):
def __init__(self, q: Queue):
self._q = q
def write(self, s: str):
if s:
self._q.put(("stdout", s))
return len(s)
def flush(self):
pass
# ─────────────────────────────────────────────────────────────
# 스트림 분석 — pipeline() 직접 호출
# ─────────────────────────────────────────────────────────────
def stream_analyze(query, persona_name, api_key, history=None):
query = (query or "").strip()
persona_name = (persona_name or "").strip()
api_key = (api_key or "").strip()
if not query:
yield (_wrap_log('❌ Please enter your question.
'), "", "")
return
if not api_key:
yield (_wrap_log('❌ Please enter your OpenAI API key.
'), "", "")
return
os.environ["LLM_MODEL_API_KEY"] = api_key
try:
from pipeline import pipeline
except Exception as e:
yield (_wrap_log(f'❌ pipeline 로드 실패: {html_lib.escape(str(e))}
'), "", "")
return
text_acc = ""
log_lines = []
frozen_log = ""
result_json = ""
first_delta = False
worker_done = False
elapsed = _make_elapsed()
eq: Queue = Queue()
def status_cb(msg: str):
eq.put(("status", msg))
def delta_cb(delta: str):
eq.put(("delta", delta))
def reader():
capture = _StdoutCapture(eq)
old_stdout = sys.stdout
sys.stdout = capture
try:
pname = persona_name if persona_name and persona_name != "없음" else None
result = pipeline(
query,
persona_name=pname,
status_callback=status_cb,
stream_callback=delta_cb,
stream=True,
history=history or [],
)
eq.put(("result", result))
except Exception as exc:
eq.put(("exception", str(exc)))
finally:
sys.stdout = old_stdout
eq.put(("worker_done", None))
Thread(target=reader, daemon=True).start()
while True:
try:
kind, payload = eq.get(timeout=0.1)
buf = [(kind, payload)]
while True:
try: buf.append(eq.get_nowait())
except Empty: break
except Empty:
buf = []
for kind, payload in buf:
if kind == "status":
if not first_delta:
log_lines.append(("status", payload))
elif kind == "stdout":
if not first_delta:
if log_lines and log_lines[-1][0] == "stdout":
log_lines[-1] = ("stdout", log_lines[-1][1] + payload)
else:
log_lines.append(("stdout", payload))
elif kind == "delta":
if payload:
if not first_delta:
first_delta = True
frozen_log = _make_log_html(log_lines)
text_acc += payload
elif kind == "result":
result = payload
try:
result_dict = asdict(result) if is_dataclass(result) else dict(result)
result_json = json.dumps(result_dict, ensure_ascii=False, indent=2, default=str)
except Exception:
result_json = ""
if not text_acc:
llm = getattr(result, "llm_response", "") or ""
if llm:
first_delta = True
frozen_log = _make_log_html(log_lines)
text_acc = llm
elif kind == "exception":
log_lines.append(("error", str(payload)))
elif kind == "worker_done":
worker_done = True
t = timer_text(elapsed())
if first_delta:
panel = _wrap_answer(frozen_log, _md_to_html(text_acc), t)
else:
panel = _wrap_log(_make_log_html(log_lines))
yield (panel, t, result_json)
if worker_done:
break
t = timer_text(elapsed())
if first_delta:
panel = _wrap_answer(frozen_log, _md_to_html(text_acc), t)
else:
panel = _wrap_log(_make_log_html(log_lines))
yield (panel, t, result_json)
# ─────────────────────────────────────────────────────────────
# CSS (gradio_app.py 원본 그대로)
# ─────────────────────────────────────────────────────────────
CSS = """
@import url("https://fonts.googleapis.com/css2?family=IBM+Plex+Sans+KR:wght@400;500;600;700&family=JetBrains+Mono:wght@400;500;600&display=swap");
:root {
--ws-bg: #f7faf9;
--ws-surface: #ffffff;
--ws-border: #dbe5e2;
--ws-text: #182022;
--ws-muted: #5d6b70;
--ws-accent: #0f766e;
--ws-accent2: #14b8a6;
--ws-code-bg: #f1f5f9;
--ws-green-bg: #f0fdfa;
--ws-green-border: #99f6e4;
--panel-height: 680px;
--pf-left-h: 130px;
--pf-right-h: calc(var(--pf-left-h) * 3 + 12px * 2);
}
.gradio-container,
.gradio-container :is(h1,h2,h3,h4,h5,h6,p,span,div,label,button,input,textarea,select) {
font-family: "IBM Plex Sans KR","Noto Sans KR","Source Sans 3",sans-serif !important;
letter-spacing: 0.005em;
}
.gradio-container {
background: radial-gradient(circle at top left,#edf9f6 0%,#f8fbfc 35%,#fdfefe 100%) !important;
}
#ws-header {
background: linear-gradient(135deg,#f0fdfa 0%,#e8faf7 55%,#f7faf9 100%);
border: 1px solid #b2e8e2;
border-radius: 14px;
padding: 20px 28px 16px;
margin-bottom: 8px;
position: relative; overflow: hidden;
}
#ws-header::before {
content:""; position:absolute; top:-70px; right:-70px;
width:260px; height:260px;
background:radial-gradient(circle,rgba(20,184,166,.13) 0%,transparent 68%);
pointer-events:none;
}
#ws-header h1 { font-size:22px !important; font-weight:700 !important; color:#0b3b39 !important; margin:0 0 4px !important; }
#ws-header p { font-size:13px !important; color:var(--ws-muted) !important; margin:0 !important; }
#ws-header .ws-badge {
display:inline-block; padding:1px 8px; border-radius:20px;
font-size:10px; font-weight:700; letter-spacing:.07em; text-transform:uppercase;
background:rgba(15,118,110,.1); border:1px solid rgba(15,118,110,.25);
color:var(--ws-accent); margin-right:7px; vertical-align:middle;
}
.tab-nav button {
background:transparent !important; color:var(--ws-muted) !important;
border:none !important; border-bottom:2px solid transparent !important;
font-size:13px !important; font-weight:600 !important;
padding:8px 18px !important; border-radius:0 !important;
transition:color .18s,border-color .18s !important;
}
.tab-nav button.selected,.tab-nav button:hover {
color:var(--ws-accent) !important; border-bottom-color:var(--ws-accent) !important;
background:transparent !important;
}
#qa-row { align-items: stretch !important; }
#input-col {
background: var(--ws-surface);
border: 1px solid var(--ws-border) !important;
border-radius: 14px !important;
padding: 18px 20px !important;
box-shadow: 0 2px 12px rgba(16,24,40,.04);
display: flex; flex-direction: column; gap: 10px;
height: auto !important;
min-height: var(--panel-height);
overflow-y: visible !important;
box-sizing: border-box;
}
.ws-label {
font-size: 10px; font-weight: 700; text-transform: uppercase;
letter-spacing: .08em; color: var(--ws-muted); margin-bottom: 4px;
}
.ws-divider { border:none; border-top:1px solid var(--ws-border); margin:10px 0; }
#persona-summary {
background: linear-gradient(180deg,#f9fefd 0%,#f3fbf9 100%) !important;
border: 1px solid #cde8e3 !important; border-radius: 10px !important;
padding: 10px 14px !important; font-size: 13px !important;
color: var(--ws-text) !important;
}
#persona-summary > .wrap,#persona-summary > div.prose { padding:0!important;border:none!important;box-shadow:none!important; }
#example-grid { display: grid; grid-template-columns: 1fr 1fr 1fr; gap: 5px; }
#example-grid button {
width: 100% !important; text-align: left !important;
background: var(--ws-code-bg) !important; border: 1px solid var(--ws-border) !important;
border-radius: 8px !important; color: var(--ws-text) !important;
font-size: 11px !important; padding: 7px 10px !important;
white-space: normal !important; line-height: 1.4 !important;
transition: border-color .18s, color .18s, background .18s !important;
min-height: 44px;
}
#example-grid button:hover {
background: var(--ws-green-bg) !important; border-color: var(--ws-accent2) !important;
color: var(--ws-accent) !important;
}
#run-btn {
background: linear-gradient(135deg,#0f766e 0%,#14b8a6 100%) !important;
border: none !important; color: #fff !important; font-weight: 700 !important;
font-size: 13px !important; border-radius: 9px !important;
transition: opacity .18s, transform .12s !important;
}
#run-btn:hover { opacity:.87!important; transform:translateY(-1px)!important; }
#clear-btn {
background: var(--ws-code-bg) !important; border: 1px solid var(--ws-border) !important;
color: var(--ws-muted) !important; border-radius: 9px !important; font-size:12px!important;
}
#clear-btn:hover { border-color:var(--ws-accent)!important; color:var(--ws-accent)!important; }
#refresh-btn {
min-width:34px!important; padding:0 8px!important;
background:var(--ws-code-bg)!important; border:1px solid var(--ws-border)!important;
color:var(--ws-muted)!important; border-radius:8px!important; font-size:15px!important;
}
#refresh-btn:hover { color:var(--ws-accent)!important; border-color:var(--ws-accent)!important; background:var(--ws-green-bg)!important; }
#output-col {
border: 1px solid var(--ws-border) !important;
border-radius: 14px !important;
overflow: hidden !important;
background: var(--ws-surface);
box-shadow: 0 2px 12px rgba(16,24,40,.04);
height: var(--panel-height) !important;
display: flex !important; flex-direction: column !important;
box-sizing: border-box;
min-height: var(--panel-height) !important;
height: auto !important;
}
#output-col > .wrap, #output-col > div {
padding: 0 !important; margin: 0 !important;
border: none !important; box-shadow: none !important;
height: 100% !important;
display: flex !important; flex-direction: column !important;
overflow: hidden !important; min-height: 0 !important;
}
#output-panel { display:flex; flex-direction:column; height:100%; overflow:hidden; min-height:0; }
.panel-header {
display:flex; align-items:center; justify-content:space-between;
padding:10px 16px; background:#f7faf9;
border-bottom:1px solid var(--ws-border); flex-shrink:0;
}
.panel-title { font-size:11px; font-weight:700; text-transform:uppercase; letter-spacing:.07em; color:var(--ws-muted); }
.panel-timer {
font-size:11px; font-weight:600; color:var(--ws-accent);
background:var(--ws-green-bg); border:1px solid var(--ws-green-border);
padding:2px 10px; border-radius:999px;
}
.idle-msg { display:flex; align-items:center; justify-content:center; flex:1; color:var(--ws-muted); font-size:13px; }
#log-scroll { flex:1; overflow-y:auto; padding:14px 18px; font-size:12px; line-height:1.7; }
.log-status { display:flex; align-items:flex-start; gap:7px; padding:3px 0; color:var(--ws-text); }
.log-status span { color:var(--ws-text); }
.log-stdout pre {
margin:3px 0; padding:4px 10px;
background:#f1f8f7; border-left:3px solid var(--ws-accent2); border-radius:0 5px 5px 0;
font-family:"JetBrains Mono","IBM Plex Mono",monospace !important;
font-size:11px !important; color:var(--ws-muted); white-space:pre-wrap; word-break:break-all;
}
.log-done { color:var(--ws-accent); font-weight:700; padding:4px 0; }
.log-error { color:#e53e3e; padding:4px 0; }
#answer-scroll-wrap { display:flex; flex-direction:column; height:calc(var(--panel-height) - 42px); overflow:hidden; }
.result-log-section { border-bottom:2px solid var(--ws-border); background:#f7faf9; flex-shrink:0; max-height:140px; overflow:hidden; }
.result-log-section[open] { overflow-y:auto; }
.result-log-header {
font-size:10px; font-weight:700; text-transform:uppercase; letter-spacing:.07em; color:var(--ws-muted);
padding:7px 18px; cursor:pointer; display:flex; align-items:center; justify-content:space-between;
list-style:none; user-select:none;
}
.result-log-header::-webkit-details-marker { display:none; }
.result-log-header::after { content:"▲"; font-size:9px; color:var(--ws-muted); transition:transform 0.2s; }
.result-log-section:not([open]) .result-log-header::after { transform:rotate(180deg); }
.result-log-section:not([open]) ~ .result-answer-section #answer-scroll { height:calc(var(--panel-height) - 78px) !important; }
.log-toggle-hint { font-size:9px; font-weight:400; color:#9bb0ac; margin-left:6px; text-transform:none; letter-spacing:0; }
.result-log-body { padding:0 18px 10px; font-size:12px; line-height:1.7; }
.result-answer-section { padding:0; flex:1; display:flex; flex-direction:column; min-height:0; overflow:hidden; }
.result-answer-header {
display:flex; align-items:center; justify-content:space-between;
padding:12px 18px 8px; border-bottom:1px solid var(--ws-border);
background:linear-gradient(135deg,#f0fdfa 0%,#e8faf7 100%); flex-shrink:0;
}
.result-answer-badge { font-size:13px; font-weight:700; color:var(--ws-accent); letter-spacing:.01em; }
#answer-scroll { padding:18px 22px; overflow-y:auto !important; height:calc(var(--panel-height) - 118px) !important; box-sizing:border-box; }
.md-body { color:var(--ws-text); line-height:1.75; font-size:14.5px; }
.md-body h1,.md-body h2,.md-body h3,.md-body h4 { color:#0b3b39; margin:.85em 0 .35em; }
.md-body h2 { font-size:16px; border-bottom:1px solid var(--ws-border); padding-bottom:5px; }
.md-body h3 { font-size:14px; color:var(--ws-accent); }
.md-body h4 { font-size:13px; }
.md-body p { margin:.4em 0; }
.md-body strong { font-weight:700; }
.md-body em { font-style:italic; }
.md-body a.md-link {
display:inline-flex; align-items:center; gap:4px;
color:var(--ws-accent); font-size:12.5px; font-weight:500; text-decoration:none;
background:var(--ws-green-bg); border:1px solid var(--ws-green-border);
border-radius:6px; padding:2px 9px; transition:background .15s,border-color .15s;
}
.md-body a.md-link:hover { background:#ccfbf1; border-color:var(--ws-accent); }
.md-body li.md-link-item { list-style:none; margin:3px 0; }
.md-body p.md-link-p { margin:3px 0; }
.md-body ul,.md-body ol { margin:.4em 0; padding-left:1.5em; }
.md-body ul ul,.md-body ol ol,.md-body ul ol,.md-body ol ul { margin:.2em 0; padding-left:1.4em; }
.md-body li { margin:.2em 0; }
.md-body li > ul,.md-body li > ol { margin-top:.15em; }
.md-body code { background:var(--ws-code-bg); color:#0b3b39; border:1px solid #d9e2ec; border-radius:5px; padding:.1em .35em; font-size:.91em; font-family:"JetBrains Mono","IBM Plex Mono",monospace; }
.md-body pre { background:#0f172a; color:#e2e8f0; border-radius:10px; border:1px solid #1e293b; padding:.85em 1em; overflow-x:auto; margin:.7em 0; }
.md-body pre code { background:transparent; border:none; color:inherit; padding:0; }
.md-body blockquote { margin:.8em 0; padding:.6em .9em; border-left:4px solid var(--ws-accent2); background:var(--ws-green-bg); color:#115e59; border-radius:0 8px 8px 0; }
.md-body table { width:100%; border-collapse:collapse; margin:.7em 0; }
.md-body th { background:#eef6f4; color:#0f3f3b; font-weight:600; font-size:12px; text-transform:uppercase; letter-spacing:.04em; padding:7px 10px; border:1px solid var(--ws-border); }
.md-body td { border:1px solid var(--ws-border); padding:6px 10px; vertical-align:top; }
.md-body tr:hover td { background:#f9fefd; }
#timer-row { display:none !important; }
#result-json-wrap { border-top:2px solid var(--ws-border); background:var(--ws-surface); }
#result-json-wrap .accordion-header { padding:10px 16px !important; }
#meta-box { max-height:220px; overflow-y:auto; background:var(--ws-surface)!important; border:none!important; }
#meta-box code,#meta-box pre { font-family:"JetBrains Mono",monospace!important; font-size:11.5px!important; color:var(--ws-muted)!important; background:transparent!important; }
::-webkit-scrollbar { width:5px; height:5px; }
::-webkit-scrollbar-track { background:transparent; }
::-webkit-scrollbar-thumb { background:var(--ws-border); border-radius:3px; }
::-webkit-scrollbar-thumb:hover { background:#aec5c1; }
body:has(.options:not(.hide)) { overflow:hidden!important; }
@media (max-width: 768px) {
#example-grid { grid-template-columns:1fr 1fr!important; }
:root { --panel-height:500px; }
#output-col { max-height:none!important; }
}
"""
HEADER_HTML = """
"""
# ─────────────────────────────────────────────────────────────
# Gradio 앱 — Ask a Question 탭만
# ─────────────────────────────────────────────────────────────
def create_app():
theme = gr.themes.Soft(primary_hue="emerald", secondary_hue="teal",
neutral_hue="slate", radius_size="lg")
with gr.Blocks(title="Wallstreet AI", analytics_enabled=False) as demo:
gr.HTML(HEADER_HTML)
# ════════════════════════════════════════════
# Ask a Question (단일 탭 없이 바로 배치)
# ════════════════════════════════════════════
with gr.Row(equal_height=True, elem_id="qa-row"):
with gr.Column(scale=1, min_width=280, elem_id="input-col"):
# API Key
api_key_input = gr.Textbox(
label="🔑 OpenAI API Key",
placeholder="sk-...",
type="password",
)
gr.HTML("
")
# 페르소나 선택
gr.HTML("Persona
")
with gr.Row():
persona_dd = gr.Dropdown(
label="", choices=load_persona_names(),
value="없음", interactive=True,
scale=5, show_label=False,
)
refresh_btn = gr.Button("↺", size="sm", scale=1,
min_width=34, elem_id="refresh-btn")
persona_summary = gr.Markdown(value="", elem_id="persona-summary", visible=True)
gr.HTML("
")
# 예시 질문
gr.HTML("Example Questions
")
with gr.Column(elem_id="example-grid"):
example_btns = []
for example_text in EXAMPLES_BY_TYPE.values():
b = gr.Button(example_text, size="sm")
example_btns.append((b, example_text))
gr.HTML("
")
# 질문 입력
gr.HTML("Ask Question
")
query_input = gr.Textbox(
label="",
placeholder="종목명, 티커, 분석 요청을 입력하세요...",
lines=3, value=EXAMPLE_QUERIES[0], show_label=False,
)
with gr.Row():
run_btn = gr.Button("🔍 Ask Question", variant="primary",
scale=3, elem_id="run-btn")
clear_btn = gr.Button("Newchat", scale=1, elem_id="clear-btn")
with gr.Column(scale=1, min_width=300, elem_id="output-col"):
output_panel = gr.HTML(value=IDLE_PANEL, show_label=False)
timer = gr.Markdown(value="", visible=False)
# 채팅 히스토리 (내부 state)
chat_history = gr.State([])
with gr.Row(elem_id="result-json-wrap"):
with gr.Column():
gr.HTML("📄 Original Data(JSON)
")
meta = gr.Code(label="", language="json", elem_id="meta-box", show_label=False)
gr.HTML(AUTO_SCROLL_JS, visible=False)
# ── 이벤트 핸들러 ──────────────────────────────
def on_run(q, persona, api_key, history):
for panel, t, rj in stream_analyze(q, persona, api_key, history):
yield panel, t, rj
def update_history(q, rj, history):
new_history = list(history)
new_history.append({"role": "user", "content": q})
try:
result = json.loads(rj)
llm_response = result.get("llm_response", "")
if llm_response:
new_history.append({"role": "assistant", "content": llm_response})
except (json.JSONDecodeError, AttributeError, TypeError):
pass
return new_history
run_click = run_btn.click(
fn=on_run,
inputs=[query_input, persona_dd, api_key_input, chat_history],
outputs=[output_panel, timer, meta],
)
run_then = run_click.then(
fn=update_history,
inputs=[query_input, meta, chat_history],
outputs=[chat_history],
)
submit_click = query_input.submit(
fn=on_run,
inputs=[query_input, persona_dd, api_key_input, chat_history],
outputs=[output_panel, timer, meta],
)
submit_then = submit_click.then(
fn=update_history,
inputs=[query_input, meta, chat_history],
outputs=[chat_history],
)
clear_btn.click(
fn=lambda: (IDLE_PANEL, "", "", [], "없음", ""),
outputs=[output_panel, timer, meta, chat_history, persona_dd, query_input],
cancels=[run_click, run_then, submit_click, submit_then],
)
refresh_btn.click(
fn=lambda: gr.update(choices=load_persona_names(), value="없음"),
outputs=[persona_dd],
)
def on_persona_change(name):
return gr.update(value=load_persona_summary(name))
persona_dd.change(fn=on_persona_change, inputs=[persona_dd], outputs=[persona_summary])
for _b, _text in example_btns:
_b.click(fn=lambda t=_text: t, outputs=[query_input])
return demo, theme
# ─────────────────────────────────────────────────────────────
# 진입점
# ─────────────────────────────────────────────────────────────
def main():
app, theme = create_app()
app.queue(default_concurrency_limit=8, max_size=64)
app.launch(server_name="0.0.0.0", server_port=7860,
debug=True, theme=theme, css=CSS)
if __name__ == "__main__":
main()