s1 / model.py
summerstars's picture
Create model.py
a7b725f verified
import torch
import torch.nn as nn
import torch.nn.functional as F
from transformers import AutoTokenizer, AutoModelForCausalLM
# ====================================================
# 1. StabilizedInfiniteGPT(推論用フル定義)
# ====================================================
class StabilizedInfiniteGPT(nn.Module):
def __init__(self, state_dim, model_name='gpt2'):
super().__init__()
print(f">>> Loading Backbone: {model_name}")
self.backbone = AutoModelForCausalLM.from_pretrained(model_name)
if hasattr(self.backbone.config, "n_embd"):
self.embed_dim = self.backbone.config.n_embd
else:
self.embed_dim = self.backbone.config.hidden_size
self.vocab_size = self.backbone.config.vocab_size
self.state_dim = state_dim
self.input_proj = nn.Linear(self.embed_dim, state_dim)
self.forget_gate = nn.Linear(state_dim, state_dim)
self.in_gate = nn.Linear(state_dim, state_dim)
self.layer_norm = nn.LayerNorm(state_dim)
self.memory_readout = nn.Linear(state_dim, self.vocab_size, bias=False)
self.gating_param = nn.Parameter(torch.tensor(0.1))
def forward_gen_step(self, context_ids, prev_state):
with torch.no_grad():
gpt_out = self.backbone(context_ids, output_hidden_states=True)
last_hidden = gpt_out.hidden_states[-1][:, -1:, :]
base_logits = gpt_out.logits[:, -1:, :]
if prev_state is None:
prev_state = torch.zeros(
context_ids.size(0), 1, self.state_dim,
device=context_ids.device, dtype=last_hidden.dtype
)
h = torch.tanh(self.input_proj(last_hidden))
f = torch.sigmoid(self.forget_gate(h))
u = torch.tanh(self.in_gate(h))
next_state = f * prev_state + (1 - f) * u
norm_state = self.layer_norm(next_state)
mem_logits = self.memory_readout(norm_state)
gate = torch.tanh(self.gating_param)
final_logits = base_logits + (gate * mem_logits)
return final_logits, next_state
# ====================================================
# 2. モデルロード関数
# ====================================================
def load_infinite_model(save_dir, device="cuda"):
print(f">>> Loading from {save_dir}...")
checkpoint = torch.load(f"{save_dir}/adapter_weights.pt", map_location=device)
config = checkpoint["config"]
tokenizer = AutoTokenizer.from_pretrained(save_dir)
model = StabilizedInfiniteGPT(
state_dim=config["state_dim"],
model_name=config["model_name"]
)
model.load_state_dict(checkpoint["model_state"], strict=False)
model.to(device)
model.eval()
return model, tokenizer
# ====================================================
# 3. 実行
# ====================================================
if __name__ == "__main__":
device = 'cuda' if torch.cuda.is_available() else 'cpu'
save_dir = "/content/my_infinite_model" # ← ここだけ変えれば OK
model, tokenizer = load_infinite_model(save_dir, device)
prompt_text = "def fibonacci(n):"
print(f"\nPrompt: {prompt_text}")
print("-" * 40)
print(prompt_text, end="", flush=True)
input_ids = tokenizer.encode(prompt_text, return_tensors="pt").to(device)
gen_state = None
curr_ids = input_ids
max_new_tokens = 100
for _ in range(max_new_tokens):
context = curr_ids[:, -1024:]
logits, gen_state = model.forward_gen_step(context, prev_state=gen_state)
next_logit = logits[:, -1, :]
top_k = 40
top_k_logits, top_k_indices = torch.topk(next_logit, top_k)
probs = F.softmax(top_k_logits, dim=-1)
idx = torch.multinomial(probs, 1)
next_token = torch.gather(top_k_indices, -1, idx)
word = tokenizer.decode(next_token[0])
print(word, end="", flush=True)
curr_ids = torch.cat([curr_ids, next_token], dim=-1)
print("\n\n>>> Generation Complete.")