File size: 8,372 Bytes
e207dc8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
#!/usr/bin/env python3
"""
Script untuk testing Textilindo AI Assistant yang sudah di-fine-tune
"""

import os
import sys
import yaml
import torch
import argparse
from pathlib import Path
from transformers import AutoTokenizer, AutoModelForCausalLM
from peft import PeftModel
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def load_system_prompt(system_prompt_path):
    """Load system prompt from markdown file"""
    try:
        with open(system_prompt_path, 'r', encoding='utf-8') as f:
            content = f.read()
        
        # Extract SYSTEM_PROMPT from markdown
        if 'SYSTEM_PROMPT = """' in content:
            start = content.find('SYSTEM_PROMPT = """') + len('SYSTEM_PROMPT = """')
            end = content.find('"""', start)
            system_prompt = content[start:end].strip()
        else:
            # Fallback: use entire content
            system_prompt = content.strip()
        
        return system_prompt
    except Exception as e:
        logger.error(f"Error loading system prompt: {e}")
        return None

def load_finetuned_model(model_path, lora_weights_path, system_prompt):
    """Load fine-tuned model with LoRA weights"""
    logger.info(f"Loading base model from: {model_path}")
    
    # Load base model
    model = AutoModelForCausalLM.from_pretrained(
        model_path,
        torch_dtype=torch.float16,
        device_map="auto",
        trust_remote_code=True
    )
    
    # Load LoRA weights if available
    if lora_weights_path and os.path.exists(lora_weights_path):
        logger.info(f"Loading LoRA weights from: {lora_weights_path}")
        model = PeftModel.from_pretrained(model, lora_weights_path)
    else:
        logger.warning("No LoRA weights found, using base model")
    
    # Load tokenizer
    tokenizer = AutoTokenizer.from_pretrained(
        model_path,
        trust_remote_code=True
    )
    
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token
    
    return model, tokenizer

def generate_response(model, tokenizer, user_input, system_prompt, max_length=512):
    """Generate response from the model"""
    # Create full prompt with system prompt
    full_prompt = f"<|system|>\n{system_prompt}\n<|user|>\n{user_input}\n<|assistant|>\n"
    
    inputs = tokenizer(full_prompt, return_tensors="pt").to(model.device)
    
    with torch.no_grad():
        outputs = model.generate(
            **inputs,
            max_length=max_length,
            temperature=0.7,
            top_p=0.9,
            top_k=40,
            repetition_penalty=1.1,
            do_sample=True,
            pad_token_id=tokenizer.eos_token_id,
            eos_token_id=tokenizer.eos_token_id,
            stop_strings=["<|end|>", "<|user|>"]
        )
    
    response = tokenizer.decode(outputs[0], skip_special_tokens=True)
    
    # Extract only the assistant's response
    if "<|assistant|>" in response:
        assistant_response = response.split("<|assistant|>")[-1].strip()
        # Remove any remaining special tokens
        assistant_response = assistant_response.replace("<|end|>", "").strip()
        return assistant_response
    else:
        return response

def interactive_test(model, tokenizer, system_prompt):
    """Interactive testing mode"""
    print("πŸ€– Textilindo AI Assistant - Interactive Mode")
    print("=" * 60)
    print("Type 'quit' to exit")
    print("-" * 60)
    
    while True:
        try:
            user_input = input("\nπŸ‘€ Customer: ").strip()
            
            if user_input.lower() in ['quit', 'exit', 'q']:
                print("πŸ‘‹ Terima kasih! Sampai jumpa!")
                break
            
            if not user_input:
                continue
            
            print("\nπŸ€– Textilindo AI: ", end="", flush=True)
            response = generate_response(model, tokenizer, user_input, system_prompt)
            print(response)
                
        except KeyboardInterrupt:
            print("\nπŸ‘‹ Terima kasih! Sampai jumpa!")
            break
        except Exception as e:
            logger.error(f"Error generating response: {e}")
            print(f"❌ Error: {e}")

