| | """ |
| | Payload implementation for coroutines as data provider. |
| | |
| | As a simple case, you can upload data from file:: |
| | |
| | @aiohttp.streamer |
| | async def file_sender(writer, file_name=None): |
| | with open(file_name, 'rb') as f: |
| | chunk = f.read(2**16) |
| | while chunk: |
| | await writer.write(chunk) |
| | |
| | chunk = f.read(2**16) |
| | |
| | Then you can use `file_sender` like this: |
| | |
| | async with session.post('http://httpbin.org/post', |
| | data=file_sender(file_name='huge_file')) as resp: |
| | print(await resp.text()) |
| | |
| | ..note:: Coroutine must accept `writer` as first argument |
| | |
| | """ |
| |
|
| | import types |
| | import warnings |
| | from typing import Any, Awaitable, Callable, Dict, Tuple |
| |
|
| | from .abc import AbstractStreamWriter |
| | from .payload import Payload, payload_type |
| |
|
| | __all__ = ("streamer",) |
| |
|
| |
|
| | class _stream_wrapper: |
| | def __init__( |
| | self, |
| | coro: Callable[..., Awaitable[None]], |
| | args: Tuple[Any, ...], |
| | kwargs: Dict[str, Any], |
| | ) -> None: |
| | self.coro = types.coroutine(coro) |
| | self.args = args |
| | self.kwargs = kwargs |
| |
|
| | async def __call__(self, writer: AbstractStreamWriter) -> None: |
| | await self.coro(writer, *self.args, **self.kwargs) |
| |
|
| |
|
| | class streamer: |
| | def __init__(self, coro: Callable[..., Awaitable[None]]) -> None: |
| | warnings.warn( |
| | "@streamer is deprecated, use async generators instead", |
| | DeprecationWarning, |
| | stacklevel=2, |
| | ) |
| | self.coro = coro |
| |
|
| | def __call__(self, *args: Any, **kwargs: Any) -> _stream_wrapper: |
| | return _stream_wrapper(self.coro, args, kwargs) |
| |
|
| |
|
| | @payload_type(_stream_wrapper) |
| | class StreamWrapperPayload(Payload): |
| | async def write(self, writer: AbstractStreamWriter) -> None: |
| | await self._value(writer) |
| |
|
| | def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str: |
| | raise TypeError("Unable to decode.") |
| |
|
| |
|
| | @payload_type(streamer) |
| | class StreamPayload(StreamWrapperPayload): |
| | def __init__(self, value: Any, *args: Any, **kwargs: Any) -> None: |
| | super().__init__(value(), *args, **kwargs) |
| |
|
| | async def write(self, writer: AbstractStreamWriter) -> None: |
| | await self._value(writer) |
| |
|