TestGPT / app.py
daniilkolbasenko's picture
Create app.py
c0b8285 verified
import os
import time
import torch
import torch.nn as nn
from torch.nn import functional as F
from torch.utils.data import DataLoader, Dataset
import tiktoken
import gradio as gr
from tqdm import tqdm
import numpy as np
from datasets import load_dataset
# ---------- 1. Жёсткие ограничения на ресурсы ----------
# Используем 12 ядер CPU и ~13 ГБ RAM
torch.set_num_threads(12)
torch.set_num_interop_threads(12)
# Ограничение памяти PyTorch (опционально, для безопасности)
# torch.cuda.empty_cache() – не нужно, так как CPU
# --- Гиперпараметры модели (подобраны под 13 ГБ RAM) ---
vocab_size = 50257
block_size = 256
n_embd = 384
n_head = 6
n_layer = 6
dropout = 0.1
# --- Гиперпараметры обучения (снижены для экономии памяти) ---
batch_size = 24 # было 32 -> снижаем
learning_rate = 5e-4
max_iters = 15000
eval_interval = 500
eval_iters = 100
warmup_iters = 500
# --- Параметры DataLoader (умеренные) ---
num_workers = 6 # было 8 -> снижаем
prefetch_factor = 4
pin_memory = True
device = 'cpu'
print(f"Устройство: {device}")
print(f"Используется CPU потоков: {torch.get_num_threads()}")
# ---------- 2. Датасет и токенизация ----------
print("\n[1/5] Загрузка и токенизация датасета...")
dataset = load_dataset("JoshKeesee/Alfred-Indigo", split="train")
dialogue_texts = []
for example in dataset:
dialogue = "\n".join([f"{msg['role']}: {msg['content']}" for msg in example['messages']])
dialogue_texts.append(dialogue)
all_text = "\n\n".join(dialogue_texts)
print(f"Загружено {len(dialogue_texts)} диалогов. Общий объём: {len(all_text)} символов.")
enc = tiktoken.get_encoding("gpt2")
data = torch.tensor(enc.encode_ordinary(all_text), dtype=torch.long)
n = int(0.9 * len(data))
train_data = data[:n]
val_data = data[n:]
class TextDataset(Dataset):
def __init__(self, data, block_size):
self.data = data
self.block_size = block_size
def __len__(self):
return len(self.data) - self.block_size
def __getitem__(self, idx):
x = self.data[idx:idx+self.block_size]
y = self.data[idx+1:idx+self.block_size+1]
return x, y
train_dataset = TextDataset(train_data, block_size)
val_dataset = TextDataset(val_data, block_size)
# DataLoader с умеренным числом воркеров
train_loader = DataLoader(
train_dataset,
batch_size=batch_size,
shuffle=True,
num_workers=num_workers,
pin_memory=pin_memory,
prefetch_factor=prefetch_factor
)
val_loader = DataLoader(
val_dataset,
batch_size=batch_size,
shuffle=False,
num_workers=num_workers,
pin_memory=pin_memory,
prefetch_factor=prefetch_factor
)
# ---------- 3. Архитектура модели (оптимизированная) ----------
class AttentionHead(nn.Module):
def __init__(self, head_size):
super().__init__()
self.key = nn.Linear(n_embd, head_size, bias=False)
self.query = nn.Linear(n_embd, head_size, bias=False)
self.value = nn.Linear(n_embd, head_size, bias=False)
self.dropout = nn.Dropout(dropout)
self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))
def forward(self, x):
B, T, C = x.shape
k = self.key(x)
q = self.query(x)
wei = q @ k.transpose(-2, -1) * (C ** -0.5)
wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf'))
wei = F.softmax(wei, dim=-1)
wei = self.dropout(wei)
v = self.value(x)
return wei @ v
class MultiHeadAttention(nn.Module):
def __init__(self):
super().__init__()
head_size = n_embd // n_head
self.heads = nn.ModuleList([AttentionHead(head_size) for _ in range(n_head)])
self.proj = nn.Linear(n_embd, n_embd)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
out = torch.cat([h(x) for h in self.heads], dim=-1)
out = self.dropout(self.proj(out))
return out
class FeedForward(nn.Module):
def __init__(self):
super().__init__()
self.net = nn.Sequential(
nn.Linear(n_embd, 4 * n_embd),
nn.GELU(),
nn.Linear(4 * n_embd, n_embd),
nn.Dropout(dropout)
)
def forward(self, x):
return self.net(x)
class TransformerBlock(nn.Module):
def __init__(self):
super().__init__()
self.ln1 = nn.LayerNorm(n_embd)
self.attn = MultiHeadAttention()
self.ln2 = nn.LayerNorm(n_embd)
self.ffwd = FeedForward()
def forward(self, x):
x = x + self.attn(self.ln1(x))
x = x + self.ffwd(self.ln2(x))
return x
class GPTLanguageModel(nn.Module):
def __init__(self):
super().__init__()
self.token_embedding = nn.Embedding(vocab_size, n_embd)
self.position_embedding = nn.Embedding(block_size, n_embd)
self.blocks = nn.Sequential(*[TransformerBlock() for _ in range(n_layer)])
self.ln_f = nn.LayerNorm(n_embd)
self.lm_head = nn.Linear(n_embd, vocab_size)
def forward(self, idx, targets=None):
B, T = idx.shape
tok_emb = self.token_embedding(idx)
pos_emb = self.position_embedding(torch.arange(T, device=device))
x = tok_emb + pos_emb
x = self.blocks(x)
x = self.ln_f(x)
logits = self.lm_head(x)
loss = None
if targets is not None:
B, T, C = logits.shape
logits = logits.view(B*T, C)
targets = targets.view(B*T)
loss = F.cross_entropy(logits, targets)
return logits, loss
def generate(self, idx, max_new_tokens, temperature=0.8, top_k=40):
for _ in range(max_new_tokens):
idx_cond = idx[:, -block_size:]
logits, _ = self.forward(idx_cond)
logits = logits[:, -1, :] / temperature
if top_k is not None:
v, _ = torch.topk(logits, min(top_k, logits.size(-1)))
logits[logits < v[:, [-1]]] = -float('Inf')
probs = F.softmax(logits, dim=-1)
idx_next = torch.multinomial(probs, num_samples=1)
idx = torch.cat((idx, idx_next), dim=1)
return idx
model = GPTLanguageModel()
# Компиляция (работает на PyTorch 2.x)
model = torch.compile(model)
print(f"Модель создана. Параметров: {sum(p.numel() for p in model.parameters())/1e6:.2f}M")
# ---------- 4. Обучение ----------
def get_batch_from_loader(loader):
for x, y in loader:
yield x, y
def estimate_loss():
out = {}
model.eval()
for split, loader in [('train', train_loader), ('val', val_loader)]:
losses = torch.zeros(eval_iters)
loader_iter = iter(loader)
for k in range(eval_iters):
try:
X, Y = next(loader_iter)
except StopIteration:
loader_iter = iter(loader)
X, Y = next(loader_iter)
logits, loss = model(X, Y)
losses[k] = loss.item()
out[split] = losses.mean()
model.train()
return out
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate, weight_decay=0.1)
def get_lr(it):
if it < warmup_iters:
return learning_rate * (it + 1) / warmup_iters
return learning_rate
print("\n[2/5] Старт обучения (ограничение 12 CPU / 13 ГБ RAM)...")
start_time = time.time()
for iter_num in tqdm(range(max_iters), desc="Обучение"):
lr = get_lr(iter_num)
for param_group in optimizer.param_groups:
param_group['lr'] = lr
if iter_num % eval_interval == 0 or iter_num == max_iters - 1:
losses = estimate_loss()
elapsed = time.time() - start_time
print(f"\nШаг {iter_num}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f} (время {elapsed:.2f} с)")
xb, yb = next(iter(train_loader))
logits, loss = model(xb, yb)
optimizer.zero_grad(set_to_none=True)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
optimizer.step()
print(f"\nОбучение завершено! Время: {(time.time() - start_time)/60:.2f} мин")
# Сохранение
os.makedirs('checkpoints', exist_ok=True)
torch.save(model._orig_mod.state_dict(), 'checkpoints/model_final.pth')
print("Модель сохранена в 'checkpoints/model_final.pth'")
# ---------- 5. Интерфейс Gradio ----------
def generate_response(prompt, max_new_tokens=150, temperature=0.7, top_k=40):
context = torch.tensor(enc.encode_ordinary(prompt), dtype=torch.long, device=device).unsqueeze(0)
generated_ids = model.generate(context, max_new_tokens=max_new_tokens, temperature=temperature, top_k=top_k)[0].tolist()
return enc.decode(generated_ids)
def chat_function(message, history):
return generate_response(message)
demo = gr.ChatInterface(
fn=chat_function,
title="🤖 GPT обучена с нуля (12 CPU / 13 ГБ RAM)",
description="Модель обучена на Alfred-Indigo, 6 слоёв, 6 голов внимания, контекст 256 токенов. Ограничение ресурсов: 12 ядер CPU, ~13 ГБ RAM.",
theme="soft"
)
if __name__ == "__main__":
demo.launch()