ASD / app_web.py
Nx-Neuralon's picture
Upload app_web.py
43234af verified
from __future__ import annotations
import os
import queue
import shutil
import threading
import time
from typing import Any
import gradio as gr
from app.config import Settings
from app.pipeline import AnalysisPipeline
APP_TITLE = "ASD Multi-Agent Analyzer: 群智 · 独孤九鉴"
APP_SUBTITLE = (
"全场景、多模态、医学级孤独症(ASD)辅助筛查与评估系统 | \n"
"浙江大学人机系统实验室 | 指导教师:琚兆杰教授 | 学生负责人:王绪娜 | 研发代表:关智友"
)
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
ASSETS_DIR = os.path.join(BASE_DIR, "assets")
APP_COVER_PATH = os.path.join(ASSETS_DIR, "hy.png")
print(f"BASE_DIR = {BASE_DIR}")
print(f"ASSETS_DIR = {ASSETS_DIR}")
print(f"APP_COVER_PATH = {APP_COVER_PATH}")
print(f"APP_COVER_EXISTS = {os.path.exists(APP_COVER_PATH)}")
# 让 Gradio 将 assets 目录作为静态资源目录提供给前端访问
gr.set_static_paths(paths=[ASSETS_DIR])
CUSTOM_CSS = """
:root {
--bg: #0b1020;
--panel: #121a2b;
--panel-2: #172135;
--soft: #94a3b8;
--text: #e5e7eb;
--line: #263247;
--accent: #4f46e5;
--accent-2: #2563eb;
--success: #16a34a;
--warning: #f59e0b;
}
html, body {
overflow-x: hidden !important;
}
body, .gradio-container {
background: linear-gradient(180deg, #0b1020 0%, #0f172a 100%) !important;
color: var(--text) !important;
font-family: "Inter", "Segoe UI", "PingFang SC", "Microsoft YaHei", sans-serif !important;
}
.gradio-container {
max-width: 1500px !important;
margin: 0 auto !important;
padding-top: 18px !important;
padding-bottom: 28px !important;
padding-left: 14px !important;
padding-right: 14px !important;
box-sizing: border-box !important;
}
.hero {
background: linear-gradient(135deg, rgba(79,70,229,.18), rgba(37,99,235,.12));
border: 1px solid rgba(148,163,184,.15);
border-radius: 22px;
padding: 24px 28px;
margin-bottom: 18px;
box-shadow: 0 10px 30px rgba(0,0,0,.22);
overflow: hidden;
}
.hero h1 {
margin: 0 0 8px 0;
font-size: 30px;
font-weight: 800;
letter-spacing: 0.2px;
line-height: 1.35;
word-break: break-word;
}
.hero p {
margin: 0;
color: var(--soft);
font-size: 14px;
line-height: 1.7;
white-space: pre-line;
word-break: break-word;
}
.kpi-row {
display: flex;
gap: 12px;
margin-top: 18px;
flex-wrap: wrap;
}
.kpi-card {
background: rgba(255,255,255,.03);
border: 1px solid rgba(148,163,184,.12);
border-radius: 16px;
padding: 12px 16px;
min-width: 170px;
flex: 1 1 180px;
box-sizing: border-box;
}
.kpi-card .label {
color: var(--soft);
font-size: 12px;
margin-bottom: 6px;
}
.kpi-card .value {
font-size: 16px;
font-weight: 700;
color: #ffffff;
line-height: 1.4;
}
.cover-wrap {
margin-top: 16px;
}
.cover-wrap img {
width: 100%;
max-width: 100%;
height: auto;
max-height: 320px;
object-fit: contain;
border-radius: 18px;
border: 1px solid rgba(148,163,184,.12);
box-shadow: 0 8px 24px rgba(0,0,0,.22);
display: block;
}
.section-card {
background: rgba(18,26,43,.92) !important;
border: 1px solid rgba(148,163,184,.12) !important;
border-radius: 20px !important;
box-shadow: 0 10px 30px rgba(0,0,0,.18) !important;
}
.section-title {
font-size: 15px;
font-weight: 700;
color: #fff;
margin-bottom: 8px;
}
.section-desc {
color: var(--soft);
font-size: 12px;
margin-bottom: 10px;
line-height: 1.7;
}
.gr-button-primary, button.primary {
background: linear-gradient(135deg, var(--accent), var(--accent-2)) !important;
border: none !important;
border-radius: 14px !important;
font-weight: 700 !important;
min-height: 44px !important;
}
.gr-button-secondary, button.secondary {
border-radius: 14px !important;
}
.gr-box, .gr-group, .gr-form {
border-radius: 18px !important;
}
textarea, input, .gr-textbox, .gr-file, .gr-dataframe, .gr-markdown {
border-radius: 14px !important;
box-sizing: border-box !important;
}
.tabitem {
border-radius: 16px !important;
}
.status-good {
color: #86efac;
font-weight: 700;
}
.status-warn {
color: #fcd34d;
font-weight: 700;
}
.status-bad {
color: #fca5a5;
font-weight: 700;
}
.footer-note {
color: var(--soft);
font-size: 12px;
margin-top: 14px;
line-height: 1.7;
}
pre, code {
white-space: pre-wrap !important;
word-break: break-word !important;
}
.gradio-container * {
max-width: 100%;
}
@media (max-width: 1024px) {
.gradio-container {
max-width: 100% !important;
padding-left: 12px !important;
padding-right: 12px !important;
}
.hero {
padding: 20px 20px;
}
.hero h1 {
font-size: 26px;
}
.kpi-card {
min-width: 150px;
flex: 1 1 160px;
}
}
@media (max-width: 768px) {
.gradio-container {
padding-top: 10px !important;
padding-bottom: 18px !important;
padding-left: 10px !important;
padding-right: 10px !important;
}
.hero {
border-radius: 18px;
padding: 16px 14px;
margin-bottom: 14px;
}
.hero h1 {
font-size: 22px;
line-height: 1.35;
}
.hero p {
font-size: 13px;
line-height: 1.65;
}
.kpi-row {
gap: 10px;
margin-top: 14px;
}
.kpi-card {
min-width: 100%;
flex: 1 1 100%;
padding: 10px 12px;
border-radius: 14px;
}
.kpi-card .label {
font-size: 11px;
}
.kpi-card .value {
font-size: 15px;
}
.cover-wrap {
margin-top: 12px;
}
.cover-wrap img {
max-height: 220px;
border-radius: 14px;
}
.section-card {
border-radius: 16px !important;
}
.section-title {
font-size: 14px;
}
.section-desc,
.footer-note {
font-size: 12px;
line-height: 1.6;
}
button, .gr-button, .gr-button-primary, .gr-button-secondary {
width: 100% !important;
min-height: 44px !important;
font-size: 15px !important;
}
.gr-file, .gr-textbox, .gr-markdown, .gr-group, .gr-form {
width: 100% !important;
}
textarea, input {
font-size: 14px !important;
}
.tabs {
overflow-x: auto !important;
}
.tab-nav {
flex-wrap: nowrap !important;
overflow-x: auto !important;
scrollbar-width: thin;
}
.tab-nav button {
width: auto !important;
white-space: nowrap !important;
flex: 0 0 auto !important;
min-height: 38px !important;
font-size: 13px !important;
padding: 8px 12px !important;
}
.gradio-container .grid-wrap,
.gradio-container .grid-container,
.gradio-container .grid {
overflow-x: auto !important;
}
}
@media (max-width: 480px) {
.hero {
padding: 14px 12px;
}
.hero h1 {
font-size: 19px;
}
.hero p {
font-size: 12px;
}
.section-title {
font-size: 13px;
}
.section-desc,
.footer-note {
font-size: 11px;
}
.kpi-card .value {
font-size: 14px;
}
.cover-wrap img {
max-height: 180px;
}
textarea, input {
font-size: 13px !important;
}
}
"""
def get_auth_credentials() -> tuple[str, str]:
username = os.getenv("APP_USERNAME", "").strip()
password = os.getenv("APP_PASSWORD", "").strip()
if not username or not password:
raise RuntimeError(
"缺少登录凭证。请在环境变量或 Hugging Face Space Secrets 中配置 APP_USERNAME 和 APP_PASSWORD。"
)
return username, password
def safe_read_text(path: str) -> str:
if not path or not os.path.exists(path):
return ""
with open(path, "r", encoding="utf-8", errors="ignore") as f:
return f.read()
def zip_output_dir(output_dir: str) -> str | None:
if not output_dir or not os.path.isdir(output_dir):
return None
zip_base = output_dir.rstrip("/\\")
archive_path = shutil.make_archive(zip_base, "zip", output_dir)
return archive_path
def shorten_text(text: str, max_len: int = 20000) -> str:
text = text or ""
if len(text) <= max_len:
return text
return text[:max_len] + "\n\n...[内容已截断]..."
def build_status_markdown(
stage: str,
progress_value: int,
logs: list[str],
result: dict[str, Any] | None = None,
error: str | None = None,
) -> str:
status_cls = "status-good" if result and not error else "status-warn"
if error:
status_cls = "status-bad"
lines = []
lines.append("### 运行状态")
lines.append(f"- 当前阶段:<span class='{status_cls}'>{stage}</span>")
lines.append(f"- 当前进度:**{progress_value}%**")
if result:
lines.append(f"- 可用 Key 数:**{result.get('valid_key_count', '')}**")
lines.append(f"- 视频模式:**{result.get('video_agent_mode', '')}**")
lines.append(f"- 音频模式:**{result.get('audio_agent_mode', '')}**")
lines.append(f"- 文档数量:**{result.get('document_count', '')}**")
lines.append(f"- 检索命中数:**{result.get('retrieved_count', '')}**")
lines.append(f"- 输出目录:`{result.get('output_dir', '')}`")
if error:
lines.append("")
lines.append("### 错误信息")
lines.append(f"```text\n{error}\n```")
lines.append("")
lines.append("### 运行日志")
lines.append("```text")
lines.extend(logs[-120:])
lines.append("```")
return "\n".join(lines)
def collect_outputs(output_dir: str) -> dict[str, str]:
ordinary_report = safe_read_text(os.path.join(output_dir, "final_report.md"))
rag_report = safe_read_text(os.path.join(output_dir, "rag_final_report.md"))
evidence_summary = safe_read_text(os.path.join(output_dir, "evidence_summary.json"))
merged_events = safe_read_text(os.path.join(output_dir, "merged_events.json"))
raw_agents = safe_read_text(os.path.join(output_dir, "raw_agent_results.json"))
audio_results = safe_read_text(os.path.join(output_dir, "audio_agent_results.json"))
audio_transcripts = safe_read_text(os.path.join(output_dir, "audio_transcripts.json"))
doc_results = safe_read_text(os.path.join(output_dir, "document_agent_results.json"))
rag_status = safe_read_text(os.path.join(output_dir, "rag_status.json"))
retrieved_chunks = safe_read_text(os.path.join(output_dir, "retrieved_chunks.json"))
return {
"ordinary_report": ordinary_report,
"rag_report": rag_report,
"evidence_summary": evidence_summary,
"merged_events": merged_events,
"raw_agents": raw_agents,
"audio_results": audio_results,
"audio_transcripts": audio_transcripts,
"doc_results": doc_results,
"rag_status": rag_status,
"retrieved_chunks": retrieved_chunks,
}
def build_debug_info() -> str:
lines = [
"### 调试信息",
f"- 当前工作目录:`{os.getcwd()}`",
f"- 脚本目录 BASE_DIR:`{BASE_DIR}`",
f"- 资源目录 ASSETS_DIR:`{ASSETS_DIR}`",
f"- 图片路径 APP_COVER_PATH:`{APP_COVER_PATH}`",
f"- 图片是否存在:**{os.path.exists(APP_COVER_PATH)}**",
f"- 当前标题 APP_TITLE:**{APP_TITLE}**",
f"- 当前副标题 APP_SUBTITLE:**{APP_SUBTITLE}**",
]
return "\n".join(lines)
def analyze(video_file, doc_files, progress=gr.Progress(track_tqdm=False)):
if video_file is None:
yield (
build_status_markdown("等待上传视频", 0, ["请先上传视频文件。"]),
"",
"",
"",
"",
"",
"",
"",
None,
)
return
settings = Settings.load()
log_q: queue.Queue = queue.Queue()
state = {
"progress": 0,
"message": "准备启动分析",
"done": False,
"error": None,
"result": None,
}
video_path = video_file if isinstance(video_file, str) else getattr(video_file, "name", None)
doc_paths = []
if doc_files:
if isinstance(doc_files, list):
doc_paths = [p if isinstance(p, str) else getattr(p, "name", None) for p in doc_files]
else:
doc_paths = [doc_files if isinstance(doc_files, str) else getattr(doc_files, "name", None)]
doc_paths = [p for p in doc_paths if p]
logs: list[str] = []
def progress_cb(p: int, m: str):
state["progress"] = p
state["message"] = m
log_q.put(("progress", p, m))
def log_cb(m: str):
log_q.put(("log", m))
def worker():
try:
pipeline = AnalysisPipeline(
settings=settings,
progress_cb=progress_cb,
log_cb=log_cb,
)
result = pipeline.run(video=video_path, doc_paths=doc_paths)
state["result"] = result
state["done"] = True
log_q.put(("done", result))
except Exception as e:
state["error"] = f"{type(e).__name__}: {e}"
state["done"] = True
log_q.put(("error", state["error"]))
thread = threading.Thread(target=worker, daemon=True)
thread.start()
initial_status = build_status_markdown("准备启动分析", 0, logs)
yield (
initial_status,
"",
"",
"",
"",
"",
"",
"",
None,
)
while not state["done"] or not log_q.empty():
try:
item = log_q.get(timeout=0.5)
kind = item[0]
if kind == "progress":
_, p, m = item
progress(p / 100.0, desc=m)
logs.append(f"[{p:3d}%] {m}")
elif kind == "log":
_, m = item
logs.append(m)
elif kind == "done":
_, _result = item
logs.append("分析完成。")
elif kind == "error":
_, err = item
logs.append(f"[ERROR] {err}")
except queue.Empty:
pass
status_md = build_status_markdown(
stage=state["message"],
progress_value=state["progress"],
logs=logs,
result=state["result"],
error=state["error"],
)
if state["done"]:
break
yield (
status_md,
"",
"",
"",
"",
"",
"",
"",
None,
)
time.sleep(0.15)
if state["error"]:
status_md = build_status_markdown(
stage="分析失败",
progress_value=state["progress"],
logs=logs,
error=state["error"],
)
yield (
status_md,
"",
"",
"",
"",
"",
"",
"",
None,
)
return
result = state["result"] or {}
output_dir = result.get("output_dir", "")
outputs = collect_outputs(output_dir)
downloadable = zip_output_dir(output_dir)
status_md = build_status_markdown(
stage="分析完成",
progress_value=100,
logs=logs,
result=result,
)
preview_summary = (
"=== evidence_summary.json ===\n"
+ shorten_text(outputs["evidence_summary"], 12000)
+ "\n\n=== rag_status.json ===\n"
+ shorten_text(outputs["rag_status"], 5000)
)
preview_retrieval = (
"=== retrieved_chunks.json ===\n"
+ shorten_text(outputs["retrieved_chunks"], 12000)
+ "\n\n=== merged_events.json ===\n"
+ shorten_text(outputs["merged_events"], 12000)
)
preview_modalities = (
"=== audio_agent_results.json ===\n"
+ shorten_text(outputs["audio_results"], 8000)
+ "\n\n=== audio_transcripts.json ===\n"
+ shorten_text(outputs["audio_transcripts"], 8000)
+ "\n\n=== document_agent_results.json ===\n"
+ shorten_text(outputs["doc_results"], 8000)
)
yield (
status_md,
outputs["ordinary_report"],
outputs["rag_report"],
preview_summary,
preview_retrieval,
preview_modalities,
shorten_text(outputs["raw_agents"], 15000),
output_dir,
downloadable,
)
def build_header_html() -> str:
if APP_COVER_PATH and os.path.exists(APP_COVER_PATH):
cover_html = (
f"<div class='cover-wrap'>"
f"<img src='/gradio_api/file={APP_COVER_PATH}' alt='cover' />"
f"</div>"
)
else:
cover_html = (
"<div class='cover-wrap'>"
"<div style='color:#fca5a5;font-size:13px;'>封面图未找到:assets/hy.png</div>"
"</div>"
)
return f"""
<div class="hero">
<h1>{APP_TITLE}</h1>
<p>{APP_SUBTITLE}</p>
<div class="kpi-row">
<div class="kpi-card">
<div class="label">输入模态</div>
<div class="value">视频 / 音频 / 文档</div>
</div>
<div class="kpi-card">
<div class="label">分析引擎</div>
<div class="value">多智能体并发</div>
</div>
<div class="kpi-card">
<div class="label">知识增强</div>
<div class="value">显式 RAG</div>
</div>
<div class="kpi-card">
<div class="label">输出能力</div>
<div class="value">可解释辅助报告</div>
</div>
</div>
{cover_html}
</div>
"""
with gr.Blocks(title=APP_TITLE) as demo:
gr.HTML(build_header_html())
with gr.Row():
with gr.Column(scale=1, min_width=280):
with gr.Group(elem_classes="section-card"):
gr.Markdown("### 输入与控制", elem_classes="section-title")
gr.Markdown(
"上传一个诊断视频,以及可选的多份病历/量表/问卷文档。",
elem_classes="section-desc",
)
video_input = gr.File(
label="上传视频",
file_types=[".mp4", ".avi", ".mov", ".mkv"],
type="filepath",
)
docs_input = gr.File(
label="上传可选文档(支持多选)",
file_count="multiple",
file_types=[".txt", ".md", ".json", ".csv", ".pdf", ".docx"],
type="filepath",
)
run_btn = gr.Button("开始分析", variant="primary")
gr.Markdown(
"""
<div class="footer-note">
小建议:正式分析使用 10 分钟左右的结构化诊断对话视频,文档可包含量表、病历、问卷与观察记录。
</div>
"""
)
# debug_output = gr.Markdown(value=build_debug_info())
with gr.Column(scale=1, min_width=280):
with gr.Group(elem_classes="section-card"):
gr.Markdown("### 实时运行状态", elem_classes="section-title")
gr.Markdown(
"展示当前阶段、进度、日志和运行模式。视频分析在云端可能比音频慢很多(没钱租好的服务器 ⦁֊⦁꧞) \n 若日志有持续更新就不代表卡死。",
elem_classes="section-desc",
)
status_output = gr.Markdown(value="### 状态\n等待开始分析。")
with gr.Column(scale=2, min_width=280):
with gr.Group(elem_classes="section-card"):
gr.Markdown("### 输出与结果", elem_classes="section-title")
gr.Markdown(
"查看普通报告、RAG 报告与中间态结果,并下载完整结果文件。",
elem_classes="section-desc",
)
with gr.Tabs():
with gr.Tab("普通报告"):
ordinary_report_output = gr.Markdown()
with gr.Tab("RAG 报告"):
rag_report_output = gr.Markdown()
with gr.Tab("证据摘要预览"):
summary_preview_output = gr.Textbox(lines=24)
with gr.Tab("检索与聚合预览"):
retrieval_preview_output = gr.Textbox(lines=24)
with gr.Tab("音频 / 文档中间结果"):
modality_preview_output = gr.Textbox(lines=24)
with gr.Tab("原始智能体输出"):
raw_preview_output = gr.Textbox(lines=24)
output_dir_box = gr.Textbox(label="本次输出目录", interactive=False)
download_output = gr.File(label="下载完整结果压缩包")
run_btn.click(
fn=analyze,
inputs=[video_input, docs_input],
outputs=[
status_output,
ordinary_report_output,
rag_report_output,
summary_preview_output,
retrieval_preview_output,
modality_preview_output,
raw_preview_output,
output_dir_box,
download_output,
],
api_name="analyze",
)
if __name__ == "__main__":
auth_username, auth_password = get_auth_credentials()
demo.queue(default_concurrency_limit=1).launch(
css=CUSTOM_CSS,
theme=gr.themes.Soft(
primary_hue="indigo",
secondary_hue="blue",
neutral_hue="slate",
),
allowed_paths=[ASSETS_DIR],
auth=(auth_username, auth_password),
)