| | """ |
| | Support for streaming http requests in emscripten. |
| | |
| | A few caveats - |
| | |
| | If your browser (or Node.js) has WebAssembly JavaScript Promise Integration enabled |
| | https://github.com/WebAssembly/js-promise-integration/blob/main/proposals/js-promise-integration/Overview.md |
| | *and* you launch pyodide using `pyodide.runPythonAsync`, this will fetch data using the |
| | JavaScript asynchronous fetch api (wrapped via `pyodide.ffi.call_sync`). In this case |
| | timeouts and streaming should just work. |
| | |
| | Otherwise, it uses a combination of XMLHttpRequest and a web-worker for streaming. |
| | |
| | This approach has several caveats: |
| | |
| | Firstly, you can't do streaming http in the main UI thread, because atomics.wait isn't allowed. |
| | Streaming only works if you're running pyodide in a web worker. |
| | |
| | Secondly, this uses an extra web worker and SharedArrayBuffer to do the asynchronous fetch |
| | operation, so it requires that you have crossOriginIsolation enabled, by serving over https |
| | (or from localhost) with the two headers below set: |
| | |
| | Cross-Origin-Opener-Policy: same-origin |
| | Cross-Origin-Embedder-Policy: require-corp |
| | |
| | You can tell if cross origin isolation is successfully enabled by looking at the global crossOriginIsolated variable in |
| | JavaScript console. If it isn't, streaming requests will fallback to XMLHttpRequest, i.e. getting the whole |
| | request into a buffer and then returning it. it shows a warning in the JavaScript console in this case. |
| | |
| | Finally, the webworker which does the streaming fetch is created on initial import, but will only be started once |
| | control is returned to javascript. Call `await wait_for_streaming_ready()` to wait for streaming fetch. |
| | |
| | NB: in this code, there are a lot of JavaScript objects. They are named js_* |
| | to make it clear what type of object they are. |
| | """ |
| |
|
| | from __future__ import annotations |
| |
|
| | import io |
| | import json |
| | from email.parser import Parser |
| | from importlib.resources import files |
| | from typing import TYPE_CHECKING, Any |
| |
|
| | import js |
| | from pyodide.ffi import ( |
| | JsArray, |
| | JsException, |
| | JsProxy, |
| | to_js, |
| | ) |
| |
|
| | if TYPE_CHECKING: |
| | from typing_extensions import Buffer |
| |
|
| | from .request import EmscriptenRequest |
| | from .response import EmscriptenResponse |
| |
|
| | """ |
| | There are some headers that trigger unintended CORS preflight requests. |
| | See also https://github.com/koenvo/pyodide-http/issues/22 |
| | """ |
| | HEADERS_TO_IGNORE = ("user-agent",) |
| |
|
| | SUCCESS_HEADER = -1 |
| | SUCCESS_EOF = -2 |
| | ERROR_TIMEOUT = -3 |
| | ERROR_EXCEPTION = -4 |
| |
|
| | _STREAMING_WORKER_CODE = ( |
| | files(__package__) |
| | .joinpath("emscripten_fetch_worker.js") |
| | .read_text(encoding="utf-8") |
| | ) |
| |
|
| |
|
| | class _RequestError(Exception): |
| | def __init__( |
| | self, |
| | message: str | None = None, |
| | *, |
| | request: EmscriptenRequest | None = None, |
| | response: EmscriptenResponse | None = None, |
| | ): |
| | self.request = request |
| | self.response = response |
| | self.message = message |
| | super().__init__(self.message) |
| |
|
| |
|
| | class _StreamingError(_RequestError): |
| | pass |
| |
|
| |
|
| | class _TimeoutError(_RequestError): |
| | pass |
| |
|
| |
|
| | def _obj_from_dict(dict_val: dict[str, Any]) -> JsProxy: |
| | return to_js(dict_val, dict_converter=js.Object.fromEntries) |
| |
|
| |
|
| | class _ReadStream(io.RawIOBase): |
| | def __init__( |
| | self, |
| | int_buffer: JsArray, |
| | byte_buffer: JsArray, |
| | timeout: float, |
| | worker: JsProxy, |
| | connection_id: int, |
| | request: EmscriptenRequest, |
| | ): |
| | self.int_buffer = int_buffer |
| | self.byte_buffer = byte_buffer |
| | self.read_pos = 0 |
| | self.read_len = 0 |
| | self.connection_id = connection_id |
| | self.worker = worker |
| | self.timeout = int(1000 * timeout) if timeout > 0 else None |
| | self.is_live = True |
| | self._is_closed = False |
| | self.request: EmscriptenRequest | None = request |
| |
|
| | def __del__(self) -> None: |
| | self.close() |
| |
|
| | |
| | def is_closed(self) -> bool: |
| | return self._is_closed |
| |
|
| | |
| | @property |
| | def closed(self) -> bool: |
| | return self.is_closed() |
| |
|
| | def close(self) -> None: |
| | if self.is_closed(): |
| | return |
| | self.read_len = 0 |
| | self.read_pos = 0 |
| | self.int_buffer = None |
| | self.byte_buffer = None |
| | self._is_closed = True |
| | self.request = None |
| | if self.is_live: |
| | self.worker.postMessage(_obj_from_dict({"close": self.connection_id})) |
| | self.is_live = False |
| | super().close() |
| |
|
| | def readable(self) -> bool: |
| | return True |
| |
|
| | def writable(self) -> bool: |
| | return False |
| |
|
| | def seekable(self) -> bool: |
| | return False |
| |
|
| | def readinto(self, byte_obj: Buffer) -> int: |
| | if not self.int_buffer: |
| | raise _StreamingError( |
| | "No buffer for stream in _ReadStream.readinto", |
| | request=self.request, |
| | response=None, |
| | ) |
| | if self.read_len == 0: |
| | |
| | js.Atomics.store(self.int_buffer, 0, ERROR_TIMEOUT) |
| | self.worker.postMessage(_obj_from_dict({"getMore": self.connection_id})) |
| | if ( |
| | js.Atomics.wait(self.int_buffer, 0, ERROR_TIMEOUT, self.timeout) |
| | == "timed-out" |
| | ): |
| | raise _TimeoutError |
| | data_len = self.int_buffer[0] |
| | if data_len > 0: |
| | self.read_len = data_len |
| | self.read_pos = 0 |
| | elif data_len == ERROR_EXCEPTION: |
| | string_len = self.int_buffer[1] |
| | |
| | js_decoder = js.TextDecoder.new() |
| | json_str = js_decoder.decode(self.byte_buffer.slice(0, string_len)) |
| | raise _StreamingError( |
| | f"Exception thrown in fetch: {json_str}", |
| | request=self.request, |
| | response=None, |
| | ) |
| | else: |
| | |
| | |
| | self.is_live = False |
| | self.close() |
| | return 0 |
| | |
| | ret_length = min(self.read_len, len(memoryview(byte_obj))) |
| | subarray = self.byte_buffer.subarray( |
| | self.read_pos, self.read_pos + ret_length |
| | ).to_py() |
| | memoryview(byte_obj)[0:ret_length] = subarray |
| | self.read_len -= ret_length |
| | self.read_pos += ret_length |
| | return ret_length |
| |
|
| |
|
| | class _StreamingFetcher: |
| | def __init__(self) -> None: |
| | |
| | self.streaming_ready = False |
| |
|
| | js_data_blob = js.Blob.new( |
| | to_js([_STREAMING_WORKER_CODE], create_pyproxies=False), |
| | _obj_from_dict({"type": "application/javascript"}), |
| | ) |
| |
|
| | def promise_resolver(js_resolve_fn: JsProxy, js_reject_fn: JsProxy) -> None: |
| | def onMsg(e: JsProxy) -> None: |
| | self.streaming_ready = True |
| | js_resolve_fn(e) |
| |
|
| | def onErr(e: JsProxy) -> None: |
| | js_reject_fn(e) |
| |
|
| | self.js_worker.onmessage = onMsg |
| | self.js_worker.onerror = onErr |
| |
|
| | js_data_url = js.URL.createObjectURL(js_data_blob) |
| | self.js_worker = js.globalThis.Worker.new(js_data_url) |
| | self.js_worker_ready_promise = js.globalThis.Promise.new(promise_resolver) |
| |
|
| | def send(self, request: EmscriptenRequest) -> EmscriptenResponse: |
| | headers = { |
| | k: v for k, v in request.headers.items() if k not in HEADERS_TO_IGNORE |
| | } |
| |
|
| | body = request.body |
| | fetch_data = {"headers": headers, "body": to_js(body), "method": request.method} |
| | |
| | timeout = int(1000 * request.timeout) if request.timeout > 0 else None |
| | js_shared_buffer = js.SharedArrayBuffer.new(1048576) |
| | js_int_buffer = js.Int32Array.new(js_shared_buffer) |
| | js_byte_buffer = js.Uint8Array.new(js_shared_buffer, 8) |
| |
|
| | js.Atomics.store(js_int_buffer, 0, ERROR_TIMEOUT) |
| | js.Atomics.notify(js_int_buffer, 0) |
| | js_absolute_url = js.URL.new(request.url, js.location).href |
| | self.js_worker.postMessage( |
| | _obj_from_dict( |
| | { |
| | "buffer": js_shared_buffer, |
| | "url": js_absolute_url, |
| | "fetchParams": fetch_data, |
| | } |
| | ) |
| | ) |
| | |
| | js.Atomics.wait(js_int_buffer, 0, ERROR_TIMEOUT, timeout) |
| | if js_int_buffer[0] == ERROR_TIMEOUT: |
| | raise _TimeoutError( |
| | "Timeout connecting to streaming request", |
| | request=request, |
| | response=None, |
| | ) |
| | elif js_int_buffer[0] == SUCCESS_HEADER: |
| | |
| | |
| | string_len = js_int_buffer[1] |
| | |
| | js_decoder = js.TextDecoder.new() |
| | |
| | |
| | json_str = js_decoder.decode(js_byte_buffer.slice(0, string_len)) |
| | |
| | response_obj = json.loads(json_str) |
| | return EmscriptenResponse( |
| | request=request, |
| | status_code=response_obj["status"], |
| | headers=response_obj["headers"], |
| | body=_ReadStream( |
| | js_int_buffer, |
| | js_byte_buffer, |
| | request.timeout, |
| | self.js_worker, |
| | response_obj["connectionID"], |
| | request, |
| | ), |
| | ) |
| | elif js_int_buffer[0] == ERROR_EXCEPTION: |
| | string_len = js_int_buffer[1] |
| | |
| | js_decoder = js.TextDecoder.new() |
| | json_str = js_decoder.decode(js_byte_buffer.slice(0, string_len)) |
| | raise _StreamingError( |
| | f"Exception thrown in fetch: {json_str}", request=request, response=None |
| | ) |
| | else: |
| | raise _StreamingError( |
| | f"Unknown status from worker in fetch: {js_int_buffer[0]}", |
| | request=request, |
| | response=None, |
| | ) |
| |
|
| |
|
| | class _JSPIReadStream(io.RawIOBase): |
| | """ |
| | A read stream that uses pyodide.ffi.run_sync to read from a JavaScript fetch |
| | response. This requires support for WebAssembly JavaScript Promise Integration |
| | in the containing browser, and for pyodide to be launched via runPythonAsync. |
| | |
| | :param js_read_stream: |
| | The JavaScript stream reader |
| | |
| | :param timeout: |
| | Timeout in seconds |
| | |
| | :param request: |
| | The request we're handling |
| | |
| | :param response: |
| | The response this stream relates to |
| | |
| | :param js_abort_controller: |
| | A JavaScript AbortController object, used for timeouts |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | js_read_stream: Any, |
| | timeout: float, |
| | request: EmscriptenRequest, |
| | response: EmscriptenResponse, |
| | js_abort_controller: Any, |
| | ): |
| | self.js_read_stream = js_read_stream |
| | self.timeout = timeout |
| | self._is_closed = False |
| | self._is_done = False |
| | self.request: EmscriptenRequest | None = request |
| | self.response: EmscriptenResponse | None = response |
| | self.current_buffer = None |
| | self.current_buffer_pos = 0 |
| | self.js_abort_controller = js_abort_controller |
| |
|
| | def __del__(self) -> None: |
| | self.close() |
| |
|
| | |
| | def is_closed(self) -> bool: |
| | return self._is_closed |
| |
|
| | |
| | @property |
| | def closed(self) -> bool: |
| | return self.is_closed() |
| |
|
| | def close(self) -> None: |
| | if self.is_closed(): |
| | return |
| | self.read_len = 0 |
| | self.read_pos = 0 |
| | self.js_read_stream.cancel() |
| | self.js_read_stream = None |
| | self._is_closed = True |
| | self._is_done = True |
| | self.request = None |
| | self.response = None |
| | super().close() |
| |
|
| | def readable(self) -> bool: |
| | return True |
| |
|
| | def writable(self) -> bool: |
| | return False |
| |
|
| | def seekable(self) -> bool: |
| | return False |
| |
|
| | def _get_next_buffer(self) -> bool: |
| | result_js = _run_sync_with_timeout( |
| | self.js_read_stream.read(), |
| | self.timeout, |
| | self.js_abort_controller, |
| | request=self.request, |
| | response=self.response, |
| | ) |
| | if result_js.done: |
| | self._is_done = True |
| | return False |
| | else: |
| | self.current_buffer = result_js.value.to_py() |
| | self.current_buffer_pos = 0 |
| | return True |
| |
|
| | def readinto(self, byte_obj: Buffer) -> int: |
| | if self.current_buffer is None: |
| | if not self._get_next_buffer() or self.current_buffer is None: |
| | self.close() |
| | return 0 |
| | ret_length = min( |
| | len(byte_obj), len(self.current_buffer) - self.current_buffer_pos |
| | ) |
| | byte_obj[0:ret_length] = self.current_buffer[ |
| | self.current_buffer_pos : self.current_buffer_pos + ret_length |
| | ] |
| | self.current_buffer_pos += ret_length |
| | if self.current_buffer_pos == len(self.current_buffer): |
| | self.current_buffer = None |
| | return ret_length |
| |
|
| |
|
| | |
| | def is_in_browser_main_thread() -> bool: |
| | return hasattr(js, "window") and hasattr(js, "self") and js.self == js.window |
| |
|
| |
|
| | def is_cross_origin_isolated() -> bool: |
| | return hasattr(js, "crossOriginIsolated") and js.crossOriginIsolated |
| |
|
| |
|
| | def is_in_node() -> bool: |
| | return ( |
| | hasattr(js, "process") |
| | and hasattr(js.process, "release") |
| | and hasattr(js.process.release, "name") |
| | and js.process.release.name == "node" |
| | ) |
| |
|
| |
|
| | def is_worker_available() -> bool: |
| | return hasattr(js, "Worker") and hasattr(js, "Blob") |
| |
|
| |
|
| | _fetcher: _StreamingFetcher | None = None |
| |
|
| | if is_worker_available() and ( |
| | (is_cross_origin_isolated() and not is_in_browser_main_thread()) |
| | and (not is_in_node()) |
| | ): |
| | _fetcher = _StreamingFetcher() |
| | else: |
| | _fetcher = None |
| |
|
| |
|
| | NODE_JSPI_ERROR = ( |
| | "urllib3 only works in Node.js with pyodide.runPythonAsync" |
| | " and requires the flag --experimental-wasm-stack-switching in " |
| | " versions of node <24." |
| | ) |
| |
|
| |
|
| | def send_streaming_request(request: EmscriptenRequest) -> EmscriptenResponse | None: |
| | if has_jspi(): |
| | return send_jspi_request(request, True) |
| | elif is_in_node(): |
| | raise _RequestError( |
| | message=NODE_JSPI_ERROR, |
| | request=request, |
| | response=None, |
| | ) |
| |
|
| | if _fetcher and streaming_ready(): |
| | return _fetcher.send(request) |
| | else: |
| | _show_streaming_warning() |
| | return None |
| |
|
| |
|
| | _SHOWN_TIMEOUT_WARNING = False |
| |
|
| |
|
| | def _show_timeout_warning() -> None: |
| | global _SHOWN_TIMEOUT_WARNING |
| | if not _SHOWN_TIMEOUT_WARNING: |
| | _SHOWN_TIMEOUT_WARNING = True |
| | message = "Warning: Timeout is not available on main browser thread" |
| | js.console.warn(message) |
| |
|
| |
|
| | _SHOWN_STREAMING_WARNING = False |
| |
|
| |
|
| | def _show_streaming_warning() -> None: |
| | global _SHOWN_STREAMING_WARNING |
| | if not _SHOWN_STREAMING_WARNING: |
| | _SHOWN_STREAMING_WARNING = True |
| | message = "Can't stream HTTP requests because: \n" |
| | if not is_cross_origin_isolated(): |
| | message += " Page is not cross-origin isolated\n" |
| | if is_in_browser_main_thread(): |
| | message += " Python is running in main browser thread\n" |
| | if not is_worker_available(): |
| | message += " Worker or Blob classes are not available in this environment." |
| | if streaming_ready() is False: |
| | message += """ Streaming fetch worker isn't ready. If you want to be sure that streaming fetch |
| | is working, you need to call: 'await urllib3.contrib.emscripten.fetch.wait_for_streaming_ready()`""" |
| | from js import console |
| |
|
| | console.warn(message) |
| |
|
| |
|
| | def send_request(request: EmscriptenRequest) -> EmscriptenResponse: |
| | if has_jspi(): |
| | return send_jspi_request(request, False) |
| | elif is_in_node(): |
| | raise _RequestError( |
| | message=NODE_JSPI_ERROR, |
| | request=request, |
| | response=None, |
| | ) |
| | try: |
| | js_xhr = js.XMLHttpRequest.new() |
| |
|
| | if not is_in_browser_main_thread(): |
| | js_xhr.responseType = "arraybuffer" |
| | if request.timeout: |
| | js_xhr.timeout = int(request.timeout * 1000) |
| | else: |
| | js_xhr.overrideMimeType("text/plain; charset=ISO-8859-15") |
| | if request.timeout: |
| | |
| | |
| | _show_timeout_warning() |
| |
|
| | js_xhr.open(request.method, request.url, False) |
| | for name, value in request.headers.items(): |
| | if name.lower() not in HEADERS_TO_IGNORE: |
| | js_xhr.setRequestHeader(name, value) |
| |
|
| | js_xhr.send(to_js(request.body)) |
| |
|
| | headers = dict(Parser().parsestr(js_xhr.getAllResponseHeaders())) |
| |
|
| | if not is_in_browser_main_thread(): |
| | body = js_xhr.response.to_py().tobytes() |
| | else: |
| | body = js_xhr.response.encode("ISO-8859-15") |
| | return EmscriptenResponse( |
| | status_code=js_xhr.status, headers=headers, body=body, request=request |
| | ) |
| | except JsException as err: |
| | if err.name == "TimeoutError": |
| | raise _TimeoutError(err.message, request=request) |
| | elif err.name == "NetworkError": |
| | raise _RequestError(err.message, request=request) |
| | else: |
| | |
| | raise _RequestError(err.message, request=request) |
| |
|
| |
|
| | def send_jspi_request( |
| | request: EmscriptenRequest, streaming: bool |
| | ) -> EmscriptenResponse: |
| | """ |
| | Send a request using WebAssembly JavaScript Promise Integration |
| | to wrap the asynchronous JavaScript fetch api (experimental). |
| | |
| | :param request: |
| | Request to send |
| | |
| | :param streaming: |
| | Whether to stream the response |
| | |
| | :return: The response object |
| | :rtype: EmscriptenResponse |
| | """ |
| | timeout = request.timeout |
| | js_abort_controller = js.AbortController.new() |
| | headers = {k: v for k, v in request.headers.items() if k not in HEADERS_TO_IGNORE} |
| | req_body = request.body |
| | fetch_data = { |
| | "headers": headers, |
| | "body": to_js(req_body), |
| | "method": request.method, |
| | "signal": js_abort_controller.signal, |
| | } |
| | |
| | fetcher_promise_js = js.fetch(request.url, _obj_from_dict(fetch_data)) |
| | |
| | |
| | response_js = _run_sync_with_timeout( |
| | fetcher_promise_js, |
| | timeout, |
| | js_abort_controller, |
| | request=request, |
| | response=None, |
| | ) |
| | headers = {} |
| | header_iter = response_js.headers.entries() |
| | while True: |
| | iter_value_js = header_iter.next() |
| | if getattr(iter_value_js, "done", False): |
| | break |
| | else: |
| | headers[str(iter_value_js.value[0])] = str(iter_value_js.value[1]) |
| | status_code = response_js.status |
| | body: bytes | io.RawIOBase = b"" |
| |
|
| | response = EmscriptenResponse( |
| | status_code=status_code, headers=headers, body=b"", request=request |
| | ) |
| | if streaming: |
| | |
| | if response_js.body is not None: |
| | |
| | body_stream_js = response_js.body.getReader() |
| | body = _JSPIReadStream( |
| | body_stream_js, timeout, request, response, js_abort_controller |
| | ) |
| | else: |
| | |
| | |
| | body = _run_sync_with_timeout( |
| | response_js.arrayBuffer(), |
| | timeout, |
| | js_abort_controller, |
| | request=request, |
| | response=response, |
| | ).to_py() |
| | response.body = body |
| | return response |
| |
|
| |
|
| | def _run_sync_with_timeout( |
| | promise: Any, |
| | timeout: float, |
| | js_abort_controller: Any, |
| | request: EmscriptenRequest | None, |
| | response: EmscriptenResponse | None, |
| | ) -> Any: |
| | """ |
| | Await a JavaScript promise synchronously with a timeout which is implemented |
| | via the AbortController |
| | |
| | :param promise: |
| | Javascript promise to await |
| | |
| | :param timeout: |
| | Timeout in seconds |
| | |
| | :param js_abort_controller: |
| | A JavaScript AbortController object, used on timeout |
| | |
| | :param request: |
| | The request being handled |
| | |
| | :param response: |
| | The response being handled (if it exists yet) |
| | |
| | :raises _TimeoutError: If the request times out |
| | :raises _RequestError: If the request raises a JavaScript exception |
| | |
| | :return: The result of awaiting the promise. |
| | """ |
| | timer_id = None |
| | if timeout > 0: |
| | timer_id = js.setTimeout( |
| | js_abort_controller.abort.bind(js_abort_controller), int(timeout * 1000) |
| | ) |
| | try: |
| | from pyodide.ffi import run_sync |
| |
|
| | |
| | |
| | return run_sync(promise) |
| | except JsException as err: |
| | if err.name == "AbortError": |
| | raise _TimeoutError( |
| | message="Request timed out", request=request, response=response |
| | ) |
| | else: |
| | raise _RequestError(message=err.message, request=request, response=response) |
| | finally: |
| | if timer_id is not None: |
| | js.clearTimeout(timer_id) |
| |
|
| |
|
| | def has_jspi() -> bool: |
| | """ |
| | Return true if jspi can be used. |
| | |
| | This requires both browser support and also WebAssembly |
| | to be in the correct state - i.e. that the javascript |
| | call into python was async not sync. |
| | |
| | :return: True if jspi can be used. |
| | :rtype: bool |
| | """ |
| | try: |
| | from pyodide.ffi import can_run_sync, run_sync |
| |
|
| | return bool(can_run_sync()) |
| | except ImportError: |
| | return False |
| |
|
| |
|
| | def streaming_ready() -> bool | None: |
| | if _fetcher: |
| | return _fetcher.streaming_ready |
| | else: |
| | return None |
| |
|
| |
|
| | async def wait_for_streaming_ready() -> bool: |
| | if _fetcher: |
| | await _fetcher.js_worker_ready_promise |
| | return True |
| | else: |
| | return False |
| |
|