Chronos-1.5B / quantum_kernel.py
squ11z1's picture
Upload quantum_kernel.py: IBM Quantum kernel implementation
25f5493 verified
raw
history blame
16.3 kB
"""
Быстрое создание квантового ядра для IBM Quantum
Оптимизировано для выполнения за ~8 минут
"""
# ===== ШАГ 1: УСТАНОВКА И ИМПОРТЫ =====
# Выполните в терминале:
# pip install qiskit qiskit-ibm-runtime qiskit-machine-learning scikit-learn
import numpy as np
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score
from qiskit import QuantumCircuit
from qiskit.circuit import ParameterVector
from qiskit.circuit.library import ZZFeatureMap, PauliFeatureMap
from qiskit_ibm_runtime import QiskitRuntimeService, Sampler
from qiskit.transpiler.preset_passmanagers import generate_preset_pass_manager
# ===== ШАГ 2: СОЗДАНИЕ ДАННЫХ =====
# Создаём простой датасет для бинарной классификации
print("Создание датасета...")
# Рассчитываем оптимальный размер на основе вашей скорости
# Ваша скорость: 108 схем за 44.3 секунды = 2.44 схемы/сек
# Доступно времени: ~7.5 минут = 450 секунд
# Максимум схем: 450 * 2.44 = ~1100 схем
# Оптимально: ~900 схем (запас 20%)
# Для 900 схем нужно: sqrt(900 * 0.75) ≈ 26 train образцов
# 26 train + 9 test = 35 образцов → 26^2 + 9*26 = 676 + 234 = 910 схем
X, y = make_classification(
n_samples=35, # Оптимально под вашу скорость!
n_features=4, # 4 признака -> 2 кубита
n_informative=4,
n_redundant=0,
n_clusters_per_class=1,
random_state=42
)
# Нормализация данных в диапазон [0, 2π]
X_min, X_max = X.min(), X.max()
X_normalized = 2 * np.pi * (X - X_min) / (X_max - X_min)
# Разделение на train/test (74/26)
X_train, X_test, y_train, y_test = train_test_split(
X_normalized, y, test_size=0.26, random_state=42 # ~26 train, ~9 test
)
print(f"Train: {len(X_train)} образцов, Test: {len(X_test)} образцов")
total_circuits = len(X_train)**2 + len(X_test)*len(X_train)
expected_time = total_circuits / 2.44 # На основе вашей реальной скорости
print(f"ВАЖНО: Общее количество схем = {len(X_train)**2} + {len(X_test)*len(X_train)} = {total_circuits}")
print(f"📊 Ожидаемое время (на основе вашей скорости 2.44 схем/сек):")
print(f" ~{expected_time:.0f} секунд ({expected_time/60:.1f} минут)")
print(f" Запас до 8 минут: {8 - expected_time/60:.1f} минут")
# ===== ШАГ 3: СОЗДАНИЕ FEATURE MAP =====
num_qubits = 2 # Используем 2 кубита (4 признака / 2)
# ZZFeatureMap с минимальными reps для ibm_fez
feature_map = ZZFeatureMap(
feature_dimension=num_qubits,
reps=1, # МИНИМУМ для экономии времени!
entanglement='linear'
)
print(f"\nFeature map создана с {num_qubits} кубитами")
print(f"Количество параметров: {feature_map.num_parameters}")
print(f"Глубина схемы: {feature_map.depth()}")
# ===== ШАГ 4: ПОДКЛЮЧЕНИЕ К IBM QUANTUM =====
print("\nПодключение к IBM Quantum...")
# ibm_quantum_platform - это канал по умолчанию, можно не указывать
try:
# Простейший вариант: token + instance (необязательно)
service = QiskitRuntimeService(token='YDU7vuX8jQb4gFHEaxnLPkuyg05ueEzfB2baajl3dmP_')
# Сохраняем для будущего использования
QiskitRuntimeService.save_account(
token='YDU7vuX8jQb4gFHEaxnLPkuyg05ueEzfB2baajl3dmP_',
overwrite=True
)
print("✅ API токен успешно сохранён!")
print("✅ Подключение установлено!")
except Exception as e:
print(f"❌ Ошибка подключения: {e}")
raise
# Подключаемся к ibm_fez (реальный квантовый компьютер!)
try:
backend = service.backend('ibm_fez')
print(f"\n{'='*50}")
print(f"Используем РЕАЛЬНЫЙ квантовый компьютер: {backend.name}")
print(f"Количество кубитов: {backend.num_qubits}")
print(f"Статус: {'✅ Operational' if backend.status().operational else '❌ Not operational'}")
print(f"{'='*50}\n")
except Exception as e:
print(f"\n⚠️ Backend ibm_fez недоступен: {e}")
print("\nДоступные backends:")
backends = service.backends()
for b in backends:
status = "✅" if b.status().operational else "❌"
print(f" {status} {b.name} ({b.num_qubits} qubits)")
print("\n💡 Используем наименее загруженный backend...")
backend = service.least_busy(operational=True, simulator=False, min_num_qubits=2)
print(f"Выбран: {backend.name}\n")
# ===== ШАГ 5: ФУНКЦИЯ ДЛЯ ВЫЧИСЛЕНИЯ ЯДРА =====
def compute_kernel_matrix(X1, X2, feature_map, backend, shots=1024):
"""
Вычисляет матрицу квантового ядра между X1 и X2
Параметры:
- X1, X2: массивы данных
- feature_map: квантовая карта признаков
- backend: IBM Quantum backend
- shots: количество измерений на схему
"""
n1, n2 = len(X1), len(X2)
kernel_matrix = np.zeros((n1, n2))
circuits = []
indices = []
# Создаём схемы для всех пар (i, j)
for i in range(n1):
for j in range(n2):
# Берём только первые num_qubits признаков
x1_features = X1[i][:num_qubits]
x2_features = X2[j][:num_qubits]
# Создаём overlap circuit
qc = QuantumCircuit(num_qubits)
# Применяем feature map для x1
qc.compose(feature_map.assign_parameters(x1_features), inplace=True)
# Применяем inverse feature map для x2
qc.compose(
feature_map.assign_parameters(x2_features).inverse(),
inplace=True
)
qc.measure_all()
circuits.append(qc)
indices.append((i, j))
# Оптимизация схем для ibm_fez
print(f"Транспиляция {len(circuits)} схем для ibm_fez...")
pm = generate_preset_pass_manager(
optimization_level=2, # Хороший баланс для реального QPU
backend=backend
)
transpiled_circuits = pm.run(circuits)
# Выполнение на квантовом компьютере ibm_fez
print(f"Отправка задачи на {backend.name} (shots={shots})...")
print("⏳ Ожидание выполнения на реальном квантовом компьютере...")
sampler = Sampler(mode=backend)
job = sampler.run(transpiled_circuits, shots=shots)
# Показываем ID задачи для отслеживания
print(f"Job ID: {job.job_id()}")
print(f"Статус: {job.status()}")
result = job.result()
print("✅ Выполнение завершено!")
# Обработка результатов
for idx, (i, j) in enumerate(indices):
counts = result[idx].data.meas.get_counts()
# Вероятность измерить |00...0>
zero_state = '0' * num_qubits
prob_zero = counts.get(zero_state, 0) / shots
kernel_matrix[i, j] = prob_zero
return kernel_matrix
# ===== ШАГ 6: ВЫЧИСЛЕНИЕ МАТРИЦ ЯДРА =====
print("\n" + "="*60)
print("ВЫЧИСЛЕНИЕ КВАНТОВОГО ЯДРА НА IBM_FEZ")
print("="*60)
# Для реального QPU используем меньше shots для экономии времени
shots = 1024 # Минимум для приемлемой точности
import time
start_time = time.time()
# Вычисляем матрицу ядра для обучающих данных
print(f"\n🔬 Вычисление K_train ({len(X_train)}x{len(X_train)} = {len(X_train)**2} схем)...")
K_train = compute_kernel_matrix(X_train, X_train, feature_map, backend, shots)
train_time = time.time() - start_time
print(f"⏱️ Время: {train_time:.1f} секунд")
# Вычисляем матрицу ядра для тестовых данных
print(f"\n🔬 Вычисление K_test ({len(X_test)}x{len(X_train)} = {len(X_test)*len(X_train)} схем)...")
K_test = compute_kernel_matrix(X_test, X_train, feature_map, backend, shots)
test_time = time.time() - start_time - train_time
print(f"⏱️ Время: {test_time:.1f} секунд")
total_time = time.time() - start_time
print(f"\n✅ Все матрицы ядра готовы!")
print(f"📊 Общее время выполнения: {total_time:.1f} секунд ({total_time/60:.2f} минут)")
print(f"K_train shape: {K_train.shape}")
print(f"K_test shape: {K_test.shape}")
# ===== ШАГ 7: ОБУЧЕНИЕ SVM =====
print("\n=== ОБУЧЕНИЕ SVM КЛАССИФИКАТОРА ===")
# Используем SVM с предвычисленным ядром
svm = SVC(kernel='precomputed')
svm.fit(K_train, y_train)
# Предсказание
y_pred = svm.predict(K_test)
# Оценка точности
accuracy = accuracy_score(y_test, y_pred)
print(f"\nТочность на тестовой выборке: {accuracy:.2%}")
# ===== ШАГ 8: ВИЗУАЛИЗАЦИЯ =====
print("\n=== ВИЗУАЛИЗАЦИЯ РЕЗУЛЬТАТОВ ===")
import matplotlib.pyplot as plt
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
# Визуализация матрицы ядра
im1 = axes[0].imshow(K_train, cmap='hot', interpolation='nearest')
axes[0].set_title('Матрица квантового ядра (Train)')
axes[0].set_xlabel('Образец j')
axes[0].set_ylabel('Образец i')
plt.colorbar(im1, ax=axes[0])
# Визуализация предсказаний
axes[1].scatter(range(len(y_test)), y_test, c='blue',
marker='o', label='Истинные метки', s=100)
axes[1].scatter(range(len(y_pred)), y_pred, c='red',
marker='x', label='Предсказания', s=100)
axes[1].set_title('Предсказания vs Истинные метки')
axes[1].set_xlabel('Индекс образца')
axes[1].set_ylabel('Класс')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('quantum_kernel_results.png', dpi=150, bbox_inches='tight')
print("График сохранён в 'quantum_kernel_results.png'")
plt.show()
# ===== ДОПОЛНИТЕЛЬНАЯ ИНФОРМАЦИЯ =====
print("\n=== ИТОГИ ===")
print(f"Общее количество схем выполнено: {len(X_train)**2 + len(X_test)*len(X_train)}")
print(f"Shots на схему: {shots}")
print(f"Точность классификации: {accuracy:.2%}")
print(f"\nСредние значения в матрице ядра:")
print(f" K_train: min={K_train.min():.3f}, max={K_train.max():.3f}, mean={K_train.mean():.3f}")
print(f" K_test: min={K_test.min():.3f}, max={K_test.max():.3f}, mean={K_test.mean():.3f}")
# ===== СОХРАНЕНИЕ РЕЗУЛЬТАТОВ =====
print("\n=== СОХРАНЕНИЕ РЕЗУЛЬТАТОВ ===")
import pickle
import json
from datetime import datetime
# Создаём папку для результатов
import os
results_dir = "quantum_kernel_results"
os.makedirs(results_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# 1. Сохраняем матрицы ядра (numpy arrays)
np.save(f'{results_dir}/K_train_{timestamp}.npy', K_train)
np.save(f'{results_dir}/K_test_{timestamp}.npy', K_test)
print(f"✅ Матрицы ядра сохранены:")
print(f" - {results_dir}/K_train_{timestamp}.npy")
print(f" - {results_dir}/K_test_{timestamp}.npy")
# 2. Сохраняем данные и метки
np.save(f'{results_dir}/X_train_{timestamp}.npy', X_train)
np.save(f'{results_dir}/X_test_{timestamp}.npy', X_test)
np.save(f'{results_dir}/y_train_{timestamp}.npy', y_train)
np.save(f'{results_dir}/y_test_{timestamp}.npy', y_test)
print(f"✅ Данные и метки сохранены")
# 3. Сохраняем обученную модель SVM
with open(f'{results_dir}/svm_model_{timestamp}.pkl', 'wb') as f:
pickle.dump(svm, f)
print(f"✅ Модель SVM сохранена: {results_dir}/svm_model_{timestamp}.pkl")
# 4. Сохраняем метаданные эксперимента
metadata = {
'timestamp': timestamp,
'backend': backend.name,
'num_qubits': num_qubits,
'shots': shots,
'num_train_samples': len(X_train),
'num_test_samples': len(X_test),
'total_circuits': len(X_train)**2 + len(X_test)*len(X_train),
'execution_time_seconds': total_time,
'accuracy': float(accuracy),
'feature_map': {
'type': 'ZZFeatureMap',
'reps': 1,
'entanglement': 'linear'
},
'kernel_stats': {
'K_train_min': float(K_train.min()),
'K_train_max': float(K_train.max()),
'K_train_mean': float(K_train.mean()),
'K_test_min': float(K_test.min()),
'K_test_max': float(K_test.max()),
'K_test_mean': float(K_test.mean()),
}
}
with open(f'{results_dir}/metadata_{timestamp}.json', 'w') as f:
json.dump(metadata, f, indent=2)
print(f"✅ Метаданные сохранены: {results_dir}/metadata_{timestamp}.json")
# 5. Сохраняем график
plt.savefig(f'{results_dir}/results_{timestamp}.png', dpi=150, bbox_inches='tight')
print(f"✅ График сохранён: {results_dir}/results_{timestamp}.png")
print(f"\n📁 Все результаты в папке: {results_dir}/")
print("\n" + "="*60)
print("КАК ИСПОЛЬЗОВАТЬ СОХРАНЁННЫЕ ДАННЫЕ:")
print("="*60)
print("""
# Загрузить матрицы ядра:
import numpy as np
K_train = np.load('quantum_kernel_results/K_train_TIMESTAMP.npy')
K_test = np.load('quantum_kernel_results/K_test_TIMESTAMP.npy')
# Загрузить обученную модель:
import pickle
with open('quantum_kernel_results/svm_model_TIMESTAMP.pkl', 'rb') as f:
svm = pickle.load(f)
# Использовать для новых предсказаний:
# 1. Вычислите квантовое ядро для новых данных X_new
# 2. predictions = svm.predict(K_new)
# Загрузить метаданные:
import json
with open('quantum_kernel_results/metadata_TIMESTAMP.json', 'r') as f:
metadata = json.load(f)
print(f"Точность модели: {metadata['accuracy']}")
""")
# СОВЕТЫ ПО ОПТИМИЗАЦИИ ДЛЯ IBM_FEZ:
print("\n" + "="*60)
print("ИСПОЛЬЗОВАНО НА IBM_FEZ:")
print("="*60)
print(f"✅ Образцов данных: {len(X_train) + len(X_test)}")
print(f"✅ Всего схем выполнено: {len(X_train)**2 + len(X_test)*len(X_train)}")
print(f"✅ Shots на схему: {shots}")
print(f"✅ Общее время: {total_time/60:.2f} минут")
remaining = 8 - total_time/60
print(f"✅ Оставшееся время: ~{remaining:.2f} минут")
print(f"\n📈 Статистика:")
print(f" Скорость: ~{(len(X_train)**2 + len(X_test)*len(X_train)) / total_time:.1f} схем/сек")
print(f" Среднее время на схему: ~{total_time / (len(X_train)**2 + len(X_test)*len(X_train)):.2f} сек")
if remaining > 0:
print(f"\n💡 Времени хватило! Можно было выполнить ещё ~{int(remaining * 60 / (total_time / (len(X_train)**2 + len(X_test)*len(X_train))))} схем")