|
|
"""test_conversational_neo_manual.py - Manual generation for NeoMini model"""
|
|
|
|
|
|
from model_neo import NeoMini, NeoMiniConfig
|
|
|
from transformers import AutoTokenizer
|
|
|
import torch
|
|
|
import torch.nn.functional as F
|
|
|
import json
|
|
|
|
|
|
def load_conversational_model(model_path="conversational_neo_extended"):
|
|
|
"""Load the fine-tuned conversational model"""
|
|
|
|
|
|
print("Loading fine-tuned conversational model...")
|
|
|
|
|
|
|
|
|
try:
|
|
|
with open(f"{model_path}/model_config.json", 'r') as f:
|
|
|
model_config = json.load(f)
|
|
|
max_seq_len = model_config.get('max_seq_len', 4096)
|
|
|
except:
|
|
|
max_seq_len = 4096
|
|
|
|
|
|
|
|
|
config = NeoMiniConfig()
|
|
|
config.max_seq_len = max_seq_len
|
|
|
|
|
|
model = NeoMini(config)
|
|
|
checkpoint = torch.load(f"{model_path}/conversational_model.pt", map_location='cpu')
|
|
|
model.load_state_dict(checkpoint['model_state_dict'])
|
|
|
|
|
|
|
|
|
tokenizer = AutoTokenizer.from_pretrained(model_path)
|
|
|
|
|
|
model.eval()
|
|
|
|
|
|
|
|
|
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
|
|
model = model.to(device)
|
|
|
|
|
|
print(f"β
Model loaded with {max_seq_len} token context window on {device}")
|
|
|
return model, tokenizer, device
|
|
|
|
|
|
def generate_response(model, tokenizer, prompt, device, max_new_tokens=200, temperature=0.8, top_k=50, top_p=0.9):
|
|
|
"""Manual text generation for NeoMini model"""
|
|
|
|
|
|
|
|
|
input_ids = tokenizer.encode(prompt, return_tensors="pt").to(device)
|
|
|
original_length = input_ids.shape[1]
|
|
|
|
|
|
|
|
|
with torch.no_grad():
|
|
|
for step in range(max_new_tokens):
|
|
|
|
|
|
logits = model(input_ids)
|
|
|
|
|
|
|
|
|
next_token_logits = logits[0, -1, :] / temperature
|
|
|
|
|
|
|
|
|
if top_k > 0:
|
|
|
top_k_logits, top_k_indices = torch.topk(next_token_logits, top_k)
|
|
|
next_token_logits = torch.full_like(next_token_logits, float('-inf'))
|
|
|
next_token_logits.scatter_(0, top_k_indices, top_k_logits)
|
|
|
|
|
|
|
|
|
if top_p < 1.0:
|
|
|
sorted_logits, sorted_indices = torch.sort(next_token_logits, descending=True)
|
|
|
cumulative_probs = torch.cumsum(F.softmax(sorted_logits, dim=-1), dim=-1)
|
|
|
|
|
|
|
|
|
sorted_indices_to_remove = cumulative_probs > top_p
|
|
|
sorted_indices_to_remove[1:] = sorted_indices_to_remove[:-1].clone()
|
|
|
sorted_indices_to_remove[0] = 0
|
|
|
|
|
|
indices_to_remove = sorted_indices[sorted_indices_to_remove]
|
|
|
next_token_logits[indices_to_remove] = float('-inf')
|
|
|
|
|
|
|
|
|
probs = F.softmax(next_token_logits, dim=-1)
|
|
|
next_token = torch.multinomial(probs, num_samples=1)
|
|
|
|
|
|
|
|
|
input_ids = torch.cat([input_ids, next_token.unsqueeze(0)], dim=1)
|
|
|
|
|
|
|
|
|
if next_token.item() == tokenizer.eos_token_id:
|
|
|
break
|
|
|
|
|
|
if input_ids.shape[1] >= model.config.max_seq_len:
|
|
|
break
|
|
|
|
|
|
|
|
|
generated_tokens = input_ids[0, original_length:]
|
|
|
generated_text = tokenizer.decode(generated_tokens, skip_special_tokens=True)
|
|
|
|
|
|
return generated_text.strip()
|
|
|
|
|
|
def chat_with_model(model, tokenizer, device):
|
|
|
"""Interactive chat with the conversational model"""
|
|
|
|
|
|
print("\nπ€ MAP-NEO Conversational AI (Fine-Tuned)")
|
|
|
print("Type 'quit' to exit, 'clear' to clear history, 'test' for quality tests")
|
|
|
print("="*70)
|
|
|
|
|
|
conversation_history = []
|
|
|
system_prompt = "You are MAP-NEO, a helpful, harmless, and honest AI assistant. Engage in natural conversation and provide thoughtful, accurate responses."
|
|
|
|
|
|
while True:
|
|
|
user_input = input("\nπ§ You: ").strip()
|
|
|
|
|
|
if user_input.lower() in ['quit', 'exit']:
|
|
|
print("π Goodbye!")
|
|
|
break
|
|
|
|
|
|
if user_input.lower() == 'clear':
|
|
|
conversation_history = []
|
|
|
print("π Conversation cleared!")
|
|
|
continue
|
|
|
|
|
|
if user_input.lower() == 'test':
|
|
|
test_model_quality(model, tokenizer, device)
|
|
|
continue
|
|
|
|
|
|
if not user_input:
|
|
|
continue
|
|
|
|
|
|
|
|
|
conversation_history.append(f"User: {user_input}")
|
|
|
|
|
|
|
|
|
recent_context = conversation_history[-10:]
|
|
|
context = "\n".join(recent_context)
|
|
|
|
|
|
prompt = f"{system_prompt}\n\n{context}\nAssistant:"
|
|
|
|
|
|
|
|
|
prompt_tokens = tokenizer.encode(prompt)
|
|
|
if len(prompt_tokens) > 1800:
|
|
|
recent_context = conversation_history[-6:]
|
|
|
context = "\n".join(recent_context)
|
|
|
prompt = f"{system_prompt}\n\n{context}\nAssistant:"
|
|
|
|
|
|
print("π€ MAP-NEO: ", end="", flush=True)
|
|
|
|
|
|
|
|
|
try:
|
|
|
assistant_response = generate_response(
|
|
|
model, tokenizer, prompt, device,
|
|
|
max_new_tokens=150,
|
|
|
temperature=0.8,
|
|
|
top_k=50,
|
|
|
top_p=0.9
|
|
|
)
|
|
|
|
|
|
|
|
|
if assistant_response.startswith("Assistant:"):
|
|
|
assistant_response = assistant_response[10:].strip()
|
|
|
|
|
|
print(assistant_response)
|
|
|
|
|
|
|
|
|
conversation_history.append(f"Assistant: {assistant_response}")
|
|
|
|
|
|
|
|
|
total_tokens = len(tokenizer.encode(prompt + assistant_response))
|
|
|
print(f" π Tokens: {total_tokens}/4096 ({total_tokens/4096*100:.1f}%)")
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"β Error generating response: {e}")
|
|
|
print("Try again with a different prompt.")
|
|
|
|
|
|
def test_model_quality(model, tokenizer, device):
|
|
|
"""Test model quality with sample prompts"""
|
|
|
|
|
|
print("\nπ§ͺ Testing Model Quality...")
|
|
|
print("="*60)
|
|
|
|
|
|
test_prompts = [
|
|
|
"Hello! Can you help me understand machine learning?",
|
|
|
"What's the difference between AI and machine learning?",
|
|
|
"I'm feeling stressed about work. Any advice?",
|
|
|
"Can you write a short story about a robot?",
|
|
|
"Explain quantum physics in simple terms.",
|
|
|
"How do I make a good cup of coffee?",
|
|
|
"What are the benefits of exercise?"
|
|
|
]
|
|
|
|
|
|
system_prompt = "You are MAP-NEO, a helpful, harmless, and honest AI assistant. Engage in natural conversation and provide thoughtful, accurate responses."
|
|
|
|
|
|
for i, user_prompt in enumerate(test_prompts[:5], 1):
|
|
|
print(f"\n--- Test {i}/5 ---")
|
|
|
print(f"π§ User: {user_prompt}")
|
|
|
|
|
|
prompt = f"{system_prompt}\n\nUser: {user_prompt}\nAssistant:"
|
|
|
|
|
|
try:
|
|
|
assistant_response = generate_response(
|
|
|
model, tokenizer, prompt, device,
|
|
|
max_new_tokens=120,
|
|
|
temperature=0.7,
|
|
|
top_k=50,
|
|
|
top_p=0.9
|
|
|
)
|
|
|
|
|
|
print(f"π€ MAP-NEO: {assistant_response}")
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"β Error: {e}")
|
|
|
|
|
|
print(f"\nβ
Quality tests completed!")
|
|
|
|
|
|
def compare_before_after(model, tokenizer, device):
|
|
|
"""Compare responses before and after fine-tuning"""
|
|
|
|
|
|
print("\nπ Before vs After Fine-Tuning Comparison")
|
|
|
print("="*60)
|
|
|
|
|
|
|
|
|
try:
|
|
|
print("Loading original model for comparison...")
|
|
|
original_config = NeoMiniConfig()
|
|
|
original_model = NeoMini(original_config)
|
|
|
original_checkpoint = torch.load('checkpoints/checkpoint_step_99999.pt', map_location='cpu')
|
|
|
original_model.load_state_dict(original_checkpoint['model_state_dict'])
|
|
|
original_model.eval().to(device)
|
|
|
|
|
|
test_prompt = "Hello! Can you help me learn about artificial intelligence?"
|
|
|
prompt = f"You are MAP-NEO, a helpful AI assistant.\n\nUser: {test_prompt}\nAssistant:"
|
|
|
|
|
|
|
|
|
print(f"\nπ§ User: {test_prompt}")
|
|
|
print("\nπ€ Original Model:")
|
|
|
original_response = generate_response(original_model, tokenizer, prompt, device, max_new_tokens=100, temperature=0.7)
|
|
|
print(original_response)
|
|
|
|
|
|
|
|
|
print("\nπ€ Fine-Tuned Model:")
|
|
|
finetuned_response = generate_response(model, tokenizer, prompt, device, max_new_tokens=100, temperature=0.7)
|
|
|
print(finetuned_response)
|
|
|
|
|
|
print("\nπ The fine-tuned model should be much more conversational and helpful!")
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"Comparison unavailable: {e}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
print("π MAP-NEO Conversational AI Testing Suite")
|
|
|
print("="*60)
|
|
|
|
|
|
|
|
|
model, tokenizer, device = load_conversational_model()
|
|
|
|
|
|
|
|
|
test_model_quality(model, tokenizer, device)
|
|
|
|
|
|
|
|
|
compare_before_after(model, tokenizer, device)
|
|
|
|
|
|
print("\n" + "="*70)
|
|
|
print("π Ready for interactive conversation!")
|
|
|
print("="*70)
|
|
|
|
|
|
|
|
|
chat_with_model(model, tokenizer, device)
|
|
|
|