File size: 14,536 Bytes
494c89b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
"""
Machine ID Service - управление Machine ID
"""

import json
import os
import sqlite3
import hashlib
import uuid
import platform
from pathlib import Path
from datetime import datetime
from typing import Optional, Dict, Any
from dataclasses import dataclass

import sys
from pathlib import Path as SysPath
sys.path.insert(0, str(SysPath(__file__).parent.parent))

from core.paths import get_paths
from core.config import get_config
from core.exceptions import MachineIdError, KiroNotInstalledError, KiroRunningError
from core.process_utils import is_kiro_running

# Windows-specific
if platform.system() == 'Windows':
    try:
        import winreg
    except ImportError:
        winreg = None


@dataclass
class TelemetryInfo:
    """Информация о Kiro telemetry IDs"""
    machine_id: Optional[str] = None
    sqm_id: Optional[str] = None
    dev_device_id: Optional[str] = None
    service_machine_id: Optional[str] = None
    kiro_installed: bool = False


@dataclass
class SystemMachineInfo:
    """Информация о системном Machine ID"""
    machine_guid: Optional[str] = None
    os_type: str = ""
    can_modify: bool = False
    requires_admin: bool = True
    backup_exists: bool = False
    backup_time: Optional[str] = None


class MachineIdService:
    """Сервис для управления Machine ID"""
    
    def __init__(self):
        self.paths = get_paths()
        self.config = get_config()
        self.os_type = platform.system().lower()
    
    # =========================================================================
    # Kiro Telemetry IDs
    # =========================================================================
    
    def get_telemetry_info(self) -> TelemetryInfo:
        """Получить все Kiro telemetry IDs"""
        info = TelemetryInfo(kiro_installed=self.paths.is_kiro_installed())
        
        if not info.kiro_installed:
            return info
        
        # Читаем из storage.json
        if self.paths.kiro_storage_json and self.paths.kiro_storage_json.exists():
            try:
                data = json.loads(self.paths.kiro_storage_json.read_text())
                info.machine_id = data.get('telemetry.machineId')
                info.sqm_id = data.get('telemetry.sqmId')
                info.dev_device_id = data.get('telemetry.devDeviceId')
            except Exception:
                pass
        
        # Читаем serviceMachineId из state.vscdb
        if self.paths.kiro_state_db and self.paths.kiro_state_db.exists():
            try:
                conn = sqlite3.connect(str(self.paths.kiro_state_db))
                cursor = conn.cursor()
                cursor.execute(
                    "SELECT value FROM ItemTable WHERE key = 'storage.serviceMachineId'"
                )
                row = cursor.fetchone()
                if row:
                    info.service_machine_id = row[0]
                conn.close()
            except Exception:
                pass
        
        return info
    
    def backup_telemetry(self) -> Path:
        """Создать бэкап Kiro telemetry IDs"""
        info = self.get_telemetry_info()
        
        if not info.kiro_installed:
            raise KiroNotInstalledError("Kiro is not installed")
        
        backup_data = {
            'machineId': info.machine_id,
            'sqmId': info.sqm_id,
            'devDeviceId': info.dev_device_id,
            'serviceMachineId': info.service_machine_id,
            'backupTime': datetime.now().isoformat(),
            'osType': self.os_type
        }
        
        backup_file = self.paths.get_backup_file('kiro-telemetry')
        backup_file.write_text(json.dumps(backup_data, indent=2))
        
        return backup_file
    
    def reset_telemetry(self, check_running: bool = True) -> TelemetryInfo:
        """
        Сбросить все Kiro telemetry IDs
        
        Args:
            check_running: Проверять запущен ли Kiro
        
        Returns:
            TelemetryInfo с новыми ID
        
        Raises:
            KiroRunningError: если Kiro запущен
            KiroNotInstalledError: если Kiro не установлен
        """
        if not self.paths.is_kiro_installed():
            raise KiroNotInstalledError("Kiro is not installed")
        
        if check_running and is_kiro_running():
            raise KiroRunningError("Kiro is running. Please close it first.")
        
        # Бэкапим если настроено
        if self.config.machine_id.backup_before_reset:
            self.backup_telemetry()
        
        # Генерируем новые ID
        new_ids = TelemetryInfo(
            machine_id=self._generate_machine_id(),
            sqm_id=self._generate_sqm_id(),
            dev_device_id=self._generate_dev_device_id(),
            service_machine_id=self._generate_machine_id(),
            kiro_installed=True
        )
        
        # Обновляем storage.json
        if self.paths.kiro_storage_json.exists():
            data = json.loads(self.paths.kiro_storage_json.read_text())
            data['telemetry.machineId'] = new_ids.machine_id
            data['telemetry.sqmId'] = new_ids.sqm_id
            data['telemetry.devDeviceId'] = new_ids.dev_device_id
            self.paths.kiro_storage_json.write_text(json.dumps(data, indent=2))
        
        # Обновляем state.vscdb
        if self.paths.kiro_state_db.exists():
            try:
                conn = sqlite3.connect(str(self.paths.kiro_state_db))
                cursor = conn.cursor()
                cursor.execute(
                    "UPDATE ItemTable SET value = ? WHERE key = 'storage.serviceMachineId'",
                    (new_ids.service_machine_id,)
                )
                conn.commit()
                conn.close()
            except Exception:
                pass
        
        return new_ids
    
    def restore_telemetry(self, backup_file: Path = None) -> bool:
        """Восстановить telemetry из бэкапа"""
        if backup_file is None:
            backups = self.paths.list_backups('kiro-telemetry')
            if not backups:
                raise MachineIdError("No backup found")
            backup_file = backups[0]
        
        if not backup_file.exists():
            raise MachineIdError(f"Backup file not found: {backup_file}")
        
        data = json.loads(backup_file.read_text())
        
        # Восстанавливаем storage.json
        if self.paths.kiro_storage_json.exists():
            storage = json.loads(self.paths.kiro_storage_json.read_text())
            if data.get('machineId'):
                storage['telemetry.machineId'] = data['machineId']
            if data.get('sqmId'):
                storage['telemetry.sqmId'] = data['sqmId']
            if data.get('devDeviceId'):
                storage['telemetry.devDeviceId'] = data['devDeviceId']
            self.paths.kiro_storage_json.write_text(json.dumps(storage, indent=2))
        
        # Восстанавливаем state.vscdb
        if self.paths.kiro_state_db.exists() and data.get('serviceMachineId'):
            try:
                conn = sqlite3.connect(str(self.paths.kiro_state_db))
                cursor = conn.cursor()
                cursor.execute(
                    "UPDATE ItemTable SET value = ? WHERE key = 'storage.serviceMachineId'",
                    (data['serviceMachineId'],)
                )
                conn.commit()
                conn.close()
            except Exception:
                pass
        
        return True
    
    # =========================================================================
    # System Machine GUID (Windows)
    # =========================================================================
    
    def get_system_machine_info(self) -> SystemMachineInfo:
        """Получить информацию о системном Machine ID"""
        info = SystemMachineInfo(os_type=self.os_type)
        
        if self.os_type == 'windows' and winreg:
            try:
                key = winreg.OpenKey(
                    winreg.HKEY_LOCAL_MACHINE,
                    r"SOFTWARE\Microsoft\Cryptography",
                    0, winreg.KEY_READ
                )
                value, _ = winreg.QueryValueEx(key, "MachineGuid")
                winreg.CloseKey(key)
                info.machine_guid = value
                info.can_modify = True
                info.requires_admin = True
            except Exception:
                pass
        
        # Проверяем бэкап
        backup_file = self.paths.backups_dir / 'machine-guid-backup.json'
        if backup_file.exists():
            info.backup_exists = True
            try:
                data = json.loads(backup_file.read_text())
                info.backup_time = data.get('backupTime')
            except:
                pass
        
        return info
    
    def backup_system_machine_guid(self) -> Optional[Path]:
        """Бэкап системного MachineGuid"""
        if self.os_type != 'windows':
            return None
        
        info = self.get_system_machine_info()
        if not info.machine_guid:
            return None
        
        backup_data = {
            'machineGuid': info.machine_guid,
            'backupTime': datetime.now().isoformat(),
            'computerName': os.environ.get('COMPUTERNAME', 'Unknown'),
            'osType': self.os_type
        }
        
        backup_file = self.paths.backups_dir / 'machine-guid-backup.json'
        backup_file.write_text(json.dumps(backup_data, indent=2))
        
        return backup_file
    
    def reset_system_machine_guid(self) -> Optional[str]:
        """
        Сбросить системный MachineGuid (требует админ прав)
        
        Returns:
            Новый GUID или None при ошибке
        """
        if self.os_type != 'windows' or not winreg:
            raise MachineIdError("This feature is only available on Windows")
        
        # Бэкапим
        self.backup_system_machine_guid()
        
        new_guid = str(uuid.uuid4()).upper()
        
        try:
            key = winreg.OpenKey(
                winreg.HKEY_LOCAL_MACHINE,
                r"SOFTWARE\Microsoft\Cryptography",
                0, winreg.KEY_SET_VALUE
            )
            winreg.SetValueEx(key, "MachineGuid", 0, winreg.REG_SZ, new_guid)
            winreg.CloseKey(key)
            return new_guid
        except PermissionError:
            raise MachineIdError("Administrator privileges required")
        except Exception as e:
            raise MachineIdError(f"Failed to reset MachineGuid: {e}")
    
    def restore_system_machine_guid(self) -> bool:
        """Восстановить системный MachineGuid из бэкапа"""
        if self.os_type != 'windows' or not winreg:
            raise MachineIdError("This feature is only available on Windows")
        
        backup_file = self.paths.backups_dir / 'machine-guid-backup.json'
        if not backup_file.exists():
            raise MachineIdError("No backup found")
        
        data = json.loads(backup_file.read_text())
        machine_guid = data.get('machineGuid')
        
        if not machine_guid:
            raise MachineIdError("Invalid backup file")
        
        try:
            key = winreg.OpenKey(
                winreg.HKEY_LOCAL_MACHINE,
                r"SOFTWARE\Microsoft\Cryptography",
                0, winreg.KEY_SET_VALUE
            )
            winreg.SetValueEx(key, "MachineGuid", 0, winreg.REG_SZ, machine_guid)
            winreg.CloseKey(key)
            return True
        except PermissionError:
            raise MachineIdError("Administrator privileges required")
        except Exception as e:
            raise MachineIdError(f"Failed to restore MachineGuid: {e}")
    
    # =========================================================================
    # Full Reset
    # =========================================================================
    
    def full_reset(self, reset_system: bool = False, check_running: bool = True) -> Dict[str, Any]:
        """
        Полный сброс всех ID
        
        Args:
            reset_system: Также сбросить системный MachineGuid
            check_running: Проверять запущен ли Kiro
        
        Returns:
            dict с результатами
        """
        results = {
            'kiro_reset': False,
            'system_reset': False,
            'new_telemetry': None,
            'new_system_guid': None,
            'errors': []
        }
        
        # Сбрасываем Kiro telemetry
        try:
            new_telemetry = self.reset_telemetry(check_running=check_running)
            results['kiro_reset'] = True
            results['new_telemetry'] = new_telemetry
        except Exception as e:
            results['errors'].append(f"Kiro telemetry: {e}")
        
        # Сбрасываем системный MachineGuid
        if reset_system and self.os_type == 'windows':
            try:
                new_guid = self.reset_system_machine_guid()
                results['system_reset'] = True
                results['new_system_guid'] = new_guid
            except Exception as e:
                results['errors'].append(f"System MachineGuid: {e}")
        
        return results
    
    # =========================================================================
    # Helpers
    # =========================================================================
    
    def _generate_machine_id(self) -> str:
        """Генерирует machineId (64-символьный hex)"""
        random_bytes = os.urandom(32)
        timestamp = datetime.now().timestamp()
        
        hasher = hashlib.sha256()
        hasher.update(random_bytes)
        hasher.update(str(timestamp).encode())
        
        return hasher.hexdigest()
    
    def _generate_sqm_id(self) -> str:
        """Генерирует sqmId (GUID в фигурных скобках)"""
        return '{' + str(uuid.uuid4()).upper() + '}'
    
    def _generate_dev_device_id(self) -> str:
        """Генерирует devDeviceId (UUID)"""
        return str(uuid.uuid4())