| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | """Internal utilities for gRPC Python.""" |
| |
|
| | import collections |
| | import logging |
| | import threading |
| | import time |
| | from typing import Callable, Dict, Optional, Sequence |
| |
|
| | import grpc |
| | from grpc import _common |
| | from grpc._typing import DoneCallbackType |
| |
|
| | _LOGGER = logging.getLogger(__name__) |
| |
|
| | _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE = ( |
| | 'Exception calling connectivity future "done" callback!' |
| | ) |
| |
|
| |
|
| | class RpcMethodHandler( |
| | collections.namedtuple( |
| | "_RpcMethodHandler", |
| | ( |
| | "request_streaming", |
| | "response_streaming", |
| | "request_deserializer", |
| | "response_serializer", |
| | "unary_unary", |
| | "unary_stream", |
| | "stream_unary", |
| | "stream_stream", |
| | ), |
| | ), |
| | grpc.RpcMethodHandler, |
| | ): |
| | pass |
| |
|
| |
|
| | class DictionaryGenericHandler(grpc.ServiceRpcHandler): |
| | _name: str |
| | _method_handlers: Dict[str, grpc.RpcMethodHandler] |
| |
|
| | def __init__( |
| | self, service: str, method_handlers: Dict[str, grpc.RpcMethodHandler] |
| | ): |
| | self._name = service |
| | self._method_handlers = { |
| | _common.fully_qualified_method(service, method): method_handler |
| | for method, method_handler in method_handlers.items() |
| | } |
| |
|
| | def service_name(self) -> str: |
| | return self._name |
| |
|
| | def service( |
| | self, handler_call_details: grpc.HandlerCallDetails |
| | ) -> Optional[grpc.RpcMethodHandler]: |
| | details_method = handler_call_details.method |
| | return self._method_handlers.get( |
| | details_method |
| | ) |
| |
|
| |
|
| | class _ChannelReadyFuture(grpc.Future): |
| | _condition: threading.Condition |
| | _channel: grpc.Channel |
| | _matured: bool |
| | _cancelled: bool |
| | _done_callbacks: Sequence[Callable] |
| |
|
| | def __init__(self, channel: grpc.Channel): |
| | self._condition = threading.Condition() |
| | self._channel = channel |
| |
|
| | self._matured = False |
| | self._cancelled = False |
| | self._done_callbacks = [] |
| |
|
| | def _block(self, timeout: Optional[float]) -> None: |
| | until = None if timeout is None else time.time() + timeout |
| | with self._condition: |
| | while True: |
| | if self._cancelled: |
| | raise grpc.FutureCancelledError() |
| | if self._matured: |
| | return |
| | if until is None: |
| | self._condition.wait() |
| | else: |
| | remaining = until - time.time() |
| | if remaining < 0: |
| | raise grpc.FutureTimeoutError() |
| | self._condition.wait(timeout=remaining) |
| |
|
| | def _update(self, connectivity: Optional[grpc.ChannelConnectivity]) -> None: |
| | with self._condition: |
| | if ( |
| | not self._cancelled |
| | and connectivity is grpc.ChannelConnectivity.READY |
| | ): |
| | self._matured = True |
| | self._channel.unsubscribe(self._update) |
| | self._condition.notify_all() |
| | done_callbacks = tuple(self._done_callbacks) |
| | self._done_callbacks = None |
| | else: |
| | return |
| |
|
| | for done_callback in done_callbacks: |
| | try: |
| | done_callback(self) |
| | except Exception: |
| | _LOGGER.exception(_DONE_CALLBACK_EXCEPTION_LOG_MESSAGE) |
| |
|
| | def cancel(self) -> bool: |
| | with self._condition: |
| | if not self._matured: |
| | self._cancelled = True |
| | self._channel.unsubscribe(self._update) |
| | self._condition.notify_all() |
| | done_callbacks = tuple(self._done_callbacks) |
| | self._done_callbacks = None |
| | else: |
| | return False |
| |
|
| | for done_callback in done_callbacks: |
| | try: |
| | done_callback(self) |
| | except Exception: |
| | _LOGGER.exception(_DONE_CALLBACK_EXCEPTION_LOG_MESSAGE) |
| |
|
| | return True |
| |
|
| | def cancelled(self) -> bool: |
| | with self._condition: |
| | return self._cancelled |
| |
|
| | def running(self) -> bool: |
| | with self._condition: |
| | return not self._cancelled and not self._matured |
| |
|
| | def done(self) -> bool: |
| | with self._condition: |
| | return self._cancelled or self._matured |
| |
|
| | def result(self, timeout: Optional[float] = None) -> None: |
| | self._block(timeout) |
| |
|
| | def exception(self, timeout: Optional[float] = None) -> None: |
| | self._block(timeout) |
| |
|
| | def traceback(self, timeout: Optional[float] = None) -> None: |
| | self._block(timeout) |
| |
|
| | def add_done_callback(self, fn: DoneCallbackType): |
| | with self._condition: |
| | if not self._cancelled and not self._matured: |
| | self._done_callbacks.append(fn) |
| | return |
| |
|
| | fn(self) |
| |
|
| | def start(self): |
| | with self._condition: |
| | self._channel.subscribe(self._update, try_to_connect=True) |
| |
|
| | def __del__(self): |
| | with self._condition: |
| | if not self._cancelled and not self._matured: |
| | self._channel.unsubscribe(self._update) |
| |
|
| |
|
| | def channel_ready_future(channel: grpc.Channel) -> _ChannelReadyFuture: |
| | ready_future = _ChannelReadyFuture(channel) |
| | ready_future.start() |
| | return ready_future |
| |
|
| |
|
| | def first_version_is_lower(version1: str, version2: str) -> bool: |
| | """ |
| | Compares two versions in the format '1.60.1' or '1.60.1.dev0'. |
| | |
| | This method will be used in all stubs generated by grpcio-tools to check whether |
| | the stub version is compatible with the runtime grpcio. |
| | |
| | Args: |
| | version1: The first version string. |
| | version2: The second version string. |
| | |
| | Returns: |
| | True if version1 is lower, False otherwise. |
| | """ |
| | version1_list = version1.split(".") |
| | version2_list = version2.split(".") |
| |
|
| | try: |
| | for i in range(3): |
| | if int(version1_list[i]) < int(version2_list[i]): |
| | return True |
| | if int(version1_list[i]) > int(version2_list[i]): |
| | return False |
| | except ValueError: |
| | |
| | return False |
| |
|
| | |
| | return len(version1_list) < len(version2_list) |
| |
|