PaperShow / app.py
ZaynZhu
update app.py
991a405
raw
history blame
4.2 kB
import gradio as gr
import subprocess, shutil, os, zipfile, datetime
from pathlib import Path
ROOT = Path(__file__).resolve().parent
OUTPUT_DIR = ROOT / "output"
ZIP_PATH = ROOT / "output.zip"
LOG_PATH = ROOT / "last_run.log"
def run_pipeline(model_name_t, model_name_v, result_dir, paper_latex_root, arxiv_url, openai_key, gemini_key):
start_time = datetime.datetime.now()
logs = [f"🚀 Starting pipeline at {start_time.strftime('%Y-%m-%d %H:%M:%S')}\n"]
# 🧩 确保 output 目录存在(避免 No output generated)
if not OUTPUT_DIR.exists():
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
logs.append(f"📁 Created output directory: {OUTPUT_DIR}\n")
# 🧹 清理旧输出(但保留空目录)
for item in OUTPUT_DIR.iterdir():
if item.is_dir():
shutil.rmtree(item)
else:
item.unlink()
if ZIP_PATH.exists():
ZIP_PATH.unlink()
logs.append("🧹 Cleaned previous output and zip files.\n")
# 构造命令
cmd = [
"python", "pipeline.py",
"--model_name_t", model_name_t,
"--model_name_v", model_name_v,
"--result_dir", result_dir,
"--paper_latex_root", paper_latex_root,
"--arxiv_url", arxiv_url,
]
# 临时设置 API keys(供 pipeline 内部使用)
os.environ["OPENAI_API_KEY"] = openai_key or ""
os.environ["GEMINI_API_KEY"] = gemini_key or ""
logs.append(f"🧠 Running command: {' '.join(cmd)}\n")
try:
# 同时捕获 stdout + stderr
result = subprocess.run(
cmd, capture_output=True, text=True, timeout=1800
)
logs.append("\n======= STDOUT =======\n")
logs.append(result.stdout)
logs.append("\n======= STDERR =======\n")
logs.append(result.stderr)
except subprocess.TimeoutExpired:
msg = "❌ Pipeline timed out (30 min limit)."
logs.append(msg)
_write_logs(logs)
return msg, None
except Exception as e:
msg = f"❌ Pipeline error: {e}"
logs.append(msg)
_write_logs(logs)
return msg, None
# 检查输出目录
if not any(OUTPUT_DIR.iterdir()):
msg = "❌ No output generated. Please check logs below."
logs.append(msg)
_write_logs(logs)
return "\n".join(logs), None
# 压缩 output 文件夹
with zipfile.ZipFile(ZIP_PATH, 'w', zipfile.ZIP_DEFLATED) as zipf:
for root, dirs, files in os.walk(OUTPUT_DIR):
for file in files:
file_path = Path(root) / file
arcname = file_path.relative_to(OUTPUT_DIR)
zipf.write(file_path, arcname=arcname)
logs.append(f"✅ Zipped output folder to {ZIP_PATH}\n")
end_time = datetime.datetime.now()
logs.append(f"🏁 Completed at {end_time.strftime('%Y-%m-%d %H:%M:%S')} (Duration: {(end_time - start_time).seconds}s)\n")
# 保存日志到文件
_write_logs(logs)
return "\n".join(logs), ZIP_PATH
def _write_logs(logs):
"""将日志写入文件,便于 HF Logs 窗口调试"""
with open(LOG_PATH, "w", encoding="utf-8") as f:
f.write("\n".join(logs))
# ===================== Gradio UI =====================
iface = gr.Interface(
fn=run_pipeline,
inputs=[
gr.Textbox(label="Model Name (Text)", value="gpt-4.1"),
gr.Textbox(label="Model Name (Vision)", value="gpt-4.1"),
gr.Textbox(label="Result Dir", value="output"),
gr.Textbox(label="Paper LaTeX Root", value="input/latex_proj"),
gr.Textbox(label="ArXiv URL", value="https://arxiv.org/abs/2505.21497"),
gr.Textbox(label="OpenAI API Key", placeholder="sk-...", type="password"),
gr.Textbox(label="Gemini API Key", placeholder="AIza...", type="password"),
],
outputs=[
gr.Textbox(label="Logs", lines=30, max_lines=50),
gr.File(label="Download Output (.zip)")
],
title="📄 PaperShow Pipeline",
description="输入 arXiv 链接和参数,自动生成 slides + poster,结果打包下载。",
allow_flagging="never",
)
if __name__ == "__main__":
iface.launch(server_name="0.0.0.0", server_port=7860)