Instructions to use M-Arjun/SpamShield with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- Scikit-learn
How to use M-Arjun/SpamShield with Scikit-learn:
from huggingface_hub import hf_hub_download import joblib model = joblib.load( hf_hub_download("M-Arjun/SpamShield", "sklearn_model.joblib") ) # only load pickle files from sources you trust # read more about it here https://skops.readthedocs.io/en/stable/persistence.html - Notebooks
- Google Colab
- Kaggle
| import json | |
| import os | |
| import sys | |
| from collections import Counter | |
| from pathlib import Path | |
| import joblib | |
| import numpy as np | |
| from scipy.sparse import hstack | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.linear_model import LogisticRegression | |
| from sklearn.metrics import ( | |
| accuracy_score, | |
| f1_score, | |
| precision_recall_curve, | |
| precision_score, | |
| recall_score, | |
| ) | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.pipeline import Pipeline, FeatureUnion | |
| # ONNX Conversion | |
| try: | |
| from skl2onnx import convert_sklearn | |
| from skl2onnx.common.data_types import StringTensorType | |
| ONNX_AVAILABLE = True | |
| except ImportError: | |
| ONNX_AVAILABLE = False | |
| BASE_DIR = Path(__file__).resolve().parent | |
| ROOT_DIR = BASE_DIR.parent.parent # Project root | |
| sys.path.insert(0, str(BASE_DIR)) | |
| sys.path.insert(0, str(BASE_DIR.parent)) # Add SpamShield to path for utils | |
| import config | |
| from utils import preprocess_text | |
| DATASETS_ROOT = ROOT_DIR / "Datasets" | |
| DATASET_CONFIG_PATH = Path(os.getenv("DATASET_CONFIG_PATH", str(DATASETS_ROOT / "0.4.json"))) | |
| MODEL_DIR = Path(os.getenv("MODEL_DIR", str(BASE_DIR))) | |
| TARGET_MIN_PRECISION = float(os.getenv("TARGET_MIN_PRECISION", str(config.TARGET_MIN_PRECISION))) | |
| SPAM_THRESHOLD = float(os.getenv("SPAM_THRESHOLD", str(config.SPAM_THRESHOLD))) | |
| def load_all_data(dataset_config_path: Path): | |
| records = [] | |
| seen = set() | |
| if not dataset_config_path.exists(): | |
| print(f"Config file not found: {dataset_config_path}") | |
| return records | |
| with dataset_config_path.open("r", encoding="utf-8") as f: | |
| config_data = json.load(f) | |
| for relative_path_str in config_data.get("files", []): | |
| if "*" in relative_path_str: | |
| paths = list(DATASETS_ROOT.glob(relative_path_str)) | |
| else: | |
| paths = [DATASETS_ROOT / relative_path_str] | |
| for path in paths: | |
| if not path.exists(): | |
| print(f"Warning: File not found {path}") | |
| continue | |
| with path.open("r", encoding="utf-8") as f: | |
| for line in f: | |
| try: | |
| data = json.loads(line) | |
| text = preprocess_text(data.get("text", "")) | |
| if not text: | |
| continue | |
| if text in seen: | |
| continue | |
| seen.add(text) | |
| label = int(data.get("label", 0)) | |
| category = data.get("category") | |
| if not category: | |
| category = "spam" if label == 1 else "normal" | |
| records.append({"text": text, "label": label, "category": str(category)}) | |
| except json.JSONDecodeError: | |
| continue | |
| return records | |
| def pick_threshold(y_true, y_prob, target_min_precision): | |
| precision, recall, thresholds = precision_recall_curve(y_true, y_prob) | |
| best_threshold = SPAM_THRESHOLD | |
| best_score = -1.0 | |
| for idx, threshold in enumerate(thresholds): | |
| p = precision[idx + 1] | |
| r = recall[idx + 1] | |
| if p < target_min_precision: | |
| continue | |
| if p + r == 0: | |
| continue | |
| f1 = 2 * p * r / (p + r) | |
| if f1 > best_score: | |
| best_score = f1 | |
| best_threshold = float(threshold) | |
| if best_score >= 0: | |
| return round(best_threshold, 4) | |
| f1_scores = [] | |
| for idx, threshold in enumerate(thresholds): | |
| p = precision[idx + 1] | |
| r = recall[idx + 1] | |
| f1 = 0.0 if (p + r) == 0 else (2 * p * r / (p + r)) | |
| f1_scores.append((f1, threshold)) | |
| if not f1_scores: | |
| return SPAM_THRESHOLD | |
| return round(float(max(f1_scores, key=lambda x: x[0])[1]), 4) | |
| def main(): | |
| print("Starting training for model 0.4...", flush=True) | |
| records = load_all_data(DATASET_CONFIG_PATH) | |
| if not records: | |
| raise SystemExit(f"No records found using config {DATASET_CONFIG_PATH}") | |
| texts = [r["text"] for r in records] | |
| labels = [r["label"] for r in records] | |
| categories = [r["category"] for r in records] | |
| print(f"Dataset size: {len(records)}") | |
| print(f"Label distribution: {Counter(labels)}") | |
| X_train_texts, X_val_texts, y_train, y_val, y_train_cat, y_val_cat = train_test_split( | |
| texts, | |
| labels, | |
| categories, | |
| test_size=0.2, | |
| random_state=42, | |
| stratify=labels, | |
| ) | |
| # Use FeatureUnion to combine Word and Char TF-IDF into one pipeline step | |
| union = FeatureUnion([ | |
| ("word", TfidfVectorizer( | |
| ngram_range=(1, 2), | |
| max_features=config.WORD_MAX_FEATURES, | |
| min_df=config.MIN_DF, | |
| sublinear_tf=config.SUBLINEAR_TF, | |
| dtype=np.float32, | |
| )), | |
| ("char", TfidfVectorizer( | |
| analyzer="char", | |
| ngram_range=(3, 5), | |
| max_features=config.CHAR_MAX_FEATURES, | |
| min_df=config.MIN_DF, | |
| sublinear_tf=config.SUBLINEAR_TF, | |
| dtype=np.float32, | |
| )) | |
| ]) | |
| binary_pipeline = Pipeline([ | |
| ("union", union), | |
| ("classifier", LogisticRegression( | |
| solver="saga", | |
| max_iter=500, | |
| tol=1e-3, | |
| class_weight="balanced", | |
| random_state=42, | |
| verbose=1, | |
| )) | |
| ]) | |
| print("Fitting binary model pipeline...") | |
| binary_pipeline.fit(X_train_texts, y_train) | |
| y_prob = binary_pipeline.predict_proba(X_val_texts)[:, 1] | |
| threshold = pick_threshold(y_val, y_prob, TARGET_MIN_PRECISION) | |
| y_pred = (y_prob >= threshold).astype(int) | |
| print("\n--- TF-IDF Model Results (Full) ---") | |
| print(f"Threshold: {threshold}") | |
| print(f"Accuracy: {accuracy_score(y_val, y_pred):.4f}") | |
| print(f"Precision: {precision_score(y_val, y_pred, zero_division=0):.4f}") | |
| print(f"Recall: {recall_score(y_val, y_pred, zero_division=0):.4f}") | |
| print(f"F1-score: {f1_score(y_val, y_pred, zero_division=0):.4f}") | |
| print("\nTraining spam category classifier...") | |
| train_spam_mask = np.array(y_train) == 1 | |
| val_spam_mask = np.array(y_val) == 1 | |
| if train_spam_mask.any(): | |
| spam_X_train_texts = [X_train_texts[i] for i, m in enumerate(train_spam_mask) if m] | |
| spam_y_train_cat = np.array(y_train_cat)[train_spam_mask] | |
| category_pipeline = Pipeline([ | |
| ("union", union), # Reuse the fitted union | |
| ("classifier", LogisticRegression( | |
| max_iter=20000, | |
| tol=1e-2, | |
| class_weight="balanced", | |
| solver="saga", | |
| random_state=42, | |
| verbose=1, | |
| )) | |
| ]) | |
| category_pipeline.fit(spam_X_train_texts, spam_y_train_cat) | |
| else: | |
| category_pipeline = binary_pipeline | |
| if val_spam_mask.any(): | |
| spam_X_val_texts = [X_val_texts[i] for i, m in enumerate(val_spam_mask) if m] | |
| spam_y_val_cat = np.array(y_val_cat)[val_spam_mask] | |
| y_pred_cat = category_pipeline.predict(spam_X_val_texts) | |
| cat_accuracy = accuracy_score(spam_y_val_cat, y_pred_cat) | |
| cat_f1 = f1_score(spam_y_val_cat, y_pred_cat, average="weighted", zero_division=0) | |
| print("\n--- Spam Category Results (spam-only validation set) ---") | |
| print(f"Accuracy: {cat_accuracy:.4f}") | |
| print(f"Weighted F1: {cat_f1:.4f}") | |
| MODEL_DIR.mkdir(parents=True, exist_ok=True) | |
| # Save scikit-learn models | |
| joblib.dump(union, MODEL_DIR / "vectorizer.pkl", compress=3) | |
| joblib.dump(binary_pipeline.named_steps["classifier"], MODEL_DIR / "binary_model.pkl", compress=3) | |
| joblib.dump(category_pipeline.named_steps["classifier"], MODEL_DIR / "category_model.pkl", compress=3) | |
| # Export to ONNX for production inference (Lossless conversion) | |
| if ONNX_AVAILABLE: | |
| print("\nExporting models to ONNX...") | |
| initial_type = [('input', StringTensorType([None, 1]))] | |
| # Convert binary model | |
| onnx_binary = convert_sklearn(binary_pipeline, initial_types=initial_type, | |
| target_opset=12, options={'zipmap': False}) | |
| with open(MODEL_DIR / "binary_model.onnx", "wb") as f: | |
| f.write(onnx_binary.SerializeToString()) | |
| # Convert category model | |
| onnx_category = convert_sklearn(category_pipeline, initial_types=initial_type, | |
| target_opset=12, options={'zipmap': False}) | |
| with open(MODEL_DIR / "category_model.onnx", "wb") as f: | |
| f.write(onnx_category.SerializeToString()) | |
| print("ONNX models saved successfully.") | |
| else: | |
| print("\nWarning: skl2onnx not found. Skipping ONNX export.") | |
| metadata = { | |
| "spam_threshold": threshold, | |
| "short_text_word_count": config.SHORT_TEXT_WORD_COUNT, | |
| "short_text_threshold": config.SHORT_TEXT_THRESHOLD, | |
| "very_short_text_word_count": config.VERY_SHORT_TEXT_WORD_COUNT, | |
| "very_short_text_threshold": config.VERY_SHORT_TEXT_THRESHOLD, | |
| "target_min_precision": TARGET_MIN_PRECISION, | |
| "vectorizer": "tfidf_word_char", | |
| "word_max_features": config.WORD_MAX_FEATURES, | |
| "char_max_features": config.CHAR_MAX_FEATURES, | |
| "min_df": config.MIN_DF, | |
| } | |
| with (MODEL_DIR / "metadata.json").open("w", encoding="utf-8") as f: | |
| json.dump(metadata, f, indent=2) | |
| print(f"\nSaved model to {MODEL_DIR}") | |
| if __name__ == "__main__": | |
| main() | |