import os import time from time import sleep from huggingface_hub import HfApi from datetime import datetime, timedelta import threading import gradio as gr # # Expects HF_TOKEN, REPOS, and RESTART_TIMES env vars # # API endpoints: # /call/status - Returns the current countdown and target time. # /call/force_reboot - Immediately triggers the restart of all defined repositories. # # # HF_TOKEN = os.getenv("HF_TOKEN") repos_env = os.getenv("REPOS") times_env = os.getenv("RESTART_TIMES") # Fallback for local testing to avoid crashes if envs are missing REPOS = [t.strip() for t in repos_env.split(",")] if repos_env else [] RESTART_TIMES = [t.strip() for t in times_env.split(",")] if times_env else ["00:00"] HELPER = "lainlives/starter" SELF_TIME = "00:30" # Global variable to track the specific target timestamp next_reboot_datetime = None def get_next_scheduled_time(): """Finds the next scheduled time from the list relative to now.""" now = datetime.now() candidates = [] for t_str in RESTART_TIMES: try: h, m = map(int, t_str.split(':')) dt = now.replace(hour=h, minute=m, second=0, microsecond=0) candidates.append(dt) except ValueError: print(f"Invalid time format: {t_str}") candidates.sort() for dt in candidates: if dt > now: return dt return candidates[0] + timedelta(days=1) # Initialize first target next_reboot_datetime = get_next_scheduled_time() def perform_restarts(source="Manual"): """Core logic to restart the repos.""" log_messages = [] if HF_TOKEN: api = HfApi() for repo in REPOS: try: print(f"[{source}] Restarting space {repo}...") api.pause_space(repo_id=repo) sleep(3) api.restart_space(repo_id=repo, token=HF_TOKEN, factory_reboot=True) log_messages.append(f"Successfully restarted {repo}") except Exception as e: err = f"Failed to restart {repo}: {e}" print(err) log_messages.append(err) else: log_messages.append("Error: HF_TOKEN not found.") return "\n".join(log_messages) def time_until_next_reboot(): """Calculates and formats the time until the next planned reboot.""" global next_reboot_datetime if next_reboot_datetime is None: next_reboot_datetime = get_next_scheduled_time() remaining_time = next_reboot_datetime - datetime.now() if remaining_time.total_seconds() <= 0: return "Rebooting..." mins, secs = divmod(remaining_time.seconds, 60) hours, mins = divmod(mins, 60) return f"{hours:02}:{mins:02}:{secs:02}" def get_last_reboot_status(): global next_reboot_datetime return f"Next target: {next_reboot_datetime.strftime('%Y-%m-%d %H:%M:%S')}" def check_and_reboot(): """Checks if current time has passed the scheduled time.""" global next_reboot_datetime now = datetime.now() if now >= next_reboot_datetime: print(f"Reboot triggered at {now.strftime('%H:%M:%S')}") # 1. Execute Reboots perform_restarts(source="Schedule") # 2. Advance schedule next_reboot_datetime = get_next_scheduled_time() return f"Just rebooted. Next scheduled: {next_reboot_datetime.strftime('%H:%M:%S')}" return "Space running normally." # --- API Functions --- def api_get_info(): """Returns JSON status for the API.""" return { "status": "active", "next_reboot": str(next_reboot_datetime), "time_remaining": time_until_next_reboot(), "monitored_repos": REPOS } def api_trigger_now(): """Triggers immediate reboot via API.""" logs = perform_restarts(source="API Force") return { "status": "executed", "timestamp": str(datetime.now()), "logs": logs } def trigger_helper(): if HF_TOKEN: api = HfApi() try: print(f"Restarting space {HELPER}...") api.pause_space(repo_id=HELPER) sleep(3) api.restart_space(repo_id=HELPER, token=HF_TOKEN) except Exception as e: print(f"Failed to restart {HELPER}: {e}") def sanitize(): # Sanitizer logic remains the same now = datetime.now() target_time = datetime.strptime(SELF_TIME, "%H:%M").time() next_run = datetime.combine(now.date(), target_time) if next_run <= now: next_run += timedelta(days=1) delay = (next_run - now).total_seconds() timer = threading.Timer(delay, trigger_helper) timer.start() print(f"Scheduled helper restart for {next_run}") def pipeline(): status = check_and_reboot() countdown = time_until_next_reboot() target_info = get_last_reboot_status() return countdown, status, target_info # --- Gradio Interface --- with gr.Blocks() as demo: gr.Markdown("## Space Rebooter") with gr.Row(): next_reboot_display = gr.Textbox(label="Scheduled Target", value=get_last_reboot_status()) next_reboot_countdown = gr.Textbox(label="Time Until Next Reboot", value=time_until_next_reboot()) status_output = gr.Textbox(label="Log", value="Space running normally.") # --- Hidden API Elements --- # These buttons are hidden but define the API endpoints via 'api_name' api_info_btn = gr.Button("Get Info", visible=False) api_info_btn.click(fn=api_get_info, outputs=gr.JSON(), api_name="status") api_force_btn = gr.Button("Force Reboot", visible=False) api_force_btn.click(fn=api_trigger_now, outputs=gr.JSON(), api_name="force_reboot") # Timer ticks every 1 second timer = gr.Timer(1).tick( fn=pipeline, inputs=None, outputs=[next_reboot_countdown, status_output, next_reboot_display] ) if __name__ == "__main__": sanitize() demo.launch(server_name="0.0.0.0", server_port=7860)