### ------------------------------------------------------------------------------------------------ ### ### First: do `apt-get update && apt-get install -y fluidsynth` and `pip install miditok midi2audio` ### ### ------------------------------------------------------------------------------------------------ ### ### IMPORTS ### import os import requests import zipfile import numpy as np from miditok import REMI from pathlib import Path from tqdm import tqdm import torch import torch.nn as nn from torch.nn import functional as F import time ### DATA LOADING ### MIDI_URL = "https://storage.googleapis.com/magentadata/datasets/maestro/v3.0.0/maestro-v3.0.0-midi.zip" ZIP_FILE = "maestro_midi.zip" EXTRACT_PATH = "maestro_raw" DATA_DIR = "data/maestro-v3" os.makedirs(DATA_DIR, exist_ok=True) def download_and_prepare(): if not os.path.exists(ZIP_FILE): print("Downloading MIDI-Dataset...") r = requests.get(MIDI_URL) with open(ZIP_FILE, "wb") as f: f.write(r.content) if not os.path.exists(EXTRACT_PATH): print("Extracting files...") with zipfile.ZipFile(ZIP_FILE, 'r') as zip_ref: zip_ref.extractall(EXTRACT_PATH) tokenizer = REMI() all_tokens = [] midi_paths = list(Path(EXTRACT_PATH).rglob("*.mid*")) print(f"Tokenizing {len(midi_paths)} MIDI-Files...") for path in tqdm(midi_paths): try: midi_tokens = tokenizer(path) if isinstance(midi_tokens, list): ids = midi_tokens[0].ids else: ids = midi_tokens.ids if len(ids) > 0: all_tokens.extend(ids) except Exception as e: continue if len(all_tokens) == 0: print("ERROR: No tokens extracted. Check dataset or tokenizer-version.") return data = np.array(all_tokens, dtype=np.uint16) n = len(data) train_data = data[:int(n*0.9)] val_data = data[int(n*0.9):] train_data.tofile(os.path.join(DATA_DIR, 'train.bin')) val_data.tofile(os.path.join(DATA_DIR, 'val.bin')) print(f"Prepared data successfully!") print(f"Train Tokens: {len(train_data)} | Val Tokens: {len(val_data)}") print(f"Vocab-Size: {len(tokenizer)}") download_and_prepare() ### TRAINING ### # --- 1. Hyperparameters --- batch_size = 32 block_size = 512 max_iters = 20000 learning_rate = 1e-5 gradient_accumulation_steps = 8 eval_interval = 250 eval_iters = 100 n_embd = 512 n_head = 8 n_layer = 8 dropout = 0.3 vocab_size = 300 data_dir = 'data/maestro-v3' checkpoint_path = 'out/tinymozart_ckpt.pt' best_model_path = 'out/tinymozart_best.pt' log_path = 'out/training_log.txt' device = 'cuda' # --- 2. Helpers --- def get_batch(data): ix = torch.randint(len(data) - block_size, (batch_size,)) x = torch.stack([torch.from_numpy((data[i:i+block_size]).astype(np.int64)) for i in ix]) y = torch.stack([torch.from_numpy((data[i+1:i+block_size+1]).astype(np.int64)) for i in ix]) return x.to(device), y.to(device) @torch.no_grad() def estimate_loss(model, train_data, val_data): out = {} model.eval() for split, data in [('train', train_data), ('val', val_data)]: losses = torch.zeros(eval_iters) for k in range(eval_iters): x, y = get_batch(data) _, loss = model(x, y) losses[k] = loss.item() out[split] = losses.mean() model.train() return out # --- 3. Architecture --- class Head(nn.Module): def __init__(self, head_size): super().__init__() self.key = nn.Linear(n_embd, head_size, bias=False) self.query = nn.Linear(n_embd, head_size, bias=False) self.value = nn.Linear(n_embd, head_size, bias=False) self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size))) self.dropout = nn.Dropout(dropout) def forward(self, x): B, T, C = x.shape k, q, v = self.key(x), self.query(x), self.value(x) wei = q @ k.transpose(-2, -1) * k.shape[-1]**-0.5 wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf')) wei = F.softmax(wei, dim=-1) wei = self.dropout(wei) return wei @ v class MultiHeadAttention(nn.Module): def __init__(self, num_heads, head_size): super().__init__() self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)]) self.proj = nn.Linear(n_embd, n_embd) self.dropout = nn.Dropout(dropout) def forward(self, x): out = torch.cat([h(x) for h in self.heads], dim=-1) return self.dropout(self.proj(out)) class FeedForward(nn.Module): def __init__(self, n_embd): super().__init__() self.net = nn.Sequential(nn.Linear(n_embd, 4 * n_embd), nn.GELU(), nn.Linear(4 * n_embd, n_embd), nn.Dropout(dropout)) def forward(self, x): return self.net(x) class Block(nn.Module): def __init__(self, n_embd, n_head): super().__init__() head_size = n_embd // n_head self.sa = MultiHeadAttention(n_head, head_size) self.ffwd = FeedForward(n_embd) self.ln1, self.ln2 = nn.LayerNorm(n_embd), nn.LayerNorm(n_embd) def forward(self, x): x = x + self.sa(self.ln1(x)) x = x + self.ffwd(self.ln2(x)) return x class TinyMozart(nn.Module): def __init__(self, vocab_size): super().__init__() self.token_embedding_table = nn.Embedding(vocab_size, n_embd) self.position_embedding_table = nn.Embedding(block_size, n_embd) self.blocks = nn.Sequential(*[Block(n_embd, n_head) for _ in range(n_layer)]) self.ln_f = nn.LayerNorm(n_embd) self.lm_head = nn.Linear(n_embd, vocab_size) def forward(self, idx, targets=None): B, T = idx.shape x = self.token_embedding_table(idx) + self.position_embedding_table(torch.arange(T, device=idx.device)) x = self.blocks(x) logits = self.lm_head(self.ln_f(x)) loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1)) if targets is not None else None return logits, loss # --- 4. Main Training --- def train(): train_data = np.fromfile(os.path.join(data_dir, 'train.bin'), dtype=np.uint16) val_data = np.fromfile(os.path.join(data_dir, 'val.bin'), dtype=np.uint16) model = TinyMozart(vocab_size).to(device) optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate, weight_decay=0.1) start_iter = 0 best_val_loss = float('inf') # Resume-logic target_ckpt = checkpoint_path if os.path.exists(checkpoint_path) else (best_model_path if os.path.exists(best_model_path) else None) if target_ckpt: print(f"Loading checkpoint from {target_ckpt}...") checkpoint = torch.load(target_ckpt, map_location=device) model.load_state_dict(checkpoint['model_state_dict']) optimizer.load_state_dict(checkpoint['optimizer_state_dict']) start_iter = checkpoint['iter'] best_val_loss = checkpoint.get('best_val_loss', float('inf')) print(f"Resuming from iter {start_iter} with best_val_loss {best_val_loss:.4f}") model.train() t0 = time.time() for iter in range(start_iter, max_iters): # --- GRADIENT ACCUMULATION LOOP --- optimizer.zero_grad(set_to_none=True) accum_loss = 0 for _ in range(gradient_accumulation_steps): xb, yb = get_batch(train_data) logits, loss = model(xb, yb) loss = loss / gradient_accumulation_steps loss.backward() accum_loss += loss.item() optimizer.step() if iter % 50 == 0: dt = time.time() - t0 t0 = time.time() print(f"Iter {iter}: Loss {accum_loss:.4f} | {dt*1000/50:.1f}ms/step", flush=True) if iter % eval_interval == 0: losses = estimate_loss(model, train_data, val_data) print(f">>> EVAL {iter}: Train {losses['train']:.4f}, Val {losses['val']:.4f}", flush=True) with open(log_path, 'a') as f: f.write(f"{iter},{losses['train']:.4f},{losses['val']:.4f}\n") checkpoint = { 'iter': iter, 'model_state_dict': model.state_dict(), 'optimizer_state_dict': optimizer.state_dict(), 'best_val_loss': best_val_loss } torch.save(checkpoint, checkpoint_path) if losses['val'] < best_val_loss: best_val_loss = losses['val'] checkpoint['best_val_loss'] = best_val_loss torch.save(checkpoint, best_model_path) print(f"✨ New best model saved! (Loss: {best_val_loss:.4f})") if __name__ == "__main__": train()