|
|
import torch |
|
|
import torch.nn as nn |
|
|
import torch.nn.functional as F |
|
|
from transformers import AutoTokenizer, AutoModelForCausalLM |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class StabilizedInfiniteGPT(nn.Module): |
|
|
def __init__(self, state_dim, model_name='gpt2'): |
|
|
super().__init__() |
|
|
print(f">>> Loading Backbone: {model_name}") |
|
|
|
|
|
self.backbone = AutoModelForCausalLM.from_pretrained(model_name) |
|
|
|
|
|
if hasattr(self.backbone.config, "n_embd"): |
|
|
self.embed_dim = self.backbone.config.n_embd |
|
|
else: |
|
|
self.embed_dim = self.backbone.config.hidden_size |
|
|
|
|
|
self.vocab_size = self.backbone.config.vocab_size |
|
|
self.state_dim = state_dim |
|
|
|
|
|
self.input_proj = nn.Linear(self.embed_dim, state_dim) |
|
|
self.forget_gate = nn.Linear(state_dim, state_dim) |
|
|
self.in_gate = nn.Linear(state_dim, state_dim) |
|
|
self.layer_norm = nn.LayerNorm(state_dim) |
|
|
|
|
|
self.memory_readout = nn.Linear(state_dim, self.vocab_size, bias=False) |
|
|
|
|
|
self.gating_param = nn.Parameter(torch.tensor(0.1)) |
|
|
|
|
|
def forward_gen_step(self, context_ids, prev_state): |
|
|
with torch.no_grad(): |
|
|
gpt_out = self.backbone(context_ids, output_hidden_states=True) |
|
|
last_hidden = gpt_out.hidden_states[-1][:, -1:, :] |
|
|
base_logits = gpt_out.logits[:, -1:, :] |
|
|
|
|
|
if prev_state is None: |
|
|
prev_state = torch.zeros( |
|
|
context_ids.size(0), 1, self.state_dim, |
|
|
device=context_ids.device, dtype=last_hidden.dtype |
|
|
) |
|
|
|
|
|
h = torch.tanh(self.input_proj(last_hidden)) |
|
|
f = torch.sigmoid(self.forget_gate(h)) |
|
|
u = torch.tanh(self.in_gate(h)) |
|
|
next_state = f * prev_state + (1 - f) * u |
|
|
|
|
|
norm_state = self.layer_norm(next_state) |
|
|
mem_logits = self.memory_readout(norm_state) |
|
|
|
|
|
gate = torch.tanh(self.gating_param) |
|
|
final_logits = base_logits + (gate * mem_logits) |
|
|
|
|
|
return final_logits, next_state |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def load_infinite_model(save_dir, device="cuda"): |
|
|
print(f">>> Loading from {save_dir}...") |
|
|
|
|
|
checkpoint = torch.load(f"{save_dir}/adapter_weights.pt", map_location=device) |
|
|
config = checkpoint["config"] |
|
|
|
|
|
tokenizer = AutoTokenizer.from_pretrained(save_dir) |
|
|
|
|
|
model = StabilizedInfiniteGPT( |
|
|
state_dim=config["state_dim"], |
|
|
model_name=config["model_name"] |
|
|
) |
|
|
|
|
|
model.load_state_dict(checkpoint["model_state"], strict=False) |
|
|
|
|
|
model.to(device) |
|
|
model.eval() |
|
|
return model, tokenizer |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
device = 'cuda' if torch.cuda.is_available() else 'cpu' |
|
|
save_dir = "/content/my_infinite_model" |
|
|
|
|
|
model, tokenizer = load_infinite_model(save_dir, device) |
|
|
|
|
|
prompt_text = "def fibonacci(n):" |
|
|
print(f"\nPrompt: {prompt_text}") |
|
|
print("-" * 40) |
|
|
print(prompt_text, end="", flush=True) |
|
|
|
|
|
input_ids = tokenizer.encode(prompt_text, return_tensors="pt").to(device) |
|
|
|
|
|
gen_state = None |
|
|
curr_ids = input_ids |
|
|
max_new_tokens = 100 |
|
|
|
|
|
for _ in range(max_new_tokens): |
|
|
context = curr_ids[:, -1024:] |
|
|
|
|
|
logits, gen_state = model.forward_gen_step(context, prev_state=gen_state) |
|
|
next_logit = logits[:, -1, :] |
|
|
|
|
|
top_k = 40 |
|
|
top_k_logits, top_k_indices = torch.topk(next_logit, top_k) |
|
|
probs = F.softmax(top_k_logits, dim=-1) |
|
|
|
|
|
idx = torch.multinomial(probs, 1) |
|
|
next_token = torch.gather(top_k_indices, -1, idx) |
|
|
|
|
|
word = tokenizer.decode(next_token[0]) |
|
|
print(word, end="", flush=True) |
|
|
|
|
|
curr_ids = torch.cat([curr_ids, next_token], dim=-1) |
|
|
|
|
|
print("\n\n>>> Generation Complete.") |
|
|
|