Spaces:
Sleeping
Sleeping
| """ | |
| Machine ID Service - управление Machine ID | |
| """ | |
| import json | |
| import os | |
| import sqlite3 | |
| import hashlib | |
| import uuid | |
| import platform | |
| from pathlib import Path | |
| from datetime import datetime | |
| from typing import Optional, Dict, Any | |
| from dataclasses import dataclass | |
| import sys | |
| from pathlib import Path as SysPath | |
| sys.path.insert(0, str(SysPath(__file__).parent.parent)) | |
| from core.paths import get_paths | |
| from core.config import get_config | |
| from core.exceptions import MachineIdError, KiroNotInstalledError, KiroRunningError | |
| from core.process_utils import is_kiro_running | |
| # Windows-specific | |
| if platform.system() == 'Windows': | |
| try: | |
| import winreg | |
| except ImportError: | |
| winreg = None | |
| class TelemetryInfo: | |
| """Информация о Kiro telemetry IDs""" | |
| machine_id: Optional[str] = None | |
| sqm_id: Optional[str] = None | |
| dev_device_id: Optional[str] = None | |
| service_machine_id: Optional[str] = None | |
| kiro_installed: bool = False | |
| class SystemMachineInfo: | |
| """Информация о системном Machine ID""" | |
| machine_guid: Optional[str] = None | |
| os_type: str = "" | |
| can_modify: bool = False | |
| requires_admin: bool = True | |
| backup_exists: bool = False | |
| backup_time: Optional[str] = None | |
| class MachineIdService: | |
| """Сервис для управления Machine ID""" | |
| def __init__(self): | |
| self.paths = get_paths() | |
| self.config = get_config() | |
| self.os_type = platform.system().lower() | |
| # ========================================================================= | |
| # Kiro Telemetry IDs | |
| # ========================================================================= | |
| def get_telemetry_info(self) -> TelemetryInfo: | |
| """Получить все Kiro telemetry IDs""" | |
| info = TelemetryInfo(kiro_installed=self.paths.is_kiro_installed()) | |
| if not info.kiro_installed: | |
| return info | |
| # Читаем из storage.json | |
| if self.paths.kiro_storage_json and self.paths.kiro_storage_json.exists(): | |
| try: | |
| data = json.loads(self.paths.kiro_storage_json.read_text()) | |
| info.machine_id = data.get('telemetry.machineId') | |
| info.sqm_id = data.get('telemetry.sqmId') | |
| info.dev_device_id = data.get('telemetry.devDeviceId') | |
| except Exception: | |
| pass | |
| # Читаем serviceMachineId из state.vscdb | |
| if self.paths.kiro_state_db and self.paths.kiro_state_db.exists(): | |
| try: | |
| conn = sqlite3.connect(str(self.paths.kiro_state_db)) | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| "SELECT value FROM ItemTable WHERE key = 'storage.serviceMachineId'" | |
| ) | |
| row = cursor.fetchone() | |
| if row: | |
| info.service_machine_id = row[0] | |
| conn.close() | |
| except Exception: | |
| pass | |
| return info | |
| def backup_telemetry(self) -> Path: | |
| """Создать бэкап Kiro telemetry IDs""" | |
| info = self.get_telemetry_info() | |
| if not info.kiro_installed: | |
| raise KiroNotInstalledError("Kiro is not installed") | |
| backup_data = { | |
| 'machineId': info.machine_id, | |
| 'sqmId': info.sqm_id, | |
| 'devDeviceId': info.dev_device_id, | |
| 'serviceMachineId': info.service_machine_id, | |
| 'backupTime': datetime.now().isoformat(), | |
| 'osType': self.os_type | |
| } | |
| backup_file = self.paths.get_backup_file('kiro-telemetry') | |
| backup_file.write_text(json.dumps(backup_data, indent=2)) | |
| return backup_file | |
| def reset_telemetry(self, check_running: bool = True) -> TelemetryInfo: | |
| """ | |
| Сбросить все Kiro telemetry IDs | |
| Args: | |
| check_running: Проверять запущен ли Kiro | |
| Returns: | |
| TelemetryInfo с новыми ID | |
| Raises: | |
| KiroRunningError: если Kiro запущен | |
| KiroNotInstalledError: если Kiro не установлен | |
| """ | |
| if not self.paths.is_kiro_installed(): | |
| raise KiroNotInstalledError("Kiro is not installed") | |
| if check_running and is_kiro_running(): | |
| raise KiroRunningError("Kiro is running. Please close it first.") | |
| # Бэкапим если настроено | |
| if self.config.machine_id.backup_before_reset: | |
| self.backup_telemetry() | |
| # Генерируем новые ID | |
| new_ids = TelemetryInfo( | |
| machine_id=self._generate_machine_id(), | |
| sqm_id=self._generate_sqm_id(), | |
| dev_device_id=self._generate_dev_device_id(), | |
| service_machine_id=self._generate_machine_id(), | |
| kiro_installed=True | |
| ) | |
| # Обновляем storage.json | |
| if self.paths.kiro_storage_json.exists(): | |
| data = json.loads(self.paths.kiro_storage_json.read_text()) | |
| data['telemetry.machineId'] = new_ids.machine_id | |
| data['telemetry.sqmId'] = new_ids.sqm_id | |
| data['telemetry.devDeviceId'] = new_ids.dev_device_id | |
| self.paths.kiro_storage_json.write_text(json.dumps(data, indent=2)) | |
| # Обновляем state.vscdb | |
| if self.paths.kiro_state_db.exists(): | |
| try: | |
| conn = sqlite3.connect(str(self.paths.kiro_state_db)) | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| "UPDATE ItemTable SET value = ? WHERE key = 'storage.serviceMachineId'", | |
| (new_ids.service_machine_id,) | |
| ) | |
| conn.commit() | |
| conn.close() | |
| except Exception: | |
| pass | |
| return new_ids | |
| def restore_telemetry(self, backup_file: Path = None) -> bool: | |
| """Восстановить telemetry из бэкапа""" | |
| if backup_file is None: | |
| backups = self.paths.list_backups('kiro-telemetry') | |
| if not backups: | |
| raise MachineIdError("No backup found") | |
| backup_file = backups[0] | |
| if not backup_file.exists(): | |
| raise MachineIdError(f"Backup file not found: {backup_file}") | |
| data = json.loads(backup_file.read_text()) | |
| # Восстанавливаем storage.json | |
| if self.paths.kiro_storage_json.exists(): | |
| storage = json.loads(self.paths.kiro_storage_json.read_text()) | |
| if data.get('machineId'): | |
| storage['telemetry.machineId'] = data['machineId'] | |
| if data.get('sqmId'): | |
| storage['telemetry.sqmId'] = data['sqmId'] | |
| if data.get('devDeviceId'): | |
| storage['telemetry.devDeviceId'] = data['devDeviceId'] | |
| self.paths.kiro_storage_json.write_text(json.dumps(storage, indent=2)) | |
| # Восстанавливаем state.vscdb | |
| if self.paths.kiro_state_db.exists() and data.get('serviceMachineId'): | |
| try: | |
| conn = sqlite3.connect(str(self.paths.kiro_state_db)) | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| "UPDATE ItemTable SET value = ? WHERE key = 'storage.serviceMachineId'", | |
| (data['serviceMachineId'],) | |
| ) | |
| conn.commit() | |
| conn.close() | |
| except Exception: | |
| pass | |
| return True | |
| # ========================================================================= | |
| # System Machine GUID (Windows) | |
| # ========================================================================= | |
| def get_system_machine_info(self) -> SystemMachineInfo: | |
| """Получить информацию о системном Machine ID""" | |
| info = SystemMachineInfo(os_type=self.os_type) | |
| if self.os_type == 'windows' and winreg: | |
| try: | |
| key = winreg.OpenKey( | |
| winreg.HKEY_LOCAL_MACHINE, | |
| r"SOFTWARE\Microsoft\Cryptography", | |
| 0, winreg.KEY_READ | |
| ) | |
| value, _ = winreg.QueryValueEx(key, "MachineGuid") | |
| winreg.CloseKey(key) | |
| info.machine_guid = value | |
| info.can_modify = True | |
| info.requires_admin = True | |
| except Exception: | |
| pass | |
| # Проверяем бэкап | |
| backup_file = self.paths.backups_dir / 'machine-guid-backup.json' | |
| if backup_file.exists(): | |
| info.backup_exists = True | |
| try: | |
| data = json.loads(backup_file.read_text()) | |
| info.backup_time = data.get('backupTime') | |
| except: | |
| pass | |
| return info | |
| def backup_system_machine_guid(self) -> Optional[Path]: | |
| """Бэкап системного MachineGuid""" | |
| if self.os_type != 'windows': | |
| return None | |
| info = self.get_system_machine_info() | |
| if not info.machine_guid: | |
| return None | |
| backup_data = { | |
| 'machineGuid': info.machine_guid, | |
| 'backupTime': datetime.now().isoformat(), | |
| 'computerName': os.environ.get('COMPUTERNAME', 'Unknown'), | |
| 'osType': self.os_type | |
| } | |
| backup_file = self.paths.backups_dir / 'machine-guid-backup.json' | |
| backup_file.write_text(json.dumps(backup_data, indent=2)) | |
| return backup_file | |
| def reset_system_machine_guid(self) -> Optional[str]: | |
| """ | |
| Сбросить системный MachineGuid (требует админ прав) | |
| Returns: | |
| Новый GUID или None при ошибке | |
| """ | |
| if self.os_type != 'windows' or not winreg: | |
| raise MachineIdError("This feature is only available on Windows") | |
| # Бэкапим | |
| self.backup_system_machine_guid() | |
| new_guid = str(uuid.uuid4()).upper() | |
| try: | |
| key = winreg.OpenKey( | |
| winreg.HKEY_LOCAL_MACHINE, | |
| r"SOFTWARE\Microsoft\Cryptography", | |
| 0, winreg.KEY_SET_VALUE | |
| ) | |
| winreg.SetValueEx(key, "MachineGuid", 0, winreg.REG_SZ, new_guid) | |
| winreg.CloseKey(key) | |
| return new_guid | |
| except PermissionError: | |
| raise MachineIdError("Administrator privileges required") | |
| except Exception as e: | |
| raise MachineIdError(f"Failed to reset MachineGuid: {e}") | |
| def restore_system_machine_guid(self) -> bool: | |
| """Восстановить системный MachineGuid из бэкапа""" | |
| if self.os_type != 'windows' or not winreg: | |
| raise MachineIdError("This feature is only available on Windows") | |
| backup_file = self.paths.backups_dir / 'machine-guid-backup.json' | |
| if not backup_file.exists(): | |
| raise MachineIdError("No backup found") | |
| data = json.loads(backup_file.read_text()) | |
| machine_guid = data.get('machineGuid') | |
| if not machine_guid: | |
| raise MachineIdError("Invalid backup file") | |
| try: | |
| key = winreg.OpenKey( | |
| winreg.HKEY_LOCAL_MACHINE, | |
| r"SOFTWARE\Microsoft\Cryptography", | |
| 0, winreg.KEY_SET_VALUE | |
| ) | |
| winreg.SetValueEx(key, "MachineGuid", 0, winreg.REG_SZ, machine_guid) | |
| winreg.CloseKey(key) | |
| return True | |
| except PermissionError: | |
| raise MachineIdError("Administrator privileges required") | |
| except Exception as e: | |
| raise MachineIdError(f"Failed to restore MachineGuid: {e}") | |
| # ========================================================================= | |
| # Full Reset | |
| # ========================================================================= | |
| def full_reset(self, reset_system: bool = False, check_running: bool = True) -> Dict[str, Any]: | |
| """ | |
| Полный сброс всех ID | |
| Args: | |
| reset_system: Также сбросить системный MachineGuid | |
| check_running: Проверять запущен ли Kiro | |
| Returns: | |
| dict с результатами | |
| """ | |
| results = { | |
| 'kiro_reset': False, | |
| 'system_reset': False, | |
| 'new_telemetry': None, | |
| 'new_system_guid': None, | |
| 'errors': [] | |
| } | |
| # Сбрасываем Kiro telemetry | |
| try: | |
| new_telemetry = self.reset_telemetry(check_running=check_running) | |
| results['kiro_reset'] = True | |
| results['new_telemetry'] = new_telemetry | |
| except Exception as e: | |
| results['errors'].append(f"Kiro telemetry: {e}") | |
| # Сбрасываем системный MachineGuid | |
| if reset_system and self.os_type == 'windows': | |
| try: | |
| new_guid = self.reset_system_machine_guid() | |
| results['system_reset'] = True | |
| results['new_system_guid'] = new_guid | |
| except Exception as e: | |
| results['errors'].append(f"System MachineGuid: {e}") | |
| return results | |
| # ========================================================================= | |
| # Helpers | |
| # ========================================================================= | |
| def _generate_machine_id(self) -> str: | |
| """Генерирует machineId (64-символьный hex)""" | |
| random_bytes = os.urandom(32) | |
| timestamp = datetime.now().timestamp() | |
| hasher = hashlib.sha256() | |
| hasher.update(random_bytes) | |
| hasher.update(str(timestamp).encode()) | |
| return hasher.hexdigest() | |
| def _generate_sqm_id(self) -> str: | |
| """Генерирует sqmId (GUID в фигурных скобках)""" | |
| return '{' + str(uuid.uuid4()).upper() + '}' | |
| def _generate_dev_device_id(self) -> str: | |
| """Генерирует devDeviceId (UUID)""" | |
| return str(uuid.uuid4()) | |