"""Shared utilities for the Streamlit app.""" import json import shutil import tempfile import time import zipfile from collections import defaultdict from datetime import datetime from pathlib import Path import streamlit as st def stream_markdown(text, delay=0.02): buf = "" slot = st.empty() for line in text.split("\n"): buf += line + "\n" slot.markdown(buf) time.sleep(delay) def render_intermediate_state(intermediate_state): if not intermediate_state: return by_node = defaultdict(list) for item in intermediate_state: by_node[item.get("node_name", "unknown")].append(item.get("output", "")) st.divider() st.subheader("Intermediate States") for node, outputs in by_node.items(): with st.expander(node, expanded=False): for i, content in enumerate(outputs, 1): st.markdown(f"**Step {i}**") st.markdown(content) # --- File upload helpers --- def get_upload_temp_dir() -> Path: """Return temp directory for uploaded files. Clean old dirs on startup.""" base = Path(tempfile.gettempdir()) / "scider_uploads" base.mkdir(parents=True, exist_ok=True) now = time.time() for d in base.iterdir(): if d.is_dir() and (now - d.stat().st_mtime) > 3600: try: shutil.rmtree(d) except OSError: pass return base def save_and_extract_upload(uploaded_file) -> Path | None: """Save uploaded zip to temp dir, extract it, return path to extracted dir.""" if uploaded_file is None or not uploaded_file.name.lower().endswith(".zip"): return None base = get_upload_temp_dir() dest_dir = Path(tempfile.mkdtemp(dir=base)) zip_path = dest_dir / uploaded_file.name with open(zip_path, "wb") as f: f.write(uploaded_file.getvalue()) extract_dir = dest_dir / "extracted" extract_dir.mkdir(parents=True, exist_ok=True) with zipfile.ZipFile(zip_path, "r") as zf: zf.extractall(extract_dir) zip_path.unlink() return extract_dir.resolve() def find_data_analysis_file(extract_dir: Path) -> Path | None: """Find data_analysis.md in extracted dir (root or first subdir).""" candidates = [extract_dir / "data_analysis.md", extract_dir / "analysis.md"] for c in candidates: if c.exists(): return c for p in extract_dir.rglob("data_analysis.md"): return p for p in extract_dir.rglob("analysis.md"): return p return None def _rm_upload_root(p: Path): """Remove the scider_uploads session dir (go up to find it).""" cur = Path(p).resolve().parent if Path(p).resolve().is_file() else Path(p).resolve() while cur != cur.parent: parent = cur.parent if parent.name == "scider_uploads": try: shutil.rmtree(cur) except OSError: pass return cur = parent def cleanup_uploaded_data(): """Remove temp uploaded data and restore workspace_path to default.""" for key in ("uploaded_data_path", "uploaded_experiment_path", "uploaded_full_data_path"): path = st.session_state.get(key) if path and isinstance(path, (str, Path)): _rm_upload_root(Path(path)) if key in st.session_state: del st.session_state[key] if "default_workspace_path" in st.session_state: st.session_state.workspace_path = st.session_state.default_workspace_path # --- Chat history --- def get_next_memo_number(memory_dir: Path) -> int: if not memory_dir.exists(): return 1 existing_memos = [ d.name for d in memory_dir.iterdir() if d.is_dir() and d.name.startswith("memo_") ] if not existing_memos: return 1 numbers = [] for memo in existing_memos: try: num = int(memo.replace("memo_", "")) numbers.append(num) except ValueError: continue return max(numbers) + 1 if numbers else 1 def save_chat_history(messages: list, workflow_type: str, metadata: dict = None): base_dir = Path(__file__).parent / "saved_chats" base_dir.mkdir(parents=True, exist_ok=True) memo_number = get_next_memo_number(base_dir) memo_dir = base_dir / f"memo_{memo_number}" memo_dir.mkdir(parents=True, exist_ok=True) chat_data = { "timestamp": datetime.now().isoformat(), "workflow_type": workflow_type, "metadata": metadata or {}, "messages": messages, } chat_file = memo_dir / "chat_history.json" with open(chat_file, "w", encoding="utf-8") as f: json.dump(chat_data, f, indent=2, ensure_ascii=False) return memo_dir