vishnu-coder's picture
Reconcile README with legacy project summary
eab2256
"""Model training utilities."""
from __future__ import annotations
from pathlib import Path
from typing import Dict, Tuple
import joblib
import numpy as np
from sklearn import metrics
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import StratifiedKFold, cross_val_score, train_test_split
from sklearn.pipeline import Pipeline
from .config import Config
from .logging_utils import get_logger
from .preprocessing import preprocess_dataframe
LOGGER = get_logger(__name__)
MetricsDict = Dict[str, float]
def build_pipeline(config: Config) -> Pipeline:
model_settings = config.model
vectorizer = TfidfVectorizer(
max_features=model_settings.get("max_features", 5000),
ngram_range=tuple(model_settings.get("ngram_range", (1, 1))),
stop_words="english",
)
estimator = LogisticRegression(
class_weight=model_settings.get("class_weight"),
max_iter=1000,
multi_class="auto",
solver="lbfgs",
)
pipeline = Pipeline(
steps=[
("tfidf", vectorizer),
("clf", estimator),
]
)
return pipeline
def _resolve_test_size(config: Config, target) -> float | int:
test_size = config.model.get("test_size", 0.2)
n_samples = len(target)
n_classes = int(getattr(target, "nunique", lambda: len(set(target)))())
if isinstance(test_size, float):
computed = max(1, int(round(test_size * n_samples)))
if computed < n_classes and n_samples > 0:
adjusted = max(n_classes / n_samples, test_size)
test_size = min(adjusted, 0.5)
LOGGER.warning(
"Adjusted test_size to %.2f to ensure each class appears in the test split",
test_size,
)
else:
if test_size < n_classes:
test_size = n_classes
LOGGER.warning(
"Adjusted test_size to %d to ensure each class appears in the test split",
test_size,
)
return test_size
def train_and_evaluate(config: Config) -> Tuple[Pipeline, MetricsDict]:
df = preprocess_dataframe(load_dataset(config), config)
data_settings = config.data
text_column = data_settings.get("text_column", "text")
target_column = data_settings.get("target_column", "sentiment")
y_full = df[target_column].astype(str)
test_size = _resolve_test_size(config, y_full)
X_train, X_test, y_train, y_test = train_test_split(
df[text_column],
y_full,
test_size=test_size,
random_state=config.model.get("random_state", 42),
stratify=y_full,
)
pipeline = build_pipeline(config)
LOGGER.info("Starting model training on %d samples", X_train.shape[0])
pipeline.fit(X_train, y_train)
predictions = pipeline.predict(X_test)
classification_report = metrics.classification_report(y_test, predictions, output_dict=True)
metrics_summary = {
"accuracy": metrics.accuracy_score(y_test, predictions),
"f1_macro": metrics.f1_score(y_test, predictions, average="macro"),
"precision_macro": metrics.precision_score(y_test, predictions, average="macro"),
"recall_macro": metrics.recall_score(y_test, predictions, average="macro"),
}
LOGGER.info("Evaluation metrics: %s", metrics_summary)
cv_folds = int(config.training.get("cv_folds", 0))
if cv_folds > 1:
value_counts = getattr(y_full, 'value_counts', lambda: [])()
min_class_count = int(value_counts.min()) if len(value_counts) else 0
if min_class_count < cv_folds:
cv_folds = max(min_class_count, 1)
LOGGER.warning("Reduced cv_folds to %d based on smallest class size", cv_folds)
if cv_folds > 1:
cv = StratifiedKFold(
n_splits=cv_folds,
shuffle=True,
random_state=config.model.get("random_state", 42),
)
cv_scores = cross_val_score(
pipeline,
df[text_column],
y_full,
cv=cv,
scoring=config.training.get("scoring", "f1_macro"),
)
metrics_summary["cv_mean"] = float(np.mean(cv_scores))
metrics_summary["cv_std"] = float(np.std(cv_scores))
LOGGER.info(
"Cross-validation %s: mean=%.4f std=%.4f",
config.training.get("scoring", "f1_macro"),
metrics_summary["cv_mean"],
metrics_summary["cv_std"],
)
class_order = config.data.get("class_order", [])
for label in class_order:
if label in classification_report:
metrics_summary[f"{label}_f1"] = classification_report[label]["f1-score"]
return pipeline, metrics_summary
def persist_artifacts(pipeline: Pipeline, config: Config, metrics_summary: MetricsDict) -> Path:
artifact_dir = Path(config.model.get("artifact_dir", "artifacts"))
artifact_dir.mkdir(parents=True, exist_ok=True)
artifact_path = artifact_dir / config.model.get("pipeline_filename", "sentiment_pipeline.joblib")
joblib.dump({"pipeline": pipeline, "metrics": metrics_summary}, artifact_path)
LOGGER.info("Saved pipeline and metrics to %s", artifact_path)
return artifact_path
# Avoid circular import by late import
from .data_loader import load_dataset # noqa: E402 pylint: disable=wrong-import-position
__all__ = ["build_pipeline", "train_and_evaluate", "persist_artifacts"]