import gradio as gr import subprocess, shutil, os, zipfile, datetime from pathlib import Path ROOT = Path(__file__).resolve().parent OUTPUT_DIR = ROOT / "output" ZIP_PATH = ROOT / "output.zip" LOG_PATH = ROOT / "last_run.log" def run_pipeline(model_name_t, model_name_v, result_dir, paper_latex_root, arxiv_url, openai_key, gemini_key): start_time = datetime.datetime.now() logs = [f"🚀 Starting pipeline at {start_time.strftime('%Y-%m-%d %H:%M:%S')}\n"] # 🧩 确保 output 目录存在(避免 No output generated) if not OUTPUT_DIR.exists(): OUTPUT_DIR.mkdir(parents=True, exist_ok=True) logs.append(f"📁 Created output directory: {OUTPUT_DIR}\n") # 🧹 清理旧输出(但保留空目录) for item in OUTPUT_DIR.iterdir(): if item.is_dir(): shutil.rmtree(item) else: item.unlink() if ZIP_PATH.exists(): ZIP_PATH.unlink() logs.append("🧹 Cleaned previous output and zip files.\n") # 构造命令 cmd = [ "python", "pipeline.py", "--model_name_t", model_name_t, "--model_name_v", model_name_v, "--result_dir", result_dir, "--paper_latex_root", paper_latex_root, "--arxiv_url", arxiv_url, ] # 临时设置 API keys(供 pipeline 内部使用) os.environ["OPENAI_API_KEY"] = openai_key or "" os.environ["GEMINI_API_KEY"] = gemini_key or "" logs.append(f"🧠 Running command: {' '.join(cmd)}\n") try: # 同时捕获 stdout + stderr result = subprocess.run( cmd, capture_output=True, text=True, timeout=1800 ) logs.append("\n======= STDOUT =======\n") logs.append(result.stdout) logs.append("\n======= STDERR =======\n") logs.append(result.stderr) except subprocess.TimeoutExpired: msg = "❌ Pipeline timed out (30 min limit)." logs.append(msg) _write_logs(logs) return msg, None except Exception as e: msg = f"❌ Pipeline error: {e}" logs.append(msg) _write_logs(logs) return msg, None # 检查输出目录 if not any(OUTPUT_DIR.iterdir()): msg = "❌ No output generated. Please check logs below." logs.append(msg) _write_logs(logs) return "\n".join(logs), None # 压缩 output 文件夹 with zipfile.ZipFile(ZIP_PATH, 'w', zipfile.ZIP_DEFLATED) as zipf: for root, dirs, files in os.walk(OUTPUT_DIR): for file in files: file_path = Path(root) / file arcname = file_path.relative_to(OUTPUT_DIR) zipf.write(file_path, arcname=arcname) logs.append(f"✅ Zipped output folder to {ZIP_PATH}\n") end_time = datetime.datetime.now() logs.append(f"🏁 Completed at {end_time.strftime('%Y-%m-%d %H:%M:%S')} (Duration: {(end_time - start_time).seconds}s)\n") # 保存日志到文件 _write_logs(logs) return "\n".join(logs), ZIP_PATH def _write_logs(logs): """将日志写入文件,便于 HF Logs 窗口调试""" with open(LOG_PATH, "w", encoding="utf-8") as f: f.write("\n".join(logs)) # ===================== Gradio UI ===================== iface = gr.Interface( fn=run_pipeline, inputs=[ gr.Textbox(label="Model Name (Text)", value="gpt-4.1"), gr.Textbox(label="Model Name (Vision)", value="gpt-4.1"), gr.Textbox(label="Result Dir", value="output"), gr.Textbox(label="Paper LaTeX Root", value="input/latex_proj"), gr.Textbox(label="ArXiv URL", value="https://arxiv.org/abs/2505.21497"), gr.Textbox(label="OpenAI API Key", placeholder="sk-...", type="password"), gr.Textbox(label="Gemini API Key", placeholder="AIza...", type="password"), ], outputs=[ gr.Textbox(label="Logs", lines=30, max_lines=50), gr.File(label="Download Output (.zip)") ], title="📄 PaperShow Pipeline", description="输入 arXiv 链接和参数,自动生成 slides + poster,结果打包下载。", allow_flagging="never", ) if __name__ == "__main__": iface.launch(server_name="0.0.0.0", server_port=7860)