In [9]:
# CUDA test
import torch

print("✅ CUDA 사용 가능:", torch.cuda.is_available())
print("🔢 CUDA 장치 수:", torch.cuda.device_count())
print("🔥 PyTorch 버전:", torch.__version__)
if torch.cuda.is_available():
 print("🖥️ CUDA 장치 이름:", torch.cuda.get_device_name(0))
 print("🧱 PyTorch 빌드된 CUDA 버전:", torch.version.cuda) # type: ignore
else:
 print("⚠️ CUDA를 인식하지 못했습니다.")


✅ CUDA 사용 가능: True
🔢 CUDA 장치 수: 1
🔥 PyTorch 버전: 2.7.1+cu128
🖥️ CUDA 장치 이름: NVIDIA GeForce RTX 3060 Laptop GPU
🧱 PyTorch 빌드된 CUDA 버전: 12.8


In [1]:
from Models.Vector2MIDI import Vector2MIDI # 클래스 정의가 필요
import torch.optim as optim
import torch

device = torch.device("cuda") # GPU 사용
#device = torch.device("cpu") # CPU 사용

model = Vector2MIDI(hidden_dim=1024).to(device)
optimizer = optim.Adam(model.parameters(), lr=15e-5)

In [2]:
# 전처리 데이터 로드
from torch.utils.data import DataLoader
from utility.dataset import MIDIDataset
import torch

data = torch.load("DIVA_dataset.pt")
X_tensor = data["X"]
Y_tensor = data["Y"]

print("X_tensor shape:", X_tensor.shape)
print("Y_tensor shape:", Y_tensor.shape)

dataset = MIDIDataset(X_tensor, Y_tensor) # 객체를 만들어서 쉽게 tensor를 꺼낼 수 있게 함
dataloader = DataLoader(dataset, batch_size=8, shuffle=True)

X_tensor shape: torch.Size([34, 25])
Y_tensor shape: torch.Size([34, 128, 7])


In [None]:
# 디버깅 환경변수 설정

import os

os.environ['CUDA_LAUNCH_BLOCKING'] = "1"
os.environ["CUDA_VISIBLE_DEVICES"] = "0"

In [4]:
import copy

EPOCH = 50

# 최고 성능 추적을 위한 변수들
best_loss = float('inf') # 가장 좋은 loss 값
best_model_state = None # 최고 성능 모델의 state_dict
best_loss:float

model.train()

for i in range(EPOCH):
 total_loss = 0
 for X_batch, Y_batch in dataloader:
 X_batch = X_batch.to(device, non_blocking=True) # non_blocking으로 성능 향상
 Y_batch = Y_batch.to(device, non_blocking=True)
 
 with torch.autograd.detect_anomaly(): # loss nan 발생 시 연산오류 출력
 optimizer.zero_grad()
 loss = model.calc_loss(X_batch, Y_batch)
 
 loss.backward()

 # 그래디언트 클리핑 적용
 grad_norm = torch.nn.utils.clip_grad_norm_(
 model.parameters(), 
 max_norm=1.0
 )
 
 # 옵션: 그래디언트 모니터링
 if grad_norm > 1.0 * 2:
 print(f"Large gradient detected: {grad_norm:.4f}")
 
 optimizer.step()

 total_loss += loss.item()

 # 평균 loss 계산
 avg_loss = total_loss / len(dataloader)
 
 # 최고 성능 모델 업데이트
 if avg_loss < best_loss:
 best_loss = avg_loss
 best_model_state = copy.deepcopy(model.state_dict())# 모델의 현재 상태를 깊은 복사로 저장
 
 print(f"Epoch {i+1}, Loss: {avg_loss:.4f}, Best: {best_loss:.4f}")

# 학습 완료 후 최고 성능 모델 로드
if best_model_state != None:
 model.load_state_dict(best_model_state)
print(f"\nTraining completed!")

 with torch.autograd.detect_anomaly(): # loss nan 발생 시 연산오류 출력
 gamma = torch.cuda.FloatTensor([gamma])
 with torch.autograd.detect_anomaly(): # loss nan 발생 시 연산오류 출력


Epoch 1, Loss: 4.0590, Best: 4.0590
Epoch 2, Loss: 4.8194, Best: 4.0590
Epoch 3, Loss: 4.8279, Best: 4.0590
Epoch 4, Loss: 3.7606, Best: 3.7606
Epoch 5, Loss: 4.2284, Best: 3.7606
Epoch 6, Loss: 4.5423, Best: 3.7606
Epoch 7, Loss: 4.5410, Best: 3.7606
Epoch 8, Loss: 4.1297, Best: 3.7606
Epoch 9, Loss: 4.2703, Best: 3.7606
Epoch 10, Loss: 3.6612, Best: 3.6612
Epoch 11, Loss: 4.0261, Best: 3.6612
Epoch 12, Loss: 4.5013, Best: 3.6612
Epoch 13, Loss: 3.8839, Best: 3.6612
Epoch 14, Loss: 4.4046, Best: 3.6612
Epoch 15, Loss: 4.4510, Best: 3.6612
Epoch 16, Loss: 4.4549, Best: 3.6612
Epoch 17, Loss: 4.1500, Best: 3.6612
Epoch 18, Loss: 4.5133, Best: 3.6612
Epoch 19, Loss: 3.5382, Best: 3.5382
Epoch 20, Loss: 3.8307, Best: 3.5382
Epoch 21, Loss: 3.8997, Best: 3.5382
Epoch 22, Loss: 3.9955, Best: 3.5382
Epoch 23, Loss: 3.9440, Best: 3.5382
Epoch 24, Loss: 3.9549, Best: 3.5382
Large gradient detected: 2.3065
Epoch 25, Loss: 4.4964, Best: 3.5382
Epoch 26, Loss: 4.0543, Best: 3.5382
Epoch 27, Loss:

