| """ |
| Dataset and feature engine for ScamShield. |
| |
| Feature extraction is deterministic and side-effect free. In particular, it does |
| not call Google Safe Browsing or any network service. Training, evaluation, and |
| inference all import the same functions from this module. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import os |
| import re |
| from pathlib import Path |
| from typing import Iterable, Optional |
|
|
| import numpy as np |
| import pandas as pd |
| import tldextract |
| from sklearn.model_selection import train_test_split |
|
|
|
|
| BASE_DIR = Path(__file__).resolve().parents[1] |
| DATA_DIR = BASE_DIR / "data" |
|
|
| SUSPICIOUS_TLDS = { |
| "tk", "ml", "ga", "cf", "gq", "xyz", "top", "click", "link", |
| "work", "loan", "online", "site", "info", "biz", "club", |
| } |
|
|
| URL_SHORTENERS = { |
| "bit.ly", "tinyurl.com", "t.co", "goo.gl", "ow.ly", "is.gd", |
| "buff.ly", "rebrand.ly", "cutt.ly", "fkrt.it", "amzn.in", |
| } |
|
|
| LEGIT_DOMAINS = { |
| "amazon.com", "amazon.in", "netflix.com", "spotify.com", "apple.com", |
| "google.com", "microsoft.com", "paypal.com", "ebay.com", "walmart.com", |
| "fedex.com", "ups.com", "usps.com", "shopify.com", "slack.com", |
| "zoom.us", "gmail.com", "outlook.com", "yahoo.com", |
| "flipkart.com", "myntra.com", "swiggy.com", "zomato.com", |
| "makemytrip.com", "yatra.com", "cleartrip.com", "paytm.com", |
| "phonepe.com", "hdfcbank.com", "onlinesbi.sbi", "icicibank.com", |
| "axisbank.com", "kotak.com", "airindia.in", "goindigo.in", |
| "irctc.co.in", "redbus.in", "olacabs.com", "hotstar.com", |
| "sonyliv.com", "zee5.com", "jiocinema.com", "moneycontrol.com", |
| "bseindia.com", "nseindia.com", "bookmyshow.com", "policybazaar.com", |
| "nykaa.com", "ajio.com", "firstcry.com", "bigbasket.com", |
| "blinkit.com", "zepto.com", "upstox.com", "groww.in", "zerodha.com", |
| "airtel.in", "jio.com", "myvi.in", "vi.com", "bsnl.co.in", |
| "tataplay.com", "dishtv.in", "sbicard.com", "bajajfinserv.in", |
| "cred.club", "licindia.in", "epfindia.gov.in", "mseb.co.in", |
| "bescom.org", "uidai.gov.in", "cibil.com", |
| } |
|
|
| TRUSTED_TLDS = { |
| "gov", "gov.in", "edu", "sbi", |
| } |
|
|
| URL_FEATURE_COLS = [ |
| "has_url", "num_urls", "has_http", "has_https", "suspicious_tld", |
| "max_url_len", "has_ip_url", "has_shortened_url", "has_legit_domain", |
| ] |
|
|
| TEXT_FEATURE_COLS = [ |
| "num_chars", "num_words", "pct_upper", "pct_digits", "num_special", |
| "urgency_count", "has_phone", "has_currency", |
| ] |
|
|
| URGENCY_WORDS = { |
| "urgent", "winner", "won", "free", "prize", "claim", "cash", |
| "congratulations", "selected", "reward", "limited", "click", |
| "password", "invoice", "crypto", "bitcoin", "wallet", "suspended", |
| "blocked", "deactivated", "illegal", "arrested", "cyber", "fraud", |
| "hack", "jaldi", "turant", "abhi", "kijiye", "rupaye", "paisa", |
| "khata", "band", "inam", "jeeta", "loot", "kyc", "cashback", |
| "lucky", "gift", "redeem", "bijli", "officer", "helpline", |
| "fir", "giraftari", "arrest", |
| } |
|
|
| URGENCY_PHRASES = { |
| "act now", "action required", "share your otp", "last chance", |
| "court notice", "turant call", "abhi call", "aaj raat", |
| "kal subah", "power cut", "connection cut", "band ho jayega", |
| } |
|
|
| CURRENCY_SYMBOLS = {"$", "\u00a3", "\u20ac", "\u20b9", "btc", "eth", "usdt"} |
|
|
| INDIAN_LEGIT_SMS = [ |
| "HDFC Bank: Rs.25,000 credited to a/c XX4521 on 02-May. Avl bal: Rs.1,42,356. -HDFC Bank", |
| "Dear Customer, your ICICI a/c ending 7890 debited Rs.1,500 at AMAZON on 01-May. -ICICI Bank", |
| "Your SBI a/c XXXX1234 is credited with INR 5,000.00 on 01-May-25. Bal: INR 12,450.00. -SBI", |
| "Your OTP for SBI Net Banking login is 483921. Valid for 10 minutes. Do not share. -SBI", |
| "PhonePe OTP: 273948 for payment of Rs.150 to Zomato. Do not share. -PhonePe", |
| "Aadhaar OTP: 581234 for e-KYC verification. Valid 30 minutes. -UIDAI", |
| "Airtel Thanks! Your recharge of Rs.239 is successful. Validity: 28 days. Data: 1.5GB/day. -Airtel", |
| "Jio: Your recharge of Rs.299 is done. Validity 28 days, 2GB/day data. Enjoy! -Jio", |
| "Your electricity bill of Rs.1,234 for account 98765 is due on 10-May. Pay via BESCOM app.", |
| "Bijli Bill: Aapka MSEB bijli bill Rs.1,847 generate ho gaya hai. Due date: 10-Jun-26. Pay at mseb.co.in", |
| "Your Amazon order #402-9876543 is out for delivery today. Track: amzn.in/track -Amazon", |
| "Flipkart: Your order for boAt Earphones has been shipped. Tracking: fkrt.it/xyz -Flipkart", |
| "Zomato: Your order from Dominos has been picked up. ETA: 25 mins. -Zomato", |
| "Your IRCTC ticket PNR 4567891230 is confirmed. Train 12345 on 05-May. Seat: S4/32. -IRCTC", |
| "IndiGo: Your flight 6E-456 on 05-May is confirmed. PNR: ABCDEF. Web check-in open. -IndiGo", |
| "LIC: Your premium of Rs.5,000 for policy 123456789 is due on 10-May. Pay at licindia.in.", |
| "EPFO: Your PF balance as of 01-May-25 is Rs.2,45,678. Check on epfindia.gov.in. -EPFO", |
| "You received Rs.500 from Priya via UPI. UPI Ref: 123456789012. -Google Pay", |
| "PhonePe: Rs.1,200 sent to Ajay Kumar successfully. UPI Ref: 987654321. -PhonePe", |
| "Paytm: Rs.250 added to your wallet from HDFC Bank XX1234. Wallet Bal: Rs.430. -Paytm", |
| ] |
|
|
| URL_RE = re.compile(r"(?:https?://|www\.)[^\s<>'\"]+", re.IGNORECASE) |
| BARE_DOMAIN_RE = re.compile( |
| r"\b(?:[a-zA-Z0-9-]+\.)+(?:com|org|net|edu|gov|gov\.in|co\.uk|co\.in|in|io|co|sbi|club|xyz|top|click|link|online|site|info|biz)\b", |
| re.IGNORECASE, |
| ) |
| PHONE_RE = re.compile(r"(?:\+91[-\s]?)?\d[\d\s-]{8,}\d") |
| TOKEN_RE = re.compile(r"[a-z]+|[\u0900-\u097F]+") |
|
|
|
|
| def get_feature_columns() -> list[str]: |
| return URL_FEATURE_COLS + TEXT_FEATURE_COLS |
|
|
|
|
| def clean_text(text: str, remove_urls: bool = False) -> str: |
| text = "" if text is None else str(text) |
| if remove_urls: |
| text = URL_RE.sub("", text) |
| return re.sub(r"\s+", " ", text).strip() |
|
|
|
|
| def extract_urls(text: str) -> list[str]: |
| text = "" if text is None else str(text) |
| urls = URL_RE.findall(text) |
| urls.extend(BARE_DOMAIN_RE.findall(text)) |
| cleaned = [] |
| for url in urls: |
| cleaned.append(url.strip(".,;:!?)\"]}'").lower()) |
| return sorted(set(u for u in cleaned if u)) |
|
|
|
|
| def _registered_domain(url: str) -> tuple[str, str]: |
| ext = tldextract.extract(url) |
| domain = ext.domain.lower() |
| suffix = ext.suffix.lower() |
| full_domain = f"{domain}.{suffix}".strip(".") |
| return full_domain, suffix |
|
|
|
|
| def url_features(text: str) -> dict: |
| urls = extract_urls(text) |
|
|
| suspicious_tld = 0 |
| has_ip_url = 0 |
| has_shortened_url = 0 |
| has_legit_domain = 0 |
| max_url_len = 0 |
|
|
| for url in urls: |
| max_url_len = max(max_url_len, len(url)) |
| full_domain, suffix = _registered_domain(url) |
|
|
| if suffix in SUSPICIOUS_TLDS: |
| suspicious_tld = 1 |
| if full_domain in URL_SHORTENERS: |
| has_shortened_url = 1 |
| if full_domain in LEGIT_DOMAINS or suffix in TRUSTED_TLDS: |
| has_legit_domain = 1 |
| if re.search(r"https?://\d{1,3}(?:\.\d{1,3}){3}", url, re.IGNORECASE): |
| has_ip_url = 1 |
|
|
| return { |
| "has_url": int(bool(urls)), |
| "num_urls": len(urls), |
| "has_http": int(any(u.startswith("http://") for u in urls)), |
| "has_https": int(any(u.startswith("https://") for u in urls)), |
| "suspicious_tld": suspicious_tld, |
| "max_url_len": max_url_len, |
| "has_ip_url": has_ip_url, |
| "has_shortened_url": has_shortened_url, |
| "has_legit_domain": has_legit_domain, |
| } |
|
|
|
|
| def text_features(text: str) -> dict: |
| text = "" if text is None else str(text) |
| lowered = text.lower() |
| tokens = TOKEN_RE.findall(lowered) |
|
|
| num_chars = len(text) |
| upper_count = sum(1 for c in text if c.isupper()) |
| digit_count = sum(1 for c in text if c.isdigit()) |
| special_count = sum(1 for c in text if c in "!@#$%^&*()_+-=[]{}|;:,.<>?") |
| urgency_count = sum(1 for token in tokens if token in URGENCY_WORDS) |
| urgency_count += sum(1 for phrase in URGENCY_PHRASES if phrase in lowered) |
|
|
| return { |
| "num_chars": num_chars, |
| "num_words": len(tokens), |
| "pct_upper": upper_count / num_chars if num_chars else 0.0, |
| "pct_digits": digit_count / num_chars if num_chars else 0.0, |
| "num_special": special_count, |
| "urgency_count": urgency_count, |
| "has_phone": int(bool(PHONE_RE.search(text))), |
| "has_currency": int(any(symbol in lowered for symbol in CURRENCY_SYMBOLS)), |
| } |
|
|
|
|
| def _standardize_label(value) -> Optional[int]: |
| if value is None or (isinstance(value, float) and np.isnan(value)): |
| return None |
| text = str(value).strip().lower() |
| mapping = { |
| "spam": 1, "scam": 1, "phishing": 1, "smishing": 1, "1": 1, "true": 1, |
| "ham": 0, "safe": 0, "legit": 0, "legitimate": 0, "not_spam": 0, |
| "0": 0, "false": 0, |
| } |
| return mapping.get(text) |
|
|
|
|
| def _standardize_frame(df: pd.DataFrame, message_cols: Iterable[str], label_cols: Iterable[str]) -> pd.DataFrame: |
| lower_map = {c.lower().strip(): c for c in df.columns} |
| msg_col = next((lower_map[c] for c in message_cols if c in lower_map), None) |
| lbl_col = next((lower_map[c] for c in label_cols if c in lower_map), None) |
|
|
| if msg_col is None or lbl_col is None: |
| return pd.DataFrame(columns=["message", "label"]) |
|
|
| result = pd.DataFrame({ |
| "message": df[msg_col].astype(str), |
| "label": df[lbl_col].apply(_standardize_label), |
| }) |
| result = result.dropna(subset=["message", "label"]) |
| result["label"] = result["label"].astype(int) |
| result = result[result["message"].str.strip() != ""] |
| return result[["message", "label"]].reset_index(drop=True) |
|
|
|
|
| def _load_local_spam_csv() -> pd.DataFrame: |
| path = DATA_DIR / "spam.csv" |
| if not path.exists(): |
| return pd.DataFrame(columns=["message", "label"]) |
| try: |
| df = pd.read_csv(path, encoding="latin-1") |
| result = _standardize_frame(df, ["v2", "message", "text", "sms"], ["v1", "label", "labels", "category"]) |
| print(f" Local UCI spam.csv: {len(result)} messages loaded") |
| return result |
| except Exception as exc: |
| print(f" Warning: failed to load local spam.csv: {exc}") |
| return pd.DataFrame(columns=["message", "label"]) |
|
|
|
|
| def _load_local_parquet() -> pd.DataFrame: |
| frames = [] |
| for path in sorted(DATA_DIR.glob("*.parquet")): |
| try: |
| raw = pd.read_parquet(path) |
| result = _standardize_frame(raw, ["message", "text", "sms", "email"], ["label", "labels", "category", "class"]) |
| if len(result): |
| print(f" Local {path.name}: {len(result)} messages loaded") |
| frames.append(result) |
| except Exception as exc: |
| print(f" Warning: failed to load {path.name}: {exc}") |
| if not frames: |
| return pd.DataFrame(columns=["message", "label"]) |
| return pd.concat(frames, ignore_index=True) |
|
|
|
|
| def _load_synthetic_indian_legit() -> pd.DataFrame: |
| print(f" Synthetic Indian legit SMS: {len(INDIAN_LEGIT_SMS)} messages loaded") |
| return pd.DataFrame({"message": INDIAN_LEGIT_SMS, "label": 0}) |
|
|
|
|
| def _load_huggingface_dataset(name: str) -> pd.DataFrame: |
| try: |
| from datasets import load_dataset as hf_load |
|
|
| ds = hf_load(name) |
| frames = [] |
| for split_name in ds.keys(): |
| raw = ds[split_name].to_pandas() |
| frames.append(_standardize_frame(raw, ["message", "text", "sms", "email"], ["label", "labels", "category", "class"])) |
| result = pd.concat(frames, ignore_index=True) |
| print(f" HuggingFace {name}: {len(result)} messages loaded") |
| return result |
| except Exception as exc: |
| print(f" Warning: failed to load HuggingFace {name}: {exc}") |
| return pd.DataFrame(columns=["message", "label"]) |
|
|
|
|
| def _load_multilingual_hf(name: str) -> pd.DataFrame: |
| try: |
| from datasets import load_dataset as hf_load |
|
|
| ds = hf_load(name) |
| raw = pd.concat([ds[s].to_pandas() for s in ds.keys()], ignore_index=True) |
| lower_map = {c.lower().strip(): c for c in raw.columns} |
| label_col = next((lower_map[c] for c in ["label", "labels", "v1", "category", "class"] if c in lower_map), None) |
| if label_col is None: |
| return pd.DataFrame(columns=["message", "label"]) |
|
|
| message_cols = [] |
| for col in raw.columns: |
| lower = col.lower().strip() |
| if lower in {"text", "message", "sms", "v2", "text_en", "en", "text_hi", "hi", "hindi"} or lower.endswith("_hi"): |
| message_cols.append(col) |
|
|
| frames = [] |
| for col in message_cols: |
| frame = pd.DataFrame({"message": raw[col].astype(str), "label": raw[label_col].apply(_standardize_label)}) |
| frame = frame.dropna(subset=["message", "label"]) |
| frame["label"] = frame["label"].astype(int) |
| frame = frame[frame["message"].str.strip() != ""] |
| frames.append(frame[["message", "label"]]) |
|
|
| result = pd.concat(frames, ignore_index=True) if frames else pd.DataFrame(columns=["message", "label"]) |
| print(f" Multilingual {name}: {len(result)} messages loaded") |
| return result |
| except Exception as exc: |
| print(f" Warning: failed to load multilingual {name}: {exc}") |
| return pd.DataFrame(columns=["message", "label"]) |
|
|
|
|
| def _deduplicate(df: pd.DataFrame) -> pd.DataFrame: |
| df = df.copy() |
| df["_norm"] = df["message"].astype(str).str.lower().str.strip().str.replace(r"\s+", " ", regex=True) |
| df = df.drop_duplicates(subset="_norm", keep="first").drop(columns=["_norm"]) |
| return df.reset_index(drop=True) |
|
|
|
|
| def _add_features(df: pd.DataFrame) -> pd.DataFrame: |
| url_rows = [] |
| text_rows = [] |
| cleaned = [] |
|
|
| for message in df["message"]: |
| message = str(message) |
| url_rows.append(url_features(message)) |
| text_rows.append(text_features(message)) |
| cleaned.append(clean_text(message)) |
|
|
| result = pd.concat( |
| [df.reset_index(drop=True), pd.DataFrame(url_rows), pd.DataFrame(text_rows)], |
| axis=1, |
| ) |
| result["message"] = cleaned |
| return result |
|
|
|
|
| def load_dataset(use_remote: Optional[bool] = None) -> pd.DataFrame: |
| """ |
| Load SMS datasets and return message, label, and feature columns. |
| |
| Remote loaders are enabled by default. Set SCAMSHIELD_USE_REMOTE_DATA=0 or |
| pass use_remote=False to train only on local files plus synthetic examples. |
| """ |
| if use_remote is None: |
| use_remote = os.getenv("SCAMSHIELD_USE_REMOTE_DATA", "1").lower() not in {"0", "false", "no"} |
|
|
| print("Loading datasets...") |
| frames = [ |
| _load_local_spam_csv(), |
| _load_local_parquet(), |
| _load_synthetic_indian_legit(), |
| ] |
|
|
| if use_remote: |
| frames.extend([ |
| _load_huggingface_dataset("Deysi/spam-detection-dataset"), |
| _load_huggingface_dataset("Ngadou/Spam_SMS"), |
| _load_multilingual_hf("dbarbedillo/SMS_Spam_Multilingual_Collection_Dataset"), |
| ]) |
|
|
| frames = [frame for frame in frames if len(frame) > 0] |
| if not frames: |
| raise RuntimeError("No datasets loaded. Check local data files or enable remote datasets.") |
|
|
| df = pd.concat(frames, ignore_index=True) |
| before = len(df) |
| df = _deduplicate(df) |
| if before != len(df): |
| print(f" Removed {before - len(df)} duplicate messages") |
|
|
| df = _add_features(df) |
| spam_count = int(df["label"].sum()) |
| ham_count = len(df) - spam_count |
| print(f"\nDataset loaded: {len(df)} messages ({spam_count} spam, {ham_count} ham)") |
| return df |
|
|
|
|
| def split_dataset(df: pd.DataFrame, seed: int = 42) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: |
| train_df, temp_df = train_test_split( |
| df, |
| test_size=0.30, |
| stratify=df["label"], |
| random_state=seed, |
| ) |
| val_df, test_df = train_test_split( |
| temp_df, |
| test_size=0.50, |
| stratify=temp_df["label"], |
| random_state=seed, |
| ) |
| return train_df.reset_index(drop=True), val_df.reset_index(drop=True), test_df.reset_index(drop=True) |
|
|