Spaces:
Sleeping
Sleeping
File size: 4,742 Bytes
978fed5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 | """Shared utilities for the Streamlit app."""
import json
import shutil
import tempfile
import time
import zipfile
from collections import defaultdict
from datetime import datetime
from pathlib import Path
import streamlit as st
def stream_markdown(text, delay=0.02):
buf = ""
slot = st.empty()
for line in text.split("\n"):
buf += line + "\n"
slot.markdown(buf)
time.sleep(delay)
def render_intermediate_state(intermediate_state):
if not intermediate_state:
return
by_node = defaultdict(list)
for item in intermediate_state:
by_node[item.get("node_name", "unknown")].append(item.get("output", ""))
st.divider()
st.subheader("Intermediate States")
for node, outputs in by_node.items():
with st.expander(node, expanded=False):
for i, content in enumerate(outputs, 1):
st.markdown(f"**Step {i}**")
st.markdown(content)
# --- File upload helpers ---
def get_upload_temp_dir() -> Path:
"""Return temp directory for uploaded files. Clean old dirs on startup."""
base = Path(tempfile.gettempdir()) / "scider_uploads"
base.mkdir(parents=True, exist_ok=True)
now = time.time()
for d in base.iterdir():
if d.is_dir() and (now - d.stat().st_mtime) > 3600:
try:
shutil.rmtree(d)
except OSError:
pass
return base
def save_and_extract_upload(uploaded_file) -> Path | None:
"""Save uploaded zip to temp dir, extract it, return path to extracted dir."""
if uploaded_file is None or not uploaded_file.name.lower().endswith(".zip"):
return None
base = get_upload_temp_dir()
dest_dir = Path(tempfile.mkdtemp(dir=base))
zip_path = dest_dir / uploaded_file.name
with open(zip_path, "wb") as f:
f.write(uploaded_file.getvalue())
extract_dir = dest_dir / "extracted"
extract_dir.mkdir(parents=True, exist_ok=True)
with zipfile.ZipFile(zip_path, "r") as zf:
zf.extractall(extract_dir)
zip_path.unlink()
return extract_dir.resolve()
def find_data_analysis_file(extract_dir: Path) -> Path | None:
"""Find data_analysis.md in extracted dir (root or first subdir)."""
candidates = [extract_dir / "data_analysis.md", extract_dir / "analysis.md"]
for c in candidates:
if c.exists():
return c
for p in extract_dir.rglob("data_analysis.md"):
return p
for p in extract_dir.rglob("analysis.md"):
return p
return None
def _rm_upload_root(p: Path):
"""Remove the scider_uploads session dir (go up to find it)."""
cur = Path(p).resolve().parent if Path(p).resolve().is_file() else Path(p).resolve()
while cur != cur.parent:
parent = cur.parent
if parent.name == "scider_uploads":
try:
shutil.rmtree(cur)
except OSError:
pass
return
cur = parent
def cleanup_uploaded_data():
"""Remove temp uploaded data and restore workspace_path to default."""
for key in ("uploaded_data_path", "uploaded_experiment_path", "uploaded_full_data_path"):
path = st.session_state.get(key)
if path and isinstance(path, (str, Path)):
_rm_upload_root(Path(path))
if key in st.session_state:
del st.session_state[key]
if "default_workspace_path" in st.session_state:
st.session_state.workspace_path = st.session_state.default_workspace_path
# --- Chat history ---
def get_next_memo_number(memory_dir: Path) -> int:
if not memory_dir.exists():
return 1
existing_memos = [
d.name for d in memory_dir.iterdir() if d.is_dir() and d.name.startswith("memo_")
]
if not existing_memos:
return 1
numbers = []
for memo in existing_memos:
try:
num = int(memo.replace("memo_", ""))
numbers.append(num)
except ValueError:
continue
return max(numbers) + 1 if numbers else 1
def save_chat_history(messages: list, workflow_type: str, metadata: dict = None):
base_dir = Path(__file__).parent / "saved_chats"
base_dir.mkdir(parents=True, exist_ok=True)
memo_number = get_next_memo_number(base_dir)
memo_dir = base_dir / f"memo_{memo_number}"
memo_dir.mkdir(parents=True, exist_ok=True)
chat_data = {
"timestamp": datetime.now().isoformat(),
"workflow_type": workflow_type,
"metadata": metadata or {},
"messages": messages,
}
chat_file = memo_dir / "chat_history.json"
with open(chat_file, "w", encoding="utf-8") as f:
json.dump(chat_data, f, indent=2, ensure_ascii=False)
return memo_dir
|