Spaces:
Sleeping
Sleeping
File size: 7,060 Bytes
6de2f28 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 | """
SmartCertify ML — Feature Engineering
All feature transformations: date features, text features, risk scores, hash validation, PCA.
"""
import numpy as np
import pandas as pd
import hashlib
import re
import logging
from pathlib import Path
from typing import Tuple, Dict, Any
from sklearn.decomposition import PCA
import sys
sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent))
from app.config.settings import PLOTS_DIR
from app.utils.visualization import plot_pca_variance
logger = logging.getLogger(__name__)
def extract_date_features(df: pd.DataFrame) -> pd.DataFrame:
"""
Extract date-based features from issue_date and expiry_date.
Returns: issue_month, issue_year, days_to_expiry, is_expired, weekend_issued
"""
df = df.copy()
if "issue_date" in df.columns:
issue_date = pd.to_datetime(df["issue_date"], errors="coerce")
df["issue_month"] = issue_date.dt.month.fillna(0).astype(int)
df["issue_year"] = issue_date.dt.year.fillna(0).astype(int)
df["issue_dayofweek"] = issue_date.dt.dayofweek.fillna(0).astype(int)
df["weekend_issued"] = (df["issue_dayofweek"] >= 5).astype(int)
if "expiry_date" in df.columns and "issue_date" in df.columns:
expiry_date = pd.to_datetime(df["expiry_date"], errors="coerce")
days_to_expiry = (expiry_date - issue_date).dt.days
df["days_to_expiry"] = days_to_expiry.fillna(0).astype(int)
df["is_expired"] = (days_to_expiry < 0).astype(int)
# Future issue date flag
if "issue_date" in df.columns:
now = pd.Timestamp.now()
df["is_future_issue"] = (issue_date > now).astype(int)
return df
def compute_text_features(df: pd.DataFrame) -> pd.DataFrame:
"""
Extract text-based features from name and course fields.
Returns: name_length, course_word_count, special_char_ratio
"""
df = df.copy()
if "recipient_name" in df.columns:
df["name_length"] = df["recipient_name"].fillna("").str.len()
df["name_word_count"] = df["recipient_name"].fillna("").str.split().str.len()
if "course_name" in df.columns:
df["course_word_count"] = df["course_name"].fillna("").str.split().str.len()
df["course_name_length"] = df["course_name"].fillna("").str.len()
if "issuer_name" in df.columns:
df["issuer_name_length"] = df["issuer_name"].fillna("").str.len()
# Special character ratio — unusual characters can indicate fraud
df["special_char_ratio"] = df["issuer_name"].fillna("").apply(
lambda x: len(re.findall(r"[^a-zA-Z0-9\s]", x)) / max(len(x), 1)
)
return df
def compute_risk_score(df: pd.DataFrame) -> pd.DataFrame:
"""
Compute composite risk score from reputation, completeness, and template match.
Higher score = higher risk of fraud.
"""
df = df.copy()
rep = 1 - df.get("issuer_reputation_score", pd.Series(0.5, index=df.index)).fillna(0.5)
comp = 1 - df.get("metadata_completeness_score", pd.Series(0.5, index=df.index)).fillna(0.5)
tmpl = 1 - df.get("template_match_score", pd.Series(0.5, index=df.index)).fillna(0.5)
domain = 1 - df.get("domain_verification_status", pd.Series(1, index=df.index)).fillna(1)
# Weighted composite risk score
df["risk_score"] = (rep * 0.3 + comp * 0.2 + tmpl * 0.3 + domain * 0.2)
# Risk category
df["risk_category"] = pd.cut(
df["risk_score"],
bins=[0, 0.25, 0.5, 0.75, 1.0],
labels=["LOW", "MEDIUM", "HIGH", "CRITICAL"],
include_lowest=True,
)
return df
def hash_integrity_check(credential_hash: str) -> Dict[str, Any]:
"""
Validate hash format, length, and entropy.
Returns dict with validation results.
"""
result = {
"is_valid_format": False,
"is_valid_length": False,
"entropy": 0.0,
"hash_quality": "INVALID",
}
if not credential_hash or not isinstance(credential_hash, str):
return result
# Check hex format
result["is_valid_format"] = bool(re.match(r"^[0-9a-f]+$", credential_hash.lower()))
# Check length (SHA-256 = 64 hex chars)
result["is_valid_length"] = len(credential_hash) == 64
# Compute Shannon entropy
if len(credential_hash) > 0:
probs = np.array([credential_hash.count(c) / len(credential_hash) for c in set(credential_hash)])
result["entropy"] = float(-np.sum(probs * np.log2(probs + 1e-10)))
# Quality assessment
if result["is_valid_format"] and result["is_valid_length"] and result["entropy"] > 3.5:
result["hash_quality"] = "VALID"
elif result["is_valid_format"] and result["entropy"] > 2.5:
result["hash_quality"] = "SUSPICIOUS"
else:
result["hash_quality"] = "INVALID"
return result
def add_hash_features(df: pd.DataFrame) -> pd.DataFrame:
"""Add hash-derived features to the dataframe."""
df = df.copy()
if "credential_hash" in df.columns:
hash_results = df["credential_hash"].fillna("").apply(hash_integrity_check)
df["hash_valid_format"] = hash_results.apply(lambda x: int(x["is_valid_format"]))
df["hash_valid_length"] = hash_results.apply(lambda x: int(x["is_valid_length"]))
df["hash_entropy"] = hash_results.apply(lambda x: x["entropy"])
return df
def apply_pca(X: np.ndarray, n_components: int = 10) -> Tuple[np.ndarray, PCA]:
"""
Apply PCA dimensionality reduction and save variance plot.
Returns transformed data and fitted PCA object.
"""
n_components = min(n_components, X.shape[1], X.shape[0])
pca = PCA(n_components=n_components, random_state=42)
X_pca = pca.fit_transform(X)
logger.info(f"PCA: {n_components} components explain {pca.explained_variance_ratio_.sum():.2%} variance")
# Save explained variance plot
try:
plot_pca_variance(pca.explained_variance_ratio_)
except Exception as e:
logger.warning(f"Could not save PCA plot: {e}")
return X_pca, pca
def engineer_all_features(df: pd.DataFrame) -> pd.DataFrame:
"""Apply all feature engineering transformations."""
df = extract_date_features(df)
df = compute_text_features(df)
df = compute_risk_score(df)
df = add_hash_features(df)
logger.info(f"Feature engineering complete. Shape: {df.shape}")
return df
def main():
"""Run feature engineering on the dataset."""
from app.config.settings import DATASET_PATH
print("Running feature engineering pipeline...")
df = pd.read_csv(DATASET_PATH)
df = engineer_all_features(df)
# Show new features
new_cols = [c for c in df.columns if c not in pd.read_csv(DATASET_PATH).columns]
print(f"\n✅ Feature engineering complete:")
print(f" • Original columns: {len(pd.read_csv(DATASET_PATH).columns)}")
print(f" • New columns: {len(new_cols)}")
print(f" • Total columns: {len(df.columns)}")
print(f" • New features: {new_cols}")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
main()
|