import os, json, time, base64, uuid, threading, queue, io from flask import Flask, request, jsonify, Response from flask_cors import CORS import jupyter_client app = Flask(__name__) CORS(app, resources={r"/*": {"origins": "*"}}) VALID_API_KEY = os.getenv("SANDBOX_API_KEY", "") def check_auth(): if not VALID_API_KEY: return True key = ( request.headers.get("X-API-KEY", "") or request.headers.get("Authorization", "").replace("Bearer ", "") ) return key == VALID_API_KEY sessions = {} sessions_lock = threading.Lock() def get_or_create_kernel(session_id="default"): with sessions_lock: if session_id not in sessions: km = jupyter_client.KernelManager(kernel_name='python3') km.start_kernel() kc = km.client() kc.start_channels() kc.wait_for_ready(timeout=30) sessions[session_id] = { "km": km, "kc": kc, "created_at": time.time(), "last_used": time.time() } else: sessions[session_id]["last_used"] = time.time() return sessions[session_id]["kc"] def kill_kernel(session_id): with sessions_lock: if session_id in sessions: try: sessions[session_id]["kc"].stop_channels() sessions[session_id]["km"].shutdown_kernel(now=True) except: pass del sessions[session_id] def execute_code(kc, code, timeout=60): msg_id = kc.execute(code) stdout = [] stderr = [] result = [] artifacts = [] error = None deadline = time.time() + timeout while time.time() < deadline: try: msg = kc.get_iopub_msg(timeout=1) msg_type = msg['msg_type'] content = msg['content'] if msg_type == 'stream': if content['name'] == 'stdout': stdout.append(content['text']) else: stderr.append(content['text']) elif msg_type == 'execute_result': result.append(content['data'].get('text/plain', '')) if 'text/html' in content['data']: result.append(content['data']['text/html']) elif msg_type == 'display_data': data = content.get('data', {}) if 'image/png' in data: artifacts.append({ "type": "image/png", "data": data['image/png'], "filename": f"artifact_{len(artifacts)+1}.png" }) if 'image/jpeg' in data: artifacts.append({ "type": "image/jpeg", "data": data['image/jpeg'], "filename": f"artifact_{len(artifacts)+1}.jpg" }) if 'text/plain' in data: result.append(data['text/plain']) elif msg_type == 'error': error = { "ename": content.get('ename', 'Error'), "evalue": content.get('evalue', ''), "traceback": "\n".join( line.encode('ascii', errors='ignore').decode() for line in content.get('traceback', []) ) } elif msg_type == 'status' and content.get('execution_state') == 'idle': break except queue.Empty: continue except Exception: break return { "stdout": "".join(stdout), "stderr": "".join(stderr), "result": "\n".join(result), "artifacts": artifacts, "error": error } # ── Cleanup idle kernels ────────────────────────────────────────────────────── def cleanup_loop(): while True: time.sleep(1800) now = time.time() to_kill = [] with sessions_lock: for sid, data in sessions.items(): if now - data["last_used"] > 3600: to_kill.append(sid) for sid in to_kill: kill_kernel(sid) threading.Thread(target=cleanup_loop, daemon=True).start() # ── Endpoints ───────────────────────────────────────────────────────────────── @app.route('/') def status(): return jsonify({ "status": "online", "active_sessions": len(sessions), "message": "Obsidian Jupyter Kernel Sandbox" }) @app.route('/execute', methods=['POST']) def execute(): if not check_auth(): return jsonify({"output": "Unauthorized"}), 401 data = request.json or {} code = data.get("code", "") session_id = data.get("session_id", "default") timeout = data.get("timeout", 60) if not code: return jsonify({"output": "No code provided"}), 400 try: kc = get_or_create_kernel(session_id) result = execute_code(kc, code, timeout=timeout) parts = [] if result["stdout"]: parts.append(result["stdout"]) if result["result"]: parts.append(result["result"]) if result["error"]: parts.append( f"Error: {result['error']['ename']}: {result['error']['evalue']}\n" f"{result['error']['traceback']}" ) elif result["stderr"] and not result["error"]: parts.append(result["stderr"]) return jsonify({ "output": "\n".join(parts) or "Executed successfully (no output)", "artifacts": result["artifacts"], "error": result["error"] }) except Exception as e: return jsonify({"output": f"Kernel Error: {str(e)}"}), 500 @app.route('/install', methods=['POST']) def install(): if not check_auth(): return jsonify({"error": "Unauthorized"}), 401 data = request.json or {} package = data.get("package", "") session_id = data.get("session_id", "default") if not package: return jsonify({"error": "No package specified"}), 400 kc = get_or_create_kernel(session_id) result = execute_code( kc, f"import subprocess; subprocess.run(['pip', 'install', '{package}', '-q'])", timeout=120 ) return jsonify({"message": f"{package} installed", "output": result["stdout"]}) @app.route('/sessions', methods=['GET']) def list_sessions(): if not check_auth(): return jsonify({"error": "Unauthorized"}), 401 with sessions_lock: return jsonify({ "sessions": [ {"id": sid, "created_at": d["created_at"], "last_used": d["last_used"]} for sid, d in sessions.items() ] }) @app.route('/sessions/', methods=['DELETE']) def delete_session(session_id): if not check_auth(): return jsonify({"error": "Unauthorized"}), 401 kill_kernel(session_id) return jsonify({"message": f"Session {session_id} terminated"}) @app.route('/sessions', methods=['POST']) def create_session(): if not check_auth(): return jsonify({"error": "Unauthorized"}), 401 session_id = str(uuid.uuid4()) get_or_create_kernel(session_id) return jsonify({"session_id": session_id}) @app.route('/upload', methods=['POST']) def upload_file(): if not check_auth(): return jsonify({"error": "Unauthorized"}), 401 data = request.json or {} filename = data.get("filename", "uploaded_file") file_b64 = data.get("base64", "") session_id = data.get("session_id", "default") file_bytes = base64.b64decode(file_b64) kc = get_or_create_kernel(session_id) encoded = base64.b64encode(file_bytes).decode() code = f""" import base64 with open('/tmp/{filename}', 'wb') as f: f.write(base64.b64decode('{encoded}')) print('File written to /tmp/{filename}') """ result = execute_code(kc, code) return jsonify({"output": result["stdout"], "path": f"/tmp/{filename}"}) # 1. Terminal command execution @app.route('/terminal', methods=['POST']) def terminal(): if not check_auth(): return jsonify({"error": "Unauthorized"}), 401 data = request.json or {} cmd = data.get("command", "") session_id = data.get("session_id", "default") if not cmd: return jsonify({"error": "No command"}), 400 safe_code = f""" import subprocess, shlex result = subprocess.run( shlex.split({repr(cmd)}), capture_output=True, text=True, timeout=30, cwd='/tmp' ) print(result.stdout) if result.stderr: print("STDERR:", result.stderr) print("Exit code:", result.returncode) """ kc = get_or_create_kernel(session_id) return jsonify(execute_code(kc, safe_code)) # 2. File download — return file from /tmp/ as base64 @app.route('/download', methods=['POST']) def download_file(): if not check_auth(): return jsonify({"error": "Unauthorized"}), 401 data = request.json or {} filename = data.get("filename", "") session_id = data.get("session_id", "default") code = f""" import base64, os path = '/tmp/{filename}' if os.path.exists(path): with open(path, 'rb') as f: b64 = base64.b64encode(f.read()).decode() print(f"FILE_B64_START:{{b64}}:FILE_B64_END") print(f"Size: {{os.path.getsize(path)}} bytes") else: print("File not found:", path) """ kc = get_or_create_kernel(session_id) result = execute_code(kc, code) # Extract b64 from output import re m = re.search(r'FILE_B64_START:(.+?):FILE_B64_END', result.get("stdout","")) if m: return jsonify({"filename": filename, "base64": m.group(1), "found": True}) return jsonify({"found": False, "output": result.get("stdout","")}) # 3. List /tmp files @app.route('/files', methods=['GET', 'POST']) def list_files(): if not check_auth(): return jsonify({"error": "Unauthorized"}), 401 data = (request.json or {}) if request.method == 'POST' else {} session_id = data.get("session_id", "default") code = """ import os, json files = [] for f in os.listdir('/tmp'): path = f'/tmp/{f}' if os.path.isfile(path): files.append({"name": f, "size": os.path.getsize(path), "modified": os.path.getmtime(path)}) print(json.dumps(sorted(files, key=lambda x: x['modified'], reverse=True))) """ kc = get_or_create_kernel(session_id) result = execute_code(kc, code) try: return jsonify({"files": __import__('json').loads(result.get("stdout","[]"))}) except Exception: return jsonify({"files": [], "raw": result.get("stdout","")}) # 4. Audio output endpoint @app.route('/audio', methods=['POST']) def generate_audio(): """Generate audio via TTS or synthesis in the kernel.""" if not check_auth(): return jsonify({"error": "Unauthorized"}), 401 data = request.json or {} session_id = data.get("session_id", "default") code = data.get("code", "") kc = get_or_create_kernel(session_id) result = execute_code(kc, code, timeout=120) # Collect audio artifacts specifically audio_artifacts = [a for a in result.get("artifacts", []) if a.get("type","").startswith("audio/")] return jsonify({**result, "audio_artifacts": audio_artifacts}) # 5. Browser actions endpoint (structured) @app.route('/browser', methods=['POST']) def browser_action(): """ Execute structured browser actions without raw code. action: screenshot | navigate | click | type | extract | pdf """ if not check_auth(): return jsonify({"error": "Unauthorized"}), 401 data = request.json or {} action = data.get("action", "screenshot") url = data.get("url", "") session_id = data.get("session_id", "browser") if action == "screenshot": code = f""" from playwright.sync_api import sync_playwright with sync_playwright() as p: browser = p.chromium.launch(args=['--no-sandbox']) page = browser.new_page(viewport={{'width':1280,'height':720}}) page.goto('{url}', wait_until='networkidle', timeout=30000) screenshot = page.screenshot(full_page={data.get('full_page', False)}) title = page.title() browser.close() import IPython.display as d d.display(d.Image(data=screenshot)) print(f"URL: {url}") print(f"Title: {{title}}") """ elif action == "extract": selector = data.get("selector", "body") code = f""" from playwright.sync_api import sync_playwright with sync_playwright() as p: browser = p.chromium.launch(args=['--no-sandbox']) page = browser.new_page() page.goto('{url}', wait_until='networkidle', timeout=30000) text = page.inner_text('{selector}') browser.close() print(text[:5000]) """ elif action == "pdf": code = f""" from playwright.sync_api import sync_playwright with sync_playwright() as p: browser = p.chromium.launch(args=['--no-sandbox']) page = browser.new_page() page.goto('{url}', wait_until='networkidle', timeout=30000) pdf_bytes = page.pdf(format='A4') browser.close() import base64 print("PDF_B64:" + base64.b64encode(pdf_bytes).decode()) print(f"PDF size: {{len(pdf_bytes)}} bytes") """ else: return jsonify({"error": f"Unknown action: {action}"}), 400 kc = get_or_create_kernel(session_id) return jsonify(execute_code(kc, code, timeout=60)) # 6. Health endpoint with capabilities list @app.route('/capabilities') def capabilities(): return jsonify({ "python": True, "playwright": True, "packages": [ "pandas","numpy","matplotlib","seaborn","scipy","plotly", "requests","beautifulsoup4","pillow","openpyxl", "PyPDF2","python-docx","fpdf2","reportlab","kaleido" ], "endpoints": [ "/execute","/install","/upload","/download", "/terminal","/files","/browser","/audio", "/sessions","/capabilities" ] }) # 7. Interrupt running kernel (for long loops) @app.route('/interrupt/', methods=['POST']) def interrupt_kernel(session_id): if not check_auth(): return jsonify({"error": "Unauthorized"}), 401 with sessions_lock: if session_id not in sessions: return jsonify({"error": "Session not found"}), 404 try: sessions[session_id]["km"].interrupt_kernel() return jsonify({"message": f"Kernel {session_id} interrupted"}) except Exception as e: return jsonify({"error": str(e)}), 500 if __name__ == "__main__": print("[SANDBOX] Pre-warming default kernel...") get_or_create_kernel("default") print("[SANDBOX] Ready on port 7860") app.run(host="0.0.0.0", port=7860)