Alalay / models /random_forest.py
Jandayl's picture
testing, removed obsolete files.
2867a3c
# THIS FILE IS NOT CRUCIAL TO THE CURRENT SYSTEM. USED ONLY AS A BENCHMARK.
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import StratifiedKFold, GridSearchCV, RandomizedSearchCV, train_test_split
from sklearn.metrics import f1_score, make_scorer, classification_report, confusion_matrix
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, ExtraTreesClassifier, VotingClassifier
from sklearn.preprocessing import OneHotEncoder, LabelEncoder, StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from scipy.stats import randint
# Load dataset
df = pd.read_csv("Feature_Extracted_Corpus.csv")
# Clean categorical features
df['sentence_construction_type'] = df['sentence_construction_type'].replace(['Unknown'], 'Other')
df['sentence_type'] = df['sentence_type'].replace(['Compound-Complex'], 'Other')
print("Sentence Construction Type Counts:\n", df['sentence_construction_type'].value_counts())
print("\nSentence Type Counts:\n", df['sentence_type'].value_counts())
print("\nTarget Distribution:\n", df['group'].value_counts())
# Encode target
label_encoder = LabelEncoder()
y = label_encoder.fit_transform(df["group"])
print("\nEncoded classes:", dict(zip(label_encoder.classes_, range(len(label_encoder.classes_)))))
# Feature matrix
X = df.drop(columns=["id", "text", "group", "grade"])
# Identify numeric and categorical columns
numeric_cols = X.select_dtypes(include=[np.number]).columns.tolist()
categorical_cols = ["sentence_construction_type", "sentence_type"]
print(f"\nNumeric features: {numeric_cols}")
print(f"Categorical features: {categorical_cols}")
# Create preprocessor
preprocessor = ColumnTransformer(
transformers=[
("cat", OneHotEncoder(handle_unknown="ignore", sparse_output=False), categorical_cols),
("num", StandardScaler(), numeric_cols)
],
remainder="passthrough"
)
# split the data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
print(f"\nTraining set size: {len(X_train)}")
print(f"Test set size: {len(X_test)}")
print(f"Training set class distribution: {np.bincount(y_train)}")
print(f"Test set class distribution: {np.bincount(y_test)}")
# Train a model with best params from previous run
print("\n=== ANALYZING FEATURE IMPORTANCE ===")
best_params = {
'max_depth': 10,
'max_features': 'sqrt',
'min_samples_leaf': 4,
'min_samples_split': 2,
'n_estimators': 300
}
# Create and train pipeline
pipeline = Pipeline([
("preprocessing", preprocessor),
("classifier", RandomForestClassifier(**best_params, random_state=42, class_weight='balanced'))
])
pipeline.fit(X_train, y_train)
# Get feature names after preprocessing
try:
feature_names = (pipeline.named_steps['preprocessing']
.get_feature_names_out()
.tolist())
# Clean up feature names for better display
feature_names = [name.replace('cat__', '').replace('num__', '') for name in feature_names]
# Get feature importances
importances = pipeline.named_steps['classifier'].feature_importances_
# Create feature importance dataframe
feature_importance_df = pd.DataFrame({
'feature': feature_names,
'importance': importances
}).sort_values('importance', ascending=False)
print("\nTop 10 most important features:")
print(feature_importance_df.head(10))
# Plot top 20 features
plt.figure(figsize=(12, 8))
top_20 = feature_importance_df.head(20)
plt.barh(range(len(top_20)), top_20['importance'])
plt.yticks(range(len(top_20)), top_20['feature'])
plt.xlabel('Feature Importance')
plt.title('Top 20 Most Important Features')
plt.gca().invert_yaxis() # Most important at the top
plt.tight_layout()
plt.show()
except Exception as e:
print(f"Could not get feature names: {e}")
# Alternative: just get importance values without names
importances = pipeline.named_steps['classifier'].feature_importances_
print(f"\nTop 10 feature importances (indices): {importances.argsort()[-10:][::-1]}")
# Evaluate current model
y_pred = pipeline.predict(X_test)
current_f1 = f1_score(y_test, y_pred, average='macro')
print(f"\n=== CURRENT MODEL PERFORMANCE ===")
print(f"Test F1 Macro: {current_f1:.4f}")
print("\nClassification Report:")
print(classification_report(y_test, y_pred, target_names=label_encoder.classes_))
# Confusion Matrix
cm = confusion_matrix(y_test, y_pred)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
xticklabels=label_encoder.classes_,
yticklabels=label_encoder.classes_)
plt.title('Confusion Matrix - Current Model')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.show()
# Option 1: Focused grid search around your best params
print("\n=== OPTION 1: FOCUSED GRID SEARCH ===")
focused_param_grid = {
"classifier__n_estimators": [200, 300, 400],
"classifier__max_depth": [8, 10, 12],
"classifier__min_samples_split": [2, 3, 4],
"classifier__min_samples_leaf": [3, 4, 5, 6],
"classifier__max_features": ["sqrt", "log2", 0.3, 0.4],
"classifier__class_weight": ["balanced", "balanced_subsample"]
}
# Use smaller grid for faster execution
smaller_grid = {
"classifier__n_estimators": [200, 300],
"classifier__max_depth": [8, 10],
"classifier__min_samples_split": [2, 4],
"classifier__min_samples_leaf": [3, 4, 5],
"classifier__max_features": ["sqrt", 0.4],
"classifier__class_weight": ["balanced"]
}
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42) # Used 5-fold based on planned methodology
f1_macro_scorer = make_scorer(f1_score, average="macro")
grid_search = GridSearchCV(
Pipeline([
("preprocessing", preprocessor),
("classifier", RandomForestClassifier(random_state=42))
]),
smaller_grid, # Using smaller grid for speed
scoring=f1_macro_scorer,
cv=cv,
n_jobs=-1,
verbose=1
)
print("Running grid search (this may take a few minutes)...")
grid_search.fit(X_train, y_train)
print("\n=== BEST MODEL FROM GRID SEARCH ===")
print(f"Best CV Score: {grid_search.best_score_:.4f}")
print("Best Params:")
for param, value in grid_search.best_params_.items():
print(f" {param}: {value}")
# Evaluate on test set
y_pred_grid = grid_search.predict(X_test)
grid_f1 = f1_score(y_test, y_pred_grid, average='macro')
print(f"\nTest F1 Macro: {grid_f1:.4f}")
print("\nClassification Report - Grid Search Model:")
print(classification_report(y_test, y_pred_grid, target_names=label_encoder.classes_))
# Option 2: Try different algorithms
print("\n=== OPTION 2: COMPARING DIFFERENT ALGORITHMS ===")
algorithms = {
'Random Forest': RandomForestClassifier(random_state=42, class_weight='balanced'),
'Extra Trees': ExtraTreesClassifier(random_state=42, class_weight='balanced'),
'Gradient Boosting': GradientBoostingClassifier(random_state=42)
}
results = {}
for name, algorithm in algorithms.items():
print(f"\nTraining {name}...")
pipe = Pipeline([
("preprocessing", preprocessor),
("classifier", algorithm)
])
# Use default params for quick comparison
pipe.fit(X_train, y_train)
y_pred = pipe.predict(X_test)
f1 = f1_score(y_test, y_pred, average='macro')
results[name] = f1
print(f"{name} F1 Macro: {f1:.4f}")
# Plot algorithm comparison
plt.figure(figsize=(10, 6))
plt.bar(results.keys(), results.values())
plt.ylabel('F1 Macro Score')
plt.title('Algorithm Comparison')
for i, (name, score) in enumerate(results.items()):
plt.text(i, score + 0.01, f'{score:.3f}', ha='center')
plt.ylim(0, 1)
plt.show()
# Try ensemble of best models
print("\n=== OPTION 3: ENSEMBLE MODEL ===")
# Create a few different models
rf1 = RandomForestClassifier(n_estimators=300, max_depth=10, min_samples_leaf=4,
random_state=42, class_weight='balanced')
rf2 = RandomForestClassifier(n_estimators=200, max_depth=8, min_samples_leaf=5,
random_state=43, class_weight='balanced_subsample')
rf3 = RandomForestClassifier(n_estimators=400, max_depth=12, min_samples_leaf=3,
random_state=44, class_weight='balanced')
# Create ensemble
ensemble = Pipeline([
("preprocessing", preprocessor),
("classifier", VotingClassifier(
estimators=[
('rf1', rf1),
('rf2', rf2),
('rf3', rf3)
],
voting='soft' # Use probability voting
))
])
print("Training ensemble...")
ensemble.fit(X_train, y_train)
ensemble_pred = ensemble.predict(X_test)
ensemble_f1 = f1_score(y_test, ensemble_pred, average='macro')
print(f"Ensemble Test F1 Macro: {ensemble_f1:.4f}")
print("\nClassification Report - Ensemble:")
print(classification_report(y_test, ensemble_pred, target_names=label_encoder.classes_))
# Summary of all results
print("\n" + "="*50)
print("=== SUMMARY OF RESULTS ===")
print(f"Current Model F1: {current_f1:.4f}")
print(f"Grid Search Model F1: {grid_f1:.4f}")
print(f"Ensemble Model F1: {ensemble_f1:.4f}")
print("\nAlgorithm Comparison:")
for name, score in results.items():
print(f" {name}: {score:.4f}")
# Find the best performing model
best_f1 = max(current_f1, grid_f1, ensemble_f1, *results.values())
print(f"\nBest Overall F1 Macro: {best_f1:.4f}")
# Save the best model if needed
if best_f1 == ensemble_f1:
best_model = ensemble
print("Best model: Ensemble")
elif best_f1 == grid_f1:
best_model = grid_search.best_estimator_
print("Best model: Grid Search")
elif best_f1 == current_f1:
best_model = pipeline
print("Best model: Current Model")
else:
# Find which algorithm was best
best_algo_name = max(results, key=results.get)
best_model = Pipeline([
("preprocessing", preprocessor),
("classifier", algorithms[best_algo_name])
])
best_model.fit(X_train, y_train)
print(f"Best model: {best_algo_name}")
# Save the model
import joblib
joblib.dump(best_model, 'best_model.pkl')
print("\nBest model saved as 'best_model.pkl'")