Spaces:
Running
Running
| from fastapi import FastAPI | |
| from fastapi.responses import StreamingResponse | |
| from pydantic import BaseModel | |
| import subprocess | |
| import shlex | |
| app = FastAPI() | |
| class AgentRequest(BaseModel): | |
| prompt: str | |
| # Defaulting to one of the built-in free OpenCode Zen models | |
| model: str = "opencode/minimax-m2.5-free" | |
| def stream_opencode_logs(prompt: str, model: str): | |
| """ | |
| Executes OpenCode in headless mode and streams the live terminal output. | |
| """ | |
| # shlex.quote ensures the prompt doesn't break the shell command | |
| cmd = f'opencode run --model {shlex.quote(model)} {shlex.quote(prompt)}' | |
| # Run OpenCode as a subprocess and capture stdout live | |
| process = subprocess.Popen( | |
| cmd, | |
| shell=True, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, # Pipe errors to the same log stream | |
| text=True, | |
| bufsize=1 | |
| ) | |
| # Yield logs line-by-line as Server-Sent Events | |
| for line in process.stdout: | |
| yield f"data: {line}\n\n" | |
| process.stdout.close() | |
| process.wait() | |
| yield f"data: [DONE]\n\n" | |
| async def execute_task(req: AgentRequest): | |
| return StreamingResponse( | |
| stream_opencode_logs(req.prompt, req.model), | |
| media_type="text/event-stream" | |
| ) | |
| def health_check(): | |
| return {"status": "OpenCode Executor is live and waiting for Super Agent!"} |