Buckets:
ktongue/docker_container / .vscode-server /extensions /ms-python.debugpy-2025.18.0 /bundled /libs /debugpy /server /api.py
| # Copyright (c) Microsoft Corporation. All rights reserved. | |
| # Licensed under the MIT License. See LICENSE in the project root | |
| # for license information. | |
| import codecs | |
| import os | |
| import pydevd | |
| import socket | |
| import sys | |
| import threading | |
| import debugpy | |
| from debugpy import adapter | |
| from debugpy.common import json, log, sockets | |
| from _pydevd_bundle.pydevd_constants import get_global_debugger | |
| from pydevd_file_utils import absolute_path | |
| from debugpy.common.util import hide_debugpy_internals | |
| _tls = threading.local() | |
| # TODO: "gevent", if possible. | |
| _config = { | |
| "qt": "none", | |
| "subProcess": True, | |
| "python": sys.executable, | |
| "pythonEnv": {}, | |
| } | |
| _config_valid_values = { | |
| # If property is not listed here, any value is considered valid, so long as | |
| # its type matches that of the default value in _config. | |
| "qt": ["auto", "none", "pyside", "pyside2", "pyqt4", "pyqt5"], | |
| } | |
| # This must be a global to prevent it from being garbage collected and triggering | |
| # https://bugs.python.org/issue37380. | |
| _adapter_process = None | |
| def _settrace(*args, **kwargs): | |
| log.debug("pydevd.settrace(*{0!r}, **{1!r})", args, kwargs) | |
| # The stdin in notification is not acted upon in debugpy, so, disable it. | |
| kwargs.setdefault("notify_stdin", False) | |
| try: | |
| pydevd.settrace(*args, **kwargs) | |
| except Exception: | |
| raise | |
| def ensure_logging(): | |
| """Starts logging to log.log_dir, if it hasn't already been done.""" | |
| if ensure_logging.ensured: | |
| return | |
| ensure_logging.ensured = True | |
| log.to_file(prefix="debugpy.server") | |
| log.describe_environment("Initial environment:") | |
| if log.log_dir is not None: | |
| pydevd.log_to(log.log_dir + "/debugpy.pydevd.log") | |
| ensure_logging.ensured = False | |
| def log_to(path): | |
| if ensure_logging.ensured: | |
| raise RuntimeError("logging has already begun") | |
| log.debug("log_to{0!r}", (path,)) | |
| if path is sys.stderr: | |
| log.stderr.levels |= set(log.LEVELS) | |
| else: | |
| log.log_dir = path | |
| def configure(properties=None, **kwargs): | |
| ensure_logging() | |
| log.debug("configure{0!r}", (properties, kwargs)) | |
| if properties is None: | |
| properties = kwargs | |
| else: | |
| properties = dict(properties) | |
| properties.update(kwargs) | |
| for k, v in properties.items(): | |
| if k not in _config: | |
| raise ValueError("Unknown property {0!r}".format(k)) | |
| expected_type = type(_config[k]) | |
| if type(v) is not expected_type: | |
| raise ValueError("{0!r} must be a {1}".format(k, expected_type.__name__)) | |
| valid_values = _config_valid_values.get(k) | |
| if (valid_values is not None) and (v not in valid_values): | |
| raise ValueError("{0!r} must be one of: {1!r}".format(k, valid_values)) | |
| _config[k] = v | |
| def _starts_debugging(func): | |
| def debug(address, **kwargs): | |
| try: | |
| _, port = address | |
| except Exception: | |
| port = address | |
| localhost = sockets.get_default_localhost() | |
| address = (localhost, port) | |
| try: | |
| port.__index__() # ensure it's int-like | |
| except Exception: | |
| raise ValueError("expected port or (host, port)") | |
| if not (0 <= port < 2**16): | |
| raise ValueError("invalid port number") | |
| ensure_logging() | |
| log.debug("{0}({1!r}, **{2!r})", func.__name__, address, kwargs) | |
| log.info("Initial debug configuration: {0}", json.repr(_config)) | |
| qt_mode = _config.get("qt", "none") | |
| if qt_mode != "none": | |
| pydevd.enable_qt_support(qt_mode) | |
| settrace_kwargs = { | |
| "suspend": False, | |
| "patch_multiprocessing": _config.get("subProcess", True), | |
| } | |
| if hide_debugpy_internals(): | |
| debugpy_path = os.path.dirname(absolute_path(debugpy.__file__)) | |
| settrace_kwargs["dont_trace_start_patterns"] = (debugpy_path,) | |
| settrace_kwargs["dont_trace_end_patterns"] = (str("debugpy_launcher.py"),) | |
| try: | |
| return func(address, settrace_kwargs, **kwargs) | |
| except Exception: | |
| log.reraise_exception("{0}() failed:", func.__name__, level="info") | |
| return debug | |
| def listen(address, settrace_kwargs, in_process_debug_adapter=False): | |
| # Errors below are logged with level="info", because the caller might be catching | |
| # and handling exceptions, and we don't want to spam their stderr unnecessarily. | |
| if listen.called: | |
| # Multiple calls to listen() cause the debuggee to hang | |
| raise RuntimeError("debugpy.listen() has already been called on this process") | |
| host, port = address | |
| if in_process_debug_adapter: | |
| log.info("Listening: pydevd without debugpy adapter: {0}:{1}", host, port) | |
| settrace_kwargs["patch_multiprocessing"] = False | |
| _settrace( | |
| host=host, | |
| port=port, | |
| wait_for_ready_to_run=False, | |
| block_until_connected=False, | |
| **settrace_kwargs | |
| ) | |
| return | |
| import subprocess | |
| server_access_token = codecs.encode(os.urandom(32), "hex").decode("ascii") | |
| try: | |
| localhost = sockets.get_default_localhost() | |
| endpoints_listener = sockets.create_server(localhost, 0, timeout=30) | |
| except Exception as exc: | |
| log.swallow_exception("Can't listen for adapter endpoints:") | |
| raise RuntimeError("can't listen for adapter endpoints: " + str(exc)) | |
| try: | |
| endpoints_host, endpoints_port = sockets.get_address(endpoints_listener) | |
| log.info( | |
| "Waiting for adapter endpoints on {0}:{1}...", | |
| endpoints_host, | |
| endpoints_port, | |
| ) | |
| host, port = address | |
| adapter_args = [ | |
| _config.get("python", sys.executable), | |
| os.path.dirname(adapter.__file__), | |
| "--for-server", | |
| str(endpoints_port), | |
| "--host", | |
| host, | |
| "--port", | |
| str(port), | |
| "--server-access-token", | |
| server_access_token, | |
| ] | |
| if log.log_dir is not None: | |
| adapter_args += ["--log-dir", log.log_dir] | |
| log.info("debugpy.listen() spawning adapter: {0}", json.repr(adapter_args)) | |
| # On Windows, detach the adapter from our console, if any, so that it doesn't | |
| # receive Ctrl+C from it, and doesn't keep it open once we exit. | |
| creationflags = 0 | |
| if sys.platform == "win32": | |
| creationflags |= 0x08000000 # CREATE_NO_WINDOW | |
| creationflags |= 0x00000200 # CREATE_NEW_PROCESS_GROUP | |
| # On embedded applications, environment variables might not contain | |
| # Python environment settings. | |
| python_env = _config.get("pythonEnv") | |
| if not bool(python_env): | |
| python_env = None | |
| # Adapter will outlive this process, so we shouldn't wait for it. However, we | |
| # need to ensure that the Popen instance for it doesn't get garbage-collected | |
| # by holding a reference to it in a non-local variable, to avoid triggering | |
| # https://bugs.python.org/issue37380. | |
| try: | |
| global _adapter_process | |
| _adapter_process = subprocess.Popen( | |
| adapter_args, | |
| close_fds=True, | |
| creationflags=creationflags, | |
| env=python_env, | |
| ) | |
| if os.name == "posix": | |
| # It's going to fork again to daemonize, so we need to wait on it to | |
| # clean it up properly. | |
| _adapter_process.wait() | |
| else: | |
| # Suppress misleading warning about child process still being alive when | |
| # this process exits (https://bugs.python.org/issue38890). | |
| _adapter_process.returncode = 0 | |
| pydevd.add_dont_terminate_child_pid(_adapter_process.pid) | |
| except Exception as exc: | |
| log.swallow_exception("Error spawning debug adapter:", level="info") | |
| raise RuntimeError("error spawning debug adapter: " + str(exc)) | |
| try: | |
| sock, _ = endpoints_listener.accept() | |
| try: | |
| sock.settimeout(None) | |
| sock_io = sock.makefile("rb", 0) | |
| try: | |
| endpoints = json.loads(sock_io.read().decode("utf-8")) | |
| finally: | |
| sock_io.close() | |
| finally: | |
| sockets.close_socket(sock) | |
| except socket.timeout: | |
| log.swallow_exception( | |
| "Timed out waiting for adapter to connect:", level="info" | |
| ) | |
| raise RuntimeError("timed out waiting for adapter to connect") | |
| except Exception as exc: | |
| log.swallow_exception("Error retrieving adapter endpoints:", level="info") | |
| raise RuntimeError("error retrieving adapter endpoints: " + str(exc)) | |
| finally: | |
| endpoints_listener.close() | |
| log.info("Endpoints received from adapter: {0}", json.repr(endpoints)) | |
| if "error" in endpoints: | |
| raise RuntimeError(str(endpoints["error"])) | |
| try: | |
| server_host = str(endpoints["server"]["host"]) | |
| server_port = int(endpoints["server"]["port"]) | |
| client_host = str(endpoints["client"]["host"]) | |
| client_port = int(endpoints["client"]["port"]) | |
| except Exception as exc: | |
| log.swallow_exception( | |
| "Error parsing adapter endpoints:\n{0}\n", | |
| json.repr(endpoints), | |
| level="info", | |
| ) | |
| raise RuntimeError("error parsing adapter endpoints: " + str(exc)) | |
| log.info( | |
| "Adapter is accepting incoming client connections on {0}:{1}", | |
| client_host, | |
| client_port, | |
| ) | |
| _settrace( | |
| host=server_host, | |
| port=server_port, | |
| wait_for_ready_to_run=False, | |
| block_until_connected=True, | |
| access_token=server_access_token, | |
| **settrace_kwargs | |
| ) | |
| log.info("pydevd is connected to adapter at {0}:{1}", server_host, server_port) | |
| listen.called = True | |
| return client_host, client_port | |
| listen.called = False | |
| def connect(address, settrace_kwargs, access_token=None, parent_session_pid=None): | |
| host, port = address | |
| _settrace(host=host, port=port, client_access_token=access_token, ppid=parent_session_pid or 0, **settrace_kwargs) | |
| class wait_for_client: | |
| def __call__(self): | |
| ensure_logging() | |
| log.debug("wait_for_client()") | |
| pydb = get_global_debugger() | |
| if pydb is None: | |
| raise RuntimeError("listen() or connect() must be called first") | |
| cancel_event = threading.Event() | |
| self.cancel = cancel_event.set | |
| pydevd._wait_for_attach(cancel=cancel_event) | |
| def cancel(): | |
| raise RuntimeError("wait_for_client() must be called first") | |
| wait_for_client = wait_for_client() | |
| def is_client_connected(): | |
| return pydevd._is_attached() | |
| def breakpoint(): | |
| ensure_logging() | |
| if not is_client_connected(): | |
| log.info("breakpoint() ignored - debugger not attached") | |
| return | |
| log.debug("breakpoint()") | |
| # Get the first frame in the stack that's not an internal frame. | |
| pydb = get_global_debugger() | |
| stop_at_frame = sys._getframe().f_back | |
| while ( | |
| stop_at_frame is not None | |
| and pydb.get_file_type(stop_at_frame) == pydb.PYDEV_FILE | |
| ): | |
| stop_at_frame = stop_at_frame.f_back | |
| _settrace( | |
| suspend=True, | |
| trace_only_current_thread=True, | |
| patch_multiprocessing=False, | |
| stop_at_frame=stop_at_frame, | |
| ) | |
| stop_at_frame = None | |
| def debug_this_thread(): | |
| ensure_logging() | |
| log.debug("debug_this_thread()") | |
| _settrace(suspend=False) | |
| def trace_this_thread(should_trace): | |
| ensure_logging() | |
| log.debug("trace_this_thread({0!r})", should_trace) | |
| pydb = get_global_debugger() | |
| if should_trace: | |
| pydb.enable_tracing() | |
| else: | |
| pydb.disable_tracing() | |
Xet Storage Details
- Size:
- 12.3 kB
- Xet hash:
- d1172af119c8fc5741b9184e6c4948020718e457385f5206195c11a2aa9333e0
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.