""" Remote GPU Execution Server v2.2 (Streaming Support) ===================================================== Questo script usa FastAPI + Gradio per ricevere codice Python ed eseguirlo sulla GPU remota. Supporta sia esecuzione sincrona che streaming in tempo reale. """ from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse import gradio as gr import subprocess import tempfile import base64 import glob import shutil import os import sys app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) def execute_code(python_code: str) -> dict: """ Esegue codice e restituisce output + file generati (png, jpg, csv). """ if not python_code or not python_code.strip(): return {"output": "❌ Errore: Nessun codice ricevuto.", "files": []} # Crea directory temporanea unica per questa esecuzione work_dir = tempfile.mkdtemp() script_path = os.path.join(work_dir, "script.py") # Scrive il codice with open(script_path, 'w', encoding='utf-8') as f: f.write(python_code) output_parts = [] generated_files = [] try: output_parts.append("🚀 Avvio esecuzione...") output_parts.append(f"📂 Directory: {work_dir}") output_parts.append("=" * 50) # Esegue result = subprocess.run( [sys.executable, script_path], capture_output=True, text=True, cwd=work_dir, # Esegue nella dir temporanea timeout=3600, # 1 hour timeout for long training runs env={**os.environ, "PYTHONUNBUFFERED": "1"} ) # Output if result.stdout: output_parts.append(result.stdout) if result.stderr: output_parts.append("\n--- STDERR ---") output_parts.append(result.stderr) output_parts.append("=" * 50) status = "✅ Successo" if result.returncode == 0 else f"❌ Errore ({result.returncode})" output_parts.append(f"{status}") # Cerca file generati (immagini, csv, json) for ext in ['*.png', '*.jpg', '*.svg', '*.csv', '*.json', '*.txt', '*.pth', '*.pt']: if ext == '*.txt': continue # Salta txt per evitare di rileggere log for filepath in glob.glob(os.path.join(work_dir, ext)): filename = os.path.basename(filepath) try: with open(filepath, "rb") as f: if os.path.getsize(filepath) > 10 * 1024 * 1024: output_parts.append(f"⚠️ File '{filename}' troppo grande (>10MB), saltato.") continue encoded = base64.b64encode(f.read()).decode('utf-8') generated_files.append({ "name": filename, "data": encoded, "mime": "application/octet-stream" }) output_parts.append(f"📦 File generato: {filename}") except Exception as e: output_parts.append(f"❌ Errore lettura {filename}: {e}") except Exception as e: import traceback output_parts.append(f"❌ Eccezione: {str(e)}\n{traceback.format_exc()}") finally: shutil.rmtree(work_dir, ignore_errors=True) return { "output": "\n".join(output_parts), "files": generated_files } def get_system_info(): info = ["System Info:"] info.append(f"Python: {sys.version}") try: import torch info.append(f"PyTorch: {torch.__version__}") info.append(f"CUDA Available: {torch.cuda.is_available()}") if torch.cuda.is_available(): info.append(f"GPU: {torch.cuda.get_device_name(0)}") except ImportError: info.append("PyTorch not installed") return "\n".join(info) @app.get("/") def read_root(): return {"status": "running", "message": "Remote GPU Execution Server"} @app.post("/execute") async def execute(data: dict): """Endpoint POST per eseguire codice Python""" try: code = data.get("code", "") return execute_code(code) except Exception as e: import traceback return {"output": f"❌ Server Error: {str(e)}\n\n{traceback.format_exc()}", "files": []} def stream_generator(python_code: str): """Generator che esegue codice e yielda output in tempo reale""" if not python_code or not python_code.strip(): yield "❌ Errore: Nessun codice ricevuto.\n" return work_dir = tempfile.mkdtemp() script_path = os.path.join(work_dir, "script.py") with open(script_path, 'w', encoding='utf-8') as f: f.write(python_code) yield f"🚀 Avvio esecuzione...\n" yield f"📂 Directory: {work_dir}\n" yield "=" * 50 + "\n" try: process = subprocess.Popen( [sys.executable, "-u", script_path], # -u for unbuffered stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, cwd=work_dir, env={**os.environ, "PYTHONUNBUFFERED": "1"} ) # Stream output line by line for line in process.stdout: yield line process.wait() yield "\n" + "=" * 50 + "\n" status = "✅ Successo" if process.returncode == 0 else f"❌ Errore ({process.returncode})" yield f"{status}\n" except Exception as e: import traceback yield f"❌ Eccezione: {str(e)}\n{traceback.format_exc()}\n" finally: shutil.rmtree(work_dir, ignore_errors=True) @app.post("/execute_stream") async def execute_stream(data: dict): """Endpoint POST per eseguire codice Python con streaming in tempo reale""" code = data.get("code", "") return StreamingResponse(stream_generator(code), media_type="text/plain") # Interfaccia Gradio (Wrapper semplificato) def gradio_wrapper(code): res = execute_code(code) return res["output"] demo = gr.Interface( fn=gradio_wrapper, inputs=gr.Textbox( label="📝 Codice Python", placeholder="Incolla qui il tuo codice Python...", lines=15 ), outputs=gr.Textbox(label="📤 Output Esecuzione (i file non sono visualizzati qui, usa l'API)"), title="🚀 Remote GPU Execution Server", description=f"Server attivo. {get_system_info()}" ) # Monta Gradio su FastAPI app = gr.mount_gradio_app(app, demo, path="/gradio") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)