Spaces:
Sleeping
Sleeping
| """ | |
| SmartCertify ML — Feature Engineering | |
| All feature transformations: date features, text features, risk scores, hash validation, PCA. | |
| """ | |
| import numpy as np | |
| import pandas as pd | |
| import hashlib | |
| import re | |
| import logging | |
| from pathlib import Path | |
| from typing import Tuple, Dict, Any | |
| from sklearn.decomposition import PCA | |
| import sys | |
| sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent)) | |
| from app.config.settings import PLOTS_DIR | |
| from app.utils.visualization import plot_pca_variance | |
| logger = logging.getLogger(__name__) | |
| def extract_date_features(df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Extract date-based features from issue_date and expiry_date. | |
| Returns: issue_month, issue_year, days_to_expiry, is_expired, weekend_issued | |
| """ | |
| df = df.copy() | |
| if "issue_date" in df.columns: | |
| issue_date = pd.to_datetime(df["issue_date"], errors="coerce") | |
| df["issue_month"] = issue_date.dt.month.fillna(0).astype(int) | |
| df["issue_year"] = issue_date.dt.year.fillna(0).astype(int) | |
| df["issue_dayofweek"] = issue_date.dt.dayofweek.fillna(0).astype(int) | |
| df["weekend_issued"] = (df["issue_dayofweek"] >= 5).astype(int) | |
| if "expiry_date" in df.columns and "issue_date" in df.columns: | |
| expiry_date = pd.to_datetime(df["expiry_date"], errors="coerce") | |
| days_to_expiry = (expiry_date - issue_date).dt.days | |
| df["days_to_expiry"] = days_to_expiry.fillna(0).astype(int) | |
| df["is_expired"] = (days_to_expiry < 0).astype(int) | |
| # Future issue date flag | |
| if "issue_date" in df.columns: | |
| now = pd.Timestamp.now() | |
| df["is_future_issue"] = (issue_date > now).astype(int) | |
| return df | |
| def compute_text_features(df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Extract text-based features from name and course fields. | |
| Returns: name_length, course_word_count, special_char_ratio | |
| """ | |
| df = df.copy() | |
| if "recipient_name" in df.columns: | |
| df["name_length"] = df["recipient_name"].fillna("").str.len() | |
| df["name_word_count"] = df["recipient_name"].fillna("").str.split().str.len() | |
| if "course_name" in df.columns: | |
| df["course_word_count"] = df["course_name"].fillna("").str.split().str.len() | |
| df["course_name_length"] = df["course_name"].fillna("").str.len() | |
| if "issuer_name" in df.columns: | |
| df["issuer_name_length"] = df["issuer_name"].fillna("").str.len() | |
| # Special character ratio — unusual characters can indicate fraud | |
| df["special_char_ratio"] = df["issuer_name"].fillna("").apply( | |
| lambda x: len(re.findall(r"[^a-zA-Z0-9\s]", x)) / max(len(x), 1) | |
| ) | |
| return df | |
| def compute_risk_score(df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Compute composite risk score from reputation, completeness, and template match. | |
| Higher score = higher risk of fraud. | |
| """ | |
| df = df.copy() | |
| rep = 1 - df.get("issuer_reputation_score", pd.Series(0.5, index=df.index)).fillna(0.5) | |
| comp = 1 - df.get("metadata_completeness_score", pd.Series(0.5, index=df.index)).fillna(0.5) | |
| tmpl = 1 - df.get("template_match_score", pd.Series(0.5, index=df.index)).fillna(0.5) | |
| domain = 1 - df.get("domain_verification_status", pd.Series(1, index=df.index)).fillna(1) | |
| # Weighted composite risk score | |
| df["risk_score"] = (rep * 0.3 + comp * 0.2 + tmpl * 0.3 + domain * 0.2) | |
| # Risk category | |
| df["risk_category"] = pd.cut( | |
| df["risk_score"], | |
| bins=[0, 0.25, 0.5, 0.75, 1.0], | |
| labels=["LOW", "MEDIUM", "HIGH", "CRITICAL"], | |
| include_lowest=True, | |
| ) | |
| return df | |
| def hash_integrity_check(credential_hash: str) -> Dict[str, Any]: | |
| """ | |
| Validate hash format, length, and entropy. | |
| Returns dict with validation results. | |
| """ | |
| result = { | |
| "is_valid_format": False, | |
| "is_valid_length": False, | |
| "entropy": 0.0, | |
| "hash_quality": "INVALID", | |
| } | |
| if not credential_hash or not isinstance(credential_hash, str): | |
| return result | |
| # Check hex format | |
| result["is_valid_format"] = bool(re.match(r"^[0-9a-f]+$", credential_hash.lower())) | |
| # Check length (SHA-256 = 64 hex chars) | |
| result["is_valid_length"] = len(credential_hash) == 64 | |
| # Compute Shannon entropy | |
| if len(credential_hash) > 0: | |
| probs = np.array([credential_hash.count(c) / len(credential_hash) for c in set(credential_hash)]) | |
| result["entropy"] = float(-np.sum(probs * np.log2(probs + 1e-10))) | |
| # Quality assessment | |
| if result["is_valid_format"] and result["is_valid_length"] and result["entropy"] > 3.5: | |
| result["hash_quality"] = "VALID" | |
| elif result["is_valid_format"] and result["entropy"] > 2.5: | |
| result["hash_quality"] = "SUSPICIOUS" | |
| else: | |
| result["hash_quality"] = "INVALID" | |
| return result | |
| def add_hash_features(df: pd.DataFrame) -> pd.DataFrame: | |
| """Add hash-derived features to the dataframe.""" | |
| df = df.copy() | |
| if "credential_hash" in df.columns: | |
| hash_results = df["credential_hash"].fillna("").apply(hash_integrity_check) | |
| df["hash_valid_format"] = hash_results.apply(lambda x: int(x["is_valid_format"])) | |
| df["hash_valid_length"] = hash_results.apply(lambda x: int(x["is_valid_length"])) | |
| df["hash_entropy"] = hash_results.apply(lambda x: x["entropy"]) | |
| return df | |
| def apply_pca(X: np.ndarray, n_components: int = 10) -> Tuple[np.ndarray, PCA]: | |
| """ | |
| Apply PCA dimensionality reduction and save variance plot. | |
| Returns transformed data and fitted PCA object. | |
| """ | |
| n_components = min(n_components, X.shape[1], X.shape[0]) | |
| pca = PCA(n_components=n_components, random_state=42) | |
| X_pca = pca.fit_transform(X) | |
| logger.info(f"PCA: {n_components} components explain {pca.explained_variance_ratio_.sum():.2%} variance") | |
| # Save explained variance plot | |
| try: | |
| plot_pca_variance(pca.explained_variance_ratio_) | |
| except Exception as e: | |
| logger.warning(f"Could not save PCA plot: {e}") | |
| return X_pca, pca | |
| def engineer_all_features(df: pd.DataFrame) -> pd.DataFrame: | |
| """Apply all feature engineering transformations.""" | |
| df = extract_date_features(df) | |
| df = compute_text_features(df) | |
| df = compute_risk_score(df) | |
| df = add_hash_features(df) | |
| logger.info(f"Feature engineering complete. Shape: {df.shape}") | |
| return df | |
| def main(): | |
| """Run feature engineering on the dataset.""" | |
| from app.config.settings import DATASET_PATH | |
| print("Running feature engineering pipeline...") | |
| df = pd.read_csv(DATASET_PATH) | |
| df = engineer_all_features(df) | |
| # Show new features | |
| new_cols = [c for c in df.columns if c not in pd.read_csv(DATASET_PATH).columns] | |
| print(f"\n✅ Feature engineering complete:") | |
| print(f" • Original columns: {len(pd.read_csv(DATASET_PATH).columns)}") | |
| print(f" • New columns: {len(new_cols)}") | |
| print(f" • Total columns: {len(df.columns)}") | |
| print(f" • New features: {new_cols}") | |
| if __name__ == "__main__": | |
| logging.basicConfig(level=logging.INFO) | |
| main() | |