def batch_test(model, tokenizer, system_prompt, test_cases):
    """Batch testing with predefined test cases"""
    print("πŸ§ͺ Textilindo AI Assistant - Batch Testing")
    print("=" * 60)
    
    for i, test_case in enumerate(test_cases, 1):
        print(f"\nπŸ“ Test Case {i}: {test_case['prompt']}")
        print("-" * 40)
        
        try:
            response = generate_response(model, tokenizer, test_case['prompt'], system_prompt)
            print(f"πŸ€– Response: {response}")
            
            if 'expected' in test_case:
                print(f"🎯 Expected: {test_case['expected']}")
                
        except Exception as e:
            logger.error(f"Error in test case {i}: {e}")
            print(f"❌ Error: {e}")

def main():
    parser = argparse.ArgumentParser(description='Test Textilindo AI Assistant')
    parser.add_argument('--model_path', type=str, default='./models/llama-3.1-8b-instruct',
                        help='Path to base model')
    parser.add_argument('--lora_path', type=str, default=None,
                        help='Path to LoRA weights')
    parser.add_argument('--system_prompt', type=str, default='configs/system_prompt.md',
                        help='Path to system prompt file')
    
    args = parser.parse_args()
    
    print("πŸ§ͺ Textilindo AI Assistant Testing")
    print("=" * 60)
    
    # Load system prompt
    system_prompt = load_system_prompt(args.system_prompt)
    if not system_prompt:
        print(f"❌ System prompt tidak ditemukan: {args.system_prompt}")
        sys.exit(1)
    
    # Check if model exists
    if not os.path.exists(args.model_path):
        print(f"❌ Base model tidak ditemukan: {args.model_path}")
        print("Jalankan download_model.py terlebih dahulu")
        sys.exit(1)
    
    try:
        # Load model
        print("1️⃣ Loading model...")
        model, tokenizer = load_finetuned_model(args.model_path, args.lora_path, system_prompt)
        print("βœ… Model loaded successfully!")
        
        # Test cases specific to Textilindo
        test_cases = [
            {
                "prompt": "dimana lokasi textilindo?",
                "expected": "Textilindo berkantor pusat di Jl. Raya Prancis No.39, Kosambi Tim., Kec. Kosambi, Kabupaten Tangerang, Banten 15213"
            },
            {
                "prompt": "Jam berapa textilindo beroperasional?",
                "expected": "Jam operasional Senin-Jumat 08:00-17:00, Sabtu 08:00-12:00."
            },
            {
                "prompt": "Berapa ketentuan pembelian?",
                "expected": "Minimal order 1 roll per jenis kain"
            },
            {
                "prompt": "bagimana dengan pembayarannya?",
                "expected": "Pembayaran dapat dilakukan via transfer bank atau cash on delivery"
            },
            {
                "prompt": "apa ada gratis ongkir?",
                "expected": "Gratis ongkir untuk order minimal 5 roll."
            },
            {
                "prompt": "Apa bisa dikirimkan sample? apa gratis?",
                "expected": "hallo kak untuk sampel kita bisa kirimkan gratis ya kak 😊"
            }
        ]
        
        # Choose testing mode
        print("\n2️⃣ Pilih mode testing:")
        print("1. Interactive mode (chat)")
        print("2. Batch testing")
        print("3. Custom prompt")
        
        choice = input("\nPilihan (1-3): ").strip()
        
        if choice == "1":
            interactive_test(model, tokenizer, system_prompt)
        elif choice == "2":
            batch_test(model, tokenizer, system_prompt, test_cases)
        elif choice == "3":
            custom_prompt = input("Masukkan prompt custom: ").strip()
            if custom_prompt:
                response = generate_response(model, tokenizer, custom_prompt, system_prompt)
                print(f"\nπŸ€– Response: {response}")
        else:
            print("❌ Pilihan tidak valid")
            
    except Exception as e:
        logger.error(f"Error: {e}")
        print(f"❌ Error loading model: {e}")

if __name__ == "__main__":
    main()