| from __future__ import annotations |
|
|
| import orjson |
| from aiohttp import ClientSession, ClientTimeout |
| from loguru import logger |
|
|
| from bot.analytics.types import AbstractAnalyticsLogger, BaseEvent |
|
|
| AMPLITUDE_ENDPOINT = "https://api2.amplitude.com/2/httpapi" |
|
|
|
|
| class AmplitudeTelegramLogger(AbstractAnalyticsLogger): |
| def __init__(self, api_token: str, base_url: str = AMPLITUDE_ENDPOINT) -> None: |
| self._api_token: str = api_token |
| self._base_url: str = base_url |
| self._headers = {"Content-Type": "application/json", "Accept": "*/*"} |
| self._timeout = ClientTimeout(total=15) |
| self.SUCCESS_STATUS_CODE = 200 |
|
|
| async def _send_request( |
| self, |
| event: BaseEvent, |
| ) -> None: |
| """Implementation of interaction with Amplitude API.""" |
| data = {"api_key": self._api_token, "events": [event.to_dict()]} |
|
|
| async with ( |
| ClientSession() as session, |
| session.post( |
| self._base_url, |
| headers=self._headers, |
| data=orjson.dumps(data), |
| timeout=self._timeout, |
| ) as response, |
| ): |
| json_response = await response.json(content_type="application/json") |
|
|
| self._validate_response(json_response) |
|
|
| def _validate_response(self, response: dict[str, str | int]) -> None: |
| """Validate response.""" |
| if response.get("code") != self.SUCCESS_STATUS_CODE: |
| error = response.get("error") |
| code = response.get("code") |
|
|
| logger.error(f"get error from amplitude api | error: {error} | code: {code}") |
| msg = f"Error in amplitude api call | error: {error} | code: {code}" |
| raise ValueError(msg) |
|
|
| logger.info(f"successfully send to Amplitude | server_upload_time: {response['server_upload_time']}") |
|
|
| async def log_event( |
| self, |
| event: BaseEvent, |
| ) -> None: |
| """Use this method to sends event to Amplitude.""" |
| await self._send_request(event) |
|
|