| | from types import SimpleNamespace |
| | from typing import TYPE_CHECKING, Mapping, Optional, Type, TypeVar |
| |
|
| | import attr |
| | from aiosignal import Signal |
| | from multidict import CIMultiDict |
| | from yarl import URL |
| |
|
| | from .client_reqrep import ClientResponse |
| |
|
| | if TYPE_CHECKING: |
| | from .client import ClientSession |
| |
|
| | _ParamT_contra = TypeVar("_ParamT_contra", contravariant=True) |
| | _TracingSignal = Signal[ClientSession, SimpleNamespace, _ParamT_contra] |
| |
|
| |
|
| | __all__ = ( |
| | "TraceConfig", |
| | "TraceRequestStartParams", |
| | "TraceRequestEndParams", |
| | "TraceRequestExceptionParams", |
| | "TraceConnectionQueuedStartParams", |
| | "TraceConnectionQueuedEndParams", |
| | "TraceConnectionCreateStartParams", |
| | "TraceConnectionCreateEndParams", |
| | "TraceConnectionReuseconnParams", |
| | "TraceDnsResolveHostStartParams", |
| | "TraceDnsResolveHostEndParams", |
| | "TraceDnsCacheHitParams", |
| | "TraceDnsCacheMissParams", |
| | "TraceRequestRedirectParams", |
| | "TraceRequestChunkSentParams", |
| | "TraceResponseChunkReceivedParams", |
| | "TraceRequestHeadersSentParams", |
| | ) |
| |
|
| |
|
| | class TraceConfig: |
| | """First-class used to trace requests launched via ClientSession objects.""" |
| |
|
| | def __init__( |
| | self, trace_config_ctx_factory: Type[SimpleNamespace] = SimpleNamespace |
| | ) -> None: |
| | self._on_request_start: _TracingSignal[TraceRequestStartParams] = Signal(self) |
| | self._on_request_chunk_sent: _TracingSignal[TraceRequestChunkSentParams] = ( |
| | Signal(self) |
| | ) |
| | self._on_response_chunk_received: _TracingSignal[ |
| | TraceResponseChunkReceivedParams |
| | ] = Signal(self) |
| | self._on_request_end: _TracingSignal[TraceRequestEndParams] = Signal(self) |
| | self._on_request_exception: _TracingSignal[TraceRequestExceptionParams] = ( |
| | Signal(self) |
| | ) |
| | self._on_request_redirect: _TracingSignal[TraceRequestRedirectParams] = Signal( |
| | self |
| | ) |
| | self._on_connection_queued_start: _TracingSignal[ |
| | TraceConnectionQueuedStartParams |
| | ] = Signal(self) |
| | self._on_connection_queued_end: _TracingSignal[ |
| | TraceConnectionQueuedEndParams |
| | ] = Signal(self) |
| | self._on_connection_create_start: _TracingSignal[ |
| | TraceConnectionCreateStartParams |
| | ] = Signal(self) |
| | self._on_connection_create_end: _TracingSignal[ |
| | TraceConnectionCreateEndParams |
| | ] = Signal(self) |
| | self._on_connection_reuseconn: _TracingSignal[ |
| | TraceConnectionReuseconnParams |
| | ] = Signal(self) |
| | self._on_dns_resolvehost_start: _TracingSignal[ |
| | TraceDnsResolveHostStartParams |
| | ] = Signal(self) |
| | self._on_dns_resolvehost_end: _TracingSignal[TraceDnsResolveHostEndParams] = ( |
| | Signal(self) |
| | ) |
| | self._on_dns_cache_hit: _TracingSignal[TraceDnsCacheHitParams] = Signal(self) |
| | self._on_dns_cache_miss: _TracingSignal[TraceDnsCacheMissParams] = Signal(self) |
| | self._on_request_headers_sent: _TracingSignal[TraceRequestHeadersSentParams] = ( |
| | Signal(self) |
| | ) |
| |
|
| | self._trace_config_ctx_factory = trace_config_ctx_factory |
| |
|
| | def trace_config_ctx( |
| | self, trace_request_ctx: Optional[Mapping[str, str]] = None |
| | ) -> SimpleNamespace: |
| | """Return a new trace_config_ctx instance""" |
| | return self._trace_config_ctx_factory(trace_request_ctx=trace_request_ctx) |
| |
|
| | def freeze(self) -> None: |
| | self._on_request_start.freeze() |
| | self._on_request_chunk_sent.freeze() |
| | self._on_response_chunk_received.freeze() |
| | self._on_request_end.freeze() |
| | self._on_request_exception.freeze() |
| | self._on_request_redirect.freeze() |
| | self._on_connection_queued_start.freeze() |
| | self._on_connection_queued_end.freeze() |
| | self._on_connection_create_start.freeze() |
| | self._on_connection_create_end.freeze() |
| | self._on_connection_reuseconn.freeze() |
| | self._on_dns_resolvehost_start.freeze() |
| | self._on_dns_resolvehost_end.freeze() |
| | self._on_dns_cache_hit.freeze() |
| | self._on_dns_cache_miss.freeze() |
| | self._on_request_headers_sent.freeze() |
| |
|
| | @property |
| | def on_request_start(self) -> "_TracingSignal[TraceRequestStartParams]": |
| | return self._on_request_start |
| |
|
| | @property |
| | def on_request_chunk_sent( |
| | self, |
| | ) -> "_TracingSignal[TraceRequestChunkSentParams]": |
| | return self._on_request_chunk_sent |
| |
|
| | @property |
| | def on_response_chunk_received( |
| | self, |
| | ) -> "_TracingSignal[TraceResponseChunkReceivedParams]": |
| | return self._on_response_chunk_received |
| |
|
| | @property |
| | def on_request_end(self) -> "_TracingSignal[TraceRequestEndParams]": |
| | return self._on_request_end |
| |
|
| | @property |
| | def on_request_exception( |
| | self, |
| | ) -> "_TracingSignal[TraceRequestExceptionParams]": |
| | return self._on_request_exception |
| |
|
| | @property |
| | def on_request_redirect( |
| | self, |
| | ) -> "_TracingSignal[TraceRequestRedirectParams]": |
| | return self._on_request_redirect |
| |
|
| | @property |
| | def on_connection_queued_start( |
| | self, |
| | ) -> "_TracingSignal[TraceConnectionQueuedStartParams]": |
| | return self._on_connection_queued_start |
| |
|
| | @property |
| | def on_connection_queued_end( |
| | self, |
| | ) -> "_TracingSignal[TraceConnectionQueuedEndParams]": |
| | return self._on_connection_queued_end |
| |
|
| | @property |
| | def on_connection_create_start( |
| | self, |
| | ) -> "_TracingSignal[TraceConnectionCreateStartParams]": |
| | return self._on_connection_create_start |
| |
|
| | @property |
| | def on_connection_create_end( |
| | self, |
| | ) -> "_TracingSignal[TraceConnectionCreateEndParams]": |
| | return self._on_connection_create_end |
| |
|
| | @property |
| | def on_connection_reuseconn( |
| | self, |
| | ) -> "_TracingSignal[TraceConnectionReuseconnParams]": |
| | return self._on_connection_reuseconn |
| |
|
| | @property |
| | def on_dns_resolvehost_start( |
| | self, |
| | ) -> "_TracingSignal[TraceDnsResolveHostStartParams]": |
| | return self._on_dns_resolvehost_start |
| |
|
| | @property |
| | def on_dns_resolvehost_end( |
| | self, |
| | ) -> "_TracingSignal[TraceDnsResolveHostEndParams]": |
| | return self._on_dns_resolvehost_end |
| |
|
| | @property |
| | def on_dns_cache_hit(self) -> "_TracingSignal[TraceDnsCacheHitParams]": |
| | return self._on_dns_cache_hit |
| |
|
| | @property |
| | def on_dns_cache_miss(self) -> "_TracingSignal[TraceDnsCacheMissParams]": |
| | return self._on_dns_cache_miss |
| |
|
| | @property |
| | def on_request_headers_sent( |
| | self, |
| | ) -> "_TracingSignal[TraceRequestHeadersSentParams]": |
| | return self._on_request_headers_sent |
| |
|
| |
|
| | @attr.s(auto_attribs=True, frozen=True, slots=True) |
| | class TraceRequestStartParams: |
| | """Parameters sent by the `on_request_start` signal""" |
| |
|
| | method: str |
| | url: URL |
| | headers: "CIMultiDict[str]" |
| |
|
| |
|
| | @attr.s(auto_attribs=True, frozen=True, slots=True) |
| | class TraceRequestChunkSentParams: |
| | """Parameters sent by the `on_request_chunk_sent` signal""" |
| |
|
| | method: str |
| | url: URL |
| | chunk: bytes |
| |
|
| |
|
| | @attr.s(auto_attribs=True, frozen=True, slots=True) |
| | class TraceResponseChunkReceivedParams: |
| | """Parameters sent by the `on_response_chunk_received` signal""" |
| |
|
| | method: str |
| | url: URL |
| | chunk: bytes |
| |
|
| |
|
| | @attr.s(auto_attribs=True, frozen=True, slots=True) |
| | class TraceRequestEndParams: |
| | """Parameters sent by the `on_request_end` signal""" |
| |
|
| | method: str |
| | url: URL |
| | headers: "CIMultiDict[str]" |
| | response: ClientResponse |
| |
|
| |
|
| | @attr.s(auto_attribs=True, frozen=True, slots=True) |
| | class TraceRequestExceptionParams: |
| | """Parameters sent by the `on_request_exception` signal""" |
| |
|
| | method: str |
| | url: URL |
| | headers: "CIMultiDict[str]" |
| | exception: BaseException |
| |
|
| |
|
| | @attr.s(auto_attribs=True, frozen=True, slots=True) |
| | class TraceRequestRedirectParams: |
| | """Parameters sent by the `on_request_redirect` signal""" |
| |
|
| | method: str |
| | url: URL |
| | headers: "CIMultiDict[str]" |
| | response: ClientResponse |
| |
|
| |
|
| | @attr.s(auto_attribs=True, frozen=True, slots=True) |
| | class TraceConnectionQueuedStartParams: |
| | """Parameters sent by the `on_connection_queued_start` signal""" |
| |
|
| |
|
| | @attr.s(auto_attribs=True, frozen=True, slots=True) |
| | class TraceConnectionQueuedEndParams: |
| | """Parameters sent by the `on_connection_queued_end` signal""" |
| |
|
| |
|
| | @attr.s(auto_attribs=True, frozen=True, slots=True) |
| | class TraceConnectionCreateStartParams: |
| | """Parameters sent by the `on_connection_create_start` signal""" |
| |
|
| |
|
| | @attr.s(auto_attribs=True, frozen=True, slots=True) |
| | class TraceConnectionCreateEndParams: |
| | """Parameters sent by the `on_connection_create_end` signal""" |
| |
|
| |
|
| | @attr.s(auto_attribs=True, frozen=True, slots=True) |
| | class TraceConnectionReuseconnParams: |
| | """Parameters sent by the `on_connection_reuseconn` signal""" |
| |
|
| |
|
| | @attr.s(auto_attribs=True, frozen=True, slots=True) |
| | class TraceDnsResolveHostStartParams: |
| | """Parameters sent by the `on_dns_resolvehost_start` signal""" |
| |
|
| | host: str |
| |
|
| |
|
| | @attr.s(auto_attribs=True, frozen=True, slots=True) |
| | class TraceDnsResolveHostEndParams: |
| | """Parameters sent by the `on_dns_resolvehost_end` signal""" |
| |
|
| | host: str |
| |
|
| |
|
| | @attr.s(auto_attribs=True, frozen=True, slots=True) |
| | class TraceDnsCacheHitParams: |
| | """Parameters sent by the `on_dns_cache_hit` signal""" |
| |
|
| | host: str |
| |
|
| |
|
| | @attr.s(auto_attribs=True, frozen=True, slots=True) |
| | class TraceDnsCacheMissParams: |
| | """Parameters sent by the `on_dns_cache_miss` signal""" |
| |
|
| | host: str |
| |
|
| |
|
| | @attr.s(auto_attribs=True, frozen=True, slots=True) |
| | class TraceRequestHeadersSentParams: |
| | """Parameters sent by the `on_request_headers_sent` signal""" |
| |
|
| | method: str |
| | url: URL |
| | headers: "CIMultiDict[str]" |
| |
|
| |
|
| | class Trace: |
| | """Internal dependency holder class. |
| | |
| | Used to keep together the main dependencies used |
| | at the moment of send a signal. |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | session: "ClientSession", |
| | trace_config: TraceConfig, |
| | trace_config_ctx: SimpleNamespace, |
| | ) -> None: |
| | self._trace_config = trace_config |
| | self._trace_config_ctx = trace_config_ctx |
| | self._session = session |
| |
|
| | async def send_request_start( |
| | self, method: str, url: URL, headers: "CIMultiDict[str]" |
| | ) -> None: |
| | return await self._trace_config.on_request_start.send( |
| | self._session, |
| | self._trace_config_ctx, |
| | TraceRequestStartParams(method, url, headers), |
| | ) |
| |
|
| | async def send_request_chunk_sent( |
| | self, method: str, url: URL, chunk: bytes |
| | ) -> None: |
| | return await self._trace_config.on_request_chunk_sent.send( |
| | self._session, |
| | self._trace_config_ctx, |
| | TraceRequestChunkSentParams(method, url, chunk), |
| | ) |
| |
|
| | async def send_response_chunk_received( |
| | self, method: str, url: URL, chunk: bytes |
| | ) -> None: |
| | return await self._trace_config.on_response_chunk_received.send( |
| | self._session, |
| | self._trace_config_ctx, |
| | TraceResponseChunkReceivedParams(method, url, chunk), |
| | ) |
| |
|
| | async def send_request_end( |
| | self, |
| | method: str, |
| | url: URL, |
| | headers: "CIMultiDict[str]", |
| | response: ClientResponse, |
| | ) -> None: |
| | return await self._trace_config.on_request_end.send( |
| | self._session, |
| | self._trace_config_ctx, |
| | TraceRequestEndParams(method, url, headers, response), |
| | ) |
| |
|
| | async def send_request_exception( |
| | self, |
| | method: str, |
| | url: URL, |
| | headers: "CIMultiDict[str]", |
| | exception: BaseException, |
| | ) -> None: |
| | return await self._trace_config.on_request_exception.send( |
| | self._session, |
| | self._trace_config_ctx, |
| | TraceRequestExceptionParams(method, url, headers, exception), |
| | ) |
| |
|
| | async def send_request_redirect( |
| | self, |
| | method: str, |
| | url: URL, |
| | headers: "CIMultiDict[str]", |
| | response: ClientResponse, |
| | ) -> None: |
| | return await self._trace_config._on_request_redirect.send( |
| | self._session, |
| | self._trace_config_ctx, |
| | TraceRequestRedirectParams(method, url, headers, response), |
| | ) |
| |
|
| | async def send_connection_queued_start(self) -> None: |
| | return await self._trace_config.on_connection_queued_start.send( |
| | self._session, self._trace_config_ctx, TraceConnectionQueuedStartParams() |
| | ) |
| |
|
| | async def send_connection_queued_end(self) -> None: |
| | return await self._trace_config.on_connection_queued_end.send( |
| | self._session, self._trace_config_ctx, TraceConnectionQueuedEndParams() |
| | ) |
| |
|
| | async def send_connection_create_start(self) -> None: |
| | return await self._trace_config.on_connection_create_start.send( |
| | self._session, self._trace_config_ctx, TraceConnectionCreateStartParams() |
| | ) |
| |
|
| | async def send_connection_create_end(self) -> None: |
| | return await self._trace_config.on_connection_create_end.send( |
| | self._session, self._trace_config_ctx, TraceConnectionCreateEndParams() |
| | ) |
| |
|
| | async def send_connection_reuseconn(self) -> None: |
| | return await self._trace_config.on_connection_reuseconn.send( |
| | self._session, self._trace_config_ctx, TraceConnectionReuseconnParams() |
| | ) |
| |
|
| | async def send_dns_resolvehost_start(self, host: str) -> None: |
| | return await self._trace_config.on_dns_resolvehost_start.send( |
| | self._session, self._trace_config_ctx, TraceDnsResolveHostStartParams(host) |
| | ) |
| |
|
| | async def send_dns_resolvehost_end(self, host: str) -> None: |
| | return await self._trace_config.on_dns_resolvehost_end.send( |
| | self._session, self._trace_config_ctx, TraceDnsResolveHostEndParams(host) |
| | ) |
| |
|
| | async def send_dns_cache_hit(self, host: str) -> None: |
| | return await self._trace_config.on_dns_cache_hit.send( |
| | self._session, self._trace_config_ctx, TraceDnsCacheHitParams(host) |
| | ) |
| |
|
| | async def send_dns_cache_miss(self, host: str) -> None: |
| | return await self._trace_config.on_dns_cache_miss.send( |
| | self._session, self._trace_config_ctx, TraceDnsCacheMissParams(host) |
| | ) |
| |
|
| | async def send_request_headers( |
| | self, method: str, url: URL, headers: "CIMultiDict[str]" |
| | ) -> None: |
| | return await self._trace_config._on_request_headers_sent.send( |
| | self._session, |
| | self._trace_config_ctx, |
| | TraceRequestHeadersSentParams(method, url, headers), |
| | ) |
| |
|