finalRLEnv / app.py
garvitsachdeva's picture
fix: install nvidia-cudnn-cu12 alongside CUDA torch (removes --no-deps)
0c3df82
"""
SpindleFlow RL β€” HuggingFace Spaces Training App
=================================================
Upload this file + requirements.txt to a NEW HF Space.
Space settings:
SDK : Gradio
Hardware : A100 (large) ← select when creating the Space
Secrets : HF_TOKEN (write token β€” huggingface.co β†’ Settings β†’ Tokens)
OPENAI_API_KEY (optional β€” enables finetuner + spawn self-learning)
HF_MODEL_REPO (optional β€” defaults to <your-username>/spindleflow-rl)
Training starts automatically when the Space boots.
Refresh the page or click "Refresh" to see live progress.
"""
import sys, os, subprocess
print("=== PYTHON STARTED ===", flush=True)
# ── Force CUDA torch before any `import torch` happens in this process ─────────
# requirements.txt installs CPU torch as a transitive dep of sentence-transformers.
# --force-reinstall overrides "already satisfied" and also installs nvidia-cudnn-cu12
# and other CUDA runtime packages needed by the LSTM kernel (cuDNN).
# This subprocess runs before gradio (and therefore before any torch import).
print("Installing CUDA torch + cuDNN...", flush=True)
_cuda_r = subprocess.run(
[sys.executable, "-m", "pip", "install", "-q",
"--force-reinstall",
"--index-url", "https://download.pytorch.org/whl/cu121",
"--extra-index-url", "https://pypi.org/simple",
"torch"],
capture_output=True, text=True,
timeout=600,
)
if _cuda_r.returncode == 0:
print("CUDA torch + cuDNN installed OK.", flush=True)
else:
print("CUDA torch install FAILED:", _cuda_r.stderr[-400:], flush=True)
import gradio as gr
print("=== GRADIO IMPORTED ===", flush=True)
import threading
import json, time
import numpy as np
# ── Persistent state (file-based so Gradio worker processes can read it) ─────
_LOG_FILE = "/home/user/app/assets/training_log.txt"
_STATUS_FILE = "/home/user/app/assets/training_status.json"
def _write_status(phase, done=False, error=None):
try:
os.makedirs("/home/user/app/assets", exist_ok=True)
with open(_STATUS_FILE, "w", encoding="utf-8") as f:
json.dump({"phase": phase, "done": done, "error": error}, f)
except Exception:
pass
def _log(msg: str):
ts = time.strftime("%H:%M:%S")
line = f"[{ts}] {msg}"
print(line, flush=True)
try:
os.makedirs("/home/user/app/assets", exist_ok=True)
with open(_LOG_FILE, "a", encoding="utf-8") as f:
f.write(line + "\n")
except Exception:
pass
# ── Training thread ───────────────────────────────────────────
def _training_thread():
try:
# ── Tokens ──────────────────────────────────────────
HF_TOKEN = os.environ.get("HF_TOKEN", "")
OPENAI_KEY = os.environ.get("OPENAI_API_KEY", "")
HF_REPO = os.environ.get("HF_MODEL_REPO", "")
if not HF_TOKEN:
raise RuntimeError(
"HF_TOKEN secret not set. "
"Go to Space Settings β†’ Variables and secrets β†’ add HF_TOKEN."
)
if OPENAI_KEY:
_log("OpenAI key found β€” finetuner + spawn self-learning enabled.")
else:
_log("No OPENAI_API_KEY β€” running in simulation mode (fast training).")
if not HF_REPO:
from huggingface_hub import whoami
username = whoami(token=HF_TOKEN)["name"].strip()
HF_REPO = f"{username}/spindleflow-rl"
else:
HF_REPO = HF_REPO.strip()
_log(f"Model will be pushed to: https://huggingface.co/{HF_REPO}")
REPO_DIR = "/home/user/app"
os.chdir(REPO_DIR)
sys.path.insert(0, REPO_DIR)
_log(f"Working directory: {REPO_DIR}")
os.makedirs("/home/user/app/data", exist_ok=True)
os.makedirs("/home/user/app/checkpoints", exist_ok=True)
os.makedirs("/home/user/app/assets", exist_ok=True)
# ── Create HF repo early so periodic pushes can start ──
from huggingface_hub import HfApi, CommitOperationAdd
api = HfApi(token=HF_TOKEN)
api.create_repo(repo_id=HF_REPO, repo_type="model", exist_ok=True)
# ── Force SentenceTransformer onto CUDA ─────────────
# encode() is called every step (scratchpad) + per specialist call.
# On CPU this costs ~250 ms/call β†’ ~1 s/step. On CUDA it's ~10 ms.
_log("Patching SentenceTransformer to CUDA...")
import torch as _torch_st
if _torch_st.cuda.is_available():
try:
from sentence_transformers import SentenceTransformer as _ST
_orig_st_init = _ST.__init__
def _fast_st_init(self, *args, **kwargs):
kwargs.setdefault("device", "cuda")
_orig_st_init(self, *args, **kwargs)
_ST.__init__ = _fast_st_init
_log("SentenceTransformer β†’ cuda βœ“")
except Exception as _ep:
_log(f"ST patch skipped: {_ep}")
else:
_log("WARNING: CUDA not available for SentenceTransformer β€” CPU mode (slow)")
# ── Patch env for simulate_specialists ──────────────
_log("Loading environment...")
from env.spindleflow_env import SpindleFlowEnv
import os as _os
if not getattr(SpindleFlowEnv, "_simulate_patched", False):
_orig_init = SpindleFlowEnv.__init__
def _new_init(self, *args, simulate_specialists=False, **kwargs):
_orig_init(self, *args, **kwargs)
self.simulate_specialists = simulate_specialists
SpindleFlowEnv.__init__ = _new_init
_orig_call = SpindleFlowEnv._call_specialist
def _new_call(self, specialist_id, task, elapsed_ms, context=None):
if getattr(self, "simulate_specialists", False):
_key = _os.environ.pop("OPENAI_API_KEY", None)
try:
return _orig_call(self, specialist_id, task, elapsed_ms, context=context)
finally:
if _key:
_os.environ["OPENAI_API_KEY"] = _key
return _orig_call(self, specialist_id, task, elapsed_ms, context=context)
SpindleFlowEnv._call_specialist = _new_call
SpindleFlowEnv._simulate_patched = True
# ── Disable Tier-2 LLM scoring during training ───────
# TieredRewardScorer._tier2_score calls OpenAI API (>1000ms per episode).
# Returning None forces it to fall back to Tier-1 embedding scoring (~fast),
# preserving a meaningful reward signal without API latency.
from reward.tiered_reward import TieredRewardScorer
TieredRewardScorer._get_openai_client = lambda self: None
_log("TieredRewardScorer β†’ Tier-1 only (LLM judge disabled for speed) βœ“")
# ── Patch generalist baseline β†’ static (0 API calls per episode) ─────
from env.spindleflow_env import SpindleFlowEnv as _SFEnv
_STATIC_BASELINE = (
"General problem-solving approach:\n"
"1. Gather and clarify requirements\n"
"2. Research common solution patterns\n"
"3. Draft a high-level architecture\n"
"4. Implement in small, testable increments\n"
"5. Validate against acceptance criteria and deploy\n"
"No specialist domain expertise applied."
)
_SFEnv._generate_generalist_baseline = lambda self, task: _STATIC_BASELINE
_log("Generalist baseline β†’ static simulation (0 API calls per episode) βœ“")
# ── Smoke test ──────────────────────────────────────
_log("Running smoke test...")
env = SpindleFlowEnv(
config_path="configs/training_config.yaml",
catalog_path="configs/specialist_catalog.yaml",
use_real_spindleflow=False,
phase=1,
simulate_specialists=True,
)
obs, info = env.reset()
env.step(env.action_space.sample())
_log(f"Smoke test OK β€” obs shape {obs.shape}")
# ── Benchmark: encode speed and full step speed ──────
_log("Benchmarking SentenceTransformer encode speed...")
_N_enc = 50
_t0 = time.perf_counter()
for _ in range(_N_enc):
env.registry.embed_query("Software engineering task requiring specialist delegation")
_enc_ms = (time.perf_counter() - _t0) / _N_enc * 1000
_enc_device = "CUDA βœ“ fast" if _enc_ms < 50 else "CPU β€” slow, patch may have failed"
_log(f"Encode speed : {_enc_ms:.1f} ms/call [{_enc_device}]")
_log("Benchmarking full env.step() speed...")
_N_steps = 30
obs_b, _ = env.reset()
_t0 = time.perf_counter()
for _ in range(_N_steps):
obs_b, _, _d, _, _ = env.step(env.action_space.sample())
if _d:
obs_b, _ = env.reset()
_step_ms = (time.perf_counter() - _t0) / _N_steps * 1000
_step_ok = "fast βœ“" if _step_ms < 100 else "slow β€” check logs"
_log(f"Step speed : {_step_ms:.1f} ms/step [{_step_ok}]")
_log(f"Projected 100k steps: {100_000 * _step_ms / 1000 / 60:.0f} min")
env.close()
# ── Training ────────────────────────────────────────
import torch, yaml
from sb3_contrib import RecurrentPPO
from stable_baselines3.common.vec_env import DummyVecEnv, VecNormalize
from stable_baselines3.common.callbacks import CheckpointCallback, BaseCallback
from policy.lstm_policy import build_policy_kwargs
from training.curriculum import CurriculumManager
from training.specialist_improvement_callback import SpecialistImprovementCallback
with open("configs/training_config.yaml") as f:
cfg = yaml.safe_load(f)
curriculum = CurriculumManager(config_path="configs/training_config.yaml")
class RewardLogger(BaseCallback):
def __init__(self, curriculum):
super().__init__()
self.episode_rewards = []
self._running = 0.0
self._curriculum = curriculum
def _on_step(self):
for r, d in zip(
self.locals.get("rewards", []),
self.locals.get("dones", []),
):
self._running += float(r)
if d:
ep = self._running
self.episode_rewards.append(ep)
self._running = 0.0
advanced = self._curriculum.on_episode_end(ep)
n = len(self.episode_rewards)
if advanced or n % 25 == 0:
_log(
f"Ep {n:5d} | reward {ep:+.3f} | "
f"{self._curriculum.progress_str()}"
)
return True
class PeriodicHubPush(BaseCallback):
"""Pushes checkpoint + log + reward curve to HF Hub every N steps."""
def __init__(self, api, hf_repo, hf_token, vec_env,
reward_logger_ref, push_every=10_000):
super().__init__()
self._api = api
self._repo = hf_repo
self._token = hf_token
self._vec_env = vec_env
self._rl_ref = reward_logger_ref
self._push_every = push_every
self._last_push = 0
def _save_curve(self):
ep = self._rl_ref.episode_rewards
if len(ep) < 2:
return
window = max(10, len(ep) // 20)
smoothed = [
float(np.mean(ep[max(0, i - window):i + 1]))
for i in range(len(ep))
]
step = max(1, len(ep) // 200)
with open("/home/user/app/assets/reward_curve.json", "w") as f:
json.dump({
"episodes": list(range(len(ep)))[::step],
"mean_rewards": smoothed[::step],
"raw_rewards": ep[::step],
"step": self.num_timesteps,
}, f)
import matplotlib, matplotlib.pyplot as plt
matplotlib.use("Agg")
plt.figure(figsize=(10, 4))
every = max(1, len(ep) // 500)
plt.plot(range(0, len(ep), every), ep[::every],
"o", markersize=2, alpha=0.2, color="#00d4ff",
label="Episode reward")
plt.plot(range(0, len(ep), every), smoothed[::every],
linewidth=2.5, color="#ff6b35",
label=f"Smoothed ({window}-ep mean)")
if len(ep) >= 5:
plt.axhline(float(np.mean(ep[:5])),
color="#94a3b8", linestyle="--", alpha=0.8,
label="Early baseline")
plt.axhline(float(np.mean(ep[-min(200, len(ep)):])),
color="#34d399", linestyle="--", alpha=0.8,
label="Current mean")
plt.xlabel("Episode"); plt.ylabel("Reward")
plt.title(f"SpindleFlow RL β€” Learning Curve (step {self.num_timesteps:,})")
plt.legend(); plt.grid(alpha=0.2); plt.tight_layout()
plt.savefig("/home/user/app/assets/reward_curve.png", dpi=150)
plt.close()
def _on_step(self):
if self.num_timesteps - self._last_push < self._push_every:
return True
self._last_push = self.num_timesteps
try:
_log(f"Periodic save at step {self.num_timesteps:,} ...")
self.model.save("/home/user/app/spindleflow_model_latest")
self._vec_env.save("/home/user/app/vec_normalize_latest.pkl")
self._save_curve()
candidates = [
("/home/user/app/spindleflow_model_latest.zip", "spindleflow_model_latest.zip"),
("/home/user/app/vec_normalize_latest.pkl", "vec_normalize_latest.pkl"),
("/home/user/app/assets/training_log.txt", "training_log.txt"),
("/home/user/app/assets/reward_curve.json", "reward_curve.json"),
("/home/user/app/assets/reward_curve.png", "reward_curve.png"),
]
ops = [
CommitOperationAdd(path_in_repo=dst, path_or_fileobj=src)
for src, dst in candidates if os.path.exists(src)
]
if ops:
self._api.create_commit(
repo_id=self._repo, repo_type="model",
operations=ops,
commit_message=f"Checkpoint at step {self.num_timesteps:,}",
token=self._token,
)
_log(f"Periodic push done β€” {len(ops)} files at step {self.num_timesteps:,}")
except Exception as e:
_log(f"Periodic push failed (non-fatal): {e}")
return True
def make_env():
return SpindleFlowEnv(
config_path="configs/training_config.yaml",
catalog_path="configs/specialist_catalog.yaml",
use_real_spindleflow=False,
phase=1,
simulate_specialists=True,
)
_n_envs = int(cfg.get("training", {}).get("n_envs", 1))
vec_env = DummyVecEnv([make_env] * _n_envs)
vec_env = VecNormalize(vec_env, norm_obs=True, norm_reward=True, clip_obs=10.0)
_ppo = cfg.get("ppo", {})
_lstm = cfg.get("lstm", {})
model = RecurrentPPO(
policy="MlpLstmPolicy",
env=vec_env,
learning_rate=float(_ppo.get("learning_rate", 3e-4)),
n_steps=int(_ppo.get("n_steps", 512)),
batch_size=int(_ppo.get("batch_size", 64)),
n_epochs=int(_ppo.get("n_epochs", 10)),
gamma=float(_ppo.get("gamma", 0.99)),
gae_lambda=float(_ppo.get("gae_lambda", 0.95)),
clip_range=float(_ppo.get("clip_range", 0.2)),
ent_coef=float(_ppo.get("ent_coef", 0.01)),
vf_coef=float(_ppo.get("vf_coef", 0.5)),
max_grad_norm=float(_ppo.get("max_grad_norm", 0.5)),
policy_kwargs=build_policy_kwargs(
hidden_size=int(_lstm.get("hidden_size", 256))
),
verbose=0,
seed=int(cfg.get("training", {}).get("seed", 42)),
device="cuda" if torch.cuda.is_available() else "cpu",
)
_log(f"Training on : {model.device}")
_log(f"Curriculum : Phase {curriculum.current_phase} β€” {curriculum.progress_str()}")
total_steps = 30_000 # ~45 min on A100 with simulation, produces clean reward curve
_log(f"Total steps : {total_steps:,}")
_log("Training started...\n")
_write_status("training")
reward_logger = RewardLogger(curriculum=curriculum)
checkpoint_cb = CheckpointCallback(
save_freq=10_000, save_path="/home/user/app/checkpoints/"
)
improvement_cb = SpecialistImprovementCallback(
improve_every_n_episodes=cfg.get("specialist_improvement", {}).get(
"improve_every_n_episodes", 100
),
verbose=1,
)
periodic_push = PeriodicHubPush(
api=api, hf_repo=HF_REPO, hf_token=HF_TOKEN,
vec_env=vec_env, reward_logger_ref=reward_logger, push_every=5_000,
)
model.learn(
total_timesteps=total_steps,
callback=[reward_logger, checkpoint_cb, improvement_cb, periodic_push],
)
MODEL_PATH = "/home/user/app/spindleflow_model"
STATS_PATH = "/home/user/app/vec_normalize.pkl"
model.save(MODEL_PATH)
vec_env.save(STATS_PATH)
_log(f"Model saved β€” {len(reward_logger.episode_rewards)} episodes completed.")
_log(f"Final curriculum: {curriculum.progress_str()}")
# ── Reward curve ────────────────────────────────────
_write_status("saving")
ep_rewards = reward_logger.episode_rewards or [0.0]
episodes = list(range(len(ep_rewards)))
window = max(50, len(ep_rewards) // 20)
smoothed = [
float(np.mean(ep_rewards[max(0, i - window):i + 1]))
for i in range(len(ep_rewards))
]
step = max(1, len(episodes) // 200)
with open("/home/user/app/assets/reward_curve.json", "w") as f:
json.dump({
"episodes": episodes[::step],
"mean_rewards": smoothed[::step],
}, f)
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
plt.figure(figsize=(10, 4))
plot_every = max(1, len(ep_rewards) // 500)
plt.plot(episodes[::plot_every], ep_rewards[::plot_every],
"o", markersize=2, alpha=0.2, color="#00d4ff", label="Episode reward")
plt.plot(episodes[::plot_every], smoothed[::plot_every],
linewidth=2.5, color="#ff6b35", label=f"Smoothed ({window}-ep mean)")
plt.axhline(y=float(np.mean(ep_rewards[:5])),
color="#94a3b8", linestyle="--", alpha=0.8, label="Early baseline")
plt.axhline(y=float(np.mean(ep_rewards[-200:])),
color="#34d399", linestyle="--", alpha=0.8, label="Final mean")
plt.xlabel("Episode"); plt.ylabel("Reward")
plt.title("SpindleFlow RL β€” Delegation Policy Learning Curve")
plt.legend(); plt.grid(alpha=0.2); plt.tight_layout()
plt.savefig("/home/user/app/assets/reward_curve.png", dpi=150)
plt.close()
_log("Reward curve saved.")
# ── Push everything to HF Hub ────────────────────────
_write_status("uploading")
_log(f"Pushing to https://huggingface.co/{HF_REPO} ...")
ep = reward_logger.episode_rewards
f5 = float(np.mean(ep[:5])) if len(ep) >= 5 else 0.0
l5 = float(np.mean(ep[-5:])) if len(ep) >= 5 else 0.0
readme = f"""---
license: mit
tags:
- reinforcement-learning
- stable-baselines3
- sb3-contrib
- gymnasium
- multi-agent
- openenv
library_name: stable-baselines3
---
# SpindleFlow RL β€” Delegation Policy
LSTM PPO agent trained on SpindleFlow-v0 (OpenEnv).
## Training summary
| Metric | Value |
|---|---|
| Algorithm | RecurrentPPO (SB3 + sb3-contrib) |
| Total timesteps | {total_steps:,} |
| Episodes completed | {len(ep)} |
| First-5 mean reward | {f5:.4f} |
| Last-5 mean reward | {l5:.4f} |
| Improvement | {l5 - f5:+.4f} |
| Device | {str(model.device)} |
![Reward Curve](reward_curve.png)
## Load
```python
from sb3_contrib import RecurrentPPO
from huggingface_hub import hf_hub_download
model = RecurrentPPO.load(hf_hub_download("{HF_REPO}", "spindleflow_model.zip"))
```
"""
with open("/home/user/app/README.md", "w") as f:
f.write(readme)
candidates = [
("/home/user/app/spindleflow_model.zip", "spindleflow_model.zip"),
("/home/user/app/vec_normalize.pkl", "vec_normalize.pkl"),
("/home/user/app/assets/reward_curve.png", "reward_curve.png"),
("/home/user/app/assets/reward_curve.json", "reward_curve.json"),
("/home/user/app/assets/training_log.txt", "training_log.txt"),
("/home/user/app/README.md", "README.md"),
("/home/user/app/data/specialist_memory.json", "data/specialist_memory.json"),
("/home/user/app/data/spawn_memory.jsonl", "data/spawn_memory.jsonl"),
("/home/user/app/data/resolution_memory.jsonl", "data/resolution_memory.jsonl"),
]
ops = [
CommitOperationAdd(path_in_repo=dst, path_or_fileobj=src)
for src, dst in candidates
if os.path.exists(src)
]
api.create_commit(
repo_id=HF_REPO, repo_type="model", operations=ops,
commit_message="Add trained SpindleFlow RL policy",
token=HF_TOKEN,
)
_log(f"Uploaded {len(ops)} files.")
_log(f"Model live at: https://huggingface.co/{HF_REPO}")
_write_status("complete", done=True)
except Exception as exc:
import traceback
_log(f"ERROR: {exc}")
_log(traceback.format_exc())
_write_status("error", error=str(exc))
# ── Start training immediately on Space boot ──────────────────
_thread = threading.Thread(target=_training_thread, daemon=True)
_thread.start()
# ── Gradio UI ─────────────────────────────────────────────────
def _get_state():
# Read from files β€” works across Gradio worker processes
try:
with open(_STATUS_FILE, "r", encoding="utf-8") as f:
s = json.load(f)
phase, done, error = s.get("phase", "starting"), s.get("done", False), s.get("error")
except Exception:
phase, done, error = "starting", False, None
try:
with open(_LOG_FILE, "r", encoding="utf-8") as f:
lines = f.readlines()
log_text = "".join(lines[-120:])
except Exception:
log_text = ""
if done:
label = "βœ… Training complete β€” model pushed to HF Hub"
elif error:
label = f"❌ Error: {error}"
else:
icons = {"starting": "⏳", "training": "πŸ”„", "saving": "πŸ’Ύ", "uploading": "πŸ“€"}
label = f"{icons.get(phase, 'πŸ”„')} {phase.capitalize()}..."
return label, log_text
CSS = """
body, .gradio-container { background: #0f172a !important; }
.status-box textarea {
font-size: 1.05rem !important; font-weight: 700 !important;
background: #1e293b !important; color: #f1f5f9 !important;
border: 1px solid #334155 !important;
}
.log-box textarea {
font-family: 'Courier New', monospace !important;
font-size: 0.8rem !important; line-height: 1.5 !important;
background: #0f172a !important; color: #94a3b8 !important;
border: 1px solid #1e293b !important;
}
h1 { color: #f1f5f9 !important; }
p, label { color: #94a3b8 !important; }
footer { display: none !important; }
"""
with gr.Blocks(title="SpindleFlow RL Training", css=CSS) as demo:
gr.Markdown("# πŸ€– SpindleFlow RL β€” Training Dashboard")
gr.Markdown(
"Live training log β€” updates every 10 s automatically. "
"When complete the trained model is pushed to your HF Hub repo."
)
with gr.Row():
status_box = gr.Textbox(
label="Status",
value="⏳ Starting...",
interactive=False,
scale=4,
elem_classes="status-box",
)
refresh_btn = gr.Button("πŸ”„ Refresh now", scale=1, variant="primary")
log_box = gr.Textbox(
label="Training log (last 120 lines)",
value="",
lines=28,
max_lines=40,
interactive=False,
elem_classes="log-box",
)
refresh_btn.click(fn=_get_state, outputs=[status_box, log_box])
demo.load(fn=_get_state, outputs=[status_box, log_box])
timer = gr.Timer(value=10)
timer.tick(fn=_get_state, outputs=[status_box, log_box])
demo.launch()