File size: 2,954 Bytes
1f1b4db
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import json
import httpx
import uuid
from config.constants import (
    APP_LANGUAGE,
    BASE_URL,
    APP_NAME,
    APP_VERSION,
    DISPLAY_NAME,
    HADWARE_INFO,
    SYSTEM_INFO,
)
from protos import telemetry_pb2


class TelemetryAPI:
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.client = httpx.AsyncClient(verify=False, timeout=3)

    async def do_telemetry(self):
        await self.send_telemetry()
        await self.send_ping()
        await self.send_unleash_request()

    async def send_telemetry(self):
        telemetry = telemetry_pb2.TelemetryData()

        # System info
        system_info = telemetry.system_info
        system_info.api_key = self.api_key
        system_info.locale = APP_LANGUAGE
        system_info.extension_name = APP_NAME
        system_info.ide_name = APP_NAME
        system_info.extension_version = APP_VERSION
        system_info.os = SYSTEM_INFO
        system_info.ide_version = DISPLAY_NAME
        system_info.hardware = HADWARE_INFO

        system_info.session_id = str(uuid.uuid4())

        response = await self.client.post(
            url=f"{BASE_URL}/exa.api_server_pb.ApiServerService/RecordAsyncTelemetry",
            headers={
                "Content-Type": "application/proto",
            },
            content=telemetry.SerializeToString(),
        )

        return response.status_code == 200

    async def send_ping(self):
        url = f"{BASE_URL}/exa.api_server_pb.ApiServerService/Ping"
        
        headers = {
            'User-Agent': 'connect-go/1.16.2 (go1.23.2 X:nocoverageredesign)',
            'Accept-Encoding': 'gzip',
            'Connect-Protocol-Version': '1',
            'Content-Type': 'application/proto'
        }

        try:
            response = await self.client.post(url, headers=headers)
            print(f"Ping Status Code: {response.status_code}")
        except httpx.RequestError as e:
            print(f"请求发生错误: {e}")

    async def send_unleash_request(self):
        url = f"{BASE_URL}/exa.api_server_pb.ApiServerService/GetUnleashContextFields"
        
        headers = {
            'User-Agent': 'connect-go/1.16.2 (go1.23.2 X:nocoverageredesign)',
            'Accept-Encoding': 'gzip',
            'Connect-Protocol-Version': '1',
            'Content-Type': 'application/proto'
        }
        
        try:
            response = await self.client.post(url, headers=headers)
            
            print(f"GetUnleashContextFields Status Code: {response.status_code}")
            print(f"地区:")
            print(f"{response.text}")
            
        except httpx.RequestError as e:
            print(f"请求发生错误: {e}")

    async def __aenter__(self):
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.client.aclose()