SmartCertify-ML / app /data /feature_engineering.py
Harsh Yadav
CREATE : Trained the model
6de2f28
"""
SmartCertify ML — Feature Engineering
All feature transformations: date features, text features, risk scores, hash validation, PCA.
"""
import numpy as np
import pandas as pd
import hashlib
import re
import logging
from pathlib import Path
from typing import Tuple, Dict, Any
from sklearn.decomposition import PCA
import sys
sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent))
from app.config.settings import PLOTS_DIR
from app.utils.visualization import plot_pca_variance
logger = logging.getLogger(__name__)
def extract_date_features(df: pd.DataFrame) -> pd.DataFrame:
"""
Extract date-based features from issue_date and expiry_date.
Returns: issue_month, issue_year, days_to_expiry, is_expired, weekend_issued
"""
df = df.copy()
if "issue_date" in df.columns:
issue_date = pd.to_datetime(df["issue_date"], errors="coerce")
df["issue_month"] = issue_date.dt.month.fillna(0).astype(int)
df["issue_year"] = issue_date.dt.year.fillna(0).astype(int)
df["issue_dayofweek"] = issue_date.dt.dayofweek.fillna(0).astype(int)
df["weekend_issued"] = (df["issue_dayofweek"] >= 5).astype(int)
if "expiry_date" in df.columns and "issue_date" in df.columns:
expiry_date = pd.to_datetime(df["expiry_date"], errors="coerce")
days_to_expiry = (expiry_date - issue_date).dt.days
df["days_to_expiry"] = days_to_expiry.fillna(0).astype(int)
df["is_expired"] = (days_to_expiry < 0).astype(int)
# Future issue date flag
if "issue_date" in df.columns:
now = pd.Timestamp.now()
df["is_future_issue"] = (issue_date > now).astype(int)
return df
def compute_text_features(df: pd.DataFrame) -> pd.DataFrame:
"""
Extract text-based features from name and course fields.
Returns: name_length, course_word_count, special_char_ratio
"""
df = df.copy()
if "recipient_name" in df.columns:
df["name_length"] = df["recipient_name"].fillna("").str.len()
df["name_word_count"] = df["recipient_name"].fillna("").str.split().str.len()
if "course_name" in df.columns:
df["course_word_count"] = df["course_name"].fillna("").str.split().str.len()
df["course_name_length"] = df["course_name"].fillna("").str.len()
if "issuer_name" in df.columns:
df["issuer_name_length"] = df["issuer_name"].fillna("").str.len()
# Special character ratio — unusual characters can indicate fraud
df["special_char_ratio"] = df["issuer_name"].fillna("").apply(
lambda x: len(re.findall(r"[^a-zA-Z0-9\s]", x)) / max(len(x), 1)
)
return df
def compute_risk_score(df: pd.DataFrame) -> pd.DataFrame:
"""
Compute composite risk score from reputation, completeness, and template match.
Higher score = higher risk of fraud.
"""
df = df.copy()
rep = 1 - df.get("issuer_reputation_score", pd.Series(0.5, index=df.index)).fillna(0.5)
comp = 1 - df.get("metadata_completeness_score", pd.Series(0.5, index=df.index)).fillna(0.5)
tmpl = 1 - df.get("template_match_score", pd.Series(0.5, index=df.index)).fillna(0.5)
domain = 1 - df.get("domain_verification_status", pd.Series(1, index=df.index)).fillna(1)
# Weighted composite risk score
df["risk_score"] = (rep * 0.3 + comp * 0.2 + tmpl * 0.3 + domain * 0.2)
# Risk category
df["risk_category"] = pd.cut(
df["risk_score"],
bins=[0, 0.25, 0.5, 0.75, 1.0],
labels=["LOW", "MEDIUM", "HIGH", "CRITICAL"],
include_lowest=True,
)
return df
def hash_integrity_check(credential_hash: str) -> Dict[str, Any]:
"""
Validate hash format, length, and entropy.
Returns dict with validation results.
"""
result = {
"is_valid_format": False,
"is_valid_length": False,
"entropy": 0.0,
"hash_quality": "INVALID",
}
if not credential_hash or not isinstance(credential_hash, str):
return result
# Check hex format
result["is_valid_format"] = bool(re.match(r"^[0-9a-f]+$", credential_hash.lower()))
# Check length (SHA-256 = 64 hex chars)
result["is_valid_length"] = len(credential_hash) == 64
# Compute Shannon entropy
if len(credential_hash) > 0:
probs = np.array([credential_hash.count(c) / len(credential_hash) for c in set(credential_hash)])
result["entropy"] = float(-np.sum(probs * np.log2(probs + 1e-10)))
# Quality assessment
if result["is_valid_format"] and result["is_valid_length"] and result["entropy"] > 3.5:
result["hash_quality"] = "VALID"
elif result["is_valid_format"] and result["entropy"] > 2.5:
result["hash_quality"] = "SUSPICIOUS"
else:
result["hash_quality"] = "INVALID"
return result
def add_hash_features(df: pd.DataFrame) -> pd.DataFrame:
"""Add hash-derived features to the dataframe."""
df = df.copy()
if "credential_hash" in df.columns:
hash_results = df["credential_hash"].fillna("").apply(hash_integrity_check)
df["hash_valid_format"] = hash_results.apply(lambda x: int(x["is_valid_format"]))
df["hash_valid_length"] = hash_results.apply(lambda x: int(x["is_valid_length"]))
df["hash_entropy"] = hash_results.apply(lambda x: x["entropy"])
return df
def apply_pca(X: np.ndarray, n_components: int = 10) -> Tuple[np.ndarray, PCA]:
"""
Apply PCA dimensionality reduction and save variance plot.
Returns transformed data and fitted PCA object.
"""
n_components = min(n_components, X.shape[1], X.shape[0])
pca = PCA(n_components=n_components, random_state=42)
X_pca = pca.fit_transform(X)
logger.info(f"PCA: {n_components} components explain {pca.explained_variance_ratio_.sum():.2%} variance")
# Save explained variance plot
try:
plot_pca_variance(pca.explained_variance_ratio_)
except Exception as e:
logger.warning(f"Could not save PCA plot: {e}")
return X_pca, pca
def engineer_all_features(df: pd.DataFrame) -> pd.DataFrame:
"""Apply all feature engineering transformations."""
df = extract_date_features(df)
df = compute_text_features(df)
df = compute_risk_score(df)
df = add_hash_features(df)
logger.info(f"Feature engineering complete. Shape: {df.shape}")
return df
def main():
"""Run feature engineering on the dataset."""
from app.config.settings import DATASET_PATH
print("Running feature engineering pipeline...")
df = pd.read_csv(DATASET_PATH)
df = engineer_all_features(df)
# Show new features
new_cols = [c for c in df.columns if c not in pd.read_csv(DATASET_PATH).columns]
print(f"\n✅ Feature engineering complete:")
print(f" • Original columns: {len(pd.read_csv(DATASET_PATH).columns)}")
print(f" • New columns: {len(new_cols)}")
print(f" • Total columns: {len(df.columns)}")
print(f" • New features: {new_cols}")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
main()