from typing import List, Callable, AsyncGenerator, Dict, Tuple import logging import asyncio async def _queue_to_generator(queue: asyncio.Queue, queue_event: asyncio.Event, finish_event: asyncio.Event): while True: await queue_event.wait() if queue.empty(): queue_event.clear() if finish_event.is_set(): break else: yield await queue.get() async def _multiplex(in_stream: AsyncGenerator, queue_list: List[asyncio.Queue], queue_event_list: List[asyncio.Event], finish_event: asyncio.Event): async for in_d in in_stream: for q in queue_list: await q.put(dict(in_d)) for qe in queue_event_list: qe.set() for qe in queue_event_list: qe.set() finish_event.set() def multiplexor( func_d: Dict[str, Callable[[AsyncGenerator], AsyncGenerator | None]], in_stream: AsyncGenerator ) -> Tuple[Dict[str, AsyncGenerator], asyncio.Task]: queue_list: List[asyncio.Queue] = list() queue_event_list: List[asyncio.Event] = list() stream_end_event = asyncio.Event() result_d = dict() for fun_key in func_d: q = asyncio.Queue() q_event = asyncio.Event() agen = func_d[fun_key](_queue_to_generator(q,q_event,stream_end_event)) result_d[fun_key] = agen queue_list.append(q) queue_event_list.append(q_event) multi_task = asyncio.create_task(_multiplex(in_stream, queue_list, queue_event_list, stream_end_event)) return result_d, multi_task