import gradio as gr import gymnasium as gym # from stable_baselines3 import PPO from huggingface_hub import HfApi, snapshot_download, login import pandas as pd import os import shutil import time # --- CONFIGURATION --- HF_TOKEN = os.environ.get("HF_TOKEN") REQUESTS_DATASET = "gberseth/rl-leaderboard-requests" # REPLACE THIS RESULTS_DATASET = "gberseth/rl-leaderboard-results" # REPLACE THIS EVAL_EPISODES = 10 # How many times to run the agent # Authenticate # login(token=HF_TOKEN) api = HfApi() def evaluate_policy(model_id): """ Downloads a PPO model from HF Hub, runs it in Gym, returns mean reward. """ print(f"Starting evaluation for: {model_id}") try: # 1. Download the model repository # We look for a file named "ppo_cartpole.zip" or just standard "model.zip" # Adjust 'allow_patterns' to match what you require users to submit. repo_path = snapshot_download(repo_id=model_id, allow_patterns=["*.pth", "*.pt", "*.zip", "*.yaml", "*.py"]) # Find the .zip file in the downloaded folder model_file = None for root, dirs, files in os.walk(repo_path): for file in files: if file.endswith(".pth"): model_file = os.path.join(root, file) if file.endswith("model.py"): grp_file_path = os.path.join(root, file) if file.endswith(".yaml") or file.endswith(".yalm"): hydra_config_file_path = os.path.join(root, file) if not model_file: return None, "Error: No .pth model file found in repo." # 2. Load the PPO Agent # custom_objects map may be needed if python versions differ, but usually fine for PPO import torch # ------------ # Train and test splits # Loading data # create RLDS dataset builder # log_dir = hydra.core.hydra_config.HydraConfig.get().runtime.output_dir ## Load the hydra config from omegaconf import OmegaConf cfg = OmegaConf.load(hydra_config_file_path) cfg.dataset.load_dataset = "skip" ## load the GRP model from the file doanloaded in the snappshot # Dynamically load the module import importlib.util, sys sys.path.insert(0, repo_path+"/") ## dangerous for sequrity but ok for now. from grp_model import GRP model_ = torch.load(model_file) # model_._cgf = cfg # model = PPO.load(model_file) print("Memory used by the model:", torch.cuda.memory_allocated(cfg.device) / 1e6, "MB") ## This to the database later. # 3. Run Evaluation Loop tokenizer = None text_model = None if cfg.dataset.encode_with_t5: ## Load T5 model from transformers import T5Tokenizer, T5ForConditionalGeneration tokenizer = T5Tokenizer.from_pretrained(cfg.dataset.t5_version) text_model = T5ForConditionalGeneration.from_pretrained(cfg.dataset.t5_version) if "libero" in cfg.simEval: results = eval_libero(model_.to(cfg.device), device=cfg.device, cfg=cfg, iter_=0, tokenizer=tokenizer, text_model=text_model, wandb=None, log_dir="./") if "simple_env" in cfg.simEval: import simpler_env task_name = "widowx_carrot_on_plate" # @param ["google_robot_pick_coke_can", "google_robot_move_near", "google_robot_open_drawer", "google_robot_close_drawer", "widowx_spoon_on_towel", "widowx_carrot_on_plate", "widowx_stack_cube", "widowx_put_eggplant_in_basket"] if 'env' in locals(): print("Closing existing env") env.close() del env env = simpler_env.make(task_name) env_unwrapped = env.env.env.env ## Updated gymnasium wrapper adds lots of wrappers. from sim_eval import eval_model_in_sim results = eval_model_in_sim(cfg, model_.to(cfg.device), device=cfg.device, log_dir="./", env=env, env_unwrapped=env_unwrapped, wandb=None, iter_=0, tokenizer=tokenizer, text_model=text_model) print("results:", results) # cbuffer.save(cfg.dataset.to_name) env.close() del env return results['rewards'], "Success" except Exception as e: print(f"Evaluation failed: {e}") return None, str(e) def run_evaluation_loop(): """ Main loop: Pulls requests, checks for 'Pending', evaluates, updates datasets. """ print("Checking for new submissions...") # 1. Load the Requests Dataset # We use pandas to read the CSV directly from the Hub try: requests_df = pd.read_csv(f"hf://datasets/{REQUESTS_DATASET}/requests.csv") except Exception: # If dataset doesn't exist yet, create an empty one locally (for testing) print("Requests dataset not found or empty.") return "No requests found." # 2. Filter for Pending Submissions # Assuming columns: [model_id, status, submitted_by] # pending_rows = requests_df[requests_df["status"] == "Pending"] pending_rows = requests_df[requests_df["status"].isin(["Pending", "In Progress", "Failed"])] if len(pending_rows) == 0: return "No pending submissions." # 3. Process the first pending submission row_index = pending_rows.index[0] model_id = pending_rows.loc[row_index, "model_id"] print(f"Evaluating {model_id}...") # Run the Eval score, status_msg = evaluate_policy(model_id) # 4. Update the Dataframes # Update Requests (Mark as Done or Failed) requests_df.loc[row_index, "status"] = "Done" if score is not None else "Failed" # Prepare Results Row if score is not None: new_result = { "model_id": model_id, "mean_reward": score, "status": "Success" } # Load Results Dataset try: results_df = pd.read_csv(f"hf://datasets/{RESULTS_DATASET}/results.csv") except: results_df = pd.DataFrame(columns=["model_id", "mean_reward", "status"]) # Append new result results_df = pd.concat([results_df, pd.DataFrame([new_result])], ignore_index=True) # Save Results to Hub results_df.to_csv("results.csv", index=False) api.upload_file( path_or_fileobj="results.csv", path_in_repo="results.csv", repo_id=RESULTS_DATASET, repo_type="dataset" ) # Save Requests Updates to Hub requests_df.to_csv("requests.csv", index=False) api.upload_file( path_or_fileobj="requests.csv", path_in_repo="requests.csv", repo_id=REQUESTS_DATASET, repo_type="dataset" ) return f"Processed {model_id}: Score {score}" # # --- GRADIO UI (To keep the Space running) --- # with gr.Blocks() as demo: # gr.Markdown("# RL Evaluation Backend") # gr.Markdown("This space runs in the background to evaluate new submissions.") # # A button to manually trigger eval (useful for debugging) # eval_btn = gr.Button("Run Evaluator Now") # output = gr.Textbox(label="Logs") # eval_btn.click(fn=run_evaluation_loop, outputs=output) # # Auto-run every 60 seconds (requires Gradio 'live' updates or external scheduler) # # In a real deployment, you might use a simplified cron loop or `gradio.Timer` # demo.queue().launch() if __name__ == "__main__": # while True: log = run_evaluation_loop() print(log) # time.sleep(60) # Check every 60 seconds