| |
| import fastapi |
| import modal |
| import random |
| import time |
|
|
| image = modal.Image.debian_slim().pip_install("fastapi[standard]") |
| app = modal.App("fastapi-modal", image=image) |
| web_app = fastapi.FastAPI() |
|
|
| @app.function() |
| def process_job(data): |
| |
| sleep_time = random.randint(5, 10) |
| time.sleep(sleep_time) |
| |
| |
| if random.random() < 0.1: |
| raise Exception("Random processing error occurred") |
| |
| return {"result": data, "processing_time": sleep_time} |
|
|
|
|
| @app.function() |
| @modal.asgi_app() |
| def fastapi_app(): |
| return web_app |
|
|
|
|
| @web_app.post("/submit") |
| async def submit_job_endpoint(data: str): |
| call = process_job.spawn(data) |
| return {"call_id": call.object_id, "status": "queued"} |
|
|
|
|
| @web_app.get("/status/{call_id}") |
| async def get_job_status_endpoint(call_id: str): |
| function_call = modal.FunctionCall.from_id(call_id) |
| try: |
| result = function_call.get(timeout=0) |
| return {"call_id": call_id, "status": "completed", "result": result} |
| except modal.exception.OutputExpiredError: |
| return fastapi.responses.JSONResponse(content={"call_id": call_id, "status": "error"}, status_code=404) |
| except TimeoutError: |
| |
| try: |
| |
| |
| function_call.get(timeout=1) |
| return {"call_id": call_id, "status": "completed"} |
| except TimeoutError: |
| |
| return {"call_id": call_id, "status": "processing"} |
| except Exception: |
| return {"call_id": call_id, "status": "error"} |
| except Exception as e: |
| return {"call_id": call_id, "status": "error", "error": str(e)} |
|
|
|
|
| @web_app.get("/result/{call_id}") |
| async def get_job_result_endpoint(call_id: str): |
| function_call = modal.FunctionCall.from_id(call_id) |
| try: |
| result = function_call.get(timeout=0) |
| return {"call_id": call_id, "status": "completed", "result": result} |
| except modal.exception.OutputExpiredError: |
| return fastapi.responses.JSONResponse(content={"call_id": call_id, "status": "error"}, status_code=404) |
| except TimeoutError: |
| return fastapi.responses.JSONResponse(content={"call_id": call_id, "status": "processing"}, status_code=202) |
| except Exception as e: |
| return {"call_id": call_id, "status": "error", "error": str(e)} |
|
|
|
|
| |
| @app.local_entrypoint() |
| def main(): |
| print("FastAPI app deployed as Modal ASGI app.") |
| print("Use `modal serve my_job_queue_endpoint.py` to serve the web app.") |
|
|
|
|