| |
| """ |
| Módulo Streamer de Resposta |
| |
| Implementa a lógica para streaming de respostas. Isso é crucial para aplicações |
| que lidam com grandes volumes de dados ou modelos de linguagem, permitindo que |
| a resposta seja enviada em pedaços (chunks) à medida que é gerada. |
| """ |
|
|
| import logging |
| from typing import Generator, Any, Callable, Iterable |
|
|
| from .config import CONFIG |
|
|
| |
| logger = logging.getLogger(f"superezio_enterprise.{__name__}") |
|
|
|
|
| class ResponseStreamer: |
| """ |
| Gerencia o processo de streaming de uma resposta a partir de uma função geradora. |
| |
| Se o streaming estiver desabilitado na configuração, ele pode opcionalmente |
| agregar a resposta e retorná-la de uma só vez. |
| """ |
|
|
| def __init__(self, generator_func: Callable[..., Iterable[Any]]): |
| """ |
| Args: |
| generator_func: Uma função que retorna um iterável (como um gerador) |
| que produz os chunks da resposta. |
| """ |
| self.generator_func = generator_func |
| logger.debug( |
| "ResponseStreamer inicializado para a função: %s", generator_func.__name__ |
| ) |
|
|
| def stream(self, *args, **kwargs) -> Generator[Any, None, None]: |
| """ |
| Inicia o processo de streaming se habilitado. |
| |
| Yields: |
| Os chunks de dados gerados pela função. |
| |
| Raises: |
| RuntimeError: Se o streaming estiver desabilitado e não houver fallback. |
| Qualquer exceção gerada pela `generator_func`. |
| """ |
| if not CONFIG.streaming_enabled: |
| logger.warning( |
| "Tentativa de streaming enquanto a feature está desabilitada. Abortando." |
| ) |
| raise RuntimeError( |
| "O streaming de respostas não está habilitado na configuração." |
| ) |
|
|
| logger.info("Iniciando o streaming da resposta...") |
| try: |
| |
| for chunk in self.generator_func(*args, **kwargs): |
| yield chunk |
| logger.info("Streaming da resposta concluído com sucesso.") |
| except Exception: |
| logger.error( |
| "Ocorreu um erro durante o streaming da resposta.", exc_info=True |
| ) |
| |
| raise |
|
|
| def stream_or_collect(self, *args, **kwargs) -> Any: |
| """ |
| Inicia o streaming se habilitado. Se desabilitado, coleta todos os chunks, |
| os une e retorna o resultado completo. |
| Funciona melhor para chunks de string. |
| """ |
| if CONFIG.streaming_enabled: |
| |
| return self.stream(*args, **kwargs) |
| else: |
| logger.info("Streaming desabilitado. Coletando a resposta completa...") |
| try: |
| |
| response_chunks = list(self.generator_func(*args, **kwargs)) |
| full_response = "".join(map(str, response_chunks)) |
| logger.info("Resposta completa coletada com sucesso.") |
| return full_response |
| except Exception: |
| logger.error("Ocorreu um erro ao coletar a resposta.", exc_info=True) |
| raise |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|