Db3_update / app.py
alesamodio's picture
update link to the trigger
713f934
from fastapi import FastAPI, Request
import os
import datetime
import threading
import time
# Import your pipeline only once at startup
from db3_pipeline import run_db3_pipeline
app = FastAPI()
is_ready = False # Global readiness flag
# --- Helper function for consistent logging ---
def log_event(message: str):
timestamp = datetime.datetime.utcnow().isoformat()
msg = f"{timestamp} | {message}"
print(msg)
return msg
# --- Background delayed readiness signal ---
def delayed_ready_signal(delay: int = 180):
"""Run in background thread to mark readiness after delay."""
global is_ready
time.sleep(delay)
is_ready = True
log_event(f"🌍 External ready signal sent (after {delay}s).")
# --- FastAPI startup event ---
@app.on_event("startup")
async def announce_startup():
log_event("βœ… FastAPI startup event triggered (internal ready).")
# Launch a background thread so startup isn’t blocked
threading.Thread(target=delayed_ready_signal, daemon=True).start()
# --- Health check endpoint ---
@app.get("/")
def home():
log_event("πŸ’  GET / (health check)")
return {"status": "running", "message": "Db3 update service is alive."}
# --- Readiness endpoint for GitHub polling ---
@app.get("/ready")
def ready():
"""
Returns {"status": "ready"} once all startup tasks and model loads
have completed (after delayed_ready_signal runs).
"""
if is_ready:
return {"status": "ready"}
else:
return {"status": "loading"}
# @app.get("/status")
# def status():
# """Lightweight endpoint for GitHub readiness polling."""
# return {"ready": is_ready}
# --- Main trigger endpoint ---
@app.get("/run")
def run_db3(request: Request):
log_event("πŸš€ Received /run request.")
secret = request.query_params.get("secret")
# --- Security check ---
if secret != os.getenv("CRON_SECRET"):
log_event("β›” Unauthorized request (invalid secret).")
return {"error": "unauthorized"}
try:
log_event("🟒 Starting db3_pipeline.run_db3_pipeline()...")
result = run_db3_pipeline()
log_event(f"βœ… db3_pipeline finished successfully: {result}")
return {"status": "ok", "result": str(result)}
except Exception as e:
log_event(f"❌ Pipeline failed with error: {e}")
return {"status": "failed", "error": str(e)}
# --- Optional simple log endpoint (for debugging only) ---
@app.get("/log")
def read_log():
"""Optional: read last few lines of logs (helpful when debugging)."""
return {"message": "Check HF Logs tab for full output."}