## 모델 저장

In [5]:
import torch

torch.save(model.state_dict(), 'DIVA_Model_dict.pt') # 모델 가중치, 매개변수 저장
torch.save(model, 'DIVA_Model_full.pt') # 모델 전체 저장

In [3]:
model.load_state_dict(torch.load('DIVA_Model_dict.pt')) # 모델 가중치, 매개변수 불러오기



In [4]:
from HarmonyMIDIToken import HarmonyMIDIToken as Tokenizer
from random import randint

x = X_tensor[randint(0, 33)].unsqueeze(0).to(device)
print(x.shape)
Y = model.generate(x) # 스타일 벡터 하나로 시퀀스 생성
print(Y.shape)

MIDI = Tokenizer()
MIDI.set_id(Y.tolist())

midi= MIDI.to_midi() # This should generate MIDI from the stored melody and chords
midi.write('midi', fp='test_output.mid') # Save the generated MIDI to a file

torch.Size([1, 25])
torch.Size([49, 7])


'test_output.mid'

## 차원 별로 다른 EOS 사용 시 방법

---

### 📌 현재 구조 요약

* 입력이 다차원(예: 7개의 categorical feature)
* 각 차원(feature)마다 다른 vocab이 있음
* 그리고 **각 차원마다 EOS 인덱스가 다름**

예:

```
EOS = [100, 15, 72, 14, 15, 58, 15] # len = 7 (feature 수)
```

이건 말하자면, **시퀀스의 각 column마다 "끝나는 값"이 다르다**는 뜻입니다.

---

### ❗ 문제: 생성 시 EOS를 어떻게 감지/제어하냐?

예를 들어:

* RNN/LSTM/Transformer로 다음 step을 생성할 때,
* 어떤 timestep에서 **모든 feature들이 EOS 값에 도달했는지**를 판단해야
* **생성 종료** (`stop generation`)를 할 수 있음

---

### ✅ 해결 전략

#### 👇 핵심은: **각 feature의 EOS를 별도로 추적 & 마스킹**하는 것

---

#### ✅ 방법 1: EOS를 차원별로 체크하여 "모두 EOS"인지 판단

```python
# 예: generated: [batch, time, num_features]
# EOS: list of ints, len == num_features

eos_tensor = torch.tensor(EOS, device=generated.device) # [num_features]
last_step = generated[:, -1, :] # [batch, num_features]

# EOS 매칭 여부: [batch, num_features]
eos_flags = last_step == eos_tensor # broadcasting

# 모두 EOS면 -> 종료 조건
should_stop = eos_flags.all(dim=-1) # [batch]
```

---

#### ✅ 방법 2: 각 차원이 EOS이면, 해당 차원의 logits를 마스킹

생성 시 특정 feature의 EOS가 나왔으면, **해당 feature 차원만 마스킹**해서 더 이상 생성 못 하게 하는 방식입니다.

```python
for feature_idx in range(num_features):
 eos_value = EOS[feature_idx]

 # 이전에 EOS가 나온 샘플은 더 이상 해당 feature 안 생성하게
 eos_mask = generated[:, :, feature_idx] == eos_value # [B, T]

 # logits: [B, vocab_size_per_feature]
 # 마스킹 적용
 logits[feature_idx][eos_mask] = -float('inf')
```

⚠️ 이건 logits를 feature마다 따로 생성하거나, feature별 decoder를 쓰는 경우에 사용 가능

---

#### ✅ 방법 3: 생성 과정에서 차원별 EOS 도달 상태 추적

```python
# eos_reached: [batch, num_features], 처음에는 False
eos_reached = torch.zeros(batch_size, num_features, dtype=torch.bool, device=device)

for t in range(max_steps):
 output = model(...)

 # token: [batch, num_features]
 token = sample_from_output(output)

 eos_reached |= token == eos_tensor # feature-wise EOS 체크

 # 모두 EOS인 sample이면 중단
 should_stop = eos_reached.all(dim=1) # [batch]
```

---

#### ✅ 방법 4: EOS를 하나로 통일할 수 있다면 가장 깔끔

가능하다면 EOS를 모든 feature에서 하나로 통일하는 게 가장 쉬운 방법이지만,
이미 전 feature마다 vocab이 충돌해서 다른 EOS를 쓸 수밖에 없다면 위 전략들을 써야 합니다.

---

### 🔧 추가 제안: EOS 마스킹을 위한 구조화된 추적 클래스

```python
class EOSManager:
 def __init__(self, eos_values, num_features):
 self.eos_values = torch.tensor(eos_values) # [num_features]
 self.num_features = num_features

 def update(self, tokens): # tokens: [batch, num_features]
 return tokens == self.eos_values # [batch, num_features]

 def all_eos(self, flags): # flags: [batch, num_features]
 return flags.all(dim=1) # [batch]
```

이렇게 하면 깔끔하게 재사용 가능해요.

---

### 🧠 요약

| 목적 | 방법 |
| -------------------------- | ------------------------------------- |
| 각 feature의 EOS 체크 | `tokens == eos_tensor` (broadcasting) |
| 전체 시퀀스 종료 조건 | `.all(dim=-1)` |
| feature별로 EOS 후 logits 마스킹 | logits\[:, eos\_idx] = -inf |
| 자동 추적 관리 | `EOSManager` 같은 헬퍼 클래스 사용 |

---

필요하시면 생성 루프 코드나 EOS 헬퍼 전체 구조 예제도 드릴 수 있어요.
사용 중인 모델 구조 (RNN, Transformer 등) 말씀주시면 거기에 맞춰 드릴게요.
