KManager / services /machine_id_service.py
StarrySkyWorld's picture
Initial commit
494c89b
"""
Machine ID Service - управление Machine ID
"""
import json
import os
import sqlite3
import hashlib
import uuid
import platform
from pathlib import Path
from datetime import datetime
from typing import Optional, Dict, Any
from dataclasses import dataclass
import sys
from pathlib import Path as SysPath
sys.path.insert(0, str(SysPath(__file__).parent.parent))
from core.paths import get_paths
from core.config import get_config
from core.exceptions import MachineIdError, KiroNotInstalledError, KiroRunningError
from core.process_utils import is_kiro_running
# Windows-specific
if platform.system() == 'Windows':
try:
import winreg
except ImportError:
winreg = None
@dataclass
class TelemetryInfo:
"""Информация о Kiro telemetry IDs"""
machine_id: Optional[str] = None
sqm_id: Optional[str] = None
dev_device_id: Optional[str] = None
service_machine_id: Optional[str] = None
kiro_installed: bool = False
@dataclass
class SystemMachineInfo:
"""Информация о системном Machine ID"""
machine_guid: Optional[str] = None
os_type: str = ""
can_modify: bool = False
requires_admin: bool = True
backup_exists: bool = False
backup_time: Optional[str] = None
class MachineIdService:
"""Сервис для управления Machine ID"""
def __init__(self):
self.paths = get_paths()
self.config = get_config()
self.os_type = platform.system().lower()
# =========================================================================
# Kiro Telemetry IDs
# =========================================================================
def get_telemetry_info(self) -> TelemetryInfo:
"""Получить все Kiro telemetry IDs"""
info = TelemetryInfo(kiro_installed=self.paths.is_kiro_installed())
if not info.kiro_installed:
return info
# Читаем из storage.json
if self.paths.kiro_storage_json and self.paths.kiro_storage_json.exists():
try:
data = json.loads(self.paths.kiro_storage_json.read_text())
info.machine_id = data.get('telemetry.machineId')
info.sqm_id = data.get('telemetry.sqmId')
info.dev_device_id = data.get('telemetry.devDeviceId')
except Exception:
pass
# Читаем serviceMachineId из state.vscdb
if self.paths.kiro_state_db and self.paths.kiro_state_db.exists():
try:
conn = sqlite3.connect(str(self.paths.kiro_state_db))
cursor = conn.cursor()
cursor.execute(
"SELECT value FROM ItemTable WHERE key = 'storage.serviceMachineId'"
)
row = cursor.fetchone()
if row:
info.service_machine_id = row[0]
conn.close()
except Exception:
pass
return info
def backup_telemetry(self) -> Path:
"""Создать бэкап Kiro telemetry IDs"""
info = self.get_telemetry_info()
if not info.kiro_installed:
raise KiroNotInstalledError("Kiro is not installed")
backup_data = {
'machineId': info.machine_id,
'sqmId': info.sqm_id,
'devDeviceId': info.dev_device_id,
'serviceMachineId': info.service_machine_id,
'backupTime': datetime.now().isoformat(),
'osType': self.os_type
}
backup_file = self.paths.get_backup_file('kiro-telemetry')
backup_file.write_text(json.dumps(backup_data, indent=2))
return backup_file
def reset_telemetry(self, check_running: bool = True) -> TelemetryInfo:
"""
Сбросить все Kiro telemetry IDs
Args:
check_running: Проверять запущен ли Kiro
Returns:
TelemetryInfo с новыми ID
Raises:
KiroRunningError: если Kiro запущен
KiroNotInstalledError: если Kiro не установлен
"""
if not self.paths.is_kiro_installed():
raise KiroNotInstalledError("Kiro is not installed")
if check_running and is_kiro_running():
raise KiroRunningError("Kiro is running. Please close it first.")
# Бэкапим если настроено
if self.config.machine_id.backup_before_reset:
self.backup_telemetry()
# Генерируем новые ID
new_ids = TelemetryInfo(
machine_id=self._generate_machine_id(),
sqm_id=self._generate_sqm_id(),
dev_device_id=self._generate_dev_device_id(),
service_machine_id=self._generate_machine_id(),
kiro_installed=True
)
# Обновляем storage.json
if self.paths.kiro_storage_json.exists():
data = json.loads(self.paths.kiro_storage_json.read_text())
data['telemetry.machineId'] = new_ids.machine_id
data['telemetry.sqmId'] = new_ids.sqm_id
data['telemetry.devDeviceId'] = new_ids.dev_device_id
self.paths.kiro_storage_json.write_text(json.dumps(data, indent=2))
# Обновляем state.vscdb
if self.paths.kiro_state_db.exists():
try:
conn = sqlite3.connect(str(self.paths.kiro_state_db))
cursor = conn.cursor()
cursor.execute(
"UPDATE ItemTable SET value = ? WHERE key = 'storage.serviceMachineId'",
(new_ids.service_machine_id,)
)
conn.commit()
conn.close()
except Exception:
pass
return new_ids
def restore_telemetry(self, backup_file: Path = None) -> bool:
"""Восстановить telemetry из бэкапа"""
if backup_file is None:
backups = self.paths.list_backups('kiro-telemetry')
if not backups:
raise MachineIdError("No backup found")
backup_file = backups[0]
if not backup_file.exists():
raise MachineIdError(f"Backup file not found: {backup_file}")
data = json.loads(backup_file.read_text())
# Восстанавливаем storage.json
if self.paths.kiro_storage_json.exists():
storage = json.loads(self.paths.kiro_storage_json.read_text())
if data.get('machineId'):
storage['telemetry.machineId'] = data['machineId']
if data.get('sqmId'):
storage['telemetry.sqmId'] = data['sqmId']
if data.get('devDeviceId'):
storage['telemetry.devDeviceId'] = data['devDeviceId']
self.paths.kiro_storage_json.write_text(json.dumps(storage, indent=2))
# Восстанавливаем state.vscdb
if self.paths.kiro_state_db.exists() and data.get('serviceMachineId'):
try:
conn = sqlite3.connect(str(self.paths.kiro_state_db))
cursor = conn.cursor()
cursor.execute(
"UPDATE ItemTable SET value = ? WHERE key = 'storage.serviceMachineId'",
(data['serviceMachineId'],)
)
conn.commit()
conn.close()
except Exception:
pass
return True
# =========================================================================
# System Machine GUID (Windows)
# =========================================================================
def get_system_machine_info(self) -> SystemMachineInfo:
"""Получить информацию о системном Machine ID"""
info = SystemMachineInfo(os_type=self.os_type)
if self.os_type == 'windows' and winreg:
try:
key = winreg.OpenKey(
winreg.HKEY_LOCAL_MACHINE,
r"SOFTWARE\Microsoft\Cryptography",
0, winreg.KEY_READ
)
value, _ = winreg.QueryValueEx(key, "MachineGuid")
winreg.CloseKey(key)
info.machine_guid = value
info.can_modify = True
info.requires_admin = True
except Exception:
pass
# Проверяем бэкап
backup_file = self.paths.backups_dir / 'machine-guid-backup.json'
if backup_file.exists():
info.backup_exists = True
try:
data = json.loads(backup_file.read_text())
info.backup_time = data.get('backupTime')
except:
pass
return info
def backup_system_machine_guid(self) -> Optional[Path]:
"""Бэкап системного MachineGuid"""
if self.os_type != 'windows':
return None
info = self.get_system_machine_info()
if not info.machine_guid:
return None
backup_data = {
'machineGuid': info.machine_guid,
'backupTime': datetime.now().isoformat(),
'computerName': os.environ.get('COMPUTERNAME', 'Unknown'),
'osType': self.os_type
}
backup_file = self.paths.backups_dir / 'machine-guid-backup.json'
backup_file.write_text(json.dumps(backup_data, indent=2))
return backup_file
def reset_system_machine_guid(self) -> Optional[str]:
"""
Сбросить системный MachineGuid (требует админ прав)
Returns:
Новый GUID или None при ошибке
"""
if self.os_type != 'windows' or not winreg:
raise MachineIdError("This feature is only available on Windows")
# Бэкапим
self.backup_system_machine_guid()
new_guid = str(uuid.uuid4()).upper()
try:
key = winreg.OpenKey(
winreg.HKEY_LOCAL_MACHINE,
r"SOFTWARE\Microsoft\Cryptography",
0, winreg.KEY_SET_VALUE
)
winreg.SetValueEx(key, "MachineGuid", 0, winreg.REG_SZ, new_guid)
winreg.CloseKey(key)
return new_guid
except PermissionError:
raise MachineIdError("Administrator privileges required")
except Exception as e:
raise MachineIdError(f"Failed to reset MachineGuid: {e}")
def restore_system_machine_guid(self) -> bool:
"""Восстановить системный MachineGuid из бэкапа"""
if self.os_type != 'windows' or not winreg:
raise MachineIdError("This feature is only available on Windows")
backup_file = self.paths.backups_dir / 'machine-guid-backup.json'
if not backup_file.exists():
raise MachineIdError("No backup found")
data = json.loads(backup_file.read_text())
machine_guid = data.get('machineGuid')
if not machine_guid:
raise MachineIdError("Invalid backup file")
try:
key = winreg.OpenKey(
winreg.HKEY_LOCAL_MACHINE,
r"SOFTWARE\Microsoft\Cryptography",
0, winreg.KEY_SET_VALUE
)
winreg.SetValueEx(key, "MachineGuid", 0, winreg.REG_SZ, machine_guid)
winreg.CloseKey(key)
return True
except PermissionError:
raise MachineIdError("Administrator privileges required")
except Exception as e:
raise MachineIdError(f"Failed to restore MachineGuid: {e}")
# =========================================================================
# Full Reset
# =========================================================================
def full_reset(self, reset_system: bool = False, check_running: bool = True) -> Dict[str, Any]:
"""
Полный сброс всех ID
Args:
reset_system: Также сбросить системный MachineGuid
check_running: Проверять запущен ли Kiro
Returns:
dict с результатами
"""
results = {
'kiro_reset': False,
'system_reset': False,
'new_telemetry': None,
'new_system_guid': None,
'errors': []
}
# Сбрасываем Kiro telemetry
try:
new_telemetry = self.reset_telemetry(check_running=check_running)
results['kiro_reset'] = True
results['new_telemetry'] = new_telemetry
except Exception as e:
results['errors'].append(f"Kiro telemetry: {e}")
# Сбрасываем системный MachineGuid
if reset_system and self.os_type == 'windows':
try:
new_guid = self.reset_system_machine_guid()
results['system_reset'] = True
results['new_system_guid'] = new_guid
except Exception as e:
results['errors'].append(f"System MachineGuid: {e}")
return results
# =========================================================================
# Helpers
# =========================================================================
def _generate_machine_id(self) -> str:
"""Генерирует machineId (64-символьный hex)"""
random_bytes = os.urandom(32)
timestamp = datetime.now().timestamp()
hasher = hashlib.sha256()
hasher.update(random_bytes)
hasher.update(str(timestamp).encode())
return hasher.hexdigest()
def _generate_sqm_id(self) -> str:
"""Генерирует sqmId (GUID в фигурных скобках)"""
return '{' + str(uuid.uuid4()).upper() + '}'
def _generate_dev_device_id(self) -> str:
"""Генерирует devDeviceId (UUID)"""
return str(uuid.uuid4())