from __future__ import annotations import os import queue import shutil import threading import time from typing import Any import gradio as gr from app.config import Settings from app.pipeline import AnalysisPipeline APP_TITLE = "ASD Multi-Agent Analyzer: 群智 · 独孤九鉴" APP_SUBTITLE = ( "全场景、多模态、医学级孤独症(ASD)辅助筛查与评估系统 | \n" "浙江大学人机系统实验室 | 指导教师:琚兆杰教授 | 学生负责人:王绪娜 | 研发代表:关智友" ) BASE_DIR = os.path.dirname(os.path.abspath(__file__)) ASSETS_DIR = os.path.join(BASE_DIR, "assets") APP_COVER_PATH = os.path.join(ASSETS_DIR, "hy.png") print(f"BASE_DIR = {BASE_DIR}") print(f"ASSETS_DIR = {ASSETS_DIR}") print(f"APP_COVER_PATH = {APP_COVER_PATH}") print(f"APP_COVER_EXISTS = {os.path.exists(APP_COVER_PATH)}") # 让 Gradio 将 assets 目录作为静态资源目录提供给前端访问 gr.set_static_paths(paths=[ASSETS_DIR]) CUSTOM_CSS = """ :root { --bg: #0b1020; --panel: #121a2b; --panel-2: #172135; --soft: #94a3b8; --text: #e5e7eb; --line: #263247; --accent: #4f46e5; --accent-2: #2563eb; --success: #16a34a; --warning: #f59e0b; } html, body { overflow-x: hidden !important; } body, .gradio-container { background: linear-gradient(180deg, #0b1020 0%, #0f172a 100%) !important; color: var(--text) !important; font-family: "Inter", "Segoe UI", "PingFang SC", "Microsoft YaHei", sans-serif !important; } .gradio-container { max-width: 1500px !important; margin: 0 auto !important; padding-top: 18px !important; padding-bottom: 28px !important; padding-left: 14px !important; padding-right: 14px !important; box-sizing: border-box !important; } .hero { background: linear-gradient(135deg, rgba(79,70,229,.18), rgba(37,99,235,.12)); border: 1px solid rgba(148,163,184,.15); border-radius: 22px; padding: 24px 28px; margin-bottom: 18px; box-shadow: 0 10px 30px rgba(0,0,0,.22); overflow: hidden; } .hero h1 { margin: 0 0 8px 0; font-size: 30px; font-weight: 800; letter-spacing: 0.2px; line-height: 1.35; word-break: break-word; } .hero p { margin: 0; color: var(--soft); font-size: 14px; line-height: 1.7; white-space: pre-line; word-break: break-word; } .kpi-row { display: flex; gap: 12px; margin-top: 18px; flex-wrap: wrap; } .kpi-card { background: rgba(255,255,255,.03); border: 1px solid rgba(148,163,184,.12); border-radius: 16px; padding: 12px 16px; min-width: 170px; flex: 1 1 180px; box-sizing: border-box; } .kpi-card .label { color: var(--soft); font-size: 12px; margin-bottom: 6px; } .kpi-card .value { font-size: 16px; font-weight: 700; color: #ffffff; line-height: 1.4; } .cover-wrap { margin-top: 16px; } .cover-wrap img { width: 100%; max-width: 100%; height: auto; max-height: 320px; object-fit: contain; border-radius: 18px; border: 1px solid rgba(148,163,184,.12); box-shadow: 0 8px 24px rgba(0,0,0,.22); display: block; } .section-card { background: rgba(18,26,43,.92) !important; border: 1px solid rgba(148,163,184,.12) !important; border-radius: 20px !important; box-shadow: 0 10px 30px rgba(0,0,0,.18) !important; } .section-title { font-size: 15px; font-weight: 700; color: #fff; margin-bottom: 8px; } .section-desc { color: var(--soft); font-size: 12px; margin-bottom: 10px; line-height: 1.7; } .gr-button-primary, button.primary { background: linear-gradient(135deg, var(--accent), var(--accent-2)) !important; border: none !important; border-radius: 14px !important; font-weight: 700 !important; min-height: 44px !important; } .gr-button-secondary, button.secondary { border-radius: 14px !important; } .gr-box, .gr-group, .gr-form { border-radius: 18px !important; } textarea, input, .gr-textbox, .gr-file, .gr-dataframe, .gr-markdown { border-radius: 14px !important; box-sizing: border-box !important; } .tabitem { border-radius: 16px !important; } .status-good { color: #86efac; font-weight: 700; } .status-warn { color: #fcd34d; font-weight: 700; } .status-bad { color: #fca5a5; font-weight: 700; } .footer-note { color: var(--soft); font-size: 12px; margin-top: 14px; line-height: 1.7; } pre, code { white-space: pre-wrap !important; word-break: break-word !important; } .gradio-container * { max-width: 100%; } @media (max-width: 1024px) { .gradio-container { max-width: 100% !important; padding-left: 12px !important; padding-right: 12px !important; } .hero { padding: 20px 20px; } .hero h1 { font-size: 26px; } .kpi-card { min-width: 150px; flex: 1 1 160px; } } @media (max-width: 768px) { .gradio-container { padding-top: 10px !important; padding-bottom: 18px !important; padding-left: 10px !important; padding-right: 10px !important; } .hero { border-radius: 18px; padding: 16px 14px; margin-bottom: 14px; } .hero h1 { font-size: 22px; line-height: 1.35; } .hero p { font-size: 13px; line-height: 1.65; } .kpi-row { gap: 10px; margin-top: 14px; } .kpi-card { min-width: 100%; flex: 1 1 100%; padding: 10px 12px; border-radius: 14px; } .kpi-card .label { font-size: 11px; } .kpi-card .value { font-size: 15px; } .cover-wrap { margin-top: 12px; } .cover-wrap img { max-height: 220px; border-radius: 14px; } .section-card { border-radius: 16px !important; } .section-title { font-size: 14px; } .section-desc, .footer-note { font-size: 12px; line-height: 1.6; } button, .gr-button, .gr-button-primary, .gr-button-secondary { width: 100% !important; min-height: 44px !important; font-size: 15px !important; } .gr-file, .gr-textbox, .gr-markdown, .gr-group, .gr-form { width: 100% !important; } textarea, input { font-size: 14px !important; } .tabs { overflow-x: auto !important; } .tab-nav { flex-wrap: nowrap !important; overflow-x: auto !important; scrollbar-width: thin; } .tab-nav button { width: auto !important; white-space: nowrap !important; flex: 0 0 auto !important; min-height: 38px !important; font-size: 13px !important; padding: 8px 12px !important; } .gradio-container .grid-wrap, .gradio-container .grid-container, .gradio-container .grid { overflow-x: auto !important; } } @media (max-width: 480px) { .hero { padding: 14px 12px; } .hero h1 { font-size: 19px; } .hero p { font-size: 12px; } .section-title { font-size: 13px; } .section-desc, .footer-note { font-size: 11px; } .kpi-card .value { font-size: 14px; } .cover-wrap img { max-height: 180px; } textarea, input { font-size: 13px !important; } } """ def get_auth_credentials() -> tuple[str, str]: username = os.getenv("APP_USERNAME", "").strip() password = os.getenv("APP_PASSWORD", "").strip() if not username or not password: raise RuntimeError( "缺少登录凭证。请在环境变量或 Hugging Face Space Secrets 中配置 APP_USERNAME 和 APP_PASSWORD。" ) return username, password def safe_read_text(path: str) -> str: if not path or not os.path.exists(path): return "" with open(path, "r", encoding="utf-8", errors="ignore") as f: return f.read() def zip_output_dir(output_dir: str) -> str | None: if not output_dir or not os.path.isdir(output_dir): return None zip_base = output_dir.rstrip("/\\") archive_path = shutil.make_archive(zip_base, "zip", output_dir) return archive_path def shorten_text(text: str, max_len: int = 20000) -> str: text = text or "" if len(text) <= max_len: return text return text[:max_len] + "\n\n...[内容已截断]..." def build_status_markdown( stage: str, progress_value: int, logs: list[str], result: dict[str, Any] | None = None, error: str | None = None, ) -> str: status_cls = "status-good" if result and not error else "status-warn" if error: status_cls = "status-bad" lines = [] lines.append("### 运行状态") lines.append(f"- 当前阶段:{stage}") lines.append(f"- 当前进度:**{progress_value}%**") if result: lines.append(f"- 可用 Key 数:**{result.get('valid_key_count', '')}**") lines.append(f"- 视频模式:**{result.get('video_agent_mode', '')}**") lines.append(f"- 音频模式:**{result.get('audio_agent_mode', '')}**") lines.append(f"- 文档数量:**{result.get('document_count', '')}**") lines.append(f"- 检索命中数:**{result.get('retrieved_count', '')}**") lines.append(f"- 输出目录:`{result.get('output_dir', '')}`") if error: lines.append("") lines.append("### 错误信息") lines.append(f"```text\n{error}\n```") lines.append("") lines.append("### 运行日志") lines.append("```text") lines.extend(logs[-120:]) lines.append("```") return "\n".join(lines) def collect_outputs(output_dir: str) -> dict[str, str]: ordinary_report = safe_read_text(os.path.join(output_dir, "final_report.md")) rag_report = safe_read_text(os.path.join(output_dir, "rag_final_report.md")) evidence_summary = safe_read_text(os.path.join(output_dir, "evidence_summary.json")) merged_events = safe_read_text(os.path.join(output_dir, "merged_events.json")) raw_agents = safe_read_text(os.path.join(output_dir, "raw_agent_results.json")) audio_results = safe_read_text(os.path.join(output_dir, "audio_agent_results.json")) audio_transcripts = safe_read_text(os.path.join(output_dir, "audio_transcripts.json")) doc_results = safe_read_text(os.path.join(output_dir, "document_agent_results.json")) rag_status = safe_read_text(os.path.join(output_dir, "rag_status.json")) retrieved_chunks = safe_read_text(os.path.join(output_dir, "retrieved_chunks.json")) return { "ordinary_report": ordinary_report, "rag_report": rag_report, "evidence_summary": evidence_summary, "merged_events": merged_events, "raw_agents": raw_agents, "audio_results": audio_results, "audio_transcripts": audio_transcripts, "doc_results": doc_results, "rag_status": rag_status, "retrieved_chunks": retrieved_chunks, } def build_debug_info() -> str: lines = [ "### 调试信息", f"- 当前工作目录:`{os.getcwd()}`", f"- 脚本目录 BASE_DIR:`{BASE_DIR}`", f"- 资源目录 ASSETS_DIR:`{ASSETS_DIR}`", f"- 图片路径 APP_COVER_PATH:`{APP_COVER_PATH}`", f"- 图片是否存在:**{os.path.exists(APP_COVER_PATH)}**", f"- 当前标题 APP_TITLE:**{APP_TITLE}**", f"- 当前副标题 APP_SUBTITLE:**{APP_SUBTITLE}**", ] return "\n".join(lines) def analyze(video_file, doc_files, progress=gr.Progress(track_tqdm=False)): if video_file is None: yield ( build_status_markdown("等待上传视频", 0, ["请先上传视频文件。"]), "", "", "", "", "", "", "", None, ) return settings = Settings.load() log_q: queue.Queue = queue.Queue() state = { "progress": 0, "message": "准备启动分析", "done": False, "error": None, "result": None, } video_path = video_file if isinstance(video_file, str) else getattr(video_file, "name", None) doc_paths = [] if doc_files: if isinstance(doc_files, list): doc_paths = [p if isinstance(p, str) else getattr(p, "name", None) for p in doc_files] else: doc_paths = [doc_files if isinstance(doc_files, str) else getattr(doc_files, "name", None)] doc_paths = [p for p in doc_paths if p] logs: list[str] = [] def progress_cb(p: int, m: str): state["progress"] = p state["message"] = m log_q.put(("progress", p, m)) def log_cb(m: str): log_q.put(("log", m)) def worker(): try: pipeline = AnalysisPipeline( settings=settings, progress_cb=progress_cb, log_cb=log_cb, ) result = pipeline.run(video=video_path, doc_paths=doc_paths) state["result"] = result state["done"] = True log_q.put(("done", result)) except Exception as e: state["error"] = f"{type(e).__name__}: {e}" state["done"] = True log_q.put(("error", state["error"])) thread = threading.Thread(target=worker, daemon=True) thread.start() initial_status = build_status_markdown("准备启动分析", 0, logs) yield ( initial_status, "", "", "", "", "", "", "", None, ) while not state["done"] or not log_q.empty(): try: item = log_q.get(timeout=0.5) kind = item[0] if kind == "progress": _, p, m = item progress(p / 100.0, desc=m) logs.append(f"[{p:3d}%] {m}") elif kind == "log": _, m = item logs.append(m) elif kind == "done": _, _result = item logs.append("分析完成。") elif kind == "error": _, err = item logs.append(f"[ERROR] {err}") except queue.Empty: pass status_md = build_status_markdown( stage=state["message"], progress_value=state["progress"], logs=logs, result=state["result"], error=state["error"], ) if state["done"]: break yield ( status_md, "", "", "", "", "", "", "", None, ) time.sleep(0.15) if state["error"]: status_md = build_status_markdown( stage="分析失败", progress_value=state["progress"], logs=logs, error=state["error"], ) yield ( status_md, "", "", "", "", "", "", "", None, ) return result = state["result"] or {} output_dir = result.get("output_dir", "") outputs = collect_outputs(output_dir) downloadable = zip_output_dir(output_dir) status_md = build_status_markdown( stage="分析完成", progress_value=100, logs=logs, result=result, ) preview_summary = ( "=== evidence_summary.json ===\n" + shorten_text(outputs["evidence_summary"], 12000) + "\n\n=== rag_status.json ===\n" + shorten_text(outputs["rag_status"], 5000) ) preview_retrieval = ( "=== retrieved_chunks.json ===\n" + shorten_text(outputs["retrieved_chunks"], 12000) + "\n\n=== merged_events.json ===\n" + shorten_text(outputs["merged_events"], 12000) ) preview_modalities = ( "=== audio_agent_results.json ===\n" + shorten_text(outputs["audio_results"], 8000) + "\n\n=== audio_transcripts.json ===\n" + shorten_text(outputs["audio_transcripts"], 8000) + "\n\n=== document_agent_results.json ===\n" + shorten_text(outputs["doc_results"], 8000) ) yield ( status_md, outputs["ordinary_report"], outputs["rag_report"], preview_summary, preview_retrieval, preview_modalities, shorten_text(outputs["raw_agents"], 15000), output_dir, downloadable, ) def build_header_html() -> str: if APP_COVER_PATH and os.path.exists(APP_COVER_PATH): cover_html = ( f"
{APP_SUBTITLE}