Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, Body | |
| from apscheduler.schedulers.background import BackgroundScheduler | |
| from datetime import datetime, timezone, timedelta | |
| import requests | |
| app = FastAPI() | |
| # In-memory store of last run timestamps (GMT+08) | |
| last_runs: dict[str, str] = {} | |
| # Root endpoint to confirm the service is running | |
| async def root(): | |
| return {"message": "Scheduler Space is running. Use /schedule_every5_multiple/"} | |
| # Status endpoint to view last run and next run times | |
| async def status(): | |
| tz = timezone(timedelta(hours=8)) # GMT+08 | |
| next_runs: dict[str, str] = {} | |
| for job in scheduler.get_jobs(): | |
| if job.id.startswith("every5_"): | |
| url = job.args[0] | |
| nrt = job.next_run_time | |
| if nrt: | |
| if nrt.tzinfo: | |
| nrt_local = nrt.astimezone(tz) | |
| else: | |
| nrt_local = nrt.replace(tzinfo=timezone.utc).astimezone(tz) | |
| next_runs[url] = nrt_local.isoformat() | |
| return {"last_runs": last_runs, "next_runs": next_runs} | |
| # Initialize and start the scheduler | |
| scheduler = BackgroundScheduler() | |
| scheduler.start() | |
| # Job: send POST to given URL and record timestamp in GMT+08 | |
| def do_post(url: str): | |
| try: | |
| resp = requests.post(url, json={}) | |
| tz = timezone(timedelta(hours=8)) # GMT+08 | |
| timestamp = datetime.now(tz).isoformat() | |
| last_runs[url] = timestamp | |
| print(f"[{timestamp}] POST {url} → {resp.status_code}") | |
| except Exception as e: | |
| tz = timezone(timedelta(hours=8)) | |
| err_time = datetime.now(tz).isoformat() | |
| print(f"[{err_time}] ERROR posting to {url}: {e}") | |
| # Schedule multiple endpoints every 5 minutes | |
| default_description = "List of URLs to call every 5 minutes" | |
| async def schedule_every5_multiple( | |
| endpoints: list[str] = Body(..., embed=True, description=default_description) | |
| ): | |
| for url in endpoints: | |
| job_id = f"every5_{url}" | |
| # Remove existing job if present | |
| if scheduler.get_job(job_id): | |
| scheduler.remove_job(job_id) | |
| # Schedule interval job every 5 minutes, first run now | |
| scheduler.add_job( | |
| func=do_post, | |
| trigger="interval", | |
| minutes=5, | |
| next_run_time=datetime.now(timezone(timedelta(hours=8))), | |
| args=[url], | |
| id=job_id, | |
| ) | |
| return {"status": "scheduled every 5 minutes", "jobs": len(endpoints)} | |
| # Optional self-ping to prevent idle sleep | |
| async def heartbeat_ok(): | |
| return {"alive": True} |