File size: 4,076 Bytes
6f72e2b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
"""Replacements for sys.displayhook that publish over ZMQ."""

# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
from __future__ import annotations

import builtins
import sys
import typing as t
from contextvars import ContextVar

from IPython.core.displayhook import DisplayHook
from jupyter_client.session import Session, extract_header
from traitlets import Any, Instance

from ipykernel.jsonutil import encode_images, json_clean


class ZMQDisplayHook:
    """A simple displayhook that publishes the object's repr over a ZeroMQ
    socket."""

    topic = b"execute_result"

    def __init__(self, session, pub_socket):
        """Initialize the hook."""
        self.session = session
        self.pub_socket = pub_socket

        self._parent_header: ContextVar[dict[str, Any]] = ContextVar("parent_header")
        self._parent_header.set({})
        self._parent_header_global = {}

    def get_execution_count(self):
        """This method is replaced in kernelapp"""
        return 0

    def __call__(self, obj):
        """Handle a hook call."""
        if obj is None:
            return

        builtins._ = obj  # type:ignore[attr-defined]
        sys.stdout.flush()
        sys.stderr.flush()
        contents = {
            "execution_count": self.get_execution_count(),
            "data": {"text/plain": repr(obj)},
            "metadata": {},
        }
        self.session.send(
            self.pub_socket,
            "execute_result",
            contents,
            parent=self.parent_header,
            ident=self.topic,
        )

    @property
    def parent_header(self):
        try:
            return self._parent_header.get()
        except LookupError:
            return self._parent_header_global

    def set_parent(self, parent):
        """Set the parent header."""
        parent_header = extract_header(parent)
        self._parent_header.set(parent_header)
        self._parent_header_global = parent_header


class ZMQShellDisplayHook(DisplayHook):
    """A displayhook subclass that publishes data using ZeroMQ. This is intended
    to work with an InteractiveShell instance. It sends a dict of different
    representations of the object."""

    topic = None

    session = Instance(Session, allow_none=True)
    pub_socket = Any(allow_none=True)
    _parent_header: ContextVar[dict[str, Any]]
    msg: dict[str, t.Any] | None

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._parent_header = ContextVar("parent_header")
        self._parent_header.set({})

    @property
    def parent_header(self):
        try:
            return self._parent_header.get()
        except LookupError:
            return self._parent_header_global

    def set_parent(self, parent):
        """Set the parent header."""
        parent_header = extract_header(parent)
        self._parent_header.set(parent_header)
        self._parent_header_global = parent_header

    def start_displayhook(self):
        """Start the display hook."""
        if self.session:
            self.msg = self.session.msg(
                "execute_result",
                {
                    "data": {},
                    "metadata": {},
                },
                parent=self.parent_header,
            )

    def write_output_prompt(self):
        """Write the output prompt."""
        if self.msg:
            self.msg["content"]["execution_count"] = self.prompt_count

    def write_format_data(self, format_dict, md_dict=None):
        """Write format data to the message."""
        if self.msg:
            self.msg["content"]["data"] = json_clean(encode_images(format_dict))
            self.msg["content"]["metadata"] = md_dict

    def finish_displayhook(self):
        """Finish up all displayhook activities."""
        sys.stdout.flush()
        sys.stderr.flush()
        if self.msg and self.msg["content"]["data"] and self.session:
            self.session.send(self.pub_socket, self.msg, ident=self.topic)
        self.msg = None