Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import subprocess | |
| import tempfile | |
| import os | |
| import json | |
| import docker | |
| import time | |
| from typing import Dict, List, Tuple | |
| import black | |
| import autopep8 | |
| import ast | |
| import sys | |
| from contextlib import redirect_stdout, redirect_stderr | |
| from io import StringIO | |
| class CodeRunner: | |
| def __init__(self): | |
| self.supported_languages = { | |
| 'python': {'ext': '.py', 'cmd': ['python'], 'formatter': self.format_python}, | |
| 'javascript': {'ext': '.js', 'cmd': ['node'], 'formatter': self.format_javascript}, | |
| 'typescript': {'ext': '.ts', 'cmd': ['ts-node'], 'formatter': None}, | |
| 'go': {'ext': '.go', 'cmd': ['go', 'run'], 'formatter': None}, | |
| 'rust': {'ext': '.rs', 'cmd': ['rustc', '--edition=2021'], 'formatter': None}, | |
| 'c': {'ext': '.c', 'cmd': ['gcc', '-o', '/tmp/output'], 'formatter': None}, | |
| 'cpp': {'ext': '.cpp', 'cmd': ['g++', '-o', '/tmp/output'], 'formatter': None}, | |
| 'java': {'ext': '.java', 'cmd': ['javac'], 'formatter': None}, | |
| 'bash': {'ext': '.sh', 'cmd': ['bash'], 'formatter': None} | |
| } | |
| # Security restrictions | |
| self.forbidden_imports = { | |
| 'python': ['os', 'subprocess', 'sys', 'importlib', '__import__'], | |
| 'javascript': ['fs', 'child_process', 'cluster', 'worker_threads'], | |
| 'bash': ['rm', 'sudo', 'chmod', 'chown'] | |
| } | |
| def run_code_sandboxed(self, code: str, language: str = "python", timeout: int = 10) -> Dict: | |
| """ | |
| Safely execute code with multiple sandboxing layers | |
| """ | |
| if language not in self.supported_languages: | |
| return { | |
| "success": False, | |
| "output": f"Language '{language}' not supported. Available: {list(self.supported_languages.keys())}", | |
| "execution_time": 0 | |
| } | |
| # Security check | |
| security_issues = self.check_security(code, language) | |
| if security_issues: | |
| return { | |
| "success": False, | |
| "output": f"Security violations detected: {', '.join(security_issues)}", | |
| "execution_time": 0 | |
| } | |
| start_time = time.time() | |
| try: | |
| # Method 1: Direct execution for Python (fastest) | |
| if language == "python": | |
| result = self._run_python_restricted(code, timeout) | |
| # Method 2: Docker sandboxing for other languages | |
| else: | |
| result = self._run_in_docker(code, language, timeout) | |
| execution_time = time.time() - start_time | |
| result["execution_time"] = round(execution_time, 3) | |
| return result | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "output": f"Execution error: {str(e)}", | |
| "execution_time": time.time() - start_time | |
| } | |
| def _run_python_restricted(self, code: str, timeout: int) -> Dict: | |
| """ | |
| Run Python code with restricted builtins and memory/time limits | |
| """ | |
| # Create restricted environment | |
| restricted_builtins = { | |
| 'print': print, | |
| 'len': len, | |
| 'str': str, | |
| 'int': int, | |
| 'float': float, | |
| 'list': list, | |
| 'dict': dict, | |
| 'tuple': tuple, | |
| 'set': set, | |
| 'range': range, | |
| 'enumerate': enumerate, | |
| 'zip': zip, | |
| 'map': map, | |
| 'filter': filter, | |
| 'sorted': sorted, | |
| 'sum': sum, | |
| 'min': min, | |
| 'max': max, | |
| 'abs': abs, | |
| 'round': round, | |
| 'isinstance': isinstance, | |
| 'type': type, | |
| 'hasattr': hasattr, | |
| 'getattr': getattr, | |
| 'setattr': setattr, | |
| } | |
| # Capture output | |
| stdout_buffer = StringIO() | |
| stderr_buffer = StringIO() | |
| try: | |
| # Parse and validate AST | |
| tree = ast.parse(code) | |
| # Execute with restrictions | |
| with redirect_stdout(stdout_buffer), redirect_stderr(stderr_buffer): | |
| exec(compile(tree, '<string>', 'exec'), { | |
| '__builtins__': restricted_builtins, | |
| '__name__': '__main__' | |
| }) | |
| return { | |
| "success": True, | |
| "output": stdout_buffer.getvalue(), | |
| "errors": stderr_buffer.getvalue() | |
| } | |
| except SyntaxError as e: | |
| return { | |
| "success": False, | |
| "output": f"Syntax Error: {str(e)}" | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "output": f"Runtime Error: {str(e)}" | |
| } | |
| def _run_in_docker(self, code: str, language: str, timeout: int) -> Dict: | |
| """ | |
| Run code in isolated Docker container (requires Docker daemon) | |
| """ | |
| try: | |
| client = docker.from_env() | |
| # Language-specific Docker images | |
| images = { | |
| 'javascript': 'node:18-alpine', | |
| 'typescript': 'node:18-alpine', | |
| 'go': 'golang:1.21-alpine', | |
| 'rust': 'rust:1.70-alpine', | |
| 'c': 'gcc:latest', | |
| 'cpp': 'gcc:latest', | |
| 'java': 'openjdk:17-alpine', | |
| 'bash': 'alpine:latest' | |
| } | |
| image = images.get(language, 'alpine:latest') | |
| lang_config = self.supported_languages[language] | |
| # Create temporary file | |
| with tempfile.NamedTemporaryFile(mode='w', suffix=lang_config['ext'], delete=False) as f: | |
| f.write(code) | |
| temp_file = f.name | |
| try: | |
| # Run in container | |
| result = client.containers.run( | |
| image, | |
| command=lang_config['cmd'] + [f'/tmp/code{lang_config["ext"]}'], | |
| volumes={temp_file: {'bind': f'/tmp/code{lang_config["ext"]}', 'mode': 'ro'}}, | |
| mem_limit='128m', | |
| cpu_quota=50000, # 50% CPU | |
| network_disabled=True, | |
| timeout=timeout, | |
| remove=True, | |
| capture_output=True, | |
| text=True | |
| ) | |
| return { | |
| "success": True, | |
| "output": result.decode('utf-8') if isinstance(result, bytes) else str(result) | |
| } | |
| finally: | |
| os.unlink(temp_file) | |
| except docker.errors.ContainerError as e: | |
| return {"success": False, "output": f"Container error: {e.stderr.decode()}"} | |
| except docker.errors.ImageNotFound: | |
| return {"success": False, "output": f"Docker image not found for {language}"} | |
| except Exception as e: | |
| return {"success": False, "output": f"Docker execution failed: {str(e)}"} | |
| def _run_with_subprocess(self, code: str, language: str, timeout: int) -> Dict: | |
| """ | |
| Fallback: Run using subprocess with basic sandboxing | |
| """ | |
| lang_config = self.supported_languages[language] | |
| with tempfile.NamedTemporaryFile(mode='w', suffix=lang_config['ext'], delete=False) as f: | |
| f.write(code) | |
| temp_file = f.name | |
| try: | |
| result = subprocess.run( | |
| lang_config['cmd'] + [temp_file], | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True, | |
| timeout=timeout, | |
| cwd='/tmp', # Restrict to /tmp directory | |
| env={'PATH': '/usr/bin:/bin'} # Minimal environment | |
| ) | |
| return { | |
| "success": result.returncode == 0, | |
| "output": result.stdout, | |
| "errors": result.stderr | |
| } | |
| except subprocess.TimeoutExpired: | |
| return {"success": False, "output": f"Execution timed out after {timeout}s"} | |
| except FileNotFoundError: | |
| return {"success": False, "output": f"Language runtime not found for {language}"} | |
| finally: | |
| os.unlink(temp_file) | |
| def check_security(self, code: str, language: str) -> List[str]: | |
| """ | |
| Basic security checks for dangerous patterns | |
| """ | |
| issues = [] | |
| forbidden = self.forbidden_imports.get(language, []) | |
| code_lower = code.lower() | |
| # Check forbidden imports/modules | |
| for forbidden_item in forbidden: | |
| if forbidden_item in code_lower: | |
| issues.append(f"Forbidden: {forbidden_item}") | |
| # Language-specific checks | |
| if language == "python": | |
| dangerous_patterns = ['eval(', 'exec(', '__import__', 'open(', 'file('] | |
| for pattern in dangerous_patterns: | |
| if pattern in code_lower: | |
| issues.append(f"Dangerous pattern: {pattern}") | |
| elif language == "bash": | |
| dangerous_cmds = ['rm -rf', 'sudo', '> /dev/', 'curl', 'wget'] | |
| for cmd in dangerous_cmds: | |
| if cmd in code_lower: | |
| issues.append(f"Dangerous command: {cmd}") | |
| return issues | |
| def format_python(self, code: str) -> str: | |
| """Format Python code using black and autopep8""" | |
| try: | |
| # First pass with autopep8 | |
| formatted = autopep8.fix_code(code) | |
| # Second pass with black | |
| formatted = black.format_str(formatted, mode=black.Mode()) | |
| return formatted | |
| except Exception as e: | |
| return f"Formatting error: {str(e)}" | |
| def format_javascript(self, code: str) -> str: | |
| """Basic JavaScript formatting (would use prettier in production)""" | |
| # This is a simplified formatter - in production, use prettier | |
| lines = code.split('\n') | |
| formatted_lines = [] | |
| indent_level = 0 | |
| for line in lines: | |
| stripped = line.strip() | |
| if not stripped: | |
| formatted_lines.append('') | |
| continue | |
| if stripped.endswith('{'): | |
| formatted_lines.append(' ' * indent_level + stripped) | |
| indent_level += 1 | |
| elif stripped.startswith('}'): | |
| indent_level = max(0, indent_level - 1) | |
| formatted_lines.append(' ' * indent_level + stripped) | |
| else: | |
| formatted_lines.append(' ' * indent_level + stripped) | |
| return '\n'.join(formatted_lines) | |
| def format_code(self, code: str, language: str = "python") -> str: | |
| """Format code based on language""" | |
| if language not in self.supported_languages: | |
| return f"Formatting not supported for {language}" | |
| formatter = self.supported_languages[language]['formatter'] | |
| if formatter: | |
| return formatter(code) | |
| else: | |
| return f"No formatter available for {language}" | |
| def analyze_code(self, code: str, language: str = "python") -> List[str]: | |
| """Analyze code for potential issues""" | |
| issues = [] | |
| # Security analysis | |
| security_issues = self.check_security(code, language) | |
| issues.extend([f"Security: {issue}" for issue in security_issues]) | |
| # Language-specific analysis | |
| if language == "python": | |
| try: | |
| tree = ast.parse(code) | |
| # Check for common issues | |
| for node in ast.walk(tree): | |
| if isinstance(node, ast.Global): | |
| issues.append("Code Quality: Avoid global variables") | |
| elif isinstance(node, ast.Import): | |
| for alias in node.names: | |
| if alias.name.startswith('_'): | |
| issues.append(f"Style: Avoid importing private modules: {alias.name}") | |
| elif isinstance(node, ast.FunctionDef): | |
| if len(node.args.args) > 5: | |
| issues.append(f"Code Quality: Function '{node.name}' has too many parameters") | |
| except SyntaxError as e: | |
| issues.append(f"Syntax Error: {str(e)}") | |
| return issues if issues else ["No issues found"] | |
| def generate_code(self, prompt: str, language: str = "python") -> str: | |
| """Generate basic code templates based on prompt""" | |
| templates = { | |
| "python": { | |
| "function": "def {name}():\n \"\"\"\n {description}\n \"\"\"\n pass", | |
| "class": "class {name}:\n def __init__(self):\n pass", | |
| "api": "import requests\n\ndef fetch_data(url):\n response = requests.get(url)\n return response.json()", | |
| "file": "with open('file.txt', 'r') as f:\n content = f.read()\nprint(content)" | |
| }, | |
| "javascript": { | |
| "function": "function {name}() {\n // {description}\n return null;\n}", | |
| "async": "async function {name}() {\n try {\n // {description}\n return await someAsyncOperation();\n } catch (error) {\n console.error(error);\n }\n}", | |
| "api": "fetch('/api/data')\n .then(response => response.json())\n .then(data => console.log(data));" | |
| } | |
| } | |
| prompt_lower = prompt.lower() | |
| lang_templates = templates.get(language, {}) | |
| # Simple template matching | |
| if "function" in prompt_lower: | |
| template = lang_templates.get("function", f"// {prompt}") | |
| return template.format(name="myFunction", description=prompt) | |
| elif "class" in prompt_lower and language == "python": | |
| template = lang_templates.get("class", f"# {prompt}") | |
| return template.format(name="MyClass") | |
| elif "api" in prompt_lower or "fetch" in prompt_lower: | |
| return lang_templates.get("api", f"// API code for: {prompt}") | |
| else: | |
| return f"# Generated code for: {prompt}\n# TODO: Implement functionality" | |
| # Initialize the CodeRunner | |
| runner = CodeRunner() | |
| # Gradio interface functions | |
| def run_code_interface(code: str, language: str, timeout: int = 10): | |
| result = runner.run_code_sandboxed(code, language.lower(), timeout) | |
| output = [] | |
| output.append(f"β Success: {result['success']}") | |
| output.append(f"β±οΈ Execution Time: {result.get('execution_time', 0)}s") | |
| output.append(f"\nπ€ Output:\n{result.get('output', '')}") | |
| if result.get('errors'): | |
| output.append(f"\nβ Errors:\n{result['errors']}") | |
| return "\n".join(output) | |
| def format_code_interface(code: str, language: str): | |
| return runner.format_code(code, language.lower()) | |
| def analyze_code_interface(code: str, language: str): | |
| issues = runner.analyze_code(code, language.lower()) | |
| return "\n".join([f"β’ {issue}" for issue in issues]) | |
| def generate_code_interface(prompt: str, language: str): | |
| return runner.generate_code(prompt, language.lower()) | |
| # Create Gradio interfaces | |
| with gr.Blocks(title="CodeRunner - Multi-Language Code Execution Tool") as demo: | |
| gr.Markdown("# π CodeRunner - Secure Multi-Language Code Execution") | |
| gr.Markdown("Execute, format, analyze, and generate code in multiple programming languages with built-in security sandboxing.") | |
| with gr.Tab("π Run Code"): | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| code_input = gr.Textbox( | |
| label="Code", | |
| lines=10, | |
| placeholder="Enter your code here...", | |
| value="print('Hello, World!')" | |
| ) | |
| language_select = gr.Dropdown( | |
| choices=list(runner.supported_languages.keys()), | |
| value="python", | |
| label="Language" | |
| ) | |
| timeout_slider = gr.Slider(1, 30, value=10, label="Timeout (seconds)") | |
| with gr.Column(scale=2): | |
| output = gr.Textbox(label="Output", lines=12, max_lines=20) | |
| run_btn = gr.Button("βΆοΈ Run Code", variant="primary") | |
| run_btn.click( | |
| run_code_interface, | |
| inputs=[code_input, language_select, timeout_slider], | |
| outputs=output | |
| ) | |
| with gr.Tab("β¨ Format Code"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| format_input = gr.Textbox(label="Code to Format", lines=8) | |
| format_lang = gr.Dropdown(choices=["python", "javascript"], value="python", label="Language") | |
| format_btn = gr.Button("π¨ Format Code") | |
| with gr.Column(): | |
| format_output = gr.Textbox(label="Formatted Code", lines=8) | |
| format_btn.click(format_code_interface, inputs=[format_input, format_lang], outputs=format_output) | |
| with gr.Tab("π Analyze Code"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| analyze_input = gr.Textbox(label="Code to Analyze", lines=8) | |
| analyze_lang = gr.Dropdown(choices=list(runner.supported_languages.keys()), value="python", label="Language") | |
| analyze_btn = gr.Button("π Analyze Code") | |
| with gr.Column(): | |
| analyze_output = gr.Textbox(label="Analysis Results", lines=8) | |
| analyze_btn.click(analyze_code_interface, inputs=[analyze_input, analyze_lang], outputs=analyze_output) | |
| with gr.Tab("π€ Generate Code"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| prompt_input = gr.Textbox(label="Describe what you want to generate", lines=3, placeholder="e.g., 'Create a function that calculates fibonacci numbers'") | |
| generate_lang = gr.Dropdown(choices=["python", "javascript"], value="python", label="Language") | |
| generate_btn = gr.Button("π Generate Code") | |
| with gr.Column(): | |
| generate_output = gr.Textbox(label="Generated Code", lines=10) | |
| generate_btn.click(generate_code_interface, inputs=[prompt_input, generate_lang], outputs=generate_output) | |
| with gr.Tab("π API Documentation"): | |
| gr.Markdown(""" | |
| ## API Endpoints | |
| This CodeRunner can be used as a HuggingChat tool with the following functions: | |
| ### 1. `run_code(code: str, language: str, timeout: int = 10)` | |
| - Executes code safely in a sandboxed environment | |
| - Supports: Python, JavaScript, TypeScript, Go, Rust, C, C++, Java, Bash | |
| - Returns: execution result with output and timing | |
| ### 2. `format_code(code: str, language: str)` | |
| - Formats and prettifies code | |
| - Currently supports: Python (black), JavaScript (basic) | |
| - Returns: formatted code string | |
| ### 3. `analyze_code(code: str, language: str)` | |
| - Analyzes code for security issues and code quality | |
| - Checks for dangerous patterns and common mistakes | |
| - Returns: list of issues and recommendations | |
| ### 4. `generate_code(prompt: str, language: str)` | |
| - Generates basic code templates from natural language | |
| - Supports common patterns: functions, classes, API calls | |
| - Returns: generated code template | |
| ## Security Features | |
| - Restricted Python execution environment | |
| - Docker containerization for other languages | |
| - Memory and CPU limits | |
| - Network isolation | |
| - Forbidden imports/commands detection | |
| """) | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=7860) |