| import os |
| import time |
| from time import sleep |
| from huggingface_hub import HfApi |
| from datetime import datetime, timedelta |
| import threading |
| import gradio as gr |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| HF_TOKEN = os.getenv("HF_TOKEN") |
| repos_env = os.getenv("REPOS") |
| times_env = os.getenv("RESTART_TIMES") |
|
|
| |
| REPOS = [t.strip() for t in repos_env.split(",")] if repos_env else [] |
| RESTART_TIMES = [t.strip() for t in times_env.split(",")] if times_env else ["00:00"] |
|
|
| HELPER = "lainlives/starter" |
| SELF_TIME = "00:30" |
|
|
| |
| next_reboot_datetime = None |
|
|
|
|
| def get_next_scheduled_time(): |
| """Finds the next scheduled time from the list relative to now.""" |
| now = datetime.now() |
| candidates = [] |
|
|
| for t_str in RESTART_TIMES: |
| try: |
| h, m = map(int, t_str.split(':')) |
| dt = now.replace(hour=h, minute=m, second=0, microsecond=0) |
| candidates.append(dt) |
| except ValueError: |
| print(f"Invalid time format: {t_str}") |
|
|
| candidates.sort() |
|
|
| for dt in candidates: |
| if dt > now: |
| return dt |
|
|
| return candidates[0] + timedelta(days=1) |
|
|
|
|
| |
| next_reboot_datetime = get_next_scheduled_time() |
|
|
|
|
| def perform_restarts(source="Manual"): |
| """Core logic to restart the repos.""" |
| log_messages = [] |
| if HF_TOKEN: |
| api = HfApi() |
| for repo in REPOS: |
| try: |
| print(f"[{source}] Restarting space {repo}...") |
| api.pause_space(repo_id=repo) |
| sleep(3) |
| api.restart_space(repo_id=repo, token=HF_TOKEN, factory_reboot=True) |
| log_messages.append(f"Successfully restarted {repo}") |
| except Exception as e: |
| err = f"Failed to restart {repo}: {e}" |
| print(err) |
| log_messages.append(err) |
| else: |
| log_messages.append("Error: HF_TOKEN not found.") |
|
|
| return "\n".join(log_messages) |
|
|
|
|
| def time_until_next_reboot(): |
| """Calculates and formats the time until the next planned reboot.""" |
| global next_reboot_datetime |
| if next_reboot_datetime is None: |
| next_reboot_datetime = get_next_scheduled_time() |
|
|
| remaining_time = next_reboot_datetime - datetime.now() |
|
|
| if remaining_time.total_seconds() <= 0: |
| return "Rebooting..." |
|
|
| mins, secs = divmod(remaining_time.seconds, 60) |
| hours, mins = divmod(mins, 60) |
| return f"{hours:02}:{mins:02}:{secs:02}" |
|
|
|
|
| def get_last_reboot_status(): |
| global next_reboot_datetime |
| return f"Next target: {next_reboot_datetime.strftime('%Y-%m-%d %H:%M:%S')}" |
|
|
|
|
| def check_and_reboot(): |
| """Checks if current time has passed the scheduled time.""" |
| global next_reboot_datetime |
| now = datetime.now() |
|
|
| if now >= next_reboot_datetime: |
| print(f"Reboot triggered at {now.strftime('%H:%M:%S')}") |
|
|
| |
| perform_restarts(source="Schedule") |
|
|
| |
| next_reboot_datetime = get_next_scheduled_time() |
|
|
| return f"Just rebooted. Next scheduled: {next_reboot_datetime.strftime('%H:%M:%S')}" |
|
|
| return "Space running normally." |
|
|
| |
|
|
|
|
| def api_get_info(): |
| """Returns JSON status for the API.""" |
| return { |
| "status": "active", |
| "next_reboot": str(next_reboot_datetime), |
| "time_remaining": time_until_next_reboot(), |
| "monitored_repos": REPOS |
| } |
|
|
|
|
| def api_trigger_now(): |
| """Triggers immediate reboot via API.""" |
| logs = perform_restarts(source="API Force") |
| return { |
| "status": "executed", |
| "timestamp": str(datetime.now()), |
| "logs": logs |
| } |
|
|
|
|
| def trigger_helper(): |
| if HF_TOKEN: |
| api = HfApi() |
| try: |
| print(f"Restarting space {HELPER}...") |
| api.pause_space(repo_id=HELPER) |
| sleep(3) |
| api.restart_space(repo_id=HELPER, token=HF_TOKEN) |
| except Exception as e: |
| print(f"Failed to restart {HELPER}: {e}") |
|
|
|
|
| def sanitize(): |
| |
| now = datetime.now() |
| target_time = datetime.strptime(SELF_TIME, "%H:%M").time() |
| next_run = datetime.combine(now.date(), target_time) |
| if next_run <= now: |
| next_run += timedelta(days=1) |
| delay = (next_run - now).total_seconds() |
| timer = threading.Timer(delay, trigger_helper) |
| timer.start() |
| print(f"Scheduled helper restart for {next_run}") |
|
|
|
|
| def pipeline(): |
| status = check_and_reboot() |
| countdown = time_until_next_reboot() |
| target_info = get_last_reboot_status() |
| return countdown, status, target_info |
|
|
|
|
| |
| with gr.Blocks() as demo: |
| gr.Markdown("## Space Rebooter") |
|
|
| with gr.Row(): |
| next_reboot_display = gr.Textbox(label="Scheduled Target", value=get_last_reboot_status()) |
| next_reboot_countdown = gr.Textbox(label="Time Until Next Reboot", value=time_until_next_reboot()) |
|
|
| status_output = gr.Textbox(label="Log", value="Space running normally.") |
|
|
| |
| |
| api_info_btn = gr.Button("Get Info", visible=False) |
| api_info_btn.click(fn=api_get_info, outputs=gr.JSON(), api_name="status") |
|
|
| api_force_btn = gr.Button("Force Reboot", visible=False) |
| api_force_btn.click(fn=api_trigger_now, outputs=gr.JSON(), api_name="force_reboot") |
|
|
| |
| timer = gr.Timer(1).tick( |
| fn=pipeline, |
| inputs=None, |
| outputs=[next_reboot_countdown, status_output, next_reboot_display] |
| ) |
|
|
| if __name__ == "__main__": |
| sanitize() |
| demo.launch(server_name="0.0.0.0", server_port=7860) |