"""Shared utilities for the Streamlit app.""" import json import shutil import tempfile import time import zipfile from datetime import datetime from pathlib import Path import streamlit as st def stream_markdown(text, delay=0.02): buf = "" slot = st.empty() for line in text.split("\n"): buf += line + "\n" slot.markdown(buf) time.sleep(delay) def status_banner(status: str | None, label: str) -> str: """Return an HTML status banner for a workflow result. ``status`` is the workflow's ``final_status`` ("success" / "failed" / "max_revisions_reached" / ...). The returned string is a standalone HTML block — prepend it to a markdown report with a blank line in between. """ s = (status or "").lower() if s in ("success", "complete", "completed"): cls, icon = "status-success", "✅" elif s in ("failed", "error"): cls, icon = "status-fail", "❌" else: cls, icon = "status-warn", "⚠️" return f"
" # --- File upload helpers --- def get_upload_temp_dir() -> Path: """Return temp directory for uploaded files. Clean old dirs on startup.""" base = Path(tempfile.gettempdir()) / "scider_uploads" base.mkdir(parents=True, exist_ok=True) now = time.time() for d in base.iterdir(): if d.is_dir() and (now - d.stat().st_mtime) > 3600: try: shutil.rmtree(d) except OSError: pass return base def save_and_extract_upload(uploaded_file) -> Path | None: """Save uploaded zip to temp dir, extract it, return path to extracted dir.""" if uploaded_file is None or not uploaded_file.name.lower().endswith(".zip"): return None base = get_upload_temp_dir() dest_dir = Path(tempfile.mkdtemp(dir=base)) zip_path = dest_dir / uploaded_file.name with open(zip_path, "wb") as f: f.write(uploaded_file.getvalue()) extract_dir = dest_dir / "extracted" extract_dir.mkdir(parents=True, exist_ok=True) with zipfile.ZipFile(zip_path, "r") as zf: zf.extractall(extract_dir) zip_path.unlink() return extract_dir.resolve() def find_data_analysis_file(extract_dir: Path) -> Path | None: """Find data_analysis.md in extracted dir (root or first subdir).""" candidates = [extract_dir / "data_analysis.md", extract_dir / "analysis.md"] for c in candidates: if c.exists(): return c for p in extract_dir.rglob("data_analysis.md"): return p for p in extract_dir.rglob("analysis.md"): return p return None def _rm_upload_root(p: Path): """Remove the scider_uploads session dir (go up to find it).""" cur = Path(p).resolve().parent if Path(p).resolve().is_file() else Path(p).resolve() while cur != cur.parent: parent = cur.parent if parent.name == "scider_uploads": try: shutil.rmtree(cur) except OSError: pass return cur = parent def cleanup_uploaded_data(): """Remove temp uploaded data and restore workspace_path to default.""" for key in ("uploaded_data_path", "uploaded_experiment_path", "uploaded_full_data_path"): path = st.session_state.get(key) if path and isinstance(path, (str, Path)): _rm_upload_root(Path(path)) if key in st.session_state: del st.session_state[key] if "default_workspace_path" in st.session_state: st.session_state.workspace_path = st.session_state.default_workspace_path # --- Chat history --- def get_next_memo_number(memory_dir: Path) -> int: if not memory_dir.exists(): return 1 existing_memos = [ d.name for d in memory_dir.iterdir() if d.is_dir() and d.name.startswith("memo_") ] if not existing_memos: return 1 numbers = [] for memo in existing_memos: try: num = int(memo.replace("memo_", "")) numbers.append(num) except ValueError: continue return max(numbers) + 1 if numbers else 1 def save_chat_history(messages: list, workflow_type: str, metadata: dict = None): base_dir = Path(__file__).parent / "saved_chats" base_dir.mkdir(parents=True, exist_ok=True) memo_number = get_next_memo_number(base_dir) memo_dir = base_dir / f"memo_{memo_number}" memo_dir.mkdir(parents=True, exist_ok=True) chat_data = { "timestamp": datetime.now().isoformat(), "workflow_type": workflow_type, "metadata": metadata or {}, "messages": messages, } chat_file = memo_dir / "chat_history.json" with open(chat_file, "w", encoding="utf-8") as f: json.dump(chat_data, f, indent=2, ensure_ascii=False) return memo_dir