|
|
import json
|
|
|
import httpx
|
|
|
import uuid
|
|
|
from config.constants import (
|
|
|
APP_LANGUAGE,
|
|
|
BASE_URL,
|
|
|
APP_NAME,
|
|
|
APP_VERSION,
|
|
|
DISPLAY_NAME,
|
|
|
HADWARE_INFO,
|
|
|
SYSTEM_INFO,
|
|
|
)
|
|
|
from protos import telemetry_pb2
|
|
|
|
|
|
|
|
|
class TelemetryAPI:
|
|
|
def __init__(self, api_key: str):
|
|
|
self.api_key = api_key
|
|
|
self.client = httpx.AsyncClient(verify=False, timeout=3)
|
|
|
|
|
|
async def do_telemetry(self):
|
|
|
await self.send_telemetry()
|
|
|
await self.send_ping()
|
|
|
await self.send_unleash_request()
|
|
|
|
|
|
async def send_telemetry(self):
|
|
|
telemetry = telemetry_pb2.TelemetryData()
|
|
|
|
|
|
|
|
|
system_info = telemetry.system_info
|
|
|
system_info.api_key = self.api_key
|
|
|
system_info.locale = APP_LANGUAGE
|
|
|
system_info.extension_name = APP_NAME
|
|
|
system_info.ide_name = APP_NAME
|
|
|
system_info.extension_version = APP_VERSION
|
|
|
system_info.os = SYSTEM_INFO
|
|
|
system_info.ide_version = DISPLAY_NAME
|
|
|
system_info.hardware = HADWARE_INFO
|
|
|
|
|
|
system_info.session_id = str(uuid.uuid4())
|
|
|
|
|
|
response = await self.client.post(
|
|
|
url=f"{BASE_URL}/exa.api_server_pb.ApiServerService/RecordAsyncTelemetry",
|
|
|
headers={
|
|
|
"Content-Type": "application/proto",
|
|
|
},
|
|
|
content=telemetry.SerializeToString(),
|
|
|
)
|
|
|
|
|
|
return response.status_code == 200
|
|
|
|
|
|
async def send_ping(self):
|
|
|
url = f"{BASE_URL}/exa.api_server_pb.ApiServerService/Ping"
|
|
|
|
|
|
headers = {
|
|
|
'User-Agent': 'connect-go/1.16.2 (go1.23.2 X:nocoverageredesign)',
|
|
|
'Accept-Encoding': 'gzip',
|
|
|
'Connect-Protocol-Version': '1',
|
|
|
'Content-Type': 'application/proto'
|
|
|
}
|
|
|
|
|
|
try:
|
|
|
response = await self.client.post(url, headers=headers)
|
|
|
print(f"Ping Status Code: {response.status_code}")
|
|
|
except httpx.RequestError as e:
|
|
|
print(f"请求发生错误: {e}")
|
|
|
|
|
|
async def send_unleash_request(self):
|
|
|
url = f"{BASE_URL}/exa.api_server_pb.ApiServerService/GetUnleashContextFields"
|
|
|
|
|
|
headers = {
|
|
|
'User-Agent': 'connect-go/1.16.2 (go1.23.2 X:nocoverageredesign)',
|
|
|
'Accept-Encoding': 'gzip',
|
|
|
'Connect-Protocol-Version': '1',
|
|
|
'Content-Type': 'application/proto'
|
|
|
}
|
|
|
|
|
|
try:
|
|
|
response = await self.client.post(url, headers=headers)
|
|
|
|
|
|
print(f"GetUnleashContextFields Status Code: {response.status_code}")
|
|
|
print(f"地区:")
|
|
|
print(f"{response.text}")
|
|
|
|
|
|
except httpx.RequestError as e:
|
|
|
print(f"请求发生错误: {e}")
|
|
|
|
|
|
async def __aenter__(self):
|
|
|
return self
|
|
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
|
await self.client.aclose() |