test / api /telemetry.py
gaoqilan's picture
Upload 103 files
1f1b4db verified
import json
import httpx
import uuid
from config.constants import (
APP_LANGUAGE,
BASE_URL,
APP_NAME,
APP_VERSION,
DISPLAY_NAME,
HADWARE_INFO,
SYSTEM_INFO,
)
from protos import telemetry_pb2
class TelemetryAPI:
def __init__(self, api_key: str):
self.api_key = api_key
self.client = httpx.AsyncClient(verify=False, timeout=3)
async def do_telemetry(self):
await self.send_telemetry()
await self.send_ping()
await self.send_unleash_request()
async def send_telemetry(self):
telemetry = telemetry_pb2.TelemetryData()
# System info
system_info = telemetry.system_info
system_info.api_key = self.api_key
system_info.locale = APP_LANGUAGE
system_info.extension_name = APP_NAME
system_info.ide_name = APP_NAME
system_info.extension_version = APP_VERSION
system_info.os = SYSTEM_INFO
system_info.ide_version = DISPLAY_NAME
system_info.hardware = HADWARE_INFO
system_info.session_id = str(uuid.uuid4())
response = await self.client.post(
url=f"{BASE_URL}/exa.api_server_pb.ApiServerService/RecordAsyncTelemetry",
headers={
"Content-Type": "application/proto",
},
content=telemetry.SerializeToString(),
)
return response.status_code == 200
async def send_ping(self):
url = f"{BASE_URL}/exa.api_server_pb.ApiServerService/Ping"
headers = {
'User-Agent': 'connect-go/1.16.2 (go1.23.2 X:nocoverageredesign)',
'Accept-Encoding': 'gzip',
'Connect-Protocol-Version': '1',
'Content-Type': 'application/proto'
}
try:
response = await self.client.post(url, headers=headers)
print(f"Ping Status Code: {response.status_code}")
except httpx.RequestError as e:
print(f"请求发生错误: {e}")
async def send_unleash_request(self):
url = f"{BASE_URL}/exa.api_server_pb.ApiServerService/GetUnleashContextFields"
headers = {
'User-Agent': 'connect-go/1.16.2 (go1.23.2 X:nocoverageredesign)',
'Accept-Encoding': 'gzip',
'Connect-Protocol-Version': '1',
'Content-Type': 'application/proto'
}
try:
response = await self.client.post(url, headers=headers)
print(f"GetUnleashContextFields Status Code: {response.status_code}")
print(f"地区:")
print(f"{response.text}")
except httpx.RequestError as e:
print(f"请求发生错误: {e}")
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.client.aclose()