Spaces:
Sleeping
Sleeping
File size: 2,283 Bytes
45c94ac | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 | import os
from fastapi import FastAPI, Request, HTTPException
import gradio as gr
from client import CiCdDoctorEnv, CiCdDoctorAction
from models import PipelineObservation
# Optional: your existing Gradio UI
try:
from app import demo
except ImportError:
demo = None
app = FastAPI(
title="CI/CD Doctor",
description="Self-healing CI/CD pipeline agent",
version="1.0.0",
)
_env = None
@app.get("/health")
async def health():
return {"status": "healthy"}
@app.get("/metadata")
async def metadata():
return {
"name": "CI/CD Doctor",
"description": "Fix broken CI/CD pipelines autonomously",
"version": "1.0.0",
"submission_type": "openenv-v4",
}
@app.get("/schema")
async def schema():
return {
"action": CiCdDoctorAction.model_json_schema(),
"observation": PipelineObservation.model_json_schema(),
"state": {"type": "object"},
}
@app.post("/reset")
async def reset(request: Request):
global _env
try:
body = await request.json() if await request.body() else {}
task = body.get("task", "easy")
seed = body.get("seed", 42)
base_url = os.getenv("ENV_BASE_URL", "http://localhost:8000")
_env = await CiCdDoctorEnv(base_url=base_url)
result = await _env.reset(task=task, seed=seed)
return result.observation.model_dump()
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/step")
async def step(request: Request):
global _env
if _env is None:
raise HTTPException(status_code=400, detail="Call /reset first")
try:
body = await request.json()
action = CiCdDoctorAction(**body)
result = await _env.step(action)
return result.observation.model_dump()
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/state")
async def state():
global _env
if _env is None:
return {"status": "uninitialized"}
return {"status": "running"}
# Mount Gradio UI if available
if demo is not None:
app = gr.mount_gradio_app(app, demo, path="/")
def main():
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
if __name__ == "__main__":
main() |