test1 / app.py
june-woo's picture
Update app.py
defa128 verified
import base64
import html as html_lib
import io
import json
import os
import re
import sys
import time
from dataclasses import asdict, is_dataclass
from pathlib import Path
from queue import Empty, Queue
from threading import Thread
import gradio as gr
from pydantic import BaseModel, ValidationError
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# ์„ค์ •
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
os.environ.setdefault("LLM_MODEL_NAME", "gpt-5-mini")
os.environ.setdefault("LOG_FILE", "analysis_results.jsonl")
os.environ.setdefault("PERSONA_FILE", "persona.jsonl")
PERSONA_FILE = Path(os.environ.get("PERSONA_FILE", "persona.jsonl"))
IMAGE_CACHE_DIR = Path(".persona_images")
IMAGE_CACHE_DIR.mkdir(exist_ok=True)
LOCAL_IMAGE_DIR = Path(os.environ.get("LOCAL_IMAGE_DIR", Path(__file__).parent / "persona_images"))
DEFAULT_IMAGE_FILENAME = "default.png"
EXAMPLES_BY_TYPE = {
"screener": "PER ๋‚ฎ์€ ๋Œ€ํ˜•์ฃผ ์ถ”์ฒœํ•ด์ฃผ์„ธ์š”",
"technical": "Apple(AAPL) ์ฐจํŠธ ๋ถ„์„ํ•ด์ฃผ์„ธ์š”",
"fundamental": "Microsoft ์žฌ๋ฌด์ƒํƒœ ์–ด๋•Œ์š”?",
"news_summary": "Tesla ์ตœ๊ทผ ๋‰ด์Šค ์š”์•ฝํ•ด ์ฃผ์„ธ์š”",
"comparison": "Apple vs Microsoft ๋น„๊ต ๋ถ„์„ํ•ด ์ฃผ์„ธ์š”",
"earnings": "2025๋…„ 4๋ถ„๊ธฐ ์‚ผ์„ฑ์ „์ž ์‹ค์ ์€ ์–ด๋• ๋‚˜์š”?",
"swot": "OpenAI ๊ฒฝ์Ÿ๋ ฅ ๋ถ„์„ํ•ด ์ฃผ์„ธ์š”",
"general": "Tesla(TSLA) ์–ด๋–ป๊ฒŒ ๋ณด์‹œ๋‚˜์š”?",
"watchlist": "๋‚ด ๊ด€์‹ฌ์ข…๋ชฉ(์‚ผ์„ฑ์ „์ž, SKํ•˜์ด๋‹‰์Šค, Apple, Microsoft, Tesla) ํ˜„ํ™ฉ ๋ด์ฃผ์„ธ์š”",
}
EXAMPLE_QUERIES = list(EXAMPLES_BY_TYPE.values())
AUTO_SCROLL_JS = """
<script>
(function(){
function scroll(){
var el = document.getElementById('log-scroll') || document.getElementById('answer-scroll');
if(el) el.scrollTop = el.scrollHeight;
}
var mo = new MutationObserver(scroll);
function attach(){
var root = document.getElementById('output-col') || document.body;
mo.observe(root, {childList:true, subtree:true, characterData:true});
scroll();
}
attach();
setInterval(scroll, 400);
})();
</script>
"""
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# ํ—ฌํผ
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def to_md(text):
return text or ""
def timer_text(elapsed):
return f"โฑ {elapsed}"
def _make_elapsed():
t0 = time.time()
return lambda: f"{time.time()-t0:.1f}์ดˆ"
_PAREN_MD = re.compile(r'\s*\(\s*\[[^\]]*\]\([^)]*\)\s*\)')
_MD_LINK = re.compile(r'\[([^\]]*)\]\([^)]*\)')
_PAREN_URL= re.compile(r'\s*\(https?://[^\)]*\)')
_BARE_URL = re.compile(r'https?://\S+')
_PAREN_DOM= re.compile(r'\s*\([a-zA-Z0-9._-]+\.[a-zA-Z]{2,6}\)')
def _safe(text):
t = text or ""
t = _PAREN_MD.sub('', t)
t = _MD_LINK.sub(r'\1', t)
t = _PAREN_URL.sub('', t)
t = _BARE_URL.sub('', t)
t = _PAREN_DOM.sub('', t)
t = re.sub(r'[ \t]{2,}', ' ', t).strip()
t = re.sub(r'\.\s*\.', '.', t)
return html_lib.escape(t).replace("\n", "<br>")
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# ์ง„ํ–‰ ๋กœ๊ทธ HTML ๋นŒ๋”
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
STATUS_ICONS = {
"์š”์ฒญ ์ˆ˜์‹ ": "๐Ÿ“ก", "์ธํ…ํŠธ": "๐Ÿง ", "๋„๊ตฌ": "๐Ÿ”ง", "์‹œ์žฅ": "๐Ÿ“Š",
"๋‰ด์Šค": "๐Ÿ“ฐ", "์ปจํ…์ŠคํŠธ": "๐Ÿ—‚", "LLM": "โœจ", "์™„๋ฃŒ": "โœ…",
}
def _status_icon(msg):
for k, v in STATUS_ICONS.items():
if k in msg:
return v
return "โณ"
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# ํŽ˜๋ฅด์†Œ๋‚˜ ๋ชจ๋ธ & ํŒŒ์ผ IO
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
class PersonaLine(BaseModel):
name: str
full_name: str
summary: str
financial_mindset: str
data_analysis_approach: str
response_style: str
key_principles: list[str]
famous_quotes: list[str] | None = None
image_path: str | None = None
_persona_cache: list = []
_persona_cache_mtime: float = 0.0
def _parse_personas():
global _persona_cache, _persona_cache_mtime
try:
mtime = PERSONA_FILE.stat().st_mtime if PERSONA_FILE.exists() else 0.0
except OSError:
mtime = 0.0
if mtime == _persona_cache_mtime and _persona_cache:
return _persona_cache
personas = []
if not PERSONA_FILE.exists():
_persona_cache, _persona_cache_mtime = personas, mtime
return personas
try:
with PERSONA_FILE.open("r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
data = json.loads(line)
if not isinstance(data, dict):
continue
if not data.get("full_name"):
data["full_name"] = data.get("name", "")
if "background" in data and "summary" not in data:
data["summary"] = data["background"]
personas.append(PersonaLine(**data))
except (json.JSONDecodeError, TypeError, ValidationError):
continue
except OSError:
pass
_persona_cache, _persona_cache_mtime = personas, mtime
return personas
def load_persona_names():
choices = ["์—†์Œ"]
for p in _parse_personas():
n = p.name.strip()
if n and n not in choices:
choices.append(n)
return choices
def load_persona_summary(name):
if not name or name == "์—†์Œ":
return ""
for p in _parse_personas():
if p.name.strip() == name:
summary = (p.summary[:80] + "โ€ฆ") if len(p.summary) > 80 else p.summary
return f"**{p.full_name}**\n\n{summary}"
return ""
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# ์ถœ๋ ฅ ํŒจ๋„ ๋นŒ๋”
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def _make_log_html(log_lines):
if not log_lines:
return ''
rows = []
for t, text in log_lines:
safe = html_lib.escape(text)
if t == "status":
icon = next((v for k, v in STATUS_ICONS.items() if k in text), "โณ")
rows.append(f'<div class="log-status">{icon} <span>{safe}</span></div>')
elif t == "stdout":
rows.append(f'<div class="log-stdout"><pre>{safe}</pre></div>')
elif t == "error":
rows.append(f'<div class="log-error">โŒ {safe}</div>')
elif t == "done":
rows.append('<div class="log-done">โœ… ๋ถ„์„ ์™„๋ฃŒ</div>')
return "\n".join(rows)
def _wrap_log(inner):
return (
'<div id="output-panel" class="phase-log">'
'<div class="panel-header"><span class="panel-title">Progress</span></div>'
'<div id="log-scroll">' + inner + '</div>'
'</div>'
)
def _wrap_answer(log_html, md_html, timer_str):
log_section = (
'<details class="result-log-section">'
'<summary class="result-log-header">Progress <span class="log-toggle-hint">Click to show/hide</span></summary>'
'<div class="result-log-body">' + log_html + '</div>'
'</details>'
) if log_html else ''
answer_section = (
'<div class="result-answer-section">'
'<div class="result-answer-header">'
'<span class="result-answer-badge">&#128203; Final results</span>'
f'<span class="panel-timer">{html_lib.escape(timer_str)}</span>'
'</div>'
'<div id="answer-scroll" class="md-body">' + md_html + '</div>'
'</div>'
)
return (
'<div id="output-panel" class="phase-answer">'
'<div class="panel-header">'
'<span class="panel-title">Analysis results</span>'
f'<span class="panel-timer">{html_lib.escape(timer_str)}</span>'
'</div>'
'<div id="answer-scroll-wrap">'
+ log_section
+ answer_section
+ '</div>'
'</div>'
)
def _md_to_html(text):
import re as _re
_URL_RE = _re.compile(r'https?://[^\s,๏ผŒใ€\)๏ผ‰\]ใ€‘]+')
def _url_badge(url: str) -> str:
clean = url.rstrip('.,;:')
m = _re.match(r'https?://(?:www\.)?([^/\s]+)', clean)
label = m.group(1) if m else clean
esc = html_lib.escape(clean)
return f'<a href="{esc}" target="_blank" class="md-link">๐Ÿ”— {html_lib.escape(label)}</a>'
def _clean_source_line(s: str) -> str:
s = _re.sub(r'\s*[โ€”โ€“-]{1,2}\s*(https?://)', r' \1', s)
return s
def process_inline(s: str) -> str:
s = _clean_source_line(s)
parts = []
last = 0
for m in _URL_RE.finditer(s):
before = s[last:m.start()]
before = html_lib.escape(before)
before = _re.sub(r'\*\*(.+?)\*\*', r'<strong>\1</strong>', before)
before = _re.sub(r'(?<!\*)\*(?!\*)(.+?)(?<!\*)\*(?!\*)', r'<em>\1</em>', before)
before = _re.sub(r'`(.+?)`', r'<code>\1</code>', before)
parts.append(before)
parts.append(_url_badge(m.group(0)))
last = m.end()
tail = html_lib.escape(s[last:])
tail = _re.sub(r'\*\*(.+?)\*\*', r'<strong>\1</strong>', tail)
tail = _re.sub(r'(?<!\*)\*(?!\*)(.+?)(?<!\*)\*(?!\*)', r'<em>\1</em>', tail)
tail = _re.sub(r'`(.+?)`', r'<code>\1</code>', tail)
parts.append(tail)
return ''.join(parts)
BULLET_RE = _re.compile(
r'^([ \t]*)'
r'(\d+[.)]\s+|[-โ€ขยทโ—‹โ—ฆโ–ธโ–น*]\s+)'
r'(.+)$'
)
HEADER_RE = _re.compile(r'^(#{1,4})\s+(.+)$')
def indent_level(spaces: str) -> int:
n = spaces.count('\t') * 4 + spaces.count(' ')
return n // 2
lines = text.split('\n')
out: list[str] = []
list_stack: list[tuple[int, str]] = []
para_lines: list[str] = []
def flush_para():
if para_lines:
out.append('<p>' + process_inline(' '.join(para_lines)) + '</p>')
para_lines.clear()
def open_list(depth: int, tag: str):
while list_stack and list_stack[-1][0] > depth:
_, t = list_stack.pop()
out.append(f'</{t}>')
if list_stack and list_stack[-1][0] == depth:
if list_stack[-1][1] != tag:
_, t = list_stack.pop()
out.append(f'</{t}>')
out.append(f'<{tag}>')
list_stack.append((depth, tag))
else:
out.append(f'<{tag}>')
list_stack.append((depth, tag))
def close_all_lists():
while list_stack:
_, t = list_stack.pop()
out.append(f'</{t}>')
for line in lines:
stripped = line.strip()
if not stripped:
flush_para()
close_all_lists()
continue
mh = HEADER_RE.match(stripped)
if mh:
flush_para(); close_all_lists()
level = min(len(mh.group(1)), 4)
out.append(f'<h{level}>{process_inline(mh.group(2))}</h{level}>')
continue
if _URL_RE.fullmatch(stripped):
flush_para()
badge = _url_badge(stripped)
if list_stack:
out.append(f'<li class="md-link-item">{badge}</li>')
else:
out.append(f'<p class="md-link-p">{badge}</p>')
continue
mb = BULLET_RE.match(line)
if mb:
flush_para()
spaces = mb.group(1)
marker = mb.group(2)
content = mb.group(3).strip()
depth = indent_level(spaces)
is_ordered = bool(_re.match(r'\d+', marker.strip()))
tag = 'ol' if is_ordered else 'ul'
open_list(depth, tag)
if is_ordered:
num = int(_re.match(r'(\d+)', marker.strip()).group(1))
out.append(f'<li value="{num}">{process_inline(content)}</li>')
else:
out.append(f'<li>{process_inline(content)}</li>')
continue
close_all_lists()
para_lines.append(stripped)
flush_para()
close_all_lists()
return '\n'.join(out)
IDLE_PANEL = (
'<div id="output-panel" class="phase-idle">'
'<div class="idle-msg">๐Ÿ” Enter your question on the left and click Ask a Question..</div>'
'</div>'
)
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# stdout ์บก์ฒ˜ (pipeline ๋‚ด๋ถ€ print โ†’ Queue)
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
class _StdoutCapture(io.TextIOBase):
def __init__(self, q: Queue):
self._q = q
def write(self, s: str):
if s:
self._q.put(("stdout", s))
return len(s)
def flush(self):
pass
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# ์ŠคํŠธ๋ฆผ ๋ถ„์„ โ€” pipeline() ์ง์ ‘ ํ˜ธ์ถœ
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def stream_analyze(query, persona_name, api_key, history=None):
query = (query or "").strip()
persona_name = (persona_name or "").strip()
api_key = (api_key or "").strip()
if not query:
yield (_wrap_log('<div class="log-error">โŒ Please enter your question.</div>'), "", "")
return
if not api_key:
yield (_wrap_log('<div class="log-error">โŒ Please enter your OpenAI API key.</div>'), "", "")
return
os.environ["LLM_MODEL_API_KEY"] = api_key
try:
from pipeline import pipeline
except Exception as e:
yield (_wrap_log(f'<div class="log-error">โŒ pipeline ๋กœ๋“œ ์‹คํŒจ: {html_lib.escape(str(e))}</div>'), "", "")
return
text_acc = ""
log_lines = []
frozen_log = ""
result_json = ""
first_delta = False
worker_done = False
elapsed = _make_elapsed()
eq: Queue = Queue()
def status_cb(msg: str):
eq.put(("status", msg))
def delta_cb(delta: str):
eq.put(("delta", delta))
def reader():
capture = _StdoutCapture(eq)
old_stdout = sys.stdout
sys.stdout = capture
try:
pname = persona_name if persona_name and persona_name != "์—†์Œ" else None
result = pipeline(
query,
persona_name=pname,
status_callback=status_cb,
stream_callback=delta_cb,
stream=True,
history=history or [],
)
eq.put(("result", result))
except Exception as exc:
eq.put(("exception", str(exc)))
finally:
sys.stdout = old_stdout
eq.put(("worker_done", None))
Thread(target=reader, daemon=True).start()
while True:
try:
kind, payload = eq.get(timeout=0.1)
buf = [(kind, payload)]
while True:
try: buf.append(eq.get_nowait())
except Empty: break
except Empty:
buf = []
for kind, payload in buf:
if kind == "status":
if not first_delta:
log_lines.append(("status", payload))
elif kind == "stdout":
if not first_delta:
if log_lines and log_lines[-1][0] == "stdout":
log_lines[-1] = ("stdout", log_lines[-1][1] + payload)
else:
log_lines.append(("stdout", payload))
elif kind == "delta":
if payload:
if not first_delta:
first_delta = True
frozen_log = _make_log_html(log_lines)
text_acc += payload
elif kind == "result":
result = payload
try:
result_dict = asdict(result) if is_dataclass(result) else dict(result)
result_json = json.dumps(result_dict, ensure_ascii=False, indent=2, default=str)
except Exception:
result_json = ""
if not text_acc:
llm = getattr(result, "llm_response", "") or ""
if llm:
first_delta = True
frozen_log = _make_log_html(log_lines)
text_acc = llm
elif kind == "exception":
log_lines.append(("error", str(payload)))
elif kind == "worker_done":
worker_done = True
t = timer_text(elapsed())
if first_delta:
panel = _wrap_answer(frozen_log, _md_to_html(text_acc), t)
else:
panel = _wrap_log(_make_log_html(log_lines))
yield (panel, t, result_json)
if worker_done:
break
t = timer_text(elapsed())
if first_delta:
panel = _wrap_answer(frozen_log, _md_to_html(text_acc), t)
else:
panel = _wrap_log(_make_log_html(log_lines))
yield (panel, t, result_json)
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# CSS (gradio_app.py ์›๋ณธ ๊ทธ๋Œ€๋กœ)
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
CSS = """
@import url("https://fonts.googleapis.com/css2?family=IBM+Plex+Sans+KR:wght@400;500;600;700&family=JetBrains+Mono:wght@400;500;600&display=swap");
:root {
--ws-bg: #f7faf9;
--ws-surface: #ffffff;
--ws-border: #dbe5e2;
--ws-text: #182022;
--ws-muted: #5d6b70;
--ws-accent: #0f766e;
--ws-accent2: #14b8a6;
--ws-code-bg: #f1f5f9;
--ws-green-bg: #f0fdfa;
--ws-green-border: #99f6e4;
--panel-height: 680px;
--pf-left-h: 130px;
--pf-right-h: calc(var(--pf-left-h) * 3 + 12px * 2);
}
.gradio-container,
.gradio-container :is(h1,h2,h3,h4,h5,h6,p,span,div,label,button,input,textarea,select) {
font-family: "IBM Plex Sans KR","Noto Sans KR","Source Sans 3",sans-serif !important;
letter-spacing: 0.005em;
}
.gradio-container {
background: radial-gradient(circle at top left,#edf9f6 0%,#f8fbfc 35%,#fdfefe 100%) !important;
}
#ws-header {
background: linear-gradient(135deg,#f0fdfa 0%,#e8faf7 55%,#f7faf9 100%);
border: 1px solid #b2e8e2;
border-radius: 14px;
padding: 20px 28px 16px;
margin-bottom: 8px;
position: relative; overflow: hidden;
}
#ws-header::before {
content:""; position:absolute; top:-70px; right:-70px;
width:260px; height:260px;
background:radial-gradient(circle,rgba(20,184,166,.13) 0%,transparent 68%);
pointer-events:none;
}
#ws-header h1 { font-size:22px !important; font-weight:700 !important; color:#0b3b39 !important; margin:0 0 4px !important; }
#ws-header p { font-size:13px !important; color:var(--ws-muted) !important; margin:0 !important; }
#ws-header .ws-badge {
display:inline-block; padding:1px 8px; border-radius:20px;
font-size:10px; font-weight:700; letter-spacing:.07em; text-transform:uppercase;
background:rgba(15,118,110,.1); border:1px solid rgba(15,118,110,.25);
color:var(--ws-accent); margin-right:7px; vertical-align:middle;
}
.tab-nav button {
background:transparent !important; color:var(--ws-muted) !important;
border:none !important; border-bottom:2px solid transparent !important;
font-size:13px !important; font-weight:600 !important;
padding:8px 18px !important; border-radius:0 !important;
transition:color .18s,border-color .18s !important;
}
.tab-nav button.selected,.tab-nav button:hover {
color:var(--ws-accent) !important; border-bottom-color:var(--ws-accent) !important;
background:transparent !important;
}
#qa-row { align-items: stretch !important; }
#input-col {
background: var(--ws-surface);
border: 1px solid var(--ws-border) !important;
border-radius: 14px !important;
padding: 18px 20px !important;
box-shadow: 0 2px 12px rgba(16,24,40,.04);
display: flex; flex-direction: column; gap: 10px;
height: auto !important;
min-height: var(--panel-height);
overflow-y: visible !important;
box-sizing: border-box;
}
.ws-label {
font-size: 10px; font-weight: 700; text-transform: uppercase;
letter-spacing: .08em; color: var(--ws-muted); margin-bottom: 4px;
}
.ws-divider { border:none; border-top:1px solid var(--ws-border); margin:10px 0; }
#persona-summary {
background: linear-gradient(180deg,#f9fefd 0%,#f3fbf9 100%) !important;
border: 1px solid #cde8e3 !important; border-radius: 10px !important;
padding: 10px 14px !important; font-size: 13px !important;
color: var(--ws-text) !important;
}
#persona-summary > .wrap,#persona-summary > div.prose { padding:0!important;border:none!important;box-shadow:none!important; }
#example-grid { display: grid; grid-template-columns: 1fr 1fr 1fr; gap: 5px; }
#example-grid button {
width: 100% !important; text-align: left !important;
background: var(--ws-code-bg) !important; border: 1px solid var(--ws-border) !important;
border-radius: 8px !important; color: var(--ws-text) !important;
font-size: 11px !important; padding: 7px 10px !important;
white-space: normal !important; line-height: 1.4 !important;
transition: border-color .18s, color .18s, background .18s !important;
min-height: 44px;
}
#example-grid button:hover {
background: var(--ws-green-bg) !important; border-color: var(--ws-accent2) !important;
color: var(--ws-accent) !important;
}
#run-btn {
background: linear-gradient(135deg,#0f766e 0%,#14b8a6 100%) !important;
border: none !important; color: #fff !important; font-weight: 700 !important;
font-size: 13px !important; border-radius: 9px !important;
transition: opacity .18s, transform .12s !important;
}
#run-btn:hover { opacity:.87!important; transform:translateY(-1px)!important; }
#clear-btn {
background: var(--ws-code-bg) !important; border: 1px solid var(--ws-border) !important;
color: var(--ws-muted) !important; border-radius: 9px !important; font-size:12px!important;
}
#clear-btn:hover { border-color:var(--ws-accent)!important; color:var(--ws-accent)!important; }
#refresh-btn {
min-width:34px!important; padding:0 8px!important;
background:var(--ws-code-bg)!important; border:1px solid var(--ws-border)!important;
color:var(--ws-muted)!important; border-radius:8px!important; font-size:15px!important;
}
#refresh-btn:hover { color:var(--ws-accent)!important; border-color:var(--ws-accent)!important; background:var(--ws-green-bg)!important; }
#output-col {
border: 1px solid var(--ws-border) !important;
border-radius: 14px !important;
overflow: hidden !important;
background: var(--ws-surface);
box-shadow: 0 2px 12px rgba(16,24,40,.04);
height: var(--panel-height) !important;
display: flex !important; flex-direction: column !important;
box-sizing: border-box;
min-height: var(--panel-height) !important;
height: auto !important;
}
#output-col > .wrap, #output-col > div {
padding: 0 !important; margin: 0 !important;
border: none !important; box-shadow: none !important;
height: 100% !important;
display: flex !important; flex-direction: column !important;
overflow: hidden !important; min-height: 0 !important;
}
#output-panel { display:flex; flex-direction:column; height:100%; overflow:hidden; min-height:0; }
.panel-header {
display:flex; align-items:center; justify-content:space-between;
padding:10px 16px; background:#f7faf9;
border-bottom:1px solid var(--ws-border); flex-shrink:0;
}
.panel-title { font-size:11px; font-weight:700; text-transform:uppercase; letter-spacing:.07em; color:var(--ws-muted); }
.panel-timer {
font-size:11px; font-weight:600; color:var(--ws-accent);
background:var(--ws-green-bg); border:1px solid var(--ws-green-border);
padding:2px 10px; border-radius:999px;
}
.idle-msg { display:flex; align-items:center; justify-content:center; flex:1; color:var(--ws-muted); font-size:13px; }
#log-scroll { flex:1; overflow-y:auto; padding:14px 18px; font-size:12px; line-height:1.7; }
.log-status { display:flex; align-items:flex-start; gap:7px; padding:3px 0; color:var(--ws-text); }
.log-status span { color:var(--ws-text); }
.log-stdout pre {
margin:3px 0; padding:4px 10px;
background:#f1f8f7; border-left:3px solid var(--ws-accent2); border-radius:0 5px 5px 0;
font-family:"JetBrains Mono","IBM Plex Mono",monospace !important;
font-size:11px !important; color:var(--ws-muted); white-space:pre-wrap; word-break:break-all;
}
.log-done { color:var(--ws-accent); font-weight:700; padding:4px 0; }
.log-error { color:#e53e3e; padding:4px 0; }
#answer-scroll-wrap { display:flex; flex-direction:column; height:calc(var(--panel-height) - 42px); overflow:hidden; }
.result-log-section { border-bottom:2px solid var(--ws-border); background:#f7faf9; flex-shrink:0; max-height:140px; overflow:hidden; }
.result-log-section[open] { overflow-y:auto; }
.result-log-header {
font-size:10px; font-weight:700; text-transform:uppercase; letter-spacing:.07em; color:var(--ws-muted);
padding:7px 18px; cursor:pointer; display:flex; align-items:center; justify-content:space-between;
list-style:none; user-select:none;
}
.result-log-header::-webkit-details-marker { display:none; }
.result-log-header::after { content:"โ–ฒ"; font-size:9px; color:var(--ws-muted); transition:transform 0.2s; }
.result-log-section:not([open]) .result-log-header::after { transform:rotate(180deg); }
.result-log-section:not([open]) ~ .result-answer-section #answer-scroll { height:calc(var(--panel-height) - 78px) !important; }
.log-toggle-hint { font-size:9px; font-weight:400; color:#9bb0ac; margin-left:6px; text-transform:none; letter-spacing:0; }
.result-log-body { padding:0 18px 10px; font-size:12px; line-height:1.7; }
.result-answer-section { padding:0; flex:1; display:flex; flex-direction:column; min-height:0; overflow:hidden; }
.result-answer-header {
display:flex; align-items:center; justify-content:space-between;
padding:12px 18px 8px; border-bottom:1px solid var(--ws-border);
background:linear-gradient(135deg,#f0fdfa 0%,#e8faf7 100%); flex-shrink:0;
}
.result-answer-badge { font-size:13px; font-weight:700; color:var(--ws-accent); letter-spacing:.01em; }
#answer-scroll { padding:18px 22px; overflow-y:auto !important; height:calc(var(--panel-height) - 118px) !important; box-sizing:border-box; }
.md-body { color:var(--ws-text); line-height:1.75; font-size:14.5px; }
.md-body h1,.md-body h2,.md-body h3,.md-body h4 { color:#0b3b39; margin:.85em 0 .35em; }
.md-body h2 { font-size:16px; border-bottom:1px solid var(--ws-border); padding-bottom:5px; }
.md-body h3 { font-size:14px; color:var(--ws-accent); }
.md-body h4 { font-size:13px; }
.md-body p { margin:.4em 0; }
.md-body strong { font-weight:700; }
.md-body em { font-style:italic; }
.md-body a.md-link {
display:inline-flex; align-items:center; gap:4px;
color:var(--ws-accent); font-size:12.5px; font-weight:500; text-decoration:none;
background:var(--ws-green-bg); border:1px solid var(--ws-green-border);
border-radius:6px; padding:2px 9px; transition:background .15s,border-color .15s;
}
.md-body a.md-link:hover { background:#ccfbf1; border-color:var(--ws-accent); }
.md-body li.md-link-item { list-style:none; margin:3px 0; }
.md-body p.md-link-p { margin:3px 0; }
.md-body ul,.md-body ol { margin:.4em 0; padding-left:1.5em; }
.md-body ul ul,.md-body ol ol,.md-body ul ol,.md-body ol ul { margin:.2em 0; padding-left:1.4em; }
.md-body li { margin:.2em 0; }
.md-body li > ul,.md-body li > ol { margin-top:.15em; }
.md-body code { background:var(--ws-code-bg); color:#0b3b39; border:1px solid #d9e2ec; border-radius:5px; padding:.1em .35em; font-size:.91em; font-family:"JetBrains Mono","IBM Plex Mono",monospace; }
.md-body pre { background:#0f172a; color:#e2e8f0; border-radius:10px; border:1px solid #1e293b; padding:.85em 1em; overflow-x:auto; margin:.7em 0; }
.md-body pre code { background:transparent; border:none; color:inherit; padding:0; }
.md-body blockquote { margin:.8em 0; padding:.6em .9em; border-left:4px solid var(--ws-accent2); background:var(--ws-green-bg); color:#115e59; border-radius:0 8px 8px 0; }
.md-body table { width:100%; border-collapse:collapse; margin:.7em 0; }
.md-body th { background:#eef6f4; color:#0f3f3b; font-weight:600; font-size:12px; text-transform:uppercase; letter-spacing:.04em; padding:7px 10px; border:1px solid var(--ws-border); }
.md-body td { border:1px solid var(--ws-border); padding:6px 10px; vertical-align:top; }
.md-body tr:hover td { background:#f9fefd; }
#timer-row { display:none !important; }
#result-json-wrap { border-top:2px solid var(--ws-border); background:var(--ws-surface); }
#result-json-wrap .accordion-header { padding:10px 16px !important; }
#meta-box { max-height:220px; overflow-y:auto; background:var(--ws-surface)!important; border:none!important; }
#meta-box code,#meta-box pre { font-family:"JetBrains Mono",monospace!important; font-size:11.5px!important; color:var(--ws-muted)!important; background:transparent!important; }
::-webkit-scrollbar { width:5px; height:5px; }
::-webkit-scrollbar-track { background:transparent; }
::-webkit-scrollbar-thumb { background:var(--ws-border); border-radius:3px; }
::-webkit-scrollbar-thumb:hover { background:#aec5c1; }
body:has(.options:not(.hide)) { overflow:hidden!important; }
@media (max-width: 768px) {
#example-grid { grid-template-columns:1fr 1fr!important; }
:root { --panel-height:500px; }
#output-col { max-height:none!important; }
}
"""
HEADER_HTML = """
<div id="ws-header">
<h1>๐Ÿ“ˆ Wallstreet AI</h1>
<p>
Financial analysis assistant combining legendary investor personas with
prices, fundamentals, earnings, news, and technical indicators
</p>
</div>
"""
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# Gradio ์•ฑ โ€” Ask a Question ํƒญ๋งŒ
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def create_app():
theme = gr.themes.Soft(primary_hue="emerald", secondary_hue="teal",
neutral_hue="slate", radius_size="lg")
with gr.Blocks(title="Wallstreet AI", analytics_enabled=False) as demo:
gr.HTML(HEADER_HTML)
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
# Ask a Question (๋‹จ์ผ ํƒญ ์—†์ด ๋ฐ”๋กœ ๋ฐฐ์น˜)
# โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
with gr.Row(equal_height=True, elem_id="qa-row"):
with gr.Column(scale=1, min_width=280, elem_id="input-col"):
# API Key
api_key_input = gr.Textbox(
label="๐Ÿ”‘ OpenAI API Key",
placeholder="sk-...",
type="password",
)
gr.HTML("<hr class='ws-divider'>")
# ํŽ˜๋ฅด์†Œ๋‚˜ ์„ ํƒ
gr.HTML("<p class='ws-label'>Persona</p>")
with gr.Row():
persona_dd = gr.Dropdown(
label="", choices=load_persona_names(),
value="์—†์Œ", interactive=True,
scale=5, show_label=False,
)
refresh_btn = gr.Button("โ†บ", size="sm", scale=1,
min_width=34, elem_id="refresh-btn")
persona_summary = gr.Markdown(value="", elem_id="persona-summary", visible=True)
gr.HTML("<hr class='ws-divider'>")
# ์˜ˆ์‹œ ์งˆ๋ฌธ
gr.HTML("<p class='ws-label'>Example Questions</p>")
with gr.Column(elem_id="example-grid"):
example_btns = []
for example_text in EXAMPLES_BY_TYPE.values():
b = gr.Button(example_text, size="sm")
example_btns.append((b, example_text))
gr.HTML("<hr class='ws-divider'>")
# ์งˆ๋ฌธ ์ž…๋ ฅ
gr.HTML("<p class='ws-label'>Ask Question</p>")
query_input = gr.Textbox(
label="",
placeholder="์ข…๋ชฉ๋ช…, ํ‹ฐ์ปค, ๋ถ„์„ ์š”์ฒญ์„ ์ž…๋ ฅํ•˜์„ธ์š”...",
lines=3, value=EXAMPLE_QUERIES[0], show_label=False,
)
with gr.Row():
run_btn = gr.Button("๐Ÿ” Ask Question", variant="primary",
scale=3, elem_id="run-btn")
clear_btn = gr.Button("Newchat", scale=1, elem_id="clear-btn")
with gr.Column(scale=1, min_width=300, elem_id="output-col"):
output_panel = gr.HTML(value=IDLE_PANEL, show_label=False)
timer = gr.Markdown(value="", visible=False)
# ์ฑ„ํŒ… ํžˆ์Šคํ† ๋ฆฌ (๋‚ด๋ถ€ state)
chat_history = gr.State([])
with gr.Row(elem_id="result-json-wrap"):
with gr.Column():
gr.HTML("<p class='ws-label'>๐Ÿ“„ Original Data(JSON)</p>")
meta = gr.Code(label="", language="json", elem_id="meta-box", show_label=False)
gr.HTML(AUTO_SCROLL_JS, visible=False)
# โ”€โ”€ ์ด๋ฒคํŠธ ํ•ธ๋“ค๋Ÿฌ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def on_run(q, persona, api_key, history):
for panel, t, rj in stream_analyze(q, persona, api_key, history):
yield panel, t, rj
def update_history(q, rj, history):
new_history = list(history)
new_history.append({"role": "user", "content": q})
try:
result = json.loads(rj)
llm_response = result.get("llm_response", "")
if llm_response:
new_history.append({"role": "assistant", "content": llm_response})
except (json.JSONDecodeError, AttributeError, TypeError):
pass
return new_history
run_click = run_btn.click(
fn=on_run,
inputs=[query_input, persona_dd, api_key_input, chat_history],
outputs=[output_panel, timer, meta],
)
run_then = run_click.then(
fn=update_history,
inputs=[query_input, meta, chat_history],
outputs=[chat_history],
)
submit_click = query_input.submit(
fn=on_run,
inputs=[query_input, persona_dd, api_key_input, chat_history],
outputs=[output_panel, timer, meta],
)
submit_then = submit_click.then(
fn=update_history,
inputs=[query_input, meta, chat_history],
outputs=[chat_history],
)
clear_btn.click(
fn=lambda: (IDLE_PANEL, "", "", [], "์—†์Œ", ""),
outputs=[output_panel, timer, meta, chat_history, persona_dd, query_input],
cancels=[run_click, run_then, submit_click, submit_then],
)
refresh_btn.click(
fn=lambda: gr.update(choices=load_persona_names(), value="์—†์Œ"),
outputs=[persona_dd],
)
def on_persona_change(name):
return gr.update(value=load_persona_summary(name))
persona_dd.change(fn=on_persona_change, inputs=[persona_dd], outputs=[persona_summary])
for _b, _text in example_btns:
_b.click(fn=lambda t=_text: t, outputs=[query_input])
return demo, theme
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# ์ง„์ž…์ 
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def main():
app, theme = create_app()
app.queue(default_concurrency_limit=8, max_size=64)
app.launch(server_name="0.0.0.0", server_port=7860,
debug=True, theme=theme, css=CSS)
if __name__ == "__main__":
main()