Replicate-1 / app.py
diamond-in's picture
Update app.py
a84d02b verified
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import os
import pty
import select
import subprocess
import fcntl
import signal
import asyncio
import time
app = FastAPI()
# Global Session
SESSION = {"fd": None, "process": None, "last_activity": time.time()}
class InputData(BaseModel):
data: str
@app.post("/start")
def start_terminal():
# Kill existing
if SESSION["process"]:
try:
os.kill(SESSION["process"].pid, signal.SIGTERM)
except: pass
master, slave = pty.openpty()
# FIX 1: Add '-i' for Interactive mode (Fixes "no job control")
# FIX 2: Use setsid to detach correctly
p = subprocess.Popen(
["sudo", "bash", "-i"],
stdin=slave,
stdout=slave,
stderr=slave,
preexec_fn=os.setsid,
close_fds=True
)
os.close(slave)
# Non-blocking read
fl = fcntl.fcntl(master, fcntl.F_GETFL)
fcntl.fcntl(master, fcntl.F_SETFL, fl | os.O_NONBLOCK)
SESSION["fd"] = master
SESSION["process"] = p
SESSION["last_activity"] = time.time()
return {"status": "started"}
async def stream_generator():
"""Yields data. Sends a 'ping' if idle to prevent disconnects."""
fd = SESSION["fd"]
while True:
if fd:
# Check for data
r, _, _ = select.select([fd], [], [], 0.1)
if fd in r:
try:
data = os.read(fd, 4096)
if data:
SESSION["last_activity"] = time.time()
yield data
except OSError:
break
else:
# FIX 3: Heartbeat
# If no data for 10 seconds, yield a null byte to keep HTTP open
# The client will ignore this byte.
if time.time() - SESSION["last_activity"] > 10:
yield b'\x00' # Null byte (Heartbeat)
SESSION["last_activity"] = time.time()
await asyncio.sleep(0.01)
else:
await asyncio.sleep(1)
@app.get("/stream")
def stream_output():
return StreamingResponse(stream_generator(), media_type="application/octet-stream")
@app.post("/write")
def write_input(payload: InputData):
if SESSION["fd"]:
# Write data exactly as received
os.write(SESSION["fd"], payload.data.encode())
return {"status": "ok"}