JKrishnanandhaa's picture
Upload 54 files
ff0e79e verified
"""
LightGBM classifier for forgery type classification
Implements Critical Fix #8: Configurable Confidence Threshold
"""
import numpy as np
import lightgbm as lgb
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from typing import Dict, List, Tuple, Optional
import joblib
from pathlib import Path
import json
class ForgeryClassifier:
"""
LightGBM classifier for region-wise forgery classification
Target classes:
- 0: copy_move
- 1: splicing
- 2: text_substitution
"""
CLASS_NAMES = ['copy_move', 'splicing', 'text_substitution']
def __init__(self, config):
"""
Initialize classifier
Args:
config: Configuration object
"""
self.config = config
# LightGBM parameters
self.params = config.get('classifier.params', {
'objective': 'multiclass',
'num_class': 3,
'boosting_type': 'gbdt',
'num_leaves': 31,
'learning_rate': 0.05,
'n_estimators': 200,
'max_depth': 7,
'min_child_samples': 20,
'subsample': 0.8,
'colsample_bytree': 0.8,
'reg_alpha': 0.1,
'reg_lambda': 0.1,
'random_state': 42,
'verbose': -1
})
# Critical Fix #8: Configurable confidence threshold
self.confidence_threshold = config.get('classifier.confidence_threshold', 0.6)
# Initialize model and scaler
self.model = None
self.scaler = StandardScaler()
# Feature importance
self.feature_importance = None
self.feature_names = None
def train(self,
features: np.ndarray,
labels: np.ndarray,
feature_names: Optional[List[str]] = None,
validation_split: float = 0.2) -> Dict:
"""
Train classifier
Args:
features: Feature matrix (N, D)
labels: Class labels (N,)
feature_names: Optional feature names
validation_split: Validation split ratio
Returns:
Training metrics
"""
print(f"Training LightGBM classifier")
print(f"Features shape: {features.shape}")
print(f"Labels distribution: {np.bincount(labels)}")
# Handle NaN/Inf
features = np.nan_to_num(features, nan=0.0, posinf=0.0, neginf=0.0)
# Normalize features
features_scaled = self.scaler.fit_transform(features)
# Split data (Critical Fix #7: Image-level splitting should be done upstream)
X_train, X_val, y_train, y_val = train_test_split(
features_scaled, labels,
test_size=validation_split,
random_state=42,
stratify=labels
)
# Create LightGBM datasets
train_data = lgb.Dataset(X_train, label=y_train)
val_data = lgb.Dataset(X_val, label=y_val, reference=train_data)
# Train model
self.model = lgb.train(
self.params,
train_data,
valid_sets=[train_data, val_data],
valid_names=['train', 'val'],
num_boost_round=self.params.get('n_estimators', 200),
callbacks=[
lgb.early_stopping(stopping_rounds=20),
lgb.log_evaluation(period=10)
]
)
# Store feature importance
self.feature_names = feature_names
self.feature_importance = self.model.feature_importance(importance_type='gain')
# Evaluate
train_pred = self.model.predict(X_train)
train_acc = (train_pred.argmax(axis=1) == y_train).mean()
val_pred = self.model.predict(X_val)
val_acc = (val_pred.argmax(axis=1) == y_val).mean()
metrics = {
'train_accuracy': train_acc,
'val_accuracy': val_acc,
'num_features': features.shape[1],
'num_samples': len(labels),
'best_iteration': self.model.best_iteration
}
print(f"Training complete!")
print(f"Train accuracy: {train_acc:.4f}")
print(f"Val accuracy: {val_acc:.4f}")
return metrics
def predict(self, features: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
Predict forgery types
Args:
features: Feature matrix (N, D)
Returns:
predictions: Predicted class indices (N,)
confidences: Prediction confidences (N,)
"""
if self.model is None:
raise ValueError("Model not trained. Call train() first.")
# Handle NaN/Inf
features = np.nan_to_num(features, nan=0.0, posinf=0.0, neginf=0.0)
# Normalize features
features_scaled = self.scaler.transform(features)
# Predict probabilities
probabilities = self.model.predict(features_scaled)
# Get predictions and confidences
predictions = probabilities.argmax(axis=1)
confidences = probabilities.max(axis=1)
return predictions, confidences
def predict_with_filtering(self,
features: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Predict with confidence filtering
Args:
features: Feature matrix (N, D)
Returns:
predictions: Predicted class indices (N,)
confidences: Prediction confidences (N,)
valid_mask: Boolean mask for valid predictions (N,)
"""
predictions, confidences = self.predict(features)
# Critical Fix #8: Apply confidence threshold
valid_mask = confidences >= self.confidence_threshold
return predictions, confidences, valid_mask
def get_class_name(self, class_idx: int) -> str:
"""Get class name from index"""
return self.CLASS_NAMES[class_idx]
def get_feature_importance(self, top_k: int = 20) -> List[Tuple[str, float]]:
"""
Get top-k most important features
Args:
top_k: Number of features to return
Returns:
List of (feature_name, importance) tuples
"""
if self.feature_importance is None:
return []
# Sort by importance
indices = np.argsort(self.feature_importance)[::-1][:top_k]
result = []
for idx in indices:
name = self.feature_names[idx] if self.feature_names else f'feature_{idx}'
importance = self.feature_importance[idx]
result.append((name, importance))
return result
def save(self, save_dir: str):
"""
Save model and scaler
Args:
save_dir: Directory to save model
"""
save_path = Path(save_dir)
save_path.mkdir(parents=True, exist_ok=True)
# Save LightGBM model
model_path = save_path / 'lightgbm_model.txt'
self.model.save_model(str(model_path))
# Save scaler
scaler_path = save_path / 'scaler.joblib'
joblib.dump(self.scaler, str(scaler_path))
# Save metadata
metadata = {
'confidence_threshold': self.confidence_threshold,
'class_names': self.CLASS_NAMES,
'feature_names': self.feature_names,
'feature_importance': self.feature_importance.tolist() if self.feature_importance is not None else None
}
metadata_path = save_path / 'classifier_metadata.json'
with open(metadata_path, 'w') as f:
json.dump(metadata, f, indent=2)
print(f"Classifier saved to {save_path}")
def load(self, load_dir: str):
"""
Load model and scaler
Args:
load_dir: Directory to load from
"""
load_path = Path(load_dir)
# Load LightGBM model
model_path = load_path / 'lightgbm_model.txt'
self.model = lgb.Booster(model_file=str(model_path))
# Load scaler
scaler_path = load_path / 'scaler.joblib'
self.scaler = joblib.load(str(scaler_path))
# Load metadata
metadata_path = load_path / 'classifier_metadata.json'
with open(metadata_path, 'r') as f:
metadata = json.load(f)
self.confidence_threshold = metadata.get('confidence_threshold', 0.6)
self.feature_names = metadata.get('feature_names')
self.feature_importance = np.array(metadata.get('feature_importance', []))
print(f"Classifier loaded from {load_path}")
def get_classifier(config) -> ForgeryClassifier:
"""Factory function for classifier"""
return ForgeryClassifier(config)