openlipsync / scripts /tests /modal_jobsv2.py
miguelamendez's picture
Initial upload of directory
2795099 verified
# my_job_queue_endpoint.py
import fastapi
import modal
import random
import time
image = modal.Image.debian_slim().pip_install("fastapi[standard]")
app = modal.App("fastapi-modal", image=image)
web_app = fastapi.FastAPI()
@app.function()
def process_job(data):
# Simulate long workload with random sleep between 5-10 seconds
sleep_time = random.randint(5, 10)
time.sleep(sleep_time)
# Simulate potential error
if random.random() < 0.1: # 10% chance of error
raise Exception("Random processing error occurred")
return {"result": data, "processing_time": sleep_time}
@app.function()
@modal.asgi_app()
def fastapi_app():
return web_app
@web_app.post("/submit")
async def submit_job_endpoint(data: str):
call = process_job.spawn(data)
return {"call_id": call.object_id, "status": "queued"}
@web_app.get("/status/{call_id}")
async def get_job_status_endpoint(call_id: str):
function_call = modal.FunctionCall.from_id(call_id)
try:
result = function_call.get(timeout=0)
return {"call_id": call_id, "status": "completed", "result": result}
except modal.exception.OutputExpiredError:
return fastapi.responses.JSONResponse(content={"call_id": call_id, "status": "error"}, status_code=404)
except TimeoutError:
# Check if function is still running or queued
try:
# Try to get the function state by checking if it's running
# This is a simplified approach - in practice, you might want more sophisticated state tracking
function_call.get(timeout=1) # This will raise TimeoutError if still running
return {"call_id": call_id, "status": "completed"}
except TimeoutError:
# Function is still running or queued
return {"call_id": call_id, "status": "processing"}
except Exception:
return {"call_id": call_id, "status": "error"}
except Exception as e:
return {"call_id": call_id, "status": "error", "error": str(e)}
@web_app.get("/result/{call_id}")
async def get_job_result_endpoint(call_id: str):
function_call = modal.FunctionCall.from_id(call_id)
try:
result = function_call.get(timeout=0)
return {"call_id": call_id, "status": "completed", "result": result}
except modal.exception.OutputExpiredError:
return fastapi.responses.JSONResponse(content={"call_id": call_id, "status": "error"}, status_code=404)
except TimeoutError:
return fastapi.responses.JSONResponse(content={"call_id": call_id, "status": "processing"}, status_code=202)
except Exception as e:
return {"call_id": call_id, "status": "error", "error": str(e)}
# Add a local entrypoint to test locally (optional)
@app.local_entrypoint()
def main():
print("FastAPI app deployed as Modal ASGI app.")
print("Use `modal serve my_job_queue_endpoint.py` to serve the web app.")