| import select | |
| import socket | |
| import sys | |
| import typing | |
| def is_socket_readable(sock: typing.Optional[socket.socket]) -> bool: | |
| """ | |
| Return whether a socket, as identifed by its file descriptor, is readable. | |
| "A socket is readable" means that the read buffer isn't empty, i.e. that calling | |
| .recv() on it would immediately return some data. | |
| """ | |
| # NOTE: we want check for readability without actually attempting to read, because | |
| # we don't want to block forever if it's not readable. | |
| # In the case that the socket no longer exists, or cannot return a file | |
| # descriptor, we treat it as being readable, as if it the next read operation | |
| # on it is ready to return the terminating `b""`. | |
| sock_fd = None if sock is None else sock.fileno() | |
| if sock_fd is None or sock_fd < 0: # pragma: nocover | |
| return True | |
| # The implementation below was stolen from: | |
| # https://github.com/python-trio/trio/blob/20ee2b1b7376db637435d80e266212a35837ddcc/trio/_socket.py#L471-L478 | |
| # See also: https://github.com/encode/httpcore/pull/193#issuecomment-703129316 | |
| # Use select.select on Windows, and when poll is unavailable and select.poll | |
| # everywhere else. (E.g. When eventlet is in use. See #327) | |
| if ( | |
| sys.platform == "win32" or getattr(select, "poll", None) is None | |
| ): # pragma: nocover | |
| rready, _, _ = select.select([sock_fd], [], [], 0) | |
| return bool(rready) | |
| p = select.poll() | |
| p.register(sock_fd, select.POLLIN) | |
| return bool(p.poll(0)) | |