| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | """Internal utilities for gRPC Python.""" |
| |
|
| | import collections |
| | import logging |
| | import threading |
| | import time |
| | from typing import Callable, Dict, Optional, Sequence |
| |
|
| | import grpc |
| | from grpc import _common |
| | from grpc._typing import DoneCallbackType |
| |
|
| | _LOGGER = logging.getLogger(__name__) |
| |
|
| | _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE = ( |
| | 'Exception calling connectivity future "done" callback!') |
| |
|
| |
|
| | class RpcMethodHandler( |
| | collections.namedtuple('_RpcMethodHandler', ( |
| | 'request_streaming', |
| | 'response_streaming', |
| | 'request_deserializer', |
| | 'response_serializer', |
| | 'unary_unary', |
| | 'unary_stream', |
| | 'stream_unary', |
| | 'stream_stream', |
| | )), grpc.RpcMethodHandler): |
| | pass |
| |
|
| |
|
| | class DictionaryGenericHandler(grpc.ServiceRpcHandler): |
| | _name: str |
| | _method_handlers: Dict[str, grpc.RpcMethodHandler] |
| |
|
| | def __init__(self, service: str, |
| | method_handlers: Dict[str, grpc.RpcMethodHandler]): |
| | self._name = service |
| | self._method_handlers = { |
| | _common.fully_qualified_method(service, method): method_handler |
| | for method, method_handler in method_handlers.items() |
| | } |
| |
|
| | def service_name(self) -> str: |
| | return self._name |
| |
|
| | def service( |
| | self, handler_call_details: grpc.HandlerCallDetails |
| | ) -> Optional[grpc.RpcMethodHandler]: |
| | details_method = handler_call_details.method |
| | return self._method_handlers.get(details_method) |
| |
|
| |
|
| | class _ChannelReadyFuture(grpc.Future): |
| | _condition: threading.Condition |
| | _channel: grpc.Channel |
| | _matured: bool |
| | _cancelled: bool |
| | _done_callbacks: Sequence[Callable] |
| |
|
| | def __init__(self, channel: grpc.Channel): |
| | self._condition = threading.Condition() |
| | self._channel = channel |
| |
|
| | self._matured = False |
| | self._cancelled = False |
| | self._done_callbacks = [] |
| |
|
| | def _block(self, timeout: Optional[float]) -> None: |
| | until = None if timeout is None else time.time() + timeout |
| | with self._condition: |
| | while True: |
| | if self._cancelled: |
| | raise grpc.FutureCancelledError() |
| | elif self._matured: |
| | return |
| | else: |
| | if until is None: |
| | self._condition.wait() |
| | else: |
| | remaining = until - time.time() |
| | if remaining < 0: |
| | raise grpc.FutureTimeoutError() |
| | else: |
| | self._condition.wait(timeout=remaining) |
| |
|
| | def _update(self, connectivity: Optional[grpc.ChannelConnectivity]) -> None: |
| | with self._condition: |
| | if (not self._cancelled and |
| | connectivity is grpc.ChannelConnectivity.READY): |
| | self._matured = True |
| | self._channel.unsubscribe(self._update) |
| | self._condition.notify_all() |
| | done_callbacks = tuple(self._done_callbacks) |
| | self._done_callbacks = None |
| | else: |
| | return |
| |
|
| | for done_callback in done_callbacks: |
| | try: |
| | done_callback(self) |
| | except Exception: |
| | _LOGGER.exception(_DONE_CALLBACK_EXCEPTION_LOG_MESSAGE) |
| |
|
| | def cancel(self) -> bool: |
| | with self._condition: |
| | if not self._matured: |
| | self._cancelled = True |
| | self._channel.unsubscribe(self._update) |
| | self._condition.notify_all() |
| | done_callbacks = tuple(self._done_callbacks) |
| | self._done_callbacks = None |
| | else: |
| | return False |
| |
|
| | for done_callback in done_callbacks: |
| | try: |
| | done_callback(self) |
| | except Exception: |
| | _LOGGER.exception(_DONE_CALLBACK_EXCEPTION_LOG_MESSAGE) |
| |
|
| | return True |
| |
|
| | def cancelled(self) -> bool: |
| | with self._condition: |
| | return self._cancelled |
| |
|
| | def running(self) -> bool: |
| | with self._condition: |
| | return not self._cancelled and not self._matured |
| |
|
| | def done(self) -> bool: |
| | with self._condition: |
| | return self._cancelled or self._matured |
| |
|
| | def result(self, timeout: Optional[float] = None) -> None: |
| | self._block(timeout) |
| |
|
| | def exception(self, timeout: Optional[float] = None) -> None: |
| | self._block(timeout) |
| |
|
| | def traceback(self, timeout: Optional[float] = None) -> None: |
| | self._block(timeout) |
| |
|
| | def add_done_callback(self, fn: DoneCallbackType): |
| | with self._condition: |
| | if not self._cancelled and not self._matured: |
| | self._done_callbacks.append(fn) |
| | return |
| |
|
| | fn(self) |
| |
|
| | def start(self): |
| | with self._condition: |
| | self._channel.subscribe(self._update, try_to_connect=True) |
| |
|
| | def __del__(self): |
| | with self._condition: |
| | if not self._cancelled and not self._matured: |
| | self._channel.unsubscribe(self._update) |
| |
|
| |
|
| | def channel_ready_future(channel: grpc.Channel) -> _ChannelReadyFuture: |
| | ready_future = _ChannelReadyFuture(channel) |
| | ready_future.start() |
| | return ready_future |
| |
|