|
|
import pandas as pd |
|
|
import numpy as np |
|
|
import requests |
|
|
from sklearn.model_selection import train_test_split |
|
|
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix |
|
|
from transformers import ( |
|
|
AutoTokenizer, AutoModelForSequenceClassification, |
|
|
TrainingArguments, Trainer, DataCollatorWithPadding |
|
|
) |
|
|
import torch |
|
|
from datasets import Dataset |
|
|
import logging |
|
|
import os |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class CBTBinaryClassifier: |
|
|
"""Binary classifier to distinguish normal conversation from CBT-triggering statements.""" |
|
|
|
|
|
def __init__(self, model_name="distilbert-base-uncased"): |
|
|
|
|
|
self.model_name = model_name |
|
|
self.tokenizer = AutoTokenizer.from_pretrained(model_name) |
|
|
self.model = None |
|
|
self.trainer = None |
|
|
self.inference_pipeline = None |
|
|
self.use_hf_api = False |
|
|
self.api_url = None |
|
|
self.api_token = None |
|
|
self.headers = None |
|
|
self.model_id = None |
|
|
|
|
|
|
|
|
if self.tokenizer.pad_token is None: |
|
|
self.tokenizer.pad_token = self.tokenizer.eos_token |
|
|
|
|
|
def prepare_data(self, normal_csv_path, cbt_csv_path, text_column="text"): |
|
|
"""Load and prepare training data from CSV files""" |
|
|
|
|
|
logger.info(f"Loading normal conversations from {normal_csv_path}") |
|
|
normal_df = pd.read_csv(normal_csv_path) |
|
|
normal_df['label'] = 0 |
|
|
normal_df['text'] = normal_df[text_column] |
|
|
|
|
|
logger.info(f"Loading CBT conversations from {cbt_csv_path}") |
|
|
cbt_df = pd.read_csv(cbt_csv_path) |
|
|
cbt_df['label'] = 1 |
|
|
cbt_df['text'] = cbt_df[text_column] |
|
|
|
|
|
|
|
|
combined_df = pd.concat([ |
|
|
normal_df[['text', 'label']], |
|
|
cbt_df[['text', 'label']] |
|
|
], ignore_index=True) |
|
|
|
|
|
|
|
|
combined_df = combined_df.sample(frac=1, random_state=42).reset_index(drop=True) |
|
|
|
|
|
logger.info(f"Total examples: {len(combined_df)}") |
|
|
logger.info(f"Normal conversations: {len(normal_df)}") |
|
|
logger.info(f"CBT triggers: {len(cbt_df)}") |
|
|
|
|
|
return combined_df |
|
|
|
|
|
def tokenize_data(self, df, max_length=128): |
|
|
"""Tokenize the text data""" |
|
|
|
|
|
def tokenize_function(examples): |
|
|
return self.tokenizer( |
|
|
examples['text'], |
|
|
truncation=True, |
|
|
padding='max_length', |
|
|
max_length=max_length, |
|
|
return_tensors=None |
|
|
) |
|
|
|
|
|
|
|
|
dataset = Dataset.from_pandas(df) |
|
|
tokenized_dataset = dataset.map( |
|
|
tokenize_function, |
|
|
batched=True, |
|
|
remove_columns=['text']) |
|
|
|
|
|
return tokenized_dataset |
|
|
|
|
|
def split_data(self, dataset, test_size=0.2, val_size=0.1): |
|
|
"""Split data into train/validation/test sets""" |
|
|
|
|
|
|
|
|
train_val, test = dataset.train_test_split( |
|
|
test_size=test_size, |
|
|
seed=42 |
|
|
).values() |
|
|
|
|
|
|
|
|
val_ratio = val_size / (1 - test_size) |
|
|
train, val = train_val.train_test_split( |
|
|
test_size=val_ratio, |
|
|
seed=42 |
|
|
).values() |
|
|
|
|
|
logger.info(f"Train: {len(train)}, Val: {len(val)}, Test: {len(test)}") |
|
|
return train, val, test |
|
|
|
|
|
def train_model(self, train_dataset, val_dataset, output_dir="./cbt_classifier"): |
|
|
"""Train the binary classifier with laptop-friendly settings""" |
|
|
|
|
|
|
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
|
|
|
|
|
self.model = AutoModelForSequenceClassification.from_pretrained( |
|
|
self.model_name, |
|
|
num_labels=2 |
|
|
) |
|
|
|
|
|
|
|
|
data_collator = DataCollatorWithPadding(tokenizer=self.tokenizer) |
|
|
|
|
|
|
|
|
training_args = TrainingArguments( |
|
|
output_dir=output_dir, |
|
|
num_train_epochs=2, |
|
|
per_device_train_batch_size=8, |
|
|
per_device_eval_batch_size=8, |
|
|
gradient_accumulation_steps=2, |
|
|
warmup_steps=100, |
|
|
weight_decay=0.01, |
|
|
logging_dir=f'{output_dir}/logs', |
|
|
logging_steps=50, |
|
|
eval_strategy="steps", |
|
|
eval_steps=200, |
|
|
save_strategy="steps", |
|
|
save_steps=200, |
|
|
load_best_model_at_end=True, |
|
|
metric_for_best_model="eval_accuracy", |
|
|
fp16=torch.cuda.is_available(), |
|
|
dataloader_num_workers=0, |
|
|
remove_unused_columns=True, |
|
|
) |
|
|
|
|
|
|
|
|
def compute_metrics(eval_pred): |
|
|
predictions, labels = eval_pred |
|
|
predictions = np.argmax(predictions, axis=1) |
|
|
return { |
|
|
'accuracy': accuracy_score(labels, predictions), |
|
|
} |
|
|
|
|
|
|
|
|
self.trainer = Trainer( |
|
|
model=self.model, |
|
|
args=training_args, |
|
|
train_dataset=train_dataset, |
|
|
eval_dataset=val_dataset, |
|
|
compute_metrics=compute_metrics, |
|
|
data_collator=data_collator, |
|
|
) |
|
|
|
|
|
|
|
|
logger.info("Starting training...") |
|
|
self.trainer.train() |
|
|
|
|
|
|
|
|
self.trainer.save_model() |
|
|
self.tokenizer.save_pretrained(output_dir) |
|
|
|
|
|
logger.info(f"Model saved to {output_dir}") |
|
|
|
|
|
def evaluate_model(self, test_dataset): |
|
|
"""Evaluate the trained model""" |
|
|
|
|
|
if self.trainer is None: |
|
|
raise ValueError("Model not trained yet!") |
|
|
|
|
|
|
|
|
predictions = self.trainer.predict(test_dataset) |
|
|
y_pred = np.argmax(predictions.predictions, axis=1) |
|
|
y_true = predictions.label_ids |
|
|
|
|
|
|
|
|
print("\n=== Evaluation Results ===") |
|
|
print(f"Accuracy: {accuracy_score(y_true, y_pred):.4f}") |
|
|
print("\nClassification Report:") |
|
|
print(classification_report(y_true, y_pred, |
|
|
target_names=['Normal', 'CBT Trigger'])) |
|
|
print("\nConfusion Matrix:") |
|
|
print(confusion_matrix(y_true, y_pred)) |
|
|
|
|
|
return y_true, y_pred |
|
|
|
|
|
def load_model(self, model_path="./cbt_classifier"): |
|
|
"""Load a pre-trained model for inference""" |
|
|
|
|
|
from transformers import pipeline |
|
|
|
|
|
self.inference_pipeline = pipeline( |
|
|
"text-classification", |
|
|
model=model_path, |
|
|
tokenizer=model_path, |
|
|
return_all_scores=True |
|
|
) |
|
|
|
|
|
logger.info(f"Model loaded from {model_path}") |
|
|
|
|
|
def predict(self, text, threshold=0.7): |
|
|
"""Predict if text is CBT-triggering""" |
|
|
|
|
|
if self.inference_pipeline is None: |
|
|
raise ValueError("Model not loaded! Call load_model() first.") |
|
|
|
|
|
result = self.inference_pipeline(text) |
|
|
|
|
|
|
|
|
cbt_confidence = next( |
|
|
score['score'] for score in result[0] |
|
|
if score['label'] == 'LABEL_1' |
|
|
) |
|
|
|
|
|
return { |
|
|
'is_cbt_trigger': cbt_confidence > threshold, |
|
|
'confidence': cbt_confidence, |
|
|
'threshold': threshold |
|
|
} |
|
|
|
|
|
def batch_predict(self, texts, threshold=0.7): |
|
|
"""Predict for multiple texts""" |
|
|
|
|
|
if self.inference_pipeline is None: |
|
|
raise ValueError("Model not loaded! Call load_model() first.") |
|
|
|
|
|
results = [] |
|
|
for text in texts: |
|
|
result = self.predict(text, threshold) |
|
|
results.append(result) |
|
|
|
|
|
return results |