|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import torch |
|
|
import torch.nn.functional as F |
|
|
from transformers import GPT2TokenizerFast |
|
|
from gpt_modern_8b import JiRackPyTorch |
|
|
from pathlib import Path |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
TEMPERATURE = 0.7 |
|
|
|
|
|
|
|
|
|
|
|
TOP_K = 50 |
|
|
|
|
|
|
|
|
MAX_LENGTH = 120 |
|
|
|
|
|
|
|
|
LAST_TRAINED_PATH = Path("build/fine_tuning_output/epoch2/gpt_finetuned.pt") |
|
|
FINAL_OUTPUT_DIR = Path("build/fine_tuning_output/epoch2") |
|
|
MODEL_SAVE_NAME = "gpt_finetuned.pt" |
|
|
|
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
print(f"Using device: {device}") |
|
|
|
|
|
|
|
|
class Chatbot: |
|
|
def __init__(self, model_path: Path): |
|
|
|
|
|
print("Loading standard GPT-2 tokenizer...") |
|
|
|
|
|
self.tokenizer = GPT2TokenizerFast.from_pretrained("gpt2") |
|
|
self.tokenizer.pad_token = self.tokenizer.eos_token |
|
|
|
|
|
|
|
|
print("Initializing JiRackPyTorch model...") |
|
|
self.model = JiRackPyTorch().to(device) |
|
|
self.model.eval() |
|
|
|
|
|
|
|
|
load_path = None |
|
|
candidate1 = FINAL_OUTPUT_DIR / MODEL_SAVE_NAME |
|
|
candidate2 = model_path if model_path.is_file() else None |
|
|
|
|
|
if candidate1.exists(): |
|
|
load_path = candidate1 |
|
|
print(f"Found weights in final folder: {load_path}") |
|
|
elif candidate2 and candidate2.exists(): |
|
|
load_path = candidate2 |
|
|
print(f"Loading weights from: {load_path}") |
|
|
else: |
|
|
print("Warning: No trained weights found. Running with randomly initialized model.") |
|
|
|
|
|
if load_path: |
|
|
print(f"Loading state dict from {load_path}...") |
|
|
self.model.load_state_dict(torch.load(load_path, map_location=device)) |
|
|
print("Weights loaded successfully!") |
|
|
|
|
|
print(f"Model is now running on {device} — ready for chat!\n") |
|
|
|
|
|
def generate_response(self, prompt: str, max_length: int = MAX_LENGTH, |
|
|
temperature: float = TEMPERATURE, top_k: int = TOP_K) -> str: |
|
|
input_ids = self.tokenizer.encode(prompt, return_tensors="pt").to(device) |
|
|
|
|
|
with torch.no_grad(): |
|
|
for _ in range(max_length): |
|
|
|
|
|
logits, _ = self.model(input_ids) |
|
|
|
|
|
|
|
|
next_token_logits = logits[:, -1, :] |
|
|
|
|
|
|
|
|
if temperature != 1.0: |
|
|
next_token_logits = next_token_logits / temperature |
|
|
|
|
|
|
|
|
if top_k > 0: |
|
|
values, indices = torch.topk(next_token_logits, top_k) |
|
|
next_token_logits = torch.full_like(next_token_logits, float('-inf')) |
|
|
next_token_logits.scatter_(1, indices, values) |
|
|
|
|
|
|
|
|
probabilities = F.softmax(next_token_logits, dim=-1) |
|
|
next_token = torch.multinomial(probabilities, num_samples=1) |
|
|
|
|
|
|
|
|
input_ids = torch.cat([input_ids, next_token], dim=-1) |
|
|
|
|
|
|
|
|
token_str = self.tokenizer.decode(next_token.item()) |
|
|
if "__eou__" in token_str or next_token.item() == self.tokenizer.eos_token_id: |
|
|
break |
|
|
|
|
|
|
|
|
full_output = self.tokenizer.decode(input_ids[0], skip_special_tokens=False) |
|
|
response = full_output[len(prompt):].strip() |
|
|
|
|
|
|
|
|
response = response.replace("__eou__", "").strip() |
|
|
|
|
|
return response |
|
|
|
|
|
|
|
|
|
|
|
def main(): |
|
|
global TEMPERATURE, TOP_K |
|
|
|
|
|
print("Starting JiRack Chatbot...") |
|
|
chatbot = Chatbot(LAST_TRAINED_PATH) |
|
|
|
|
|
print("\n" + "=" * 70) |
|
|
print(f"JIRACK CHATBOT ONLINE") |
|
|
print(f"Temperature: {TEMPERATURE} | Top-K: {TOP_K} | Max Length: {MAX_LENGTH}") |
|
|
print("Type 'quit' or 'exit' to exit") |
|
|
print("Change settings: set temp=0.8 or set k=80") |
|
|
print("=" * 70 + "\n") |
|
|
|
|
|
while True: |
|
|
try: |
|
|
user_input = input("You: ").strip() |
|
|
|
|
|
if user_input.lower() in {"quit", "exit", "bye"}: |
|
|
print("Goodbye!") |
|
|
break |
|
|
|
|
|
|
|
|
if user_input.lower().startswith("set temp="): |
|
|
try: |
|
|
TEMPERATURE = float(user_input.split("=")[1]) |
|
|
print(f"Temperature → {TEMPERATURE}") |
|
|
except: |
|
|
print("Invalid format. Use: set temp=0.7") |
|
|
continue |
|
|
|
|
|
if user_input.lower().startswith("set k="): |
|
|
try: |
|
|
TOP_K = int(user_input.split("=")[1]) |
|
|
print(f"Top-K → {TOP_K}") |
|
|
except: |
|
|
print("Invalid format. Use: set k=50") |
|
|
continue |
|
|
|
|
|
if not user_input: |
|
|
continue |
|
|
|
|
|
print("Generating...", end="\r") |
|
|
response = chatbot.generate_response(user_input) |
|
|
print(f"JiRack: {response}\n") |
|
|
|
|
|
except KeyboardInterrupt: |
|
|
print("\n\nShutting down...") |
|
|
break |
|
|
except Exception as e: |
|
|
print(f"Error: {e}") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |