| from botocore.eventstream import ( | |
| EventStream, | |
| EventStreamBuffer, | |
| NoInitialResponseError, | |
| ) | |
| class AioEventStream(EventStream): | |
| def __iter__(self): | |
| raise NotImplementedError('Use async-for instead') | |
| def __aiter__(self): | |
| return self.__anext__() | |
| async def __anext__(self): | |
| async for event in self._event_generator: | |
| parsed_event = self._parse_event(event) | |
| if parsed_event: | |
| yield parsed_event | |
| async def _create_raw_event_generator(self): | |
| event_stream_buffer = EventStreamBuffer() | |
| async for chunk, _ in self._raw_stream.content.iter_chunks(): | |
| event_stream_buffer.add_data(chunk) | |
| for event in event_stream_buffer: | |
| yield event # unfortunately no yield from async func support | |
| async def get_initial_response(self): | |
| try: | |
| async for event in self._event_generator: | |
| event_type = event.headers.get(':event-type') | |
| if event_type == 'initial-response': | |
| return event | |
| break | |
| except StopIteration: | |
| pass | |
| raise NoInitialResponseError() | |
| # self._raw_stream.close() is sync so no override needed | |