leonardklin's picture
Upload 328 files
978fed5 verified
"""Shared utilities for the Streamlit app."""
import json
import shutil
import tempfile
import time
import zipfile
from collections import defaultdict
from datetime import datetime
from pathlib import Path
import streamlit as st
def stream_markdown(text, delay=0.02):
buf = ""
slot = st.empty()
for line in text.split("\n"):
buf += line + "\n"
slot.markdown(buf)
time.sleep(delay)
def render_intermediate_state(intermediate_state):
if not intermediate_state:
return
by_node = defaultdict(list)
for item in intermediate_state:
by_node[item.get("node_name", "unknown")].append(item.get("output", ""))
st.divider()
st.subheader("Intermediate States")
for node, outputs in by_node.items():
with st.expander(node, expanded=False):
for i, content in enumerate(outputs, 1):
st.markdown(f"**Step {i}**")
st.markdown(content)
# --- File upload helpers ---
def get_upload_temp_dir() -> Path:
"""Return temp directory for uploaded files. Clean old dirs on startup."""
base = Path(tempfile.gettempdir()) / "scider_uploads"
base.mkdir(parents=True, exist_ok=True)
now = time.time()
for d in base.iterdir():
if d.is_dir() and (now - d.stat().st_mtime) > 3600:
try:
shutil.rmtree(d)
except OSError:
pass
return base
def save_and_extract_upload(uploaded_file) -> Path | None:
"""Save uploaded zip to temp dir, extract it, return path to extracted dir."""
if uploaded_file is None or not uploaded_file.name.lower().endswith(".zip"):
return None
base = get_upload_temp_dir()
dest_dir = Path(tempfile.mkdtemp(dir=base))
zip_path = dest_dir / uploaded_file.name
with open(zip_path, "wb") as f:
f.write(uploaded_file.getvalue())
extract_dir = dest_dir / "extracted"
extract_dir.mkdir(parents=True, exist_ok=True)
with zipfile.ZipFile(zip_path, "r") as zf:
zf.extractall(extract_dir)
zip_path.unlink()
return extract_dir.resolve()
def find_data_analysis_file(extract_dir: Path) -> Path | None:
"""Find data_analysis.md in extracted dir (root or first subdir)."""
candidates = [extract_dir / "data_analysis.md", extract_dir / "analysis.md"]
for c in candidates:
if c.exists():
return c
for p in extract_dir.rglob("data_analysis.md"):
return p
for p in extract_dir.rglob("analysis.md"):
return p
return None
def _rm_upload_root(p: Path):
"""Remove the scider_uploads session dir (go up to find it)."""
cur = Path(p).resolve().parent if Path(p).resolve().is_file() else Path(p).resolve()
while cur != cur.parent:
parent = cur.parent
if parent.name == "scider_uploads":
try:
shutil.rmtree(cur)
except OSError:
pass
return
cur = parent
def cleanup_uploaded_data():
"""Remove temp uploaded data and restore workspace_path to default."""
for key in ("uploaded_data_path", "uploaded_experiment_path", "uploaded_full_data_path"):
path = st.session_state.get(key)
if path and isinstance(path, (str, Path)):
_rm_upload_root(Path(path))
if key in st.session_state:
del st.session_state[key]
if "default_workspace_path" in st.session_state:
st.session_state.workspace_path = st.session_state.default_workspace_path
# --- Chat history ---
def get_next_memo_number(memory_dir: Path) -> int:
if not memory_dir.exists():
return 1
existing_memos = [
d.name for d in memory_dir.iterdir() if d.is_dir() and d.name.startswith("memo_")
]
if not existing_memos:
return 1
numbers = []
for memo in existing_memos:
try:
num = int(memo.replace("memo_", ""))
numbers.append(num)
except ValueError:
continue
return max(numbers) + 1 if numbers else 1
def save_chat_history(messages: list, workflow_type: str, metadata: dict = None):
base_dir = Path(__file__).parent / "saved_chats"
base_dir.mkdir(parents=True, exist_ok=True)
memo_number = get_next_memo_number(base_dir)
memo_dir = base_dir / f"memo_{memo_number}"
memo_dir.mkdir(parents=True, exist_ok=True)
chat_data = {
"timestamp": datetime.now().isoformat(),
"workflow_type": workflow_type,
"metadata": metadata or {},
"messages": messages,
}
chat_file = memo_dir / "chat_history.json"
with open(chat_file, "w", encoding="utf-8") as f:
json.dump(chat_data, f, indent=2, ensure_ascii=False)
return memo_dir