|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import torch |
|
|
import torch.nn.functional as F |
|
|
from transformers import GPT2TokenizerFast |
|
|
from gpt_pytorch import GPTPyTorch |
|
|
import os |
|
|
from pathlib import Path |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
TEMPERATURE = 0.7 |
|
|
|
|
|
|
|
|
|
|
|
TOP_K = 50 |
|
|
|
|
|
|
|
|
MAX_LENGTH = 120 |
|
|
|
|
|
|
|
|
|
|
|
LAST_TRAINED_PATH = Path("build/fine_tuning_output/epoch49/gpt_finetuned.pt") |
|
|
|
|
|
FINAL_OUTPUT_DIR = Path("build/fine_tuning_output/epoch49/gpt_finetuned.pt") |
|
|
MODEL_SAVE_NAME = "gpt_finetuned.pt" |
|
|
|
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
|
|
|
|
|
|
class Chatbot: |
|
|
def __init__(self, model_path): |
|
|
|
|
|
print("Loading standard tokenizer (gpt2)...") |
|
|
self.tokenizer = GPT2TokenizerFast.from_pretrained("gpt2") |
|
|
self.tokenizer.pad_token = self.tokenizer.eos_token |
|
|
|
|
|
|
|
|
print("Initializing model...") |
|
|
self.model = GPTPyTorch().to(device) |
|
|
self.model.eval() |
|
|
|
|
|
|
|
|
load_path = None |
|
|
if (FINAL_OUTPUT_DIR / MODEL_SAVE_NAME).exists(): |
|
|
load_path = FINAL_OUTPUT_DIR / MODEL_SAVE_NAME |
|
|
print(f"Weights from Epoch 50 found. Loading and moving to {device}...") |
|
|
elif model_path.exists(): |
|
|
load_path = model_path |
|
|
print(f"Loading weights from {load_path} and moving to {device}...") |
|
|
|
|
|
if load_path: |
|
|
self.model.load_state_dict(torch.load(load_path, map_location=device)) |
|
|
else: |
|
|
print("Warning: No trained weights found. Using randomly initialized model.") |
|
|
|
|
|
print(f"Model successfully loaded on {device} and ready for chat!") |
|
|
|
|
|
def generate_response(self, prompt, max_length=MAX_LENGTH, temperature=TEMPERATURE, top_k=TOP_K): |
|
|
|
|
|
input_ids = self.tokenizer.encode(prompt, return_tensors='pt').to(device) |
|
|
|
|
|
|
|
|
with torch.no_grad(): |
|
|
for _ in range(max_length): |
|
|
|
|
|
logits, _ = self.model(input_ids) |
|
|
|
|
|
|
|
|
next_token_logits = logits[:, -1, :] |
|
|
|
|
|
|
|
|
next_token_logits = next_token_logits / temperature |
|
|
|
|
|
|
|
|
if top_k > 0: |
|
|
|
|
|
values, indices = torch.topk(next_token_logits, top_k) |
|
|
|
|
|
next_token_logits = torch.full_like(next_token_logits, float('-inf')) |
|
|
next_token_logits.scatter_(1, indices, values) |
|
|
|
|
|
|
|
|
probabilities = F.softmax(next_token_logits, dim=-1) |
|
|
next_token = torch.multinomial(probabilities, num_samples=1) |
|
|
|
|
|
|
|
|
input_ids = torch.cat([input_ids, next_token], dim=-1) |
|
|
|
|
|
|
|
|
generated_token = self.tokenizer.decode(next_token.squeeze().item()) |
|
|
if "__eou__" in generated_token or next_token.squeeze().item() == self.tokenizer.eos_token_id: |
|
|
break |
|
|
|
|
|
|
|
|
output = self.tokenizer.decode(input_ids.squeeze().tolist()) |
|
|
|
|
|
|
|
|
response = output[len(prompt):].strip() |
|
|
|
|
|
|
|
|
response = response.replace("__eou__", "").strip() |
|
|
|
|
|
return response |
|
|
|
|
|
|
|
|
def main(): |
|
|
|
|
|
global TEMPERATURE, TOP_K |
|
|
|
|
|
chatbot = Chatbot(LAST_TRAINED_PATH) |
|
|
|
|
|
print("\n" + "="*60) |
|
|
print(f"CHATBOT ACTIVATED (PPL ~2.6 / Temperature {TEMPERATURE} / Top-K {TOP_K})") |
|
|
print("Type 'exit' or 'quit' to quit. Use 'set temp=0.x' or 'set k=N' to change settings.") |
|
|
print("="*60 + "\n") |
|
|
|
|
|
while True: |
|
|
try: |
|
|
user_input = input(">>> You: ") |
|
|
|
|
|
if user_input.lower() in ['quit', 'exit']: |
|
|
print("Goodbye!") |
|
|
break |
|
|
|
|
|
|
|
|
if user_input.lower().startswith('set temp='): |
|
|
try: |
|
|
TEMPERATURE = float(user_input.split('=')[1].strip()) |
|
|
print(f"Temperature updated to {TEMPERATURE}") |
|
|
continue |
|
|
except ValueError: |
|
|
print("Invalid temperature. Use format: set temp=0.7") |
|
|
continue |
|
|
|
|
|
if user_input.lower().startswith('set k='): |
|
|
try: |
|
|
TOP_K = int(user_input.split('=')[1].strip()) |
|
|
print(f"Top-K updated to {TOP_K}") |
|
|
continue |
|
|
except ValueError: |
|
|
print("Invalid value. Use format: set k=50") |
|
|
continue |
|
|
|
|
|
print("...Generating...") |
|
|
response = chatbot.generate_response(user_input) |
|
|
print(f"Model: {response}\n") |
|
|
|
|
|
except KeyboardInterrupt: |
|
|
print("\nGoodbye!") |
|
|
break |
|
|
except Exception as e: |
|
|
print(f"An error occurred: {e}") |
|
|
break |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |