File size: 10,175 Bytes
a683148 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 |
"""test_conversational_neo_manual.py - Manual generation for NeoMini model"""
from model_neo import NeoMini, NeoMiniConfig
from transformers import AutoTokenizer
import torch
import torch.nn.functional as F
import json
def load_conversational_model(model_path="conversational_neo_extended"):
"""Load the fine-tuned conversational model"""
print("Loading fine-tuned conversational model...")
# Load config
try:
with open(f"{model_path}/model_config.json", 'r') as f:
model_config = json.load(f)
max_seq_len = model_config.get('max_seq_len', 4096)
except:
max_seq_len = 4096
# Load model
config = NeoMiniConfig()
config.max_seq_len = max_seq_len
model = NeoMini(config)
checkpoint = torch.load(f"{model_path}/conversational_model.pt", map_location='cpu')
model.load_state_dict(checkpoint['model_state_dict'])
# Load tokenizer
tokenizer = AutoTokenizer.from_pretrained(model_path)
model.eval()
# Move to GPU if available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
print(f"β
Model loaded with {max_seq_len} token context window on {device}")
return model, tokenizer, device
def generate_response(model, tokenizer, prompt, device, max_new_tokens=200, temperature=0.8, top_k=50, top_p=0.9):
"""Manual text generation for NeoMini model"""
# Tokenize input
input_ids = tokenizer.encode(prompt, return_tensors="pt").to(device)
original_length = input_ids.shape[1]
# Generate tokens one by one
with torch.no_grad():
for step in range(max_new_tokens):
# Forward pass
logits = model(input_ids)
# Get logits for the last token
next_token_logits = logits[0, -1, :] / temperature
# Apply top-k filtering
if top_k > 0:
top_k_logits, top_k_indices = torch.topk(next_token_logits, top_k)
next_token_logits = torch.full_like(next_token_logits, float('-inf'))
next_token_logits.scatter_(0, top_k_indices, top_k_logits)
# Apply top-p filtering
if top_p < 1.0:
sorted_logits, sorted_indices = torch.sort(next_token_logits, descending=True)
cumulative_probs = torch.cumsum(F.softmax(sorted_logits, dim=-1), dim=-1)
# Remove tokens with cumulative probability above threshold
sorted_indices_to_remove = cumulative_probs > top_p
sorted_indices_to_remove[1:] = sorted_indices_to_remove[:-1].clone()
sorted_indices_to_remove[0] = 0
indices_to_remove = sorted_indices[sorted_indices_to_remove]
next_token_logits[indices_to_remove] = float('-inf')
# Sample next token
probs = F.softmax(next_token_logits, dim=-1)
next_token = torch.multinomial(probs, num_samples=1)
# Append to sequence
input_ids = torch.cat([input_ids, next_token.unsqueeze(0)], dim=1)
# Stop if EOS token or max context reached
if next_token.item() == tokenizer.eos_token_id:
break
if input_ids.shape[1] >= model.config.max_seq_len:
break
# Decode only the generated part
generated_tokens = input_ids[0, original_length:]
generated_text = tokenizer.decode(generated_tokens, skip_special_tokens=True)
return generated_text.strip()
def chat_with_model(model, tokenizer, device):
"""Interactive chat with the conversational model"""
print("\nπ€ MAP-NEO Conversational AI (Fine-Tuned)")
print("Type 'quit' to exit, 'clear' to clear history, 'test' for quality tests")
print("="*70)
conversation_history = []
system_prompt = "You are MAP-NEO, a helpful, harmless, and honest AI assistant. Engage in natural conversation and provide thoughtful, accurate responses."
while True:
user_input = input("\nπ§ You: ").strip()
if user_input.lower() in ['quit', 'exit']:
print("π Goodbye!")
break
if user_input.lower() == 'clear':
conversation_history = []
print("π Conversation cleared!")
continue
if user_input.lower() == 'test':
test_model_quality(model, tokenizer, device)
continue
if not user_input:
continue
# Build conversation context
conversation_history.append(f"User: {user_input}")
# Keep recent context (last 10 messages)
recent_context = conversation_history[-10:]
context = "\n".join(recent_context)
prompt = f"{system_prompt}\n\n{context}\nAssistant:"
# Check prompt length and truncate if needed
prompt_tokens = tokenizer.encode(prompt)
if len(prompt_tokens) > 1800: # Leave room for response
recent_context = conversation_history[-6:]
context = "\n".join(recent_context)
prompt = f"{system_prompt}\n\n{context}\nAssistant:"
print("π€ MAP-NEO: ", end="", flush=True)
# Generate response
try:
assistant_response = generate_response(
model, tokenizer, prompt, device,
max_new_tokens=150,
temperature=0.8,
top_k=50,
top_p=0.9
)
# Clean up response
if assistant_response.startswith("Assistant:"):
assistant_response = assistant_response[10:].strip()
print(assistant_response)
# Add to history
conversation_history.append(f"Assistant: {assistant_response}")
# Show token usage
total_tokens = len(tokenizer.encode(prompt + assistant_response))
print(f" π Tokens: {total_tokens}/4096 ({total_tokens/4096*100:.1f}%)")
except Exception as e:
print(f"β Error generating response: {e}")
print("Try again with a different prompt.")
def test_model_quality(model, tokenizer, device):
"""Test model quality with sample prompts"""
print("\nπ§ͺ Testing Model Quality...")
print("="*60)
test_prompts = [
"Hello! Can you help me understand machine learning?",
"What's the difference between AI and machine learning?",
"I'm feeling stressed about work. Any advice?",
"Can you write a short story about a robot?",
"Explain quantum physics in simple terms.",
"How do I make a good cup of coffee?",
"What are the benefits of exercise?"
]
system_prompt = "You are MAP-NEO, a helpful, harmless, and honest AI assistant. Engage in natural conversation and provide thoughtful, accurate responses."
for i, user_prompt in enumerate(test_prompts[:5], 1): # Test first 5
print(f"\n--- Test {i}/5 ---")
print(f"π§ User: {user_prompt}")
prompt = f"{system_prompt}\n\nUser: {user_prompt}\nAssistant:"
try:
assistant_response = generate_response(
model, tokenizer, prompt, device,
max_new_tokens=120,
temperature=0.7,
top_k=50,
top_p=0.9
)
print(f"π€ MAP-NEO: {assistant_response}")
except Exception as e:
print(f"β Error: {e}")
print(f"\nβ
Quality tests completed!")
def compare_before_after(model, tokenizer, device):
"""Compare responses before and after fine-tuning"""
print("\nπ Before vs After Fine-Tuning Comparison")
print("="*60)
# Load original model for comparison
try:
print("Loading original model for comparison...")
original_config = NeoMiniConfig()
original_model = NeoMini(original_config)
original_checkpoint = torch.load('checkpoints/checkpoint_step_99999.pt', map_location='cpu')
original_model.load_state_dict(original_checkpoint['model_state_dict'])
original_model.eval().to(device)
test_prompt = "Hello! Can you help me learn about artificial intelligence?"
prompt = f"You are MAP-NEO, a helpful AI assistant.\n\nUser: {test_prompt}\nAssistant:"
# Original model response
print(f"\nπ§ User: {test_prompt}")
print("\nπ€ Original Model:")
original_response = generate_response(original_model, tokenizer, prompt, device, max_new_tokens=100, temperature=0.7)
print(original_response)
# Fine-tuned model response
print("\nπ€ Fine-Tuned Model:")
finetuned_response = generate_response(model, tokenizer, prompt, device, max_new_tokens=100, temperature=0.7)
print(finetuned_response)
print("\nπ The fine-tuned model should be much more conversational and helpful!")
except Exception as e:
print(f"Comparison unavailable: {e}")
if __name__ == "__main__":
print("π MAP-NEO Conversational AI Testing Suite")
print("="*60)
# Load model
model, tokenizer, device = load_conversational_model()
# Test model quality
test_model_quality(model, tokenizer, device)
# Compare with original if available
compare_before_after(model, tokenizer, device)
print("\n" + "="*70)
print("π Ready for interactive conversation!")
print("="*70)
# Start interactive chat
chat_with_model(model, tokenizer, device)
|