import json import httpx import uuid from config.constants import ( APP_LANGUAGE, BASE_URL, APP_NAME, APP_VERSION, DISPLAY_NAME, HADWARE_INFO, SYSTEM_INFO, ) from protos import telemetry_pb2 class TelemetryAPI: def __init__(self, api_key: str): self.api_key = api_key self.client = httpx.AsyncClient(verify=False, timeout=3) async def do_telemetry(self): await self.send_telemetry() await self.send_ping() await self.send_unleash_request() async def send_telemetry(self): telemetry = telemetry_pb2.TelemetryData() # System info system_info = telemetry.system_info system_info.api_key = self.api_key system_info.locale = APP_LANGUAGE system_info.extension_name = APP_NAME system_info.ide_name = APP_NAME system_info.extension_version = APP_VERSION system_info.os = SYSTEM_INFO system_info.ide_version = DISPLAY_NAME system_info.hardware = HADWARE_INFO system_info.session_id = str(uuid.uuid4()) response = await self.client.post( url=f"{BASE_URL}/exa.api_server_pb.ApiServerService/RecordAsyncTelemetry", headers={ "Content-Type": "application/proto", }, content=telemetry.SerializeToString(), ) return response.status_code == 200 async def send_ping(self): url = f"{BASE_URL}/exa.api_server_pb.ApiServerService/Ping" headers = { 'User-Agent': 'connect-go/1.16.2 (go1.23.2 X:nocoverageredesign)', 'Accept-Encoding': 'gzip', 'Connect-Protocol-Version': '1', 'Content-Type': 'application/proto' } try: response = await self.client.post(url, headers=headers) print(f"Ping Status Code: {response.status_code}") except httpx.RequestError as e: print(f"请求发生错误: {e}") async def send_unleash_request(self): url = f"{BASE_URL}/exa.api_server_pb.ApiServerService/GetUnleashContextFields" headers = { 'User-Agent': 'connect-go/1.16.2 (go1.23.2 X:nocoverageredesign)', 'Accept-Encoding': 'gzip', 'Connect-Protocol-Version': '1', 'Content-Type': 'application/proto' } try: response = await self.client.post(url, headers=headers) print(f"GetUnleashContextFields Status Code: {response.status_code}") print(f"地区:") print(f"{response.text}") except httpx.RequestError as e: print(f"请求发生错误: {e}") async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.client.aclose()