import gradio as gr import subprocess, shutil, os, zipfile, datetime, sys, time, uuid, stat, re from pathlib import Path # ===================== # Version guard # ===================== def _ensure_versions(): import importlib, subprocess, sys def get_version(pkg): try: m = importlib.import_module(pkg) return getattr(m, "__version__", "0") except Exception: return "0" try: from packaging.version import Version except ImportError: # 安装packaging,确保下面版本比较能用 subprocess.check_call([sys.executable, "-m", "pip", "install", "packaging"]) from packaging.version import Version # 检查 huggingface_hub hub_ver = get_version("huggingface_hub") hv = Version(hub_ver) required_min = Version("0.24.0") required_max = Version("1.0.0") hub_ok = required_min <= hv < required_max if not hub_ok: print(f"[INFO] huggingface_hub=={hub_ver} not in range " f"[{required_min}, {required_max}), reinstalling...") subprocess.check_call([ sys.executable, "-m", "pip", "install", "huggingface-hub==0.27.1", "transformers==4.48.0", "--force-reinstall", "--no-deps" ]) else: print(f"[INFO] huggingface_hub version OK: {hub_ver}") _ensure_versions() # ===================== # Paths (read-only repo root; DO NOT write here) # ===================== ROOT = Path(__file__).resolve().parent RUNS_DIR = ROOT / "runs" # all per-run workspaces live here RUNS_DIR.mkdir(parents=True, exist_ok=True) TIMEOUT_SECONDS = 1800 # 30 minutes RETENTION_HOURS = 12 # auto-clean runs older than N hours # --------------------- # Utils # --------------------- def _now_str(): return datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') def _write_logs(log_path: Path, logs): try: log_path.parent.mkdir(parents=True, exist_ok=True) with open(log_path, "w", encoding="utf-8") as f: f.write("\n".join(logs)) except Exception: pass def _on_rm_error(func, path, exc_info): # fix "PermissionError: [Errno 13] Permission denied" for readonly files os.chmod(path, stat.S_IWRITE) func(path) def _copytree(src: Path, dst: Path, symlinks=True, ignore=None): if dst.exists(): shutil.rmtree(dst, onerror=_on_rm_error) shutil.copytree(src, dst, symlinks=symlinks, ignore=ignore) def _safe_copy(src: Path, dst: Path): dst.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(src, dst) def _cleanup_old_runs(max_age_hours=12): try: now = datetime.datetime.now().timestamp() for run_dir in RUNS_DIR.iterdir(): try: if not run_dir.is_dir(): continue mtime = run_dir.stat().st_mtime age_h = (now - mtime) / 3600.0 if age_h > max_age_hours: shutil.rmtree(run_dir, onerror=_on_rm_error) except Exception: continue except Exception: pass def _prepare_workspace(logs): """Create isolated per-run workspace and copy needed code/assets into it.""" run_id = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") + "-" + uuid.uuid4().hex[:8] work_dir = RUNS_DIR / run_id work_dir.mkdir(parents=True, exist_ok=True) # Per-run log & zip path log_path = work_dir / "run.log" zip_path = work_dir / "output.zip" logs.append(f"🧩 New workspace: {work_dir.relative_to(ROOT)} (run_id={run_id})") # Copy code/assets that do file IO so they are run-local (avoid shared writes) # Keep copies as cheap as possible (symlinks=True when supported) needed_dirs = ["posterbuilder", "Paper2Poster"] for d in needed_dirs: src = ROOT / d if src.exists(): _copytree(src, work_dir / d, symlinks=True) logs.append(f" ↪ copied {d}/ → runs/{run_id}/{d}/ (symlink where possible)") # template/ optional tmpl = ROOT / "template" if tmpl.exists(): _copytree(tmpl, work_dir / "template", symlinks=True) logs.append(" ↪ copied template/") # pipeline.py must live inside workspace so that ROOT_DIR=work_dir _safe_copy(ROOT / "pipeline.py", work_dir / "pipeline.py") # Create standard IO dirs in workspace (work_dir / "input" / "pdf").mkdir(parents=True, exist_ok=True) (work_dir / "input" / "logo").mkdir(parents=True, exist_ok=True) (work_dir / "posterbuilder" / "latex_proj").mkdir(parents=True, exist_ok=True) return run_id, work_dir, log_path, zip_path # --------------------- # Helpers for new features (post-processing) # --------------------- def _parse_rgb(s): """Accepts '94,46,145' / '94 46 145' / '[94,46,145]' / '(94, 46, 145)' and returns (r,g,b) or None.""" if s is None: return None if isinstance(s, (tuple, list)) and len(s) == 3: vals = s else: nums = re.findall(r"\d+", str(s)) if len(nums) < 3: return None vals = nums[:3] try: r, g, b = (int(vals[0]), int(vals[1]), int(vals[2])) if any(v < 0 or v > 255 for v in (r, g, b)): return None return (r, g, b) except Exception: return None def _apply_meeting_logo(OUTPUT_DIR: Path, meeting_logo_file, logs): """Replace output/poster_latex_proj/logos/right_logo.png if meeting_logo_file provided.""" if not meeting_logo_file: return False logos_dir = OUTPUT_DIR / "poster_latex_proj" / "logos" target = logos_dir / "right_logo.png" try: logos_dir.mkdir(parents=True, exist_ok=True) # Try to convert to PNG for safety try: from PIL import Image img = Image.open(meeting_logo_file.name) # preserve alpha if available if img.mode not in ("RGB", "RGBA"): img = img.convert("RGBA") img.save(target, format="PNG") logs.append(f"🖼️ Meeting logo converted to PNG and saved → {target.relative_to(OUTPUT_DIR)}") except Exception as e: # Fallback: raw copy with .png name shutil.copy(meeting_logo_file.name, target) logs.append(f"🖼️ Meeting logo copied (no conversion) → {target.relative_to(OUTPUT_DIR)} (note: ensure it's a valid PNG).") return True except Exception as e: logs.append(f"⚠️ Failed to apply meeting logo: {e}") return False def _apply_theme_rgb(OUTPUT_DIR: Path, rgb_tuple, logs): """Replace \\definecolor{nipspurple}{RGB}{r,g,b} in poster_output.tex if rgb_tuple provided.""" if not rgb_tuple: return False tex_path = OUTPUT_DIR / "poster_latex_proj" / "poster_output.tex" if not tex_path.exists(): logs.append(f"⚠️ Theme RGB skipped: {tex_path.relative_to(OUTPUT_DIR)} not found.") return False try: content = tex_path.read_text(encoding="utf-8") pattern = r"(\\definecolor\{nipspurple\}\{RGB\}\{)\s*\d+\s*,\s*\d+\s*,\s*\d+\s*(\})" new_vals = f"{rgb_tuple[0]},{rgb_tuple[1]},{rgb_tuple[2]}" new_content, n = re.subn(pattern, r"\1" + new_vals + r"\2", content, flags=re.MULTILINE) if n > 0: tex_path.write_text(new_content, encoding="utf-8") logs.append(f"🎨 Theme color updated: themecolor = {{{new_vals}}} in {tex_path.relative_to(OUTPUT_DIR)}") return True else: logs.append("⚠️ Theme RGB not applied: definecolor for 'themecolor' not found.") return False except Exception as e: logs.append(f"⚠️ Failed to update theme RGB: {e}") return False def _apply_left_logo(OUTPUT_DIR: Path, logo_files, logs): """ Use the first institutional logo uploaded by the user: - Copy it into output/poster_latex_proj/logos/ as left_logo. - Replace 'logos/left_logo.png' in poster_output.tex with the proper file extension Does NOT convert formats. Simply renames and rewrites the tex reference. """ if not logo_files: logs.append("ℹ️ No institutional logo uploaded.") return False # If multiple files component, take the first one f = logo_files[0] if isinstance(logo_files, (list, tuple)) else logo_files if not f: logs.append("ℹ️ No institutional logo uploaded.") return False ext = Path(f.name).suffix or ".png" # fallback to .png if no extension logos_dir = OUTPUT_DIR / "poster_latex_proj" / "logos" tex_path = OUTPUT_DIR / "poster_latex_proj" / "poster_output.tex" try: logos_dir.mkdir(parents=True, exist_ok=True) dst = logos_dir / f"left_logo{ext}" shutil.copy(f.name, dst) logs.append(f"🏷️ Institutional logo copied to: {dst.relative_to(OUTPUT_DIR)}") except Exception as e: logs.append(f"⚠️ Failed to copy institutional logo: {e}") return False if not tex_path.exists(): logs.append("⚠️ poster_output.tex not found, cannot replace left_logo path.") return False try: text = tex_path.read_text(encoding="utf-8") old = "logos/left_logo.png" new = f"logos/left_logo{ext}" if old in text: tex_path.write_text(text.replace(old, new), encoding="utf-8") logs.append(f"🛠️ Replaced left_logo.png → left_logo{ext} in poster_output.tex") return True # Fallback (covers weird spacing or macro variations) import re pattern = r"(logos/left_logo)\.png" new_text, n = re.subn(pattern, r"\1" + ext, text) if n > 0: tex_path.write_text(new_text, encoding="utf-8") logs.append(f"🛠️ Replaced left_logo.png → left_logo{ext} (regex fallback)") return True logs.append("ℹ️ No left_logo.png reference found in poster_output.tex.") return False except Exception as e: logs.append(f"⚠️ Failed to modify poster_output.tex: {e}") return False # ===================== # Gradio pipeline function (ISOLATED) # ===================== def run_pipeline(arxiv_url, pdf_file, openai_key, logo_files, meeting_logo_file, theme_rgb): _cleanup_old_runs(RETENTION_HOURS) start_time = datetime.datetime.now() logs = [f"🚀 Starting pipeline at {_now_str()}"] # --- Prepare per-run workspace --- run_id, WORK_DIR, LOG_PATH, ZIP_PATH = _prepare_workspace(logs) INPUT_DIR = WORK_DIR / "input" OUTPUT_DIR = WORK_DIR / "output" LOGO_DIR = INPUT_DIR / "logo" POSTER_LATEX_DIR = WORK_DIR / "posterbuilder" / "latex_proj" _write_logs(LOG_PATH, logs) yield "\n".join(logs), None # ====== Validation: must upload LOGO ====== if logo_files is None: logo_files = [] if not isinstance(logo_files, (list, tuple)): logo_files = [logo_files] logo_files = [f for f in logo_files if f] if len(logo_files) == 0: msg = "❌ You must upload at least one institutional logo (multiple allowed)." logs.append(msg) _write_logs(LOG_PATH, logs) yield "\n".join(logs), None return # Save logos into run-local dir for item in LOGO_DIR.iterdir(): if item.is_file(): item.unlink() saved_logo_paths = [] for lf in logo_files: p = LOGO_DIR / Path(lf.name).name shutil.copy(lf.name, p) saved_logo_paths.append(p) logs.append(f"🏷️ Saved {len(saved_logo_paths)} logo file(s) → {LOGO_DIR.relative_to(WORK_DIR)}") _write_logs(LOG_PATH, logs) yield "\n".join(logs), None # ====== Handle uploaded PDF (optional) ====== pdf_path = None if pdf_file: pdf_dir = INPUT_DIR / "pdf" pdf_dir.mkdir(parents=True, exist_ok=True) pdf_path = pdf_dir / Path(pdf_file.name).name shutil.copy(pdf_file.name, pdf_path) logs.append(f"📄 Uploaded PDF → {pdf_path.relative_to(WORK_DIR)}") # For pipeline Step 1.5 compatibility: also copy to input/paper.pdf canonical_pdf = INPUT_DIR / "paper.pdf" shutil.copy(pdf_file.name, canonical_pdf) _write_logs(LOG_PATH, logs) yield "\n".join(logs), None # ====== Validate input source ====== if not arxiv_url and not pdf_file: msg = "❌ Please provide either an arXiv link or upload a PDF file (choose one)." logs.append(msg) _write_logs(LOG_PATH, logs) yield "\n".join(logs), None return # ====== Build command (run INSIDE workspace) ====== cmd = [ sys.executable, "pipeline.py", "--model_name_t", "gpt-5", "--model_name_v", "gpt-5", "--result_dir", "output", "--paper_latex_root", "input/latex_proj", "--openai_key", openai_key, "--gemini_key", "##", "--logo_dir", str(LOGO_DIR) # run-local logo dir ] if arxiv_url: cmd += ["--arxiv_url", arxiv_url] # (Keep pdf via input/paper.pdf; pipeline will read it if exists) logs.append("\n======= REAL-TIME LOG =======") logs.append(f"cwd = runs/{WORK_DIR.name}") logs.append(f"cmd = {' '.join(cmd)}") _write_logs(LOG_PATH, logs) yield "\n".join(logs), None # ====== Run with REAL-TIME streaming, inside workspace ====== try: process = subprocess.Popen( cmd, cwd=str(WORK_DIR), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, universal_newlines=True, ) except Exception as e: msg = f"❌ Pipeline failed to start: {e}" logs.append(msg) _write_logs(LOG_PATH, logs) yield "\n".join(logs), None return last_yield = time.time() try: while True: # Timeout guard if (datetime.datetime.now() - start_time).total_seconds() > TIMEOUT_SECONDS: logs.append("❌ Pipeline timed out (30 min limit). Killing process…") try: process.kill() except Exception: pass _write_logs(LOG_PATH, logs) yield "\n".join(logs), None return line = process.stdout.readline() if line: print(line, end="") # echo to Space logs logs.append(line.rstrip("\n")) _write_logs(LOG_PATH, logs) now = time.time() if now - last_yield >= 0.3: last_yield = now yield "\n".join(logs), None elif process.poll() is not None: break else: time.sleep(0.05) return_code = process.wait() logs.append(f"\nProcess finished with code {return_code}") _write_logs(LOG_PATH, logs) yield "\n".join(logs), None if return_code != 0: logs.append("❌ Process exited with non-zero status. See logs above.") _write_logs(LOG_PATH, logs) yield "\n".join(logs), None return except Exception as e: logs.append(f"❌ Error during streaming: {e}") _write_logs(LOG_PATH, logs) yield "\n".join(logs), None return finally: try: if process.stdout: process.stdout.close() except Exception: pass # ====== Check output ====== has_output = False try: if OUTPUT_DIR.exists(): for _ in OUTPUT_DIR.iterdir(): has_output = True break except FileNotFoundError: has_output = False if not has_output: msg = "❌ No output generated. Please check logs above." logs.append(msg) _write_logs(LOG_PATH, logs) yield "\n".join(logs), None return # ====== NEW: Post-processing (optional features) ====== # 1) Optional meeting logo replacement applied_logo = _apply_meeting_logo(OUTPUT_DIR, meeting_logo_file, logs) # 2) Optional theme color update rgb_tuple = _parse_rgb(theme_rgb) if theme_rgb and not rgb_tuple: logs.append(f"⚠️ Ignored Theme RGB input '{theme_rgb}': expected like '94,46,145'.") applied_rgb = _apply_theme_rgb(OUTPUT_DIR, rgb_tuple, logs) if rgb_tuple else False # 3) Optional institutional logo -> left_logo. _apply_left_logo(OUTPUT_DIR, logo_files, logs) _write_logs(LOG_PATH, logs) yield "\n".join(logs), None _write_logs(LOG_PATH, logs) yield "\n".join(logs), None # ====== Zip output (run-local) ====== try: with zipfile.ZipFile(ZIP_PATH, 'w', zipfile.ZIP_DEFLATED) as zipf: for root, dirs, files in os.walk(OUTPUT_DIR): for file in files: file_path = Path(root) / file arcname = file_path.relative_to(OUTPUT_DIR) zipf.write(file_path, arcname=arcname) logs.append(f"✅ Zipped output → {ZIP_PATH.relative_to(WORK_DIR)}") except Exception as e: logs.append(f"❌ Failed to create zip: {e}") end_time = datetime.datetime.now() dur = (end_time - start_time).seconds logs.append(f"🏁 Completed at {_now_str()} (Duration: {dur}s)") logs.append(f"🆔 run_id = {WORK_DIR.name}") _write_logs(LOG_PATH, logs) yield "\n".join(logs), (str(ZIP_PATH) if ZIP_PATH.exists() else None) # ===================== # Gradio UI # ===================== iface = gr.Interface( fn=run_pipeline, inputs=[ gr.Textbox(label="📘 ArXiv URL (choose one)", placeholder="https://arxiv.org/abs/2505.xxxxx"), gr.File(label="📄 Upload PDF (choose one)"), gr.Textbox(label="🔑 OpenAI API Key", placeholder="sk-...", type="password"), gr.File( label="🏷️ Institutional Logo (optional, multiple allowed)", file_count="multiple", file_types=["image"], ), gr.File(label="🧩 Optional: Conference Logo (replaces right_logo.png)", file_count="single", file_types=["image"]), gr.Textbox(label="🎨 Optional: Theme RGB (e.g., 94,46,145)", placeholder="94,46,145"), ], outputs=[ gr.Textbox(label="🧾 Logs (8~10 minutes)", lines=30, max_lines=50), gr.File(label="📦 Download Results (.zip)") ], title="🎓 Paper2Poster", description=""" paper(https://arxiv.org/abs/2505.21497) | [GitHub](https://github.com/Paper2Poster/Paper2Poster) | project page (https://paper2poster.github.io/) # Paper2Poster Upload a paper, generate a poster for you. Each paper takes approximately **8–10 minutes**. This work is based on the **[CAMEL-ai](https://camel-ai.org/)** framework. """, allow_flagging="never", ) if __name__ == "__main__": iface.launch(server_name="0.0.0.0", server_port=7860)