File size: 4,097 Bytes
6f72e2b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
"""
Async helper function that are invalid syntax on Python 3.5 and below.

This code is best effort, and may have edge cases not behaving as expected. In
particular it contain a number of heuristics to detect whether code is
effectively async and need to run in an event loop or not.

Some constructs (like top-level `return`, or `yield`) are taken care of
explicitly to actually raise a SyntaxError and stay as close as possible to
Python semantics.
"""

import ast
import asyncio
import inspect
from functools import wraps

_asyncio_event_loop = None


def get_asyncio_loop():
    """asyncio has deprecated get_event_loop

    Replicate it here, with our desired semantics:

    - always returns a valid, not-closed loop
    - not thread-local like asyncio's,
      because we only want one loop for IPython
    - if called from inside a coroutine (e.g. in ipykernel),
      return the running loop

    .. versionadded:: 8.0
    """
    try:
        return asyncio.get_running_loop()
    except RuntimeError:
        # not inside a coroutine,
        # track our own global
        pass

    # not thread-local like asyncio's,
    # because we only track one event loop to run for IPython itself,
    # always in the main thread.
    global _asyncio_event_loop
    if _asyncio_event_loop is None or _asyncio_event_loop.is_closed():
        _asyncio_event_loop = asyncio.new_event_loop()
    return _asyncio_event_loop


class _AsyncIORunner:
    def __call__(self, coro):
        """
        Handler for asyncio autoawait
        """
        return get_asyncio_loop().run_until_complete(coro)

    def __str__(self):
        return "asyncio"


_asyncio_runner = _AsyncIORunner()


class _AsyncIOProxy:
    """Proxy-object for an asyncio

    Any coroutine methods will be wrapped in event_loop.run_
    """

    def __init__(self, obj, event_loop):
        self._obj = obj
        self._event_loop = event_loop

    def __repr__(self):
        return f"<_AsyncIOProxy({self._obj!r})>"

    def __getattr__(self, key):
        attr = getattr(self._obj, key)
        if inspect.iscoroutinefunction(attr):
            # if it's a coroutine method,
            # return a threadsafe wrapper onto the _current_ asyncio loop
            @wraps(attr)
            def _wrapped(*args, **kwargs):
                concurrent_future = asyncio.run_coroutine_threadsafe(
                    attr(*args, **kwargs), self._event_loop
                )
                return asyncio.wrap_future(concurrent_future)

            return _wrapped
        else:
            return attr

    def __dir__(self):
        return dir(self._obj)


def _curio_runner(coroutine):
    """
    handler for curio autoawait
    """
    import curio

    return curio.run(coroutine)


def _trio_runner(async_fn):
    import trio

    async def loc(coro):
        """
        We need the dummy no-op async def to protect from
        trio's internal. See https://github.com/python-trio/trio/issues/89
        """
        return await coro

    return trio.run(loc, async_fn)


def _pseudo_sync_runner(coro):
    """
    A runner that does not really allow async execution, and just advance the coroutine.

    See discussion in https://github.com/python-trio/trio/issues/608,

    Credit to Nathaniel Smith
    """
    try:
        coro.send(None)
    except StopIteration as exc:
        return exc.value
    else:
        # TODO: do not raise but return an execution result with the right info.
        raise RuntimeError(
            "{coro_name!r} needs a real async loop".format(coro_name=coro.__name__)
        )


def _should_be_async(cell: str) -> bool:
    """Detect if a block of code needs to be wrapped in an `async def`

    If the code block has a top-level return statement or is otherwise
    invalid, `False` will be returned.
    """
    try:
        code = compile(
            cell, "<>", "exec", flags=getattr(ast, "PyCF_ALLOW_TOP_LEVEL_AWAIT", 0x0)
        )
        return inspect.CO_COROUTINE & code.co_flags == inspect.CO_COROUTINE
    except (SyntaxError, ValueError, MemoryError):
        return False