starter / app.py
lainlives's picture
Upload folder using huggingface_hub
e64e6b4 verified
import os
import time
from time import sleep
from huggingface_hub import HfApi
from datetime import datetime, timedelta
import threading
import gradio as gr
#
# Expects HF_TOKEN, REPOS, and RESTART_TIMES env vars
#
# API endpoints:
# /call/status - Returns the current countdown and target time.
# /call/force_reboot - Immediately triggers the restart of all defined repositories.
#
#
#
HF_TOKEN = os.getenv("HF_TOKEN")
repos_env = os.getenv("REPOS")
times_env = os.getenv("RESTART_TIMES")
# Fallback for local testing to avoid crashes if envs are missing
REPOS = [t.strip() for t in repos_env.split(",")] if repos_env else []
RESTART_TIMES = [t.strip() for t in times_env.split(",")] if times_env else ["00:00"]
HELPER = "lainlives/starter"
SELF_TIME = "00:30"
# Global variable to track the specific target timestamp
next_reboot_datetime = None
def get_next_scheduled_time():
"""Finds the next scheduled time from the list relative to now."""
now = datetime.now()
candidates = []
for t_str in RESTART_TIMES:
try:
h, m = map(int, t_str.split(':'))
dt = now.replace(hour=h, minute=m, second=0, microsecond=0)
candidates.append(dt)
except ValueError:
print(f"Invalid time format: {t_str}")
candidates.sort()
for dt in candidates:
if dt > now:
return dt
return candidates[0] + timedelta(days=1)
# Initialize first target
next_reboot_datetime = get_next_scheduled_time()
def perform_restarts(source="Manual"):
"""Core logic to restart the repos."""
log_messages = []
if HF_TOKEN:
api = HfApi()
for repo in REPOS:
try:
print(f"[{source}] Restarting space {repo}...")
api.pause_space(repo_id=repo)
sleep(3)
api.restart_space(repo_id=repo, token=HF_TOKEN, factory_reboot=True)
log_messages.append(f"Successfully restarted {repo}")
except Exception as e:
err = f"Failed to restart {repo}: {e}"
print(err)
log_messages.append(err)
else:
log_messages.append("Error: HF_TOKEN not found.")
return "\n".join(log_messages)
def time_until_next_reboot():
"""Calculates and formats the time until the next planned reboot."""
global next_reboot_datetime
if next_reboot_datetime is None:
next_reboot_datetime = get_next_scheduled_time()
remaining_time = next_reboot_datetime - datetime.now()
if remaining_time.total_seconds() <= 0:
return "Rebooting..."
mins, secs = divmod(remaining_time.seconds, 60)
hours, mins = divmod(mins, 60)
return f"{hours:02}:{mins:02}:{secs:02}"
def get_last_reboot_status():
global next_reboot_datetime
return f"Next target: {next_reboot_datetime.strftime('%Y-%m-%d %H:%M:%S')}"
def check_and_reboot():
"""Checks if current time has passed the scheduled time."""
global next_reboot_datetime
now = datetime.now()
if now >= next_reboot_datetime:
print(f"Reboot triggered at {now.strftime('%H:%M:%S')}")
# 1. Execute Reboots
perform_restarts(source="Schedule")
# 2. Advance schedule
next_reboot_datetime = get_next_scheduled_time()
return f"Just rebooted. Next scheduled: {next_reboot_datetime.strftime('%H:%M:%S')}"
return "Space running normally."
# --- API Functions ---
def api_get_info():
"""Returns JSON status for the API."""
return {
"status": "active",
"next_reboot": str(next_reboot_datetime),
"time_remaining": time_until_next_reboot(),
"monitored_repos": REPOS
}
def api_trigger_now():
"""Triggers immediate reboot via API."""
logs = perform_restarts(source="API Force")
return {
"status": "executed",
"timestamp": str(datetime.now()),
"logs": logs
}
def trigger_helper():
if HF_TOKEN:
api = HfApi()
try:
print(f"Restarting space {HELPER}...")
api.pause_space(repo_id=HELPER)
sleep(3)
api.restart_space(repo_id=HELPER, token=HF_TOKEN)
except Exception as e:
print(f"Failed to restart {HELPER}: {e}")
def sanitize():
# Sanitizer logic remains the same
now = datetime.now()
target_time = datetime.strptime(SELF_TIME, "%H:%M").time()
next_run = datetime.combine(now.date(), target_time)
if next_run <= now:
next_run += timedelta(days=1)
delay = (next_run - now).total_seconds()
timer = threading.Timer(delay, trigger_helper)
timer.start()
print(f"Scheduled helper restart for {next_run}")
def pipeline():
status = check_and_reboot()
countdown = time_until_next_reboot()
target_info = get_last_reboot_status()
return countdown, status, target_info
# --- Gradio Interface ---
with gr.Blocks() as demo:
gr.Markdown("## Space Rebooter")
with gr.Row():
next_reboot_display = gr.Textbox(label="Scheduled Target", value=get_last_reboot_status())
next_reboot_countdown = gr.Textbox(label="Time Until Next Reboot", value=time_until_next_reboot())
status_output = gr.Textbox(label="Log", value="Space running normally.")
# --- Hidden API Elements ---
# These buttons are hidden but define the API endpoints via 'api_name'
api_info_btn = gr.Button("Get Info", visible=False)
api_info_btn.click(fn=api_get_info, outputs=gr.JSON(), api_name="status")
api_force_btn = gr.Button("Force Reboot", visible=False)
api_force_btn.click(fn=api_trigger_now, outputs=gr.JSON(), api_name="force_reboot")
# Timer ticks every 1 second
timer = gr.Timer(1).tick(
fn=pipeline,
inputs=None,
outputs=[next_reboot_countdown, status_output, next_reboot_display]
)
if __name__ == "__main__":
sanitize()
demo.launch(server_name="0.0.0.0", server_port=7860)