from fastapi import FastAPI from fastapi.responses import StreamingResponse from pydantic import BaseModel import subprocess import shlex app = FastAPI() class AgentRequest(BaseModel): prompt: str # Defaulting to one of the built-in free OpenCode Zen models model: str = "opencode/minimax-m2.5-free" def stream_opencode_logs(prompt: str, model: str): """ Executes OpenCode in headless mode and streams the live terminal output. """ # shlex.quote ensures the prompt doesn't break the shell command cmd = f'opencode run --model {shlex.quote(model)} {shlex.quote(prompt)}' # Run OpenCode as a subprocess and capture stdout live process = subprocess.Popen( cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, # Pipe errors to the same log stream text=True, bufsize=1 ) # Yield logs line-by-line as Server-Sent Events for line in process.stdout: yield f"data: {line}\n\n" process.stdout.close() process.wait() yield f"data: [DONE]\n\n" @app.post("/run") async def execute_task(req: AgentRequest): return StreamingResponse( stream_opencode_logs(req.prompt, req.model), media_type="text/event-stream" ) @app.get("/") def health_check(): return {"status": "OpenCode Executor is live and waiting for Super Agent!"}