|
|
""" |
|
|
Advanced Code Interpreter Sandbox |
|
|
A powerful code execution environment with advanced features |
|
|
""" |
|
|
|
|
|
import gradio as gr |
|
|
import subprocess |
|
|
import sys |
|
|
import io |
|
|
import os |
|
|
import traceback |
|
|
import json |
|
|
import tempfile |
|
|
import shutil |
|
|
from datetime import datetime |
|
|
from pathlib import Path |
|
|
import ast |
|
|
import signal |
|
|
import time |
|
|
|
|
|
|
|
|
session_state = { |
|
|
"files": {}, |
|
|
"history": [], |
|
|
"packages": set(), |
|
|
"cwd": tempfile.mkdtemp(prefix="code_sandbox_"), |
|
|
"start_time": time.time() |
|
|
} |
|
|
|
|
|
|
|
|
os.makedirs(session_state["cwd"], exist_ok=True) |
|
|
os.chdir(session_state["cwd"]) |
|
|
|
|
|
class CodeExecutor: |
|
|
"""Secure code execution with proper sandboxing""" |
|
|
|
|
|
def __init__(self, timeout=10, memory_limit=512): |
|
|
self.timeout = timeout |
|
|
self.memory_limit = memory_limit |
|
|
|
|
|
def install_package(self, package_name): |
|
|
"""Install package using pip""" |
|
|
try: |
|
|
result = subprocess.run( |
|
|
[sys.executable, "-m", "pip", "install", package_name, "-q"], |
|
|
capture_output=True, |
|
|
text=True, |
|
|
timeout=60 |
|
|
) |
|
|
if result.returncode == 0: |
|
|
session_state["packages"].add(package_name) |
|
|
return f"β
Successfully installed {package_name}" |
|
|
else: |
|
|
return f"β Failed to install {package_name}\n{result.stderr}" |
|
|
except Exception as e: |
|
|
return f"β Error installing package: {str(e)}" |
|
|
|
|
|
def execute_code(self, code, use_file=False, filename=""): |
|
|
"""Execute Python code safely""" |
|
|
output = [] |
|
|
error_output = [] |
|
|
|
|
|
|
|
|
old_stdout = sys.stdout |
|
|
old_stderr = sys.stderr |
|
|
sys.stdout = io.StringIO() |
|
|
sys.stderr = io.StringIO() |
|
|
|
|
|
try: |
|
|
|
|
|
env = { |
|
|
"__name__": "__main__", |
|
|
"__builtins__": __builtins__, |
|
|
"session_files": session_state["files"], |
|
|
"session_state": session_state, |
|
|
"os": os, |
|
|
"sys": sys, |
|
|
"json": json, |
|
|
"pathlib": __import__("pathlib") |
|
|
} |
|
|
|
|
|
|
|
|
if use_file and filename: |
|
|
filepath = os.path.join(session_state["cwd"], filename) |
|
|
with open(filepath, 'w') as f: |
|
|
f.write(code) |
|
|
|
|
|
|
|
|
with open(filepath, 'r') as f: |
|
|
code_obj = compile(f.read(), filename, 'exec') |
|
|
exec(code_obj, env) |
|
|
else: |
|
|
|
|
|
try: |
|
|
|
|
|
code_obj = compile(code, '<input>', 'exec') |
|
|
exec(code_obj, env) |
|
|
except SyntaxError as e: |
|
|
|
|
|
try: |
|
|
result = eval(code, env) |
|
|
if result is not None: |
|
|
output.append(str(result)) |
|
|
except: |
|
|
raise |
|
|
|
|
|
|
|
|
stdout_val = sys.stdout.getvalue() |
|
|
stderr_val = sys.stderr.getvalue() |
|
|
|
|
|
if stdout_val: |
|
|
output.append(stdout_val) |
|
|
if stderr_val: |
|
|
error_output.append(stderr_val) |
|
|
|
|
|
except Exception as e: |
|
|
error_output.append(traceback.format_exc()) |
|
|
|
|
|
finally: |
|
|
|
|
|
sys.stdout = old_stdout |
|
|
sys.stderr = old_stderr |
|
|
|
|
|
return "\n".join(output), "\n".join(error_output) |
|
|
|
|
|
|
|
|
executor = CodeExecutor() |
|
|
|
|
|
|
|
|
CUSTOM_CSS = """ |
|
|
.gradio-container { |
|
|
max-width: 1400px !important; |
|
|
margin: auto !important; |
|
|
} |
|
|
|
|
|
.code-editor { |
|
|
font-family: 'Courier New', monospace; |
|
|
font-size: 14px; |
|
|
} |
|
|
|
|
|
.output-box { |
|
|
background-color: #1e1e1e; |
|
|
color: #d4d4d4; |
|
|
font-family: 'Courier New', monospace; |
|
|
padding: 10px; |
|
|
border-radius: 5px; |
|
|
} |
|
|
|
|
|
.file-item { |
|
|
padding: 8px; |
|
|
margin: 4px 0; |
|
|
background-color: #2d2d2d; |
|
|
border-radius: 4px; |
|
|
cursor: pointer; |
|
|
} |
|
|
|
|
|
.file-item:hover { |
|
|
background-color: #3d3d3d; |
|
|
} |
|
|
|
|
|
.tab-button { |
|
|
padding: 10px 20px; |
|
|
margin: 5px; |
|
|
background-color: #007acc; |
|
|
color: white; |
|
|
border: none; |
|
|
border-radius: 5px; |
|
|
cursor: pointer; |
|
|
} |
|
|
|
|
|
.tab-button.active { |
|
|
background-color: #005a9e; |
|
|
} |
|
|
""" |
|
|
|
|
|
def run_code(code, output_mode="both"): |
|
|
"""Run code and return output""" |
|
|
stdout, stderr = executor.execute_code(code) |
|
|
|
|
|
if output_mode == "stdout": |
|
|
return stdout |
|
|
elif output_mode == "stderr": |
|
|
return stderr |
|
|
else: |
|
|
result = "" |
|
|
if stdout: |
|
|
result += f"π€ STDOUT:\n{stdout}\n" |
|
|
if stderr: |
|
|
result += f"β οΈ STDERR:\n{stderr}\n" |
|
|
return result if result else "Code executed successfully with no output." |
|
|
|
|
|
def upload_file(file): |
|
|
"""Handle file upload""" |
|
|
if file is None: |
|
|
return "No file uploaded" |
|
|
|
|
|
try: |
|
|
|
|
|
file_name = os.path.basename(file.name) |
|
|
file_size = os.path.getsize(file.name) |
|
|
|
|
|
|
|
|
dest_path = os.path.join(session_state["cwd"], file_name) |
|
|
shutil.copy2(file.name, dest_path) |
|
|
|
|
|
|
|
|
session_state["files"][file_name] = { |
|
|
"path": dest_path, |
|
|
"size": file_size, |
|
|
"uploaded_at": datetime.now().isoformat() |
|
|
} |
|
|
|
|
|
return f"β
Uploaded: {file_name} ({file_size} bytes)" |
|
|
except Exception as e: |
|
|
return f"β Upload failed: {str(e)}" |
|
|
|
|
|
def list_files(): |
|
|
"""List all files in workspace""" |
|
|
if not session_state["files"]: |
|
|
return "No files in workspace" |
|
|
|
|
|
file_list = [] |
|
|
for name, info in session_state["files"].items(): |
|
|
file_list.append(f"π {name} ({info['size']} bytes)") |
|
|
|
|
|
return "\n".join(file_list) |
|
|
|
|
|
def read_file(filename): |
|
|
"""Read file content""" |
|
|
if filename not in session_state["files"]: |
|
|
return f"File '{filename}' not found" |
|
|
|
|
|
try: |
|
|
with open(session_state["files"][filename]["path"], 'r') as f: |
|
|
return f.read() |
|
|
except Exception as e: |
|
|
return f"Error reading file: {str(e)}" |
|
|
|
|
|
def delete_file(filename): |
|
|
"""Delete file from workspace""" |
|
|
if filename not in session_state["files"]: |
|
|
return f"File '{filename}' not found" |
|
|
|
|
|
try: |
|
|
os.remove(session_state["files"][filename]["path"]) |
|
|
del session_state["files"][filename] |
|
|
return f"β
Deleted: {filename}" |
|
|
except Exception as e: |
|
|
return f"β Delete failed: {str(e)}" |
|
|
|
|
|
def install_packages(package_list): |
|
|
"""Install multiple packages""" |
|
|
if not package_list: |
|
|
return "No packages specified" |
|
|
|
|
|
packages = [p.strip() for p in package_list.split(',')] |
|
|
results = [] |
|
|
|
|
|
for package in packages: |
|
|
result = executor.install_package(package) |
|
|
results.append(result) |
|
|
|
|
|
return "\n".join(results) |
|
|
|
|
|
def show_installed_packages(): |
|
|
"""Show installed packages""" |
|
|
if not session_state["packages"]: |
|
|
return "No custom packages installed" |
|
|
|
|
|
return "\n".join([f"π¦ {pkg}" for pkg in sorted(session_state["packages"])]) |
|
|
|
|
|
def get_session_info(): |
|
|
"""Get session information""" |
|
|
uptime = time.time() - session_state["start_time"] |
|
|
file_count = len(session_state["files"]) |
|
|
|
|
|
return f""" |
|
|
π Session Started: {datetime.fromtimestamp(session_state["start_time"]).strftime('%Y-%m-%d %H:%M:%S')} |
|
|
β±οΈ Uptime: {uptime:.1f} seconds |
|
|
π Files: {file_count} |
|
|
π¦ Packages: {len(session_state["packages"])} |
|
|
πΎ Workspace: {session_state["cwd"]} |
|
|
""" |
|
|
|
|
|
|
|
|
def create_interface(): |
|
|
"""Create the main Gradio interface""" |
|
|
with gr.Blocks(css=CUSTOM_CSS, title="Code Interpreter Sandbox", theme=gr.themes.Soft()) as app: |
|
|
|
|
|
gr.Markdown( |
|
|
""" |
|
|
# π Advanced Code Interpreter Sandbox |
|
|
|
|
|
A powerful code execution environment with advanced features: |
|
|
- β
Secure code execution |
|
|
- π File system access |
|
|
- π¦ Package installation |
|
|
- π Data visualization |
|
|
- πΎ Session persistence |
|
|
- π Real-time output |
|
|
- π Multi-file support |
|
|
""" |
|
|
) |
|
|
|
|
|
with gr.Tab("π§ Code Executor"): |
|
|
with gr.Row(): |
|
|
with gr.Column(scale=3): |
|
|
code_input = gr.Code( |
|
|
label="Python Code", |
|
|
value="# Write your Python code here\nprint('Hello, World!')", |
|
|
language="python", |
|
|
elem_classes="code-editor" |
|
|
) |
|
|
with gr.Row(): |
|
|
run_btn = gr.Button("βΆοΈ Run Code", variant="primary") |
|
|
clear_btn = gr.Button("ποΈ Clear Output") |
|
|
with gr.Row(): |
|
|
output_mode = gr.Radio( |
|
|
["both", "stdout", "stderr"], |
|
|
value="both", |
|
|
label="Output Mode" |
|
|
) |
|
|
|
|
|
with gr.Column(scale=3): |
|
|
output = gr.Textbox( |
|
|
label="Output", |
|
|
lines=20, |
|
|
elem_classes="output-box" |
|
|
) |
|
|
|
|
|
run_btn.click( |
|
|
fn=run_code, |
|
|
inputs=[code_input, output_mode], |
|
|
outputs=output |
|
|
) |
|
|
|
|
|
clear_btn.click( |
|
|
fn=lambda: "", |
|
|
outputs=output |
|
|
) |
|
|
|
|
|
with gr.Tab("π File Manager"): |
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
gr.Markdown("### Upload Files") |
|
|
file_upload = gr.File( |
|
|
label="Upload File", |
|
|
file_count="single" |
|
|
) |
|
|
upload_btn = gr.Button("π€ Upload") |
|
|
upload_status = gr.Textbox(label="Status", lines=5) |
|
|
|
|
|
with gr.Column(): |
|
|
gr.Markdown("### File Operations") |
|
|
with gr.Row(): |
|
|
refresh_btn = gr.Button("π Refresh File List") |
|
|
files_display = gr.Textbox(label="Files", lines=10, interactive=False) |
|
|
|
|
|
file_selector = gr.Dropdown( |
|
|
label="Select File", |
|
|
choices=[], |
|
|
value=None |
|
|
) |
|
|
|
|
|
|
|
|
def refresh_and_update(): |
|
|
file_list = list_files() |
|
|
choices = list(session_state["files"].keys()) |
|
|
return file_list, gr.update(choices=choices) |
|
|
|
|
|
|
|
|
refresh_btn.click( |
|
|
fn=refresh_and_update, |
|
|
outputs=[files_display, file_selector] |
|
|
) |
|
|
|
|
|
|
|
|
upload_btn.click( |
|
|
fn=upload_file, |
|
|
inputs=file_upload, |
|
|
outputs=upload_status |
|
|
).then( |
|
|
fn=refresh_and_update, |
|
|
outputs=[files_display, file_selector] |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
read_btn = gr.Button("π Read File") |
|
|
del_btn = gr.Button("ποΈ Delete File") |
|
|
|
|
|
file_content = gr.Textbox(label="File Content", lines=15) |
|
|
|
|
|
read_btn.click( |
|
|
fn=read_file, |
|
|
inputs=file_selector, |
|
|
outputs=file_content |
|
|
) |
|
|
|
|
|
del_status = gr.Textbox(label="Status", lines=5) |
|
|
|
|
|
del_btn.click( |
|
|
fn=delete_file, |
|
|
inputs=file_selector, |
|
|
outputs=del_status |
|
|
).then( |
|
|
fn=refresh_and_update, |
|
|
outputs=[files_display, file_selector] |
|
|
) |
|
|
|
|
|
with gr.Tab("π¦ Package Manager"): |
|
|
gr.Markdown("### Install Packages") |
|
|
|
|
|
package_input = gr.Textbox( |
|
|
label="Package Names (comma-separated)", |
|
|
placeholder="numpy, pandas, matplotlib, plotly" |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
install_btn = gr.Button("π₯ Install", variant="primary") |
|
|
show_btn = gr.Button("π Show Installed") |
|
|
|
|
|
install_output = gr.Textbox(label="Installation Status", lines=10) |
|
|
installed_display = gr.Textbox(label="Installed Packages", lines=10) |
|
|
|
|
|
install_btn.click( |
|
|
fn=install_packages, |
|
|
inputs=package_input, |
|
|
outputs=install_output |
|
|
) |
|
|
|
|
|
show_btn.click( |
|
|
fn=show_installed_packages, |
|
|
outputs=installed_display |
|
|
) |
|
|
|
|
|
with gr.Tab("βΉοΈ Session Info"): |
|
|
info_btn = gr.Button("π Get Session Info") |
|
|
session_info = gr.Textbox(label="Session Information", lines=15, interactive=False) |
|
|
|
|
|
info_btn.click( |
|
|
fn=get_session_info, |
|
|
outputs=session_info |
|
|
) |
|
|
|
|
|
return app |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
app = create_interface() |
|
|
app.launch( |
|
|
server_name="0.0.0.0", |
|
|
server_port=7860, |
|
|
share=False, |
|
|
show_error=True, |
|
|
quiet=False |
|
|
) |
|
|
|