Spaces:
Sleeping
Sleeping
File size: 2,623 Bytes
6755903 5bf33af cb439ed 5bf33af 4004b7a cb439ed 4882e1d 4004b7a b33cbee 4882e1d fa6afcc b33cbee fa6afcc 4882e1d b33cbee 4882e1d 5bf33af cb439ed 6755903 5bf33af 6755903 cb439ed 4882e1d 5bf33af cb439ed 5bf33af 6755903 4004b7a 6755903 4004b7a 5bf33af 6755903 cb439ed 6755903 5bf33af 6755903 5bf33af 6755903 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 | from fastapi import FastAPI, Body
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime, timezone, timedelta
import requests
app = FastAPI()
# In-memory store of last run timestamps (GMT+08)
last_runs: dict[str, str] = {}
# Root endpoint to confirm the service is running
@app.get("/")
async def root():
return {"message": "Scheduler Space is running. Use /schedule_every5_multiple/"}
# Status endpoint to view last run and next run times
@app.get("/status/")
async def status():
tz = timezone(timedelta(hours=8)) # GMT+08
next_runs: dict[str, str] = {}
for job in scheduler.get_jobs():
if job.id.startswith("every5_"):
url = job.args[0]
nrt = job.next_run_time
if nrt:
if nrt.tzinfo:
nrt_local = nrt.astimezone(tz)
else:
nrt_local = nrt.replace(tzinfo=timezone.utc).astimezone(tz)
next_runs[url] = nrt_local.isoformat()
return {"last_runs": last_runs, "next_runs": next_runs}
# Initialize and start the scheduler
scheduler = BackgroundScheduler()
scheduler.start()
# Job: send POST to given URL and record timestamp in GMT+08
def do_post(url: str):
try:
resp = requests.post(url, json={})
tz = timezone(timedelta(hours=8)) # GMT+08
timestamp = datetime.now(tz).isoformat()
last_runs[url] = timestamp
print(f"[{timestamp}] POST {url} → {resp.status_code}")
except Exception as e:
tz = timezone(timedelta(hours=8))
err_time = datetime.now(tz).isoformat()
print(f"[{err_time}] ERROR posting to {url}: {e}")
# Schedule multiple endpoints every 5 minutes
default_description = "List of URLs to call every 5 minutes"
@app.post("/schedule_every5_multiple/")
async def schedule_every5_multiple(
endpoints: list[str] = Body(..., embed=True, description=default_description)
):
for url in endpoints:
job_id = f"every5_{url}"
# Remove existing job if present
if scheduler.get_job(job_id):
scheduler.remove_job(job_id)
# Schedule interval job every 5 minutes, first run now
scheduler.add_job(
func=do_post,
trigger="interval",
minutes=5,
next_run_time=datetime.now(timezone(timedelta(hours=8))),
args=[url],
id=job_id,
)
return {"status": "scheduled every 5 minutes", "jobs": len(endpoints)}
# Optional self-ping to prevent idle sleep
@app.get("/heartbeat/")
async def heartbeat_ok():
return {"alive": True} |