Spaces:
Paused
Paused
| import gradio as gr | |
| import os | |
| import subprocess | |
| import shutil | |
| import threading | |
| import io | |
| import socket | |
| import random | |
| import string | |
| import datetime | |
| import ast | |
| import re | |
| from huggingface_hub import HfApi, snapshot_download | |
| # === Config === | |
| DATASET_REPO = "Devity4756/Terminal" | |
| HF_TOKEN = os.environ.get("HF_TOKEN", "") | |
| if HF_TOKEN: | |
| api = HfApi(token=HF_TOKEN) | |
| else: | |
| api = None | |
| print("Warning: HF_TOKEN not found. Save functionality will be limited.") | |
| # Use dataset workspace directly | |
| WORKDIR = "workspace" | |
| os.makedirs(WORKDIR, exist_ok=True) | |
| # === Virtual Home Mapping === | |
| VIRTUAL_HOME = os.path.join(WORKDIR, "Alex") | |
| os.makedirs(VIRTUAL_HOME, exist_ok=True) | |
| PATH_MAP = {"Alex": VIRTUAL_HOME} | |
| # === Restore state from dataset === | |
| try: | |
| if HF_TOKEN: | |
| snapshot_path = snapshot_download( | |
| repo_id=DATASET_REPO, | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| local_dir=WORKDIR # Download directly to workspace | |
| ) | |
| print(f"Restored dataset data to: {WORKDIR}") | |
| else: | |
| print("Cannot restore state without HF_TOKEN") | |
| except Exception as e: | |
| print("No previous data restored:", e) | |
| # === Track current working directory & running process === | |
| current_dir = VIRTUAL_HOME # Start in workspace/Alex | |
| running_process = None | |
| open_file_path = None | |
| # === Flask apps & logs === | |
| flask_apps = {} | |
| flask_logs = {} | |
| flask_ports = {} | |
| next_port = 5000 | |
| flask_threads = {} | |
| # Get the public URL of the Gradio app | |
| public_url = os.environ.get("SPACE_URL", "https://huggingface.co/spaces/Devity4756/Alex-terminal") | |
| # === Security Configuration === | |
| ALLOWED_FLASK_IMPORTS = { | |
| 'Flask', 'request', 'jsonify', 'render_template', | |
| 'redirect', 'url_for', 'send_file', 'abort' | |
| } | |
| RESTRICTED_KEYWORDS = { | |
| 'os.', 'subprocess.', 'sys.', 'importlib.', 'eval(', 'exec(', | |
| 'open(', 'file(', 'compile(', '__import__', 'globals()', 'locals()', | |
| 'getattr', 'setattr', 'delattr', 'input(', 'execfile' | |
| } | |
| MAX_CODE_LENGTH = 5000 # Maximum characters for Flask code | |
| def generate_random_name(length=12): | |
| """Generate a random function/class name""" | |
| return ''.join(random.choice(string.ascii_letters) for _ in range(length)) | |
| def expand_path(path: str) -> str: | |
| """Expand paths relative to dataset workspace""" | |
| # Handle absolute paths (relative to Alex in dataset) | |
| if path.startswith("/"): | |
| return os.path.join(VIRTUAL_HOME, path.lstrip("/")) | |
| # Handle virtual home alias | |
| if path.startswith("Alex/"): | |
| return os.path.join(VIRTUAL_HOME, path.replace("Alex/", "", 1)) | |
| # Handle relative paths from current directory | |
| return os.path.join(current_dir, path) | |
| def get_local_ip(): | |
| """Get the local IP address of the machine""" | |
| try: | |
| s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
| s.connect(("8.8.8.8", 80)) | |
| ip = s.getsockname()[0] | |
| s.close() | |
| return ip | |
| except: | |
| return "localhost" | |
| def validate_flask_code(code): | |
| """Validate Flask code for security""" | |
| # Check length | |
| if len(code) > MAX_CODE_LENGTH: | |
| return False, "Code too long (max 5000 characters)" | |
| # Check for restricted patterns | |
| for pattern in RESTRICTED_KEYWORDS: | |
| if pattern in code: | |
| return False, f"Restricted pattern found: {pattern}" | |
| if 'Flask(__name__)' in code or 'app = Flask' in code: | |
| return False, "Creating new Flask app is not allowed; use the provided 'app' instance" | |
| if 'if __name__ == "__main__"' in code or "if __name__ == '__main__'" in code: | |
| return False, "Code containing 'if __name__ == \"__main__\"' is not allowed; app execution is handled automatically" | |
| # Parse AST to check for dangerous constructs | |
| try: | |
| tree = ast.parse(code) | |
| for node in ast.walk(tree): | |
| # Check for imports | |
| if isinstance(node, ast.Import) or isinstance(node, ast.ImportFrom): | |
| for alias in (node.names if isinstance(node, ast.Import) else [node]): | |
| module_name = alias.name if isinstance(node, ast.Import) else node.module | |
| if module_name and not any(module_name.startswith(allowed) | |
| for allowed in ['flask', 'werkzeug']): | |
| return False, f"Restricted import: {module_name}" | |
| # Check for function definitions with dangerous names | |
| if isinstance(node, ast.FunctionDef): | |
| if node.name.startswith('_') or node.name in ['exit', 'quit', 'help']: | |
| return False, f"Restricted function name: {node.name}" | |
| if isinstance(node, ast.Call) and isinstance(node.func, ast.Attribute): | |
| if node.func.attr == 'run' and isinstance(node.func.value, ast.Name) and node.func.value.id == 'app': | |
| return False, "Calling app.run() is not allowed; app execution is handled automatically" | |
| except SyntaxError as e: | |
| return False, f"Syntax error: {e}" | |
| return True, "Code validated successfully" | |
| def secure_execute_flask_code(app_instance, code, log_func, request_obj): | |
| """Safely execute Flask code with randomized function names""" | |
| # Generate random names for functions to avoid conflicts | |
| random_prefix = generate_random_name(8) | |
| # Replace function definitions with randomized names | |
| code = re.sub( | |
| r'def (\w+)\s*\(', | |
| lambda m: f'def {random_prefix}_{m.group(1)}(', | |
| code | |
| ) | |
| # Create a secure environment with template rendering support | |
| secure_globals = { | |
| 'app': app_instance, | |
| 'log': log_func, | |
| 'request': request_obj, | |
| '__name__': '__main__', | |
| '__builtins__': { | |
| 'str': str, 'int': int, 'float': float, 'bool': bool, 'list': list, | |
| 'dict': dict, 'tuple': tuple, 'set': set, 'len': len, 'range': range, | |
| 'print': print, 'isinstance': isinstance, 'type': type, 'repr': repr | |
| } | |
| } | |
| # Add allowed Flask imports with template support | |
| try: | |
| from flask import Flask, request, jsonify, render_template, redirect, url_for, send_file, abort | |
| secure_globals.update({ | |
| 'Flask': Flask, | |
| 'request': request, | |
| 'jsonify': jsonify, | |
| 'render_template': render_template, | |
| 'redirect': redirect, | |
| 'url_for': url_for, | |
| 'send_file': send_file, | |
| 'abort': abort | |
| }) | |
| except ImportError: | |
| pass | |
| try: | |
| # Compile and execute in restricted environment | |
| compiled_code = compile(code, '<string>', 'exec') | |
| exec(compiled_code, secure_globals) | |
| return True, "Code executed successfully" | |
| except Exception as e: | |
| return False, f"Execution error: {e}" | |
| def run_flask_app(app_name: str, code: str): | |
| """Start a Flask app using dataset paths""" | |
| global next_port, current_dir | |
| log_buffer = io.StringIO() | |
| flask_logs[app_name] = log_buffer | |
| port = next_port | |
| next_port += 1 | |
| flask_ports[app_name] = port | |
| def log(msg): | |
| log_buffer.write(msg + "\n") | |
| log_buffer.flush() | |
| print(f"[{app_name}] {msg}") | |
| try: | |
| is_valid, validation_msg = validate_flask_code(code) | |
| if not is_valid: | |
| log(f"Code validation failed: {validation_msg}") | |
| return f"Failed to start Flask app: {validation_msg}" | |
| from flask import Flask, request | |
| # Get the absolute path to templates | |
| template_dir = os.path.abspath(os.path.join(VIRTUAL_HOME, 'templates')) | |
| static_dir = os.path.abspath(os.path.join(VIRTUAL_HOME, 'static')) | |
| # Create directories if they don't exist | |
| os.makedirs(template_dir, exist_ok=True) | |
| os.makedirs(static_dir, exist_ok=True) | |
| log(f"Absolute template path: {template_dir}") | |
| log(f"Template dir exists: {os.path.exists(template_dir)}") | |
| if os.path.exists(template_dir): | |
| log(f"Files in template dir: {os.listdir(template_dir)}") | |
| # Use absolute paths for templates and static files | |
| app = Flask( | |
| app_name, | |
| template_folder=template_dir, | |
| static_folder=static_dir, | |
| static_url_path='/static' | |
| ) | |
| # Set root path to dataset Alex directory | |
| app.root_path = os.path.abspath(VIRTUAL_HOME) | |
| success, exec_msg = secure_execute_flask_code(app, code, log, request) | |
| if not success: | |
| log(f"Failed to execute code: {exec_msg}") | |
| return f"Failed to start Flask app: {exec_msg}" | |
| def run_app(): | |
| try: | |
| # Change to dataset directory using absolute path | |
| original_cwd = os.getcwd() | |
| abs_alex_path = os.path.abspath(VIRTUAL_HOME) | |
| os.chdir(abs_alex_path) | |
| log(f"Running Flask from: {os.getcwd()}") | |
| log(f"Template folder contents: {os.listdir(template_dir) if os.path.exists(template_dir) else 'NOT FOUND'}") | |
| app.run(host='0.0.0.0', port=port, debug=False, use_reloader=False) | |
| os.chdir(original_cwd) | |
| except Exception as e: | |
| log(f"Error running Flask app: {e}") | |
| thread = threading.Thread(target=run_app, daemon=True) | |
| thread.start() | |
| flask_apps[app_name] = app | |
| local_ip = get_local_ip() | |
| local_url = f"http://{local_ip}:{port}" | |
| if "hf.space" in public_url: | |
| space_id = public_url.split("https://")[1].split(".hf.space")[0] | |
| live_url = f"https://{space_id}.hf.space/proxy/{port}/" | |
| else: | |
| live_url = f"{public_url}/proxy/{port}/" | |
| log(f"Started Flask app: {app_name} on port {port}") | |
| log(f"Absolute dataset directory: {os.path.abspath(VIRTUAL_HOME)}") | |
| log(f"Absolute template folder: {template_dir}") | |
| log(f"Local URL: {local_url}") | |
| log(f"Live URL: {live_url}") | |
| return f"Started Flask app: {app_name}\nDataset directory: {os.path.abspath(VIRTUAL_HOME)}\nLocal URL: {local_url}\nLive URL: {live_url}" | |
| except Exception as e: | |
| log(f"Error starting Flask app {app_name}: {e}") | |
| return f"Error starting Flask app: {e}" | |
| def check_template_path(): | |
| """Debug function to check template path""" | |
| template_path = os.path.abspath(os.path.join(VIRTUAL_HOME, 'templates', 'index.html')) | |
| return { | |
| 'template_path': template_path, | |
| 'exists': os.path.exists(template_path), | |
| 'is_file': os.path.isfile(template_path) if os.path.exists(template_path) else False, | |
| 'template_dir': os.path.dirname(template_path), | |
| 'template_dir_exists': os.path.exists(os.path.dirname(template_path)), | |
| 'template_dir_contents': os.listdir(os.path.dirname(template_path)) if os.path.exists(os.path.dirname(template_path)) else [] | |
| } | |
| def is_flask_app(file_path): | |
| """Check if a Python file contains Flask code.""" | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| return bool(re.search(r'@app\.route', content)) | |
| except Exception: | |
| return False | |
| def run_python_app(file_path: str): | |
| """Run a regular Python application""" | |
| target = expand_path(file_path) | |
| if not os.path.exists(target): | |
| return f"File not found: {file_path}" | |
| if not target.endswith('.py'): | |
| return f"Not a Python file: {file_path}" | |
| def run_app(): | |
| try: | |
| process = subprocess.Popen( | |
| ['python3', target], | |
| cwd=os.path.dirname(target), | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True | |
| ) | |
| stdout, stderr = process.communicate() | |
| if stdout: | |
| print(f"Output from {file_path}:\n{stdout}") | |
| if stderr: | |
| print(f"Errors from {file_path}:\n{stderr}") | |
| except Exception as e: | |
| print(f"Error running {file_path}: {e}") | |
| # Run in a separate thread | |
| thread = threading.Thread(target=run_app, daemon=True) | |
| thread.start() | |
| return f"Started Python application: {file_path}" | |
| def stop_flask_app(app_name: str): | |
| """Stop a specific Flask app""" | |
| if app_name not in flask_threads: | |
| return f"Flask app '{app_name}' is not running" | |
| try: | |
| # Try to gracefully shutdown the Flask app | |
| if app_name in flask_apps: | |
| # Flask doesn't have a built-in shutdown method for the dev server, | |
| # so we'll rely on thread termination | |
| pass | |
| # Terminate the thread (this is a bit forceful but works for the dev server) | |
| thread = flask_threads[app_name] | |
| if thread.is_alive(): | |
| # Note: This may not clean up perfectly since Flask's dev server | |
| # doesn't have a proper shutdown mechanism | |
| pass | |
| # Clean up the tracking dictionaries | |
| flask_apps.pop(app_name, None) | |
| flask_logs.pop(app_name, None) | |
| flask_ports.pop(app_name, None) | |
| flask_threads.pop(app_name, None) | |
| return f"Stopped Flask app: {app_name}" | |
| except Exception as e: | |
| return f"Error stopping Flask app '{app_name}': {e}" | |
| def is_flask_app(file_path): | |
| """Check if a Python file contains Flask code.""" | |
| try: | |
| with open(file_path, 'r') as f: | |
| content = f.read() | |
| return bool(re.search(r'@app\.route', content)) | |
| except Exception: | |
| return False | |
| def view_logs(app_name: str): | |
| """Get logs of a Flask app""" | |
| if app_name not in flask_logs: | |
| return f"No logs for {app_name}" | |
| log_buf = flask_logs[app_name] | |
| log_buf.seek(0) | |
| return log_buf.read() | |
| def get_flask_apps(): | |
| """Get list of running Flask apps with their URLs""" | |
| if not flask_ports: | |
| return "No Flask apps running" | |
| local_ip = get_local_ip() | |
| result = "Running Flask apps:\n\n" | |
| for app_name, port in flask_ports.items(): | |
| thread = flask_threads.get(app_name) | |
| status = "Running" if thread and thread.is_alive() else "Stopped" | |
| local_url = f"http://{local_ip}:{port}" | |
| if "hf.space" in public_url: | |
| space_id = public_url.split("https://")[1].split(".hf.space")[0] | |
| live_url = f"https://{space_id}.hf.space/proxy/{port}/" | |
| result += f"- {app_name} ({status}):\n Local: {local_url}\n Live: {live_url}\n\n" | |
| else: | |
| result += f"- {app_name} ({status}):\n Local: {local_url}\n Live: {public_url}/proxy/{port}/\n\n" | |
| result += "Use 'close <app_name>' to stop a specific Flask app." | |
| return result | |
| def run_command(cmd: str): | |
| """Handle terminal-like commands including Flask and Python apps""" | |
| global current_dir, running_process | |
| cmd = cmd.strip() # Remove leading/trailing whitespace | |
| if not cmd: | |
| return "No command provided", "", None | |
| # Split command into command and arguments (maxsplit=1 for commands with spaces) | |
| parts = cmd.split(maxsplit=1) | |
| raw_cmd = parts[0].lower() | |
| args = parts[1] if len(parts) > 1 else "" | |
| try: | |
| # === STOP running process === | |
| if raw_cmd == "close": | |
| if args: | |
| result = stop_flask_app(args) | |
| return f"$ {cmd}\n\n{result}", "", None | |
| elif running_process and running_process.poll() is None: | |
| running_process.terminate() | |
| running_process = None | |
| return f"$ {cmd}\n\nStopped running process.", "", None | |
| return f"$ {cmd}\n\nNo active process to stop.", "", None | |
| # === Start Flask app from code === | |
| elif raw_cmd == "flaskrun" and args: | |
| # Expect args to be: app_name followed by code | |
| args_parts = args.split(maxsplit=1) | |
| if len(args_parts) < 2: | |
| return f"$ {cmd}\n\nUsage: flaskrun <app_name> <code>", "", None | |
| app_name, code = args_parts | |
| code = code.strip('"').strip("'") | |
| result = run_flask_app(app_name, code) | |
| return f"$ {cmd}\n\n{result}", "", None | |
| # === Run Python application === | |
| elif raw_cmd == "python3" and args: | |
| file_path = expand_path(args) | |
| if not os.path.exists(file_path): | |
| return f"$ {cmd}\n\nFile not found: {file_path}", "", None | |
| if is_flask_app(file_path): | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| code = f.read() | |
| app_name = os.path.splitext(os.path.basename(file_path))[0] | |
| result = run_flask_app(app_name, code) | |
| return f"$ {cmd}\n\n{result}", "", None | |
| except Exception as e: | |
| return f"$ {cmd}\n\nError reading Flask app: {e}", "", None | |
| else: | |
| result = run_python_app(file_path) | |
| return f"$ {cmd}\n\n{result}", "", None | |
| # === List Flask apps === | |
| elif raw_cmd == "flasklist": | |
| apps_info = get_flask_apps() | |
| return f"$ {cmd}\n\n{apps_info}", "", None | |
| # In run_command function, update debug: | |
| elif raw_cmd == "debug": | |
| result = f"Dataset workspace: {WORKDIR}\n" | |
| result += f"Virtual home (Alex): {VIRTUAL_HOME}\n" | |
| result += f"Current directory: {current_dir}\n" | |
| # Check template path in detail | |
| template_info = check_template_path() | |
| result += f"Absolute template path: {template_info['template_path']}\n" | |
| result += f"Template exists: {template_info['exists']}\n" | |
| result += f"Template is file: {template_info['is_file']}\n" | |
| result += f"Template dir exists: {template_info['template_dir_exists']}\n" | |
| result += f"Template dir contents: {template_info['template_dir_contents']}\n" | |
| result += f"Alex contents: {os.listdir(VIRTUAL_HOME) if os.path.exists(VIRTUAL_HOME) else 'Alex directory not found'}\n" | |
| result += f"HF Token configured: {bool(HF_TOKEN)}\n" | |
| return f"$ {cmd}\n\n{result}", "", None | |
| # In run_command function, add: | |
| elif raw_cmd == "sync": | |
| """Sync current state with Hugging Face dataset""" | |
| if not HF_TOKEN: | |
| return "HF_TOKEN not configured. Cannot sync with dataset.", "", None | |
| try: | |
| api.upload_folder( | |
| folder_path=WORKDIR, | |
| repo_id=DATASET_REPO, | |
| repo_type="dataset", | |
| commit_message=f"Manual sync: {datetime.now().isoformat()}", | |
| token=HF_TOKEN | |
| ) | |
| return "Synced workspace with Hugging Face dataset", "", None | |
| except Exception as e: | |
| return f"Sync failed: {e}", "", None | |
| # In the run_command function, add this case: | |
| elif raw_cmd == "mkdir" and args: | |
| if not args: | |
| return f"$ {cmd}\n\nNo directory specified", "", None | |
| target = expand_path(args) | |
| os.makedirs(target, exist_ok=True) | |
| return f"$ {cmd}\n\nCreated directory: {target}", "", None | |
| # === View logs === | |
| elif raw_cmd == "logs" and args: | |
| app_name = args | |
| logs = view_logs(app_name) | |
| return f"$ {cmd}\n\nLogs for {app_name}:\n{logs}", "", None | |
| # In run_command function, add: | |
| elif raw_cmd == "testtemplate": | |
| """Test if template can be loaded manually""" | |
| try: | |
| template_path = os.path.join(VIRTUAL_HOME, 'templates', 'index.html') | |
| if os.path.exists(template_path): | |
| with open(template_path, 'r') as f: | |
| content = f.read() | |
| return f"$ {cmd}\n\nTemplate found! Content length: {len(content)} characters", "", None | |
| else: | |
| return f"$ {cmd}\n\nTemplate not found at: {template_path}", "", None | |
| except Exception as e: | |
| return f"$ {cmd}\n\nError testing template: {e}", "", None | |
| # === Change directory (cd) === | |
| elif raw_cmd == "cd": | |
| if not args: | |
| return f"$ {cmd}\n\nNo directory specified", "", None | |
| target = expand_path(args) | |
| if os.path.isdir(target): | |
| current_dir = os.path.abspath(target) | |
| return f"$ {cmd}\n\nChanged directory to: {current_dir}", "", None | |
| return f"$ {cmd}\n\nDirectory not found: {args}", "", None | |
| # === Create file/folder === | |
| elif raw_cmd == "create": | |
| if not args: | |
| return f"$ {cmd}\n\nNo path specified", "", None | |
| target = expand_path(args) | |
| if args.endswith("/"): | |
| os.makedirs(target, exist_ok=True) | |
| return f"$ {cmd}\n\nCreated folder: {target}", "", None | |
| else: | |
| os.makedirs(os.path.dirname(target), exist_ok=True) | |
| with open(target, "w", encoding="utf-8") as f: | |
| pass | |
| return f"$ {cmd}\n\nCreated file: {target}", "", None | |
| # === Delete file/folder === | |
| elif raw_cmd == "delete": | |
| if not args: | |
| return f"$ {cmd}\n\nNo path specified", "", None | |
| target = expand_path(args) | |
| if os.path.isfile(target): | |
| os.remove(target) | |
| return f"$ {cmd}\n\nDeleted file: {target}", "", None | |
| elif os.path.isdir(target): | |
| shutil.rmtree(target, ignore_errors=True) | |
| return f"$ {cmd}\n\nDeleted folder: {target}", "", None | |
| else: | |
| return f"$ {cmd}\n\nNot found: {target}", "", None | |
| # === Open file/folder === | |
| elif raw_cmd == "open": | |
| if not args: | |
| return f"$ {cmd}\n\nNo path specified", "", None | |
| target = expand_path(args) | |
| if not os.path.exists(target): | |
| return f"$ {cmd}\n\nFile not found: {args}", "", None | |
| if os.path.isdir(target): | |
| items = os.listdir(target) | |
| return f"$ {cmd}\n\n" + ("\n".join(items) if items else "(empty folder)"), "", None | |
| try: | |
| with open(target, "r", encoding="utf-8") as f: | |
| content = f.read() | |
| global open_file_path | |
| open_file_path = target | |
| return f"$ {cmd}\n\nOpened file: {target}", content, target | |
| except Exception as e: | |
| return f"$ {cmd}\n\nError reading file: {e}", "", None | |
| # === List files === | |
| elif raw_cmd == "ls": | |
| items = os.listdir(current_dir) | |
| return f"$ {cmd}\n\n" + ("\n".join(items) if items else "(empty directory)"), "", None | |
| # === Print working directory === | |
| elif raw_cmd == "pwd": | |
| return f"$ {cmd}\n\n{current_dir}", "", None | |
| # === Clear screen === | |
| elif raw_cmd == "clear": | |
| return "", "", None | |
| # === Normal shell command === | |
| try: | |
| cmd_parts = ['ls', '-l'] if raw_cmd == "ls" and not args else cmd.split() | |
| running_process = subprocess.Popen( | |
| cmd_parts, | |
| shell=False, # Avoid shell=True for safety | |
| cwd=current_dir, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True | |
| ) | |
| stdout, stderr = running_process.communicate() | |
| running_process = None | |
| output = stdout + stderr | |
| return f"$ {cmd}\n\n{output if output else '(no output)'}", "", None | |
| except FileNotFoundError: | |
| return f"$ {cmd}\n\nCommand not found: {raw_cmd}", "", None | |
| except Exception as e: | |
| return f"$ {cmd}\n\nError: {e}", "", None | |
| def save_file(new_content: str, file_path: str): | |
| global open_file_path | |
| if not file_path: | |
| return "No file currently open.", "" | |
| try: | |
| with open(file_path, "w", encoding="utf-8") as f: | |
| f.write(new_content) | |
| # Always sync with Hugging Face dataset | |
| if HF_TOKEN and api: | |
| try: | |
| # Upload the specific file that was modified | |
| api.upload_file( | |
| path_or_fileobj=file_path, | |
| path_in_repo=file_path.replace(WORKDIR + "/", ""), | |
| repo_id=DATASET_REPO, | |
| repo_type="dataset", | |
| commit_message=f"Updated file: {os.path.basename(file_path)}", | |
| token=HF_TOKEN | |
| ) | |
| return f"Saved file: {file_path} (synced with dataset)", new_content | |
| except Exception as e: | |
| return f"Saved file: {file_path} (local only - upload failed: {e})", new_content | |
| else: | |
| return f"Saved file: {file_path} (local only - no HF token)", new_content | |
| except Exception as e: | |
| return f"Error saving file: {e}", new_content | |
| # === Gradio UI === | |
| with gr.Blocks() as demo: | |
| gr.Markdown("## 🖥️ Secure Terminal + Editor + Flask Apps") | |
| gr.Markdown("Run Flask apps or regular Python applications") | |
| with gr.Row(): | |
| cmd = gr.Textbox( | |
| label="Command", | |
| placeholder="Examples:\n" | |
| "flaskrun myapp '@app.route(\"/hello\")\\ndef hello(): return \"Hello World!\"'\n" | |
| "python3 app.py # Auto-detects Flask apps\n" | |
| "python3 script.py # Runs regular Python scripts\n" | |
| "flasklist\n" | |
| "logs myapp\n" | |
| "cd Alex/project\n" | |
| "create app.py\n" | |
| "open app.py\n" | |
| "ls\n" | |
| "pwd\n" | |
| "clear\n" | |
| "close" | |
| ) | |
| run_btn = gr.Button("Run") | |
| out = gr.Textbox(label="Output", lines=15) | |
| editor = gr.Textbox(label="File Editor", lines=20) | |
| save_btn = gr.Button("Save File") | |
| hidden_file = gr.Textbox(visible=False) | |
| # Attach events | |
| run_btn.click(run_command, inputs=cmd, outputs=[out, editor, hidden_file]) | |
| save_btn.click(save_file, inputs=[editor, hidden_file], outputs=[out, editor]) | |
| # For Hugging Face Spaces, we need to set up the proxy routes | |
| if "hf.space" in public_url: | |
| # This will be handled by Hugging Face's built-in proxy | |
| pass | |
| else: | |
| # For local development, we can set up a simple proxy | |
| async def proxy_to_flask(port: int, path: str): | |
| import httpx | |
| try: | |
| async with httpx.AsyncClient() as client: | |
| response = await client.get(f"http://localhost:{port}/{path}") | |
| return response.content | |
| except: | |
| return "Flask app not available" | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=7860) |