File size: 30,219 Bytes
88b8fd6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
import pandas as pd                                                                                                                
import numpy as np
import requests                                                                                                                 
from sklearn.model_selection import train_test_split                                                                               
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix                                                
from transformers import (                                                                                                         
    AutoTokenizer, AutoModelForSequenceClassification,                                                                             
    TrainingArguments, Trainer, DataCollatorWithPadding                                                                            
)                                                                                                                                  
import torch                                                                                                                       
from datasets import Dataset                                                                                                       
import logging                                                                                                                     
import os                                                                                                                          
                                                                                                                                   
logger = logging.getLogger(__name__)                                                                                               
                                                                                                                                   
class CBTBinaryClassifier:                                                                                                         
    """Binary classifier to distinguish normal conversation from CBT-triggering statements."""                                     
                                                                                                                                   
    def __init__(self, model_name="distilbert-base-uncased"):                                                                      
        # Use a lightweight model that's good for your laptop                                                                      
        self.model_name = model_name                                                                                               
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)                                                                 
        self.model = None                                                                                                          
        self.trainer = None                                                                                                        
        self.inference_pipeline = None
        self.use_hf_api = False
        self.api_url = None
        self.api_token = None
        self.headers = None
        self.model_id = None                                                                                             
                                                                                                                                   
        # Add padding token if it doesn't exist                                                                                    
        if self.tokenizer.pad_token is None:                                                                                       
            self.tokenizer.pad_token = self.tokenizer.eos_token                                                                    
                                                                                                                                   
    def prepare_data(self, normal_csv_path, cbt_csv_path, text_column="text"):                                                     
        """Load and prepare training data from CSV files"""                                                                        
                                                                                                                                   
        logger.info(f"Loading normal conversations from {normal_csv_path}")                                                        
        normal_df = pd.read_csv(normal_csv_path)                                                                                   
        normal_df['label'] = 0  # Normal conversation = 0                                                                          
        normal_df['text'] = normal_df[text_column]                                                                                 
                                                                                                                                   
        logger.info(f"Loading CBT conversations from {cbt_csv_path}")                                                              
        cbt_df = pd.read_csv(cbt_csv_path)                                                                                         
        cbt_df['label'] = 1  # CBT trigger = 1                                                                                     
        cbt_df['text'] = cbt_df[text_column]                                                                                       
                                                                                                                                   
        # Combine datasets                                                                                                         
        combined_df = pd.concat([                                                                                                  
            normal_df[['text', 'label']],                                                                                          
            cbt_df[['text', 'label']]                                                                                              
        ], ignore_index=True)                                                                                                      
                                                                                                                                   
        # Shuffle the data                                                                                                         
        combined_df = combined_df.sample(frac=1, random_state=42).reset_index(drop=True)                                           
                                                                                                                                   
        logger.info(f"Total examples: {len(combined_df)}")                                                                         
        logger.info(f"Normal conversations: {len(normal_df)}")                                                                     
        logger.info(f"CBT triggers: {len(cbt_df)}")                                                                                
                                                                                                                                   
        return combined_df                                                                                                         
                                                                                                                                   
    def tokenize_data(self, df, max_length=128):                                                                                   
        """Tokenize the text data"""                                                                                               
                                                                                                                                   
        def tokenize_function(examples):                                                                                           
            return self.tokenizer(                                                                                                 
                examples['text'],                                                                                                  
                truncation=True,                                                                                                   
                padding='max_length',                                                                                              
                max_length=max_length,                                                                                             
                return_tensors=None                                                                                                
            )                                                                                                                      
                                                                                                                                   
        # Convert to HuggingFace Dataset                                                                                           
        dataset = Dataset.from_pandas(df)                                                                                          
        tokenized_dataset = dataset.map(
            tokenize_function, 
            batched=True,
            remove_columns=['text'])                                                           
                                                                                                                                   
        return tokenized_dataset                                                                                                   
                                                                                                                                   
    def split_data(self, dataset, test_size=0.2, val_size=0.1):                                                                    
        """Split data into train/validation/test sets"""                                                                           
                                                                                                                                   
        # First split: train + val vs test                                                                                         
        train_val, test = dataset.train_test_split(                                                                                
            test_size=test_size,                                                                                                   
            seed=42                                                                                                                
        ).values()                                                                                                                 
                                                                                                                                   
        # Second split: train vs validation                                                                                        
        val_ratio = val_size / (1 - test_size)                                                                                     
        train, val = train_val.train_test_split(                                                                                   
            test_size=val_ratio,                                                                                                   
            seed=42                                                                                                                
        ).values()                                                                                                                 
                                                                                                                                   
        logger.info(f"Train: {len(train)}, Val: {len(val)}, Test: {len(test)}")                                                    
        return train, val, test                                                                                                    
                                                                                                                                   
    def train_model(self, train_dataset, val_dataset, output_dir="./cbt_classifier"):                                              
        """Train the binary classifier with laptop-friendly settings"""                                                            
                                                                                                                                   
        # Create output directory                                                                                                  
        os.makedirs(output_dir, exist_ok=True)                                                                                     
                                                                                                                                   
        # Initialize model                                                                                                         
        self.model = AutoModelForSequenceClassification.from_pretrained(                                                           
            self.model_name,                                                                                                       
            num_labels=2                                                                                                           
        )                                                                                                                          
                                                                                                                                   
        # Create data collator for dynamic padding                                                                                 
        data_collator = DataCollatorWithPadding(tokenizer=self.tokenizer)                                                          
                                                                                                                                   
        # Laptop-friendly training arguments                                                                                       
        training_args = TrainingArguments(                                                                                         
            output_dir=output_dir,                                                                                                 
            num_train_epochs=2,  # Reduced epochs                                                                                  
            per_device_train_batch_size=8,  # Smaller batch size                                                                   
            per_device_eval_batch_size=8,                                                                                          
            gradient_accumulation_steps=2,  # Simulate larger batch size                                                           
            warmup_steps=100,  # Reduced warmup                                                                                    
            weight_decay=0.01,                                                                                                     
            logging_dir=f'{output_dir}/logs',                                                                                      
            logging_steps=50,                                                                                                      
            eval_strategy="steps",                                                                                                 
            eval_steps=200,                                                                                                        
            save_strategy="steps",                                                                                                 
            save_steps=200,                                                                                                        
            load_best_model_at_end=True,                                                                                           
            metric_for_best_model="eval_accuracy",                                                                                 
            fp16=torch.cuda.is_available(),  # Use mixed precision if GPU available                                                
            dataloader_num_workers=0,  # Reduce CPU usage                                                                          
            remove_unused_columns=True,                                                                                           
        )                                                                                                                          
                                                                                                                                   
        # Metrics function                                                                                                         
        def compute_metrics(eval_pred):                                                                                            
            predictions, labels = eval_pred                                                                                        
            predictions = np.argmax(predictions, axis=1)                                                                           
            return {                                                                                                               
                'accuracy': accuracy_score(labels, predictions),                                                                   
            }                                                                                                                      
                                                                                                                                   
        # Initialize trainer                                                                                                       
        self.trainer = Trainer(                                                                                                    
            model=self.model,                                                                                                      
            args=training_args,                                                                                                    
            train_dataset=train_dataset,                                                                                           
            eval_dataset=val_dataset,                                                                                              
            compute_metrics=compute_metrics,                                                                                       
            data_collator=data_collator,                                                                                           
        )                                                                                                                          
                                                                                                                                   
        # Train the model                                                                                                          
        logger.info("Starting training...")                                                                                        
        self.trainer.train()                                                                                                       
                                                                                                                                   
        # Save the model                                                                                                           
        self.trainer.save_model()                                                                                                  
        self.tokenizer.save_pretrained(output_dir)                                                                                 
                                                                                                                                   
        logger.info(f"Model saved to {output_dir}")                                                                                
                                                                                                                                   
    def evaluate_model(self, test_dataset):                                                                                        
        """Evaluate the trained model"""                                                                                           
                                                                                                                                   
        if self.trainer is None:                                                                                                   
            raise ValueError("Model not trained yet!")                                                                             
                                                                                                                                   
        # Get predictions                                                                                                          
        predictions = self.trainer.predict(test_dataset)                                                                           
        y_pred = np.argmax(predictions.predictions, axis=1)                                                                        
        y_true = predictions.label_ids                                                                                             
                                                                                                                                   
        # Print results                                                                                                            
        print("\n=== Evaluation Results ===")                                                                                      
        print(f"Accuracy: {accuracy_score(y_true, y_pred):.4f}")                                                                   
        print("\nClassification Report:")                                                                                          
        print(classification_report(y_true, y_pred,                                                                                
                                  target_names=['Normal', 'CBT Trigger']))                                                         
        print("\nConfusion Matrix:")                                                                                               
        print(confusion_matrix(y_true, y_pred))                                                                                    
                                                                                                                                   
        return y_true, y_pred                                                                                                      
                                                                                                                                   
    def load_model(self, model_path="./cbt_classifier"):                                                                           
        """Load a pre-trained model for inference"""                                                                               
                                                                                                                                   
        from transformers import pipeline                                                                                          
                                                                                                                                   
        self.inference_pipeline = pipeline(                                                                                        
            "text-classification",                                                                                                 
            model=model_path,                                                                                                      
            tokenizer=model_path,                                                                                                  
            return_all_scores=True                                                                                                 
        )                                                                                                                          
                                                                                                                                   
        logger.info(f"Model loaded from {model_path}")                                                                             
                                                                                                                                   
    def predict(self, text, threshold=0.7):                                                                                        
        """Predict if text is CBT-triggering"""                                                                                    
                                                                                                                                   
        if self.inference_pipeline is None:                                                                                        
            raise ValueError("Model not loaded! Call load_model() first.")                                                         
                                                                                                                                   
        result = self.inference_pipeline(text)                                                                                     
                                                                                                                                   
        # Extract confidence for CBT trigger class (LABEL_1)                                                                       
        cbt_confidence = next(                                                                                                     
            score['score'] for score in result[0]                                                                                  
            if score['label'] == 'LABEL_1'                                                                                         
        )                                                                                                                          
                                                                                                                                   
        return {                                                                                                                   
            'is_cbt_trigger': cbt_confidence > threshold,                                                                          
            'confidence': cbt_confidence,                                                                                          
            'threshold': threshold                                                                                                 
        }                                                                                                                          
                                                                                                                                   
    def batch_predict(self, texts, threshold=0.7):                                                                                 
        """Predict for multiple texts"""                                                                                           
                                                                                                                                   
        if self.inference_pipeline is None:                                                                                        
            raise ValueError("Model not loaded! Call load_model() first.")                                                         
                                                                                                                                   
        results = []                                                                                                               
        for text in texts:                                                                                                         
            result = self.predict(text, threshold)                                                                                 
            results.append(result)                                                                                                 
                                                                                                                                   
        return results