AdriBat1
Add /execute_stream endpoint for real-time logs
1145aa7
"""
Remote GPU Execution Server v2.2 (Streaming Support)
=====================================================
Questo script usa FastAPI + Gradio per ricevere codice Python ed eseguirlo sulla GPU remota.
Supporta sia esecuzione sincrona che streaming in tempo reale.
"""
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
import gradio as gr
import subprocess
import tempfile
import base64
import glob
import shutil
import os
import sys
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
def execute_code(python_code: str) -> dict:
"""
Esegue codice e restituisce output + file generati (png, jpg, csv).
"""
if not python_code or not python_code.strip():
return {"output": "❌ Errore: Nessun codice ricevuto.", "files": []}
# Crea directory temporanea unica per questa esecuzione
work_dir = tempfile.mkdtemp()
script_path = os.path.join(work_dir, "script.py")
# Scrive il codice
with open(script_path, 'w', encoding='utf-8') as f:
f.write(python_code)
output_parts = []
generated_files = []
try:
output_parts.append("πŸš€ Avvio esecuzione...")
output_parts.append(f"πŸ“‚ Directory: {work_dir}")
output_parts.append("=" * 50)
# Esegue
result = subprocess.run(
[sys.executable, script_path],
capture_output=True,
text=True,
cwd=work_dir, # Esegue nella dir temporanea
timeout=3600, # 1 hour timeout for long training runs
env={**os.environ, "PYTHONUNBUFFERED": "1"}
)
# Output
if result.stdout: output_parts.append(result.stdout)
if result.stderr:
output_parts.append("\n--- STDERR ---")
output_parts.append(result.stderr)
output_parts.append("=" * 50)
status = "βœ… Successo" if result.returncode == 0 else f"❌ Errore ({result.returncode})"
output_parts.append(f"{status}")
# Cerca file generati (immagini, csv, json)
for ext in ['*.png', '*.jpg', '*.svg', '*.csv', '*.json', '*.txt', '*.pth', '*.pt']:
if ext == '*.txt': continue # Salta txt per evitare di rileggere log
for filepath in glob.glob(os.path.join(work_dir, ext)):
filename = os.path.basename(filepath)
try:
with open(filepath, "rb") as f:
if os.path.getsize(filepath) > 10 * 1024 * 1024:
output_parts.append(f"⚠️ File '{filename}' troppo grande (>10MB), saltato.")
continue
encoded = base64.b64encode(f.read()).decode('utf-8')
generated_files.append({
"name": filename,
"data": encoded,
"mime": "application/octet-stream"
})
output_parts.append(f"πŸ“¦ File generato: {filename}")
except Exception as e:
output_parts.append(f"❌ Errore lettura {filename}: {e}")
except Exception as e:
import traceback
output_parts.append(f"❌ Eccezione: {str(e)}\n{traceback.format_exc()}")
finally:
shutil.rmtree(work_dir, ignore_errors=True)
return {
"output": "\n".join(output_parts),
"files": generated_files
}
def get_system_info():
info = ["System Info:"]
info.append(f"Python: {sys.version}")
try:
import torch
info.append(f"PyTorch: {torch.__version__}")
info.append(f"CUDA Available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
info.append(f"GPU: {torch.cuda.get_device_name(0)}")
except ImportError:
info.append("PyTorch not installed")
return "\n".join(info)
@app.get("/")
def read_root():
return {"status": "running", "message": "Remote GPU Execution Server"}
@app.post("/execute")
async def execute(data: dict):
"""Endpoint POST per eseguire codice Python"""
try:
code = data.get("code", "")
return execute_code(code)
except Exception as e:
import traceback
return {"output": f"❌ Server Error: {str(e)}\n\n{traceback.format_exc()}", "files": []}
def stream_generator(python_code: str):
"""Generator che esegue codice e yielda output in tempo reale"""
if not python_code or not python_code.strip():
yield "❌ Errore: Nessun codice ricevuto.\n"
return
work_dir = tempfile.mkdtemp()
script_path = os.path.join(work_dir, "script.py")
with open(script_path, 'w', encoding='utf-8') as f:
f.write(python_code)
yield f"πŸš€ Avvio esecuzione...\n"
yield f"πŸ“‚ Directory: {work_dir}\n"
yield "=" * 50 + "\n"
try:
process = subprocess.Popen(
[sys.executable, "-u", script_path], # -u for unbuffered
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
cwd=work_dir,
env={**os.environ, "PYTHONUNBUFFERED": "1"}
)
# Stream output line by line
for line in process.stdout:
yield line
process.wait()
yield "\n" + "=" * 50 + "\n"
status = "βœ… Successo" if process.returncode == 0 else f"❌ Errore ({process.returncode})"
yield f"{status}\n"
except Exception as e:
import traceback
yield f"❌ Eccezione: {str(e)}\n{traceback.format_exc()}\n"
finally:
shutil.rmtree(work_dir, ignore_errors=True)
@app.post("/execute_stream")
async def execute_stream(data: dict):
"""Endpoint POST per eseguire codice Python con streaming in tempo reale"""
code = data.get("code", "")
return StreamingResponse(stream_generator(code), media_type="text/plain")
# Interfaccia Gradio (Wrapper semplificato)
def gradio_wrapper(code):
res = execute_code(code)
return res["output"]
demo = gr.Interface(
fn=gradio_wrapper,
inputs=gr.Textbox(
label="πŸ“ Codice Python",
placeholder="Incolla qui il tuo codice Python...",
lines=15
),
outputs=gr.Textbox(label="πŸ“€ Output Esecuzione (i file non sono visualizzati qui, usa l'API)"),
title="πŸš€ Remote GPU Execution Server",
description=f"Server attivo. {get_system_info()}"
)
# Monta Gradio su FastAPI
app = gr.mount_gradio_app(app, demo, path="/gradio")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)