Spaces:
Runtime error
Runtime error
| """Event loop integration for the ZeroMQ-based kernels.""" | |
| # Copyright (c) IPython Development Team. | |
| # Distributed under the terms of the Modified BSD License. | |
| import os | |
| import platform | |
| import sys | |
| from functools import partial | |
| import zmq | |
| from packaging.version import Version as V | |
| from traitlets.config.application import Application | |
| def _use_appnope(): | |
| """Should we use appnope for dealing with OS X app nap? | |
| Checks if we are on OS X 10.9 or greater. | |
| """ | |
| return sys.platform == "darwin" and V(platform.mac_ver()[0]) >= V("10.9") | |
| # mapping of keys to loop functions | |
| loop_map = { | |
| "inline": None, | |
| "nbagg": None, | |
| "webagg": None, | |
| "notebook": None, | |
| "ipympl": None, | |
| "widget": None, | |
| None: None, | |
| } | |
| def register_integration(*toolkitnames): | |
| """Decorator to register an event loop to integrate with the IPython kernel | |
| The decorator takes names to register the event loop as for the %gui magic. | |
| You can provide alternative names for the same toolkit. | |
| The decorated function should take a single argument, the IPython kernel | |
| instance, arrange for the event loop to yield the asyncio loop when a | |
| message is received by the main shell zmq stream or at least every | |
| ``kernel._poll_interval`` seconds, and start the event loop. | |
| :mod:`ipykernel.eventloops` provides and registers such functions | |
| for a few common event loops. | |
| """ | |
| def decorator(func): | |
| """Integration registration decorator.""" | |
| for name in toolkitnames: | |
| loop_map[name] = func | |
| func.exit_hook = lambda kernel: None # noqa: ARG005 | |
| def exit_decorator(exit_func): | |
| """@func.exit is now a decorator | |
| to register a function to be called on exit | |
| """ | |
| func.exit_hook = exit_func | |
| return exit_func | |
| func.exit = exit_decorator | |
| return func | |
| return decorator | |
| def get_shell_stream(kernel): | |
| # Return the zmq stream that receives messages for the main shell. | |
| if kernel._supports_kernel_subshells: | |
| manager = kernel.shell_channel_thread.manager | |
| socket_pair = manager.get_shell_channel_to_subshell_pair(None) | |
| return socket_pair.to_stream | |
| return kernel.shell_stream | |
| def _notify_stream_qt(kernel): | |
| import operator | |
| from functools import lru_cache | |
| from IPython.external.qt_for_kernel import QtCore | |
| try: | |
| from IPython.external.qt_for_kernel import enum_helper | |
| except ImportError: | |
| def enum_helper(name): | |
| return operator.attrgetter(name.rpartition(".")[0])(sys.modules[QtCore.__package__]) | |
| def exit_loop(): | |
| """fall back to main loop""" | |
| kernel._qt_notifier.setEnabled(False) | |
| kernel.app.qt_event_loop.quit() | |
| def process_stream_events_wrap(shell_stream, *args, **kwargs): | |
| """fall back to main loop when there's a socket event""" | |
| # call flush to ensure that the stream doesn't lose events | |
| # due to our consuming of the edge-triggered FD | |
| # flush returns the number of events consumed. | |
| # if there were any, wake it up | |
| if shell_stream.flush(limit=1): | |
| exit_loop() | |
| shell_stream = get_shell_stream(kernel) | |
| process_stream_events = partial(process_stream_events_wrap, shell_stream) | |
| if not hasattr(kernel, "_qt_notifier"): | |
| fd = shell_stream.getsockopt(zmq.FD) | |
| kernel._qt_notifier = QtCore.QSocketNotifier( | |
| fd, enum_helper("QtCore.QSocketNotifier.Type").Read, kernel.app.qt_event_loop | |
| ) | |
| kernel._qt_notifier.activated.connect(process_stream_events) | |
| else: | |
| kernel._qt_notifier.setEnabled(True) | |
| # allow for scheduling exits from the loop in case a timeout needs to | |
| # be set from the kernel level | |
| def _schedule_exit(delay): | |
| """schedule fall back to main loop in [delay] seconds""" | |
| # The signatures of QtCore.QTimer.singleShot are inconsistent between PySide and PyQt | |
| # if setting the TimerType, so we create a timer explicitly and store it | |
| # to avoid a memory leak. | |
| # PreciseTimer is needed so we exit after _at least_ the specified delay, not within 5% of it | |
| if not hasattr(kernel, "_qt_timer"): | |
| kernel._qt_timer = QtCore.QTimer(kernel.app) | |
| kernel._qt_timer.setSingleShot(True) | |
| kernel._qt_timer.setTimerType(enum_helper("QtCore.Qt.TimerType").PreciseTimer) | |
| kernel._qt_timer.timeout.connect(exit_loop) | |
| kernel._qt_timer.start(int(1000 * delay)) | |
| loop_qt._schedule_exit = _schedule_exit | |
| # there may already be unprocessed events waiting. | |
| # these events will not wake zmq's edge-triggered FD | |
| # since edge-triggered notification only occurs on new i/o activity. | |
| # process all the waiting events immediately | |
| # so we start in a clean state ensuring that any new i/o events will notify. | |
| # schedule first call on the eventloop as soon as it's running, | |
| # so we don't block here processing events | |
| QtCore.QTimer.singleShot(0, process_stream_events) | |
| def loop_qt(kernel): | |
| """Event loop for all supported versions of Qt.""" | |
| _notify_stream_qt(kernel) # install hook to stop event loop. | |
| # Start the event loop. | |
| kernel.app._in_event_loop = True | |
| # `exec` blocks until there's ZMQ activity. | |
| el = kernel.app.qt_event_loop # for brevity | |
| el.exec() if hasattr(el, "exec") else el.exec_() | |
| kernel.app._in_event_loop = False | |
| # NOTE: To be removed in version 7 | |
| loop_qt5 = loop_qt | |
| # exit and watch are the same for qt 4 and 5 | |
| def loop_qt_exit(kernel): | |
| kernel.app.exit() | |
| def _loop_wx(app): | |
| """Inner-loop for running the Wx eventloop | |
| Pulled from guisupport.start_event_loop in IPython < 5.2, | |
| since IPython 5.2 only checks `get_ipython().active_eventloop` is defined, | |
| rather than if the eventloop is actually running. | |
| """ | |
| app._in_event_loop = True | |
| app.MainLoop() | |
| app._in_event_loop = False | |
| def loop_wx(kernel): | |
| """Start a kernel with wx event loop support.""" | |
| import wx | |
| # We have to put the wx.Timer in a wx.Frame for it to fire properly. | |
| # We make the Frame hidden when we create it in the main app below. | |
| class TimerFrame(wx.Frame): # type:ignore[misc] | |
| def __init__(self, kernel): | |
| self.kernel = kernel | |
| self.shell_stream = get_shell_stream(kernel) | |
| wx.Frame.__init__(self, None, -1) | |
| self.timer = wx.Timer(self) | |
| self.Bind(wx.EVT_CLOSE, self.on_exit) | |
| self.Bind(wx.EVT_TIMER, self.on_timer) | |
| # Units for the timer are in milliseconds | |
| self.timer.Start(int(1000 * self.kernel._poll_interval)) | |
| def wake(self): | |
| """wake from wx""" | |
| try: | |
| if self.shell_stream.flush(limit=1): | |
| self.kernel.app.ExitMainLoop() | |
| except Exception: | |
| pass | |
| def on_timer(self, event): | |
| self.wake() | |
| def on_exit(self, event): | |
| self.timer.Stop() | |
| self.wake() | |
| self.Destroy() | |
| # We need a custom wx.App to create our Frame subclass that has the | |
| # wx.Timer to defer back to the tornado event loop. | |
| class IPWxApp(wx.App): # type:ignore[misc] | |
| def OnInit(self): | |
| self.frame = TimerFrame(kernel) | |
| self.frame.Show(False) | |
| return True | |
| # The redirect=False here makes sure that wx doesn't replace | |
| # sys.stdout/stderr with its own classes. | |
| if not (getattr(kernel, "app", None) and isinstance(kernel.app, wx.App)): | |
| kernel.app = IPWxApp(redirect=False) | |
| # The import of wx on Linux sets the handler for signal.SIGINT | |
| # to 0. This is a bug in wx or gtk. We fix by just setting it | |
| # back to the Python default. | |
| import signal | |
| if not callable(signal.getsignal(signal.SIGINT)): | |
| signal.signal(signal.SIGINT, signal.default_int_handler) | |
| _loop_wx(kernel.app) | |
| def loop_wx_exit(kernel): | |
| """Exit the wx loop.""" | |
| import wx | |
| wx.Exit() | |
| def loop_tk(kernel): | |
| """Start a kernel with the Tk event loop.""" | |
| from tkinter import READABLE, Tk | |
| app = Tk() | |
| # Capability detection: | |
| # per https://docs.python.org/3/library/tkinter.html#file-handlers | |
| # file handlers are not available on Windows | |
| if hasattr(app, "createfilehandler"): | |
| # A basic wrapper for structural similarity with the Windows version | |
| class BasicAppWrapper: | |
| def __init__(self, app): | |
| self.app = app | |
| self.app.withdraw() | |
| def exit_loop(): | |
| """fall back to main loop""" | |
| app.tk.deletefilehandler(shell_stream.getsockopt(zmq.FD)) | |
| app.quit() | |
| app.destroy() | |
| del kernel.app_wrapper | |
| def process_stream_events_wrap(shell_stream, *a, **kw): | |
| """fall back to main loop when there's a socket event""" | |
| if shell_stream.flush(limit=1): | |
| exit_loop() | |
| # allow for scheduling exits from the loop in case a timeout needs to | |
| # be set from the kernel level | |
| def _schedule_exit(delay): | |
| """schedule fall back to main loop in [delay] seconds""" | |
| app.after(int(1000 * delay), exit_loop) | |
| loop_tk._schedule_exit = _schedule_exit | |
| # For Tkinter, we create a Tk object and call its withdraw method. | |
| kernel.app_wrapper = BasicAppWrapper(app) | |
| shell_stream = get_shell_stream(kernel) | |
| process_stream_events = partial(process_stream_events_wrap, shell_stream) | |
| app.tk.createfilehandler(shell_stream.getsockopt(zmq.FD), READABLE, process_stream_events) | |
| # schedule initial call after start | |
| app.after(0, process_stream_events) | |
| app.mainloop() | |
| else: | |
| import asyncio | |
| import nest_asyncio | |
| nest_asyncio.apply() | |
| # Tk uses milliseconds | |
| poll_interval = int(1000 * kernel._poll_interval) | |
| shell_stream = get_shell_stream(kernel) | |
| class TimedAppWrapper: | |
| def __init__(self, app, shell_stream): | |
| self.app = app | |
| self.shell_stream = shell_stream | |
| self.app.withdraw() | |
| async def func(self): | |
| self.shell_stream.flush(limit=1) | |
| def on_timer(self): | |
| loop = asyncio.get_event_loop() | |
| try: | |
| loop.run_until_complete(self.func()) | |
| except Exception: | |
| kernel.log.exception("Error in message handler") | |
| self.app.after(poll_interval, self.on_timer) | |
| def start(self): | |
| self.on_timer() # Call it once to get things going. | |
| self.app.mainloop() | |
| kernel.app_wrapper = TimedAppWrapper(app, shell_stream) | |
| kernel.app_wrapper.start() | |
| def loop_tk_exit(kernel): | |
| """Exit the tk loop.""" | |
| try: | |
| kernel.app_wrapper.app.quit() | |
| kernel.app_wrapper.app.destroy() | |
| del kernel.app_wrapper | |
| kernel.eventloop = None | |
| except (RuntimeError, AttributeError): | |
| pass | |
| def loop_gtk(kernel): | |
| """Start the kernel, coordinating with the GTK event loop""" | |
| from .gui.gtkembed import GTKEmbed | |
| gtk_kernel = GTKEmbed(kernel) | |
| gtk_kernel.start() | |
| kernel._gtk = gtk_kernel | |
| def loop_gtk_exit(kernel): | |
| """Exit the gtk loop.""" | |
| kernel._gtk.stop() | |
| def loop_gtk3(kernel): | |
| """Start the kernel, coordinating with the GTK event loop""" | |
| from .gui.gtk3embed import GTKEmbed | |
| gtk_kernel = GTKEmbed(kernel) | |
| gtk_kernel.start() | |
| kernel._gtk = gtk_kernel | |
| def loop_gtk3_exit(kernel): | |
| """Exit the gtk3 loop.""" | |
| kernel._gtk.stop() | |
| def loop_cocoa(kernel): | |
| """Start the kernel, coordinating with the Cocoa CFRunLoop event loop | |
| via the matplotlib MacOSX backend. | |
| """ | |
| from ._eventloop_macos import mainloop, stop | |
| real_excepthook = sys.excepthook | |
| shell_stream = get_shell_stream(kernel) | |
| def handle_int(etype, value, tb): | |
| """don't let KeyboardInterrupts look like crashes""" | |
| # wake the eventloop when we get a signal | |
| stop() | |
| if etype is KeyboardInterrupt: | |
| print("KeyboardInterrupt caught in CFRunLoop", file=sys.__stdout__) | |
| else: | |
| real_excepthook(etype, value, tb) | |
| while not kernel.shell.exit_now: | |
| try: | |
| # double nested try/except, to properly catch KeyboardInterrupt | |
| # due to pyzmq Issue #130 | |
| try: | |
| # don't let interrupts during mainloop invoke crash_handler: | |
| sys.excepthook = handle_int | |
| mainloop(kernel._poll_interval) | |
| if shell_stream.flush(limit=1): | |
| # events to process, return control to kernel | |
| return | |
| except BaseException: | |
| raise | |
| except KeyboardInterrupt: | |
| # Ctrl-C shouldn't crash the kernel | |
| print("KeyboardInterrupt caught in kernel", file=sys.__stdout__) | |
| finally: | |
| # ensure excepthook is restored | |
| sys.excepthook = real_excepthook | |
| def loop_cocoa_exit(kernel): | |
| """Exit the cocoa loop.""" | |
| from ._eventloop_macos import stop | |
| stop() | |
| def loop_asyncio(kernel): | |
| """Start a kernel with asyncio event loop support.""" | |
| import asyncio | |
| loop = asyncio.get_event_loop() | |
| # loop is already running (e.g. tornado 5), nothing left to do | |
| if loop.is_running(): | |
| return | |
| if loop.is_closed(): | |
| # main loop is closed, create a new one | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| loop._should_close = False # type:ignore[attr-defined] | |
| # pause eventloop when there's an event on a zmq socket | |
| def process_stream_events(shell_stream): | |
| """fall back to main loop when there's a socket event""" | |
| if shell_stream.flush(limit=1): | |
| loop.stop() | |
| shell_stream = get_shell_stream(kernel) | |
| notifier = partial(process_stream_events, shell_stream) | |
| loop.add_reader(shell_stream.getsockopt(zmq.FD), notifier) | |
| loop.call_soon(notifier) | |
| while True: | |
| error = None | |
| try: | |
| loop.run_forever() | |
| except KeyboardInterrupt: | |
| continue | |
| except Exception as e: | |
| error = e | |
| if loop._should_close: # type:ignore[attr-defined] | |
| loop.close() | |
| if error is not None: | |
| raise error | |
| break | |
| def loop_asyncio_exit(kernel): | |
| """Exit hook for asyncio""" | |
| import asyncio | |
| loop = asyncio.get_event_loop() | |
| async def close_loop(): | |
| if hasattr(loop, "shutdown_asyncgens"): | |
| yield loop.shutdown_asyncgens() | |
| loop._should_close = True # type:ignore[attr-defined] | |
| loop.stop() | |
| if loop.is_running(): | |
| close_loop() | |
| elif not loop.is_closed(): | |
| loop.run_until_complete(close_loop) # type:ignore[arg-type] | |
| loop.close() | |
| def set_qt_api_env_from_gui(gui): | |
| """ | |
| Sets the QT_API environment variable by trying to import PyQtx or PySidex. | |
| The user can generically request `qt` or a specific Qt version, e.g. `qt6`. | |
| For a generic Qt request, we let the mechanism in IPython choose the best | |
| available version by leaving the `QT_API` environment variable blank. | |
| For specific versions, we check to see whether the PyQt or PySide | |
| implementations are present and set `QT_API` accordingly to indicate to | |
| IPython which version we want. If neither implementation is present, we | |
| leave the environment variable set so IPython will generate a helpful error | |
| message. | |
| Notes | |
| ----- | |
| - If the environment variable is already set, it will be used unchanged, | |
| regardless of what the user requested. | |
| """ | |
| qt_api = os.environ.get("QT_API", None) | |
| from IPython.external.qt_loaders import ( | |
| QT_API_PYQT5, | |
| QT_API_PYQT6, | |
| QT_API_PYSIDE2, | |
| QT_API_PYSIDE6, | |
| loaded_api, | |
| ) | |
| loaded = loaded_api() | |
| qt_env2gui = { | |
| QT_API_PYSIDE2: "qt5", | |
| QT_API_PYQT5: "qt5", | |
| QT_API_PYSIDE6: "qt6", | |
| QT_API_PYQT6: "qt6", | |
| } | |
| if loaded is not None and gui != "qt" and qt_env2gui[loaded] != gui: | |
| print(f"Cannot switch Qt versions for this session; you must use {qt_env2gui[loaded]}.") | |
| return | |
| if qt_api is not None and gui != "qt": | |
| if qt_env2gui[qt_api] != gui: | |
| print( | |
| f'Request for "{gui}" will be ignored because `QT_API` ' | |
| f'environment variable is set to "{qt_api}"' | |
| ) | |
| return | |
| else: | |
| if gui == "qt5": | |
| try: | |
| import PyQt5 # noqa: F401 | |
| os.environ["QT_API"] = "pyqt5" | |
| except ImportError: | |
| try: | |
| import PySide2 # noqa: F401 | |
| os.environ["QT_API"] = "pyside2" | |
| except ImportError: | |
| os.environ["QT_API"] = "pyqt5" | |
| elif gui == "qt6": | |
| try: | |
| import PyQt6 # noqa: F401 | |
| os.environ["QT_API"] = "pyqt6" | |
| except ImportError: | |
| try: | |
| import PySide6 # noqa: F401 | |
| os.environ["QT_API"] = "pyside6" | |
| except ImportError: | |
| os.environ["QT_API"] = "pyqt6" | |
| elif gui == "qt": | |
| # Don't set QT_API; let IPython logic choose the version. | |
| if "QT_API" in os.environ: | |
| del os.environ["QT_API"] | |
| else: | |
| print(f'Unrecognized Qt version: {gui}. Should be "qt5", "qt6", or "qt".') | |
| return | |
| # Do the actual import now that the environment variable is set to make sure it works. | |
| try: | |
| pass | |
| except Exception as e: | |
| # Clear the environment variable for the next attempt. | |
| if "QT_API" in os.environ: | |
| del os.environ["QT_API"] | |
| print(f"QT_API couldn't be set due to error {e}") | |
| return | |
| def make_qt_app_for_kernel(gui, kernel): | |
| """Sets the `QT_API` environment variable if it isn't already set.""" | |
| if hasattr(kernel, "app"): | |
| # Kernel is already running a Qt event loop, so there's no need to | |
| # create another app for it. | |
| return | |
| set_qt_api_env_from_gui(gui) | |
| # This import is guaranteed to work now: | |
| from IPython.external.qt_for_kernel import QtCore | |
| from IPython.lib.guisupport import get_app_qt4 | |
| kernel.app = get_app_qt4([" "]) | |
| kernel.app.qt_event_loop = QtCore.QEventLoop(kernel.app) | |
| def enable_gui(gui, kernel=None): | |
| """Enable integration with a given GUI""" | |
| if gui not in loop_map: | |
| e = f"Invalid GUI request {gui!r}, valid ones are:{loop_map.keys()}" | |
| raise ValueError(e) | |
| if kernel is None: | |
| if Application.initialized(): | |
| kernel = getattr(Application.instance(), "kernel", None) | |
| if kernel is None: | |
| msg = ( | |
| "You didn't specify a kernel," | |
| " and no IPython Application with a kernel appears to be running." | |
| ) | |
| raise RuntimeError(msg) | |
| if gui is None: | |
| # User wants to turn off integration; clear any evidence if Qt was the last one. | |
| if hasattr(kernel, "app"): | |
| delattr(kernel, "app") | |
| if hasattr(kernel, "_qt_notifier"): | |
| delattr(kernel, "_qt_notifier") | |
| if hasattr(kernel, "_qt_timer"): | |
| delattr(kernel, "_qt_timer") | |
| else: | |
| if gui.startswith("qt"): | |
| # Prepare the kernel here so any exceptions are displayed in the client. | |
| make_qt_app_for_kernel(gui, kernel) | |
| loop = loop_map[gui] | |
| if ( | |
| loop and kernel.eventloop is not None and kernel.eventloop is not loop # type:ignore[unreachable] | |
| ): | |
| msg = "Cannot activate multiple GUI eventloops" # type:ignore[unreachable] | |
| raise RuntimeError(msg) | |
| kernel.eventloop = loop | |
| # We set `eventloop`; the function the user chose is executed in `Kernel.enter_eventloop`, thus | |
| # any exceptions raised during the event loop will not be shown in the client. | |