RESTAPI_Docker / main.py
mengfoong123's picture
add path
b33cbee verified
from fastapi import FastAPI, Body
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime, timezone, timedelta
import requests
app = FastAPI()
# In-memory store of last run timestamps (GMT+08)
last_runs: dict[str, str] = {}
# Root endpoint to confirm the service is running
@app.get("/")
async def root():
return {"message": "Scheduler Space is running. Use /schedule_every5_multiple/"}
# Status endpoint to view last run and next run times
@app.get("/status/")
async def status():
tz = timezone(timedelta(hours=8)) # GMT+08
next_runs: dict[str, str] = {}
for job in scheduler.get_jobs():
if job.id.startswith("every5_"):
url = job.args[0]
nrt = job.next_run_time
if nrt:
if nrt.tzinfo:
nrt_local = nrt.astimezone(tz)
else:
nrt_local = nrt.replace(tzinfo=timezone.utc).astimezone(tz)
next_runs[url] = nrt_local.isoformat()
return {"last_runs": last_runs, "next_runs": next_runs}
# Initialize and start the scheduler
scheduler = BackgroundScheduler()
scheduler.start()
# Job: send POST to given URL and record timestamp in GMT+08
def do_post(url: str):
try:
resp = requests.post(url, json={})
tz = timezone(timedelta(hours=8)) # GMT+08
timestamp = datetime.now(tz).isoformat()
last_runs[url] = timestamp
print(f"[{timestamp}] POST {url}{resp.status_code}")
except Exception as e:
tz = timezone(timedelta(hours=8))
err_time = datetime.now(tz).isoformat()
print(f"[{err_time}] ERROR posting to {url}: {e}")
# Schedule multiple endpoints every 5 minutes
default_description = "List of URLs to call every 5 minutes"
@app.post("/schedule_every5_multiple/")
async def schedule_every5_multiple(
endpoints: list[str] = Body(..., embed=True, description=default_description)
):
for url in endpoints:
job_id = f"every5_{url}"
# Remove existing job if present
if scheduler.get_job(job_id):
scheduler.remove_job(job_id)
# Schedule interval job every 5 minutes, first run now
scheduler.add_job(
func=do_post,
trigger="interval",
minutes=5,
next_run_time=datetime.now(timezone(timedelta(hours=8))),
args=[url],
id=job_id,
)
return {"status": "scheduled every 5 minutes", "jobs": len(endpoints)}
# Optional self-ping to prevent idle sleep
@app.get("/heartbeat/")
async def heartbeat_ok():
return {"alive": True}