pranit_churn_application / src /data_engineering /comprehensive_feature_engineering.py
rajkhanke's picture
Upload 45 files
1b70843 verified
"""
Comprehensive Feature Engineering Pipeline
Implements ALL transformation requirements from Technical Specification
"""
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')
class ComprehensiveFeatureEngineer:
"""
Implements complete feature engineering per technical requirements:
- Customer Journey Analytics
- Network Performance Indicators
- Service Quality Features
- Customer Behavior Patterns
- Churn Risk Indicators
- Geographic and Temporal Features
- Financial Performance Metrics
- Competitive Intelligence Integration
"""
def __init__(self):
print("Initializing Comprehensive Feature Engineering Pipeline...")
self.features_df = None
def load_data(self):
"""Load all data sources"""
print("\nπŸ“‚ Loading All Data Sources...")
data = {}
# Core data
data['customers'] = pd.read_csv('data/synthetic/customers.csv')
data['billing'] = pd.read_csv('data/synthetic/billing.csv')
data['churn'] = pd.read_csv('data/synthetic/churn_labels.csv')
data['service'] = pd.read_csv('data/synthetic/customer_service.csv')
data['quality'] = pd.read_csv('data/synthetic/service_quality.csv')
data['network'] = pd.read_csv('data/synthetic/network_performance.csv')
data['towers'] = pd.read_csv('data/synthetic/network_infrastructure.csv')
data['usage'] = pd.read_csv('data/synthetic/customer_usage.csv')
# Enhanced data (if exists)
try:
data['device'] = pd.read_csv('data/synthetic/device_data.csv')
data['journey'] = pd.read_csv('data/synthetic/customer_journey.csv')
data['competitive'] = pd.read_csv('data/synthetic/competitive_intelligence.csv')
data['weather'] = pd.read_csv('data/synthetic/weather_data.csv')
data['demographics'] = pd.read_csv('data/synthetic/demographics_data.csv')
print(" βœ… Enhanced datasets loaded")
except FileNotFoundError:
print(" ⚠ Enhanced datasets not found - run generate_enhanced_data.py first")
data['device'] = None
data['journey'] = None
print(f" βœ… Loaded {len(data['customers']):,} customers")
return data
def customer_journey_analytics(self, data):
"""
Calculate customer lifetime value and lifecycle patterns
Track customer service interaction history
Analyze payment behavior
Create customer segmentation
"""
print("\nπŸ‘€ Customer Journey Analytics...")
customers = data['customers'].copy()
billing = data['billing']
service = data['service']
# Calculate CLV
customer_revenue = billing.groupby('customer_id')['total_amount'].agg([
('total_revenue', 'sum'),
('avg_monthly_revenue', 'mean'),
('revenue_std', 'std'),
('billing_cycles', 'count')
]).reset_index()
# Service interaction patterns
service_stats = service.groupby('customer_id').agg({
'interaction_id': 'count',
'was_resolved': 'mean',
'was_escalated': 'mean',
'customer_satisfaction_score': ['mean', 'std', 'min'],
'resolution_time_minutes': 'mean'
}).reset_index()
service_stats.columns = ['customer_id', 'total_service_calls', 'resolution_rate',
'escalation_rate', 'avg_csat', 'csat_volatility',
'min_csat', 'avg_resolution_time']
# Merge journey data
customers = customers.merge(customer_revenue, on='customer_id', how='left')
customers = customers.merge(service_stats, on='customer_id', how='left')
# Fill NaNs
customers['total_service_calls'] = customers['total_service_calls'].fillna(0)
customers['avg_csat'] = customers['avg_csat'].fillna(7.0)
# Calculate ARPU
customers['arpu'] = customers['total_revenue'] / customers['tenure_months'].clip(lower=1)
# Customer lifecycle stage
customers['lifecycle_stage'] = pd.cut(
customers['tenure_months'],
bins=[0, 3, 12, 36, 1000],
labels=['New', 'Growing', 'Mature', 'Tenured']
)
# Payment behavior score
customers['payment_score'] = (
(customers.get('autopay_enabled', 0).astype(int) * 3) +
(customers.get('paperless_billing', 0).astype(int) * 2) +
((10 - customers.get('late_payments', 0).clip(upper=10)) / 2)
)
print(f" βœ… Generated {len(customers.columns)} journey features")
return customers
def network_performance_indicators(self, data):
"""
Cell tower load balancing and capacity utilization
Signal quality metrics
Network availability calculations
Coverage analysis
"""
print("\nπŸ“‘ Network Performance Indicators...")
network = data['network']
towers = data['towers']
# Tower-level aggregations
tower_metrics = network.groupby('tower_id').agg({
'bandwidth_utilization_pct': ['mean', 'max', 'std'],
'latency_ms': ['mean', 'p95', 'max'],
'packet_loss_pct': ['mean', 'max'],
'throughput_mbps': ['mean', 'min'],
'availability_pct': 'mean',
'active_users': ['mean', 'max'],
'handover_success_rate_pct': 'mean',
'call_setup_success_rate_pct': 'mean'
}).reset_index()
# Flatten column names
tower_metrics.columns = ['_'.join(col).strip('_') for col in tower_metrics.columns.values]
# Merge with tower data
tower_features = towers.merge(tower_metrics, on='tower_id', how='left')
# Calculate efficiency metrics
tower_features['capacity_efficiency'] = (
tower_features['bandwidth_utilization_pct_mean'] /
tower_features['max_capacity_mbps']
)
tower_features['quality_score'] = (
(tower_features['availability_pct_mean'] / 100) * 0.4 +
(tower_features['handover_success_rate_pct_mean'] / 100) * 0.3 +
(tower_features['call_setup_success_rate_pct_mean'] / 100) * 0.3
)
print(f" βœ… Generated {len(tower_features.columns)} network features")
return tower_features
def service_quality_features(self, data):
"""
Customer-reported vs. network-measured quality correlation
Speed test results by location and device
Call drop patterns
Video streaming quality indicators
"""
print("\n⚑ Service Quality Features...")
quality = data['quality']
customers = data['customers']
# Customer-level quality metrics
quality_metrics = quality.groupby('customer_id').agg({
'call_drop_occurred': 'sum',
'download_speed_mbps': ['mean', 'std', 'min'],
'upload_speed_mbps': ['mean', 'min'],
'mos_score': 'mean',
'jitter_ms': 'mean',
'buffering_events': 'sum',
'connection_time_sec': 'mean'
}).reset_index()
quality_metrics.columns = ['_'.join(col).strip('_') for col in quality_metrics.columns.values]
# Merge with customers
customer_quality = customers.merge(quality_metrics, on='customer_id', how='left')
# Quality score
customer_quality['overall_quality_score'] = (
(customer_quality.get('download_speed_mbps_mean', 50) / 100) * 0.3 +
(customer_quality.get('mos_score_mean', 4) / 5) * 0.3 +
((20 - customer_quality.get('call_drop_occurred', 0).clip(upper=20)) / 20) * 0.4
)
print(f" βœ… Generated {quality_metrics.shape[1]} quality features")
return customer_quality
def customer_behavior_patterns(self, data):
"""
Data usage trends and seasonal patterns
Peak usage times
Roaming behavior
Device upgrade cycles
Plan change patterns
"""
print("\nπŸ“Š Customer Behavior Patterns...")
usage = data['usage']
customers = data['customers']
# Usage pattern analysis
usage_patterns = usage.groupby('customer_id').agg({
'data_usage_gb': ['mean', 'std', 'max', 'sum'],
'voice_minutes': ['mean', 'max', 'sum'],
'sms_count': 'sum',
'roaming_minutes': 'sum',
'international_calls_min': 'sum',
'peak_hour_usage_gb': 'sum',
'data_session_count': 'mean'
}).reset_index()
usage_patterns.columns = ['_'.join(col).strip('_') for col in usage_patterns.columns.values]
# Merge with customers
customer_behavior = customers.merge(usage_patterns, on='customer_id', how='left')
# Calculate behavior scores
customer_behavior['data_intensity_score'] = (
customer_behavior.get('data_usage_gb_mean', 10) / 50 # Normalize to 0-1
).clip(upper=1)
customer_behavior['roaming_frequency'] = pd.cut(
customer_behavior.get('roaming_minutes_sum', 0),
bins=[0, 1, 100, 500, 10000],
labels=['Never', 'Rare', 'Occasional', 'Frequent']
)
customer_behavior['usage_volatility'] = (
customer_behavior.get('data_usage_gb_std', 0) /
customer_behavior.get('data_usage_gb_mean', 1).clip(lower=0.1)
)
print(f" βœ… Generated {usage_patterns.shape[1]} behavior features")
return customer_behavior
def churn_risk_indicators(self, data):
"""
Service quality degradation trends
Billing complaint patterns
Usage pattern changes
Customer service interaction frequency
Contract expiration proximity
"""
print("\n🎯 Churn Risk Indicators...")
customers = data['customers']
churn = data['churn']
service = data['service']
# Merge churn labels
customers_churn = customers.merge(churn, on='customer_id', how='left')
# Calculate risk factors
# 1. Contract expiration risk
if 'contract_end_date' in customers_churn.columns:
customers_churn['contract_end_date'] = pd.to_datetime(
customers_churn['contract_end_date'], errors='coerce'
)
reference_date = pd.Timestamp('2024-12-31')
customers_churn['days_to_contract_end'] = (
(customers_churn['contract_end_date'] - reference_date).dt.days
).fillna(999)
customers_churn['contract_expiring_soon'] = (
customers_churn['days_to_contract_end'] < 90
).astype(int)
else:
customers_churn['days_to_contract_end'] = 999
customers_churn['contract_expiring_soon'] = 0
# 2. Service complaint intensity
complaints = service[service['complaint_type'].str.contains('Issue|Problem', na=False, case=False)]
complaint_counts = complaints.groupby('customer_id').size().reset_index(name='complaint_count')
customers_churn = customers_churn.merge(complaint_counts, on='customer_id', how='left')
customers_churn['complaint_count'] = customers_churn['complaint_count'].fillna(0)
# 3. Tenure risk (early vs late stage)
customers_churn['early_tenure_risk'] = (customers_churn['tenure_months'] < 6).astype(int)
customers_churn['mid_tenure_risk'] = (
(customers_churn['tenure_months'] >= 6) &
(customers_churn['tenure_months'] <= 12)
).astype(int)
# 4. Price sensitivity indicator
customers_churn['price_to_arpu_ratio'] = (
customers_churn['monthly_plan_cost'] /
customers_churn.get('arpu', customers_churn['monthly_plan_cost']).clip(lower=1)
)
# 5. Composite churn risk score
customers_churn['computed_churn_risk_score'] = (
customers_churn['contract_expiring_soon'] * 0.25 +
(customers_churn['complaint_count'] / 10).clip(upper=1) * 0.25 +
customers_churn['early_tenure_risk'] * 0.20 +
(customers_churn['price_to_arpu_ratio'] - 1).clip(lower=0, upper=1) * 0.15 +
((10 - customers_churn.get('avg_csat', 7)) / 10).clip(lower=0) * 0.15
)
print(f" βœ… Generated churn risk indicators")
return customers_churn
def geographic_temporal_features(self, data):
"""
Location-based service quality
Urban vs rural performance
Time-of-day patterns
Seasonal variations
Weather impact
"""
print("\n🌍 Geographic & Temporal Features...")
customers = data['customers']
network = data['network']
towers = data['towers']
# City-level aggregations
city_metrics = customers.groupby('city').agg({
'customer_id': 'count',
'tenure_months': 'mean',
'monthly_plan_cost': 'mean'
}).reset_index()
city_metrics.columns = ['city', 'customers_in_city', 'avg_tenure_city', 'avg_price_city']
# Merge demographics if available
if 'demographics' in data and data['demographics'] is not None:
demo = data['demographics']
city_metrics = city_metrics.merge(demo, on='city', how='left')
# Merge city features back to customers
customers_geo = customers.merge(city_metrics, on='city', how='left')
# Urban/rural classification
if 'population_density_per_sqkm' in customers_geo.columns:
customers_geo['location_type'] = pd.cut(
customers_geo['population_density_per_sqkm'],
bins=[0, 500, 2000, 100000],
labels=['Rural', 'Suburban', 'Urban']
)
else:
customers_geo['location_type'] = 'Unknown'
print(f" βœ… Generated geographic/temporal features")
return customers_geo
def financial_performance_metrics(self, data):
"""
ARPU and revenue calculations
Customer acquisition cost
Churn cost analysis
Pricing optimization features
"""
print("\nπŸ’° Financial Performance Metrics...")
customers = data['customers']
billing = data['billing']
# Revenue metrics already calculated in journey analytics
# Additional financial features
# Revenue growth
billing_sorted = billing.sort_values(['customer_id', 'billing_period'])
billing_sorted['billing_period'] = pd.to_datetime(billing_sorted['billing_period'])
# Calculate month-over-month change (simplified)
revenue_change = billing.groupby('customer_id')['total_amount'].agg([
('first_month_revenue', 'first'),
('last_month_revenue', 'last'),
('revenue_trend', lambda x: x.iloc[-1] - x.iloc[0] if len(x) > 1 else 0)
]).reset_index()
customers_financial = customers.merge(revenue_change, on='customer_id', how='left')
# Profitability indicators
customers_financial['revenue_trend_pct'] = (
customers_financial['revenue_trend'] /
customers_financial['first_month_revenue'].clip(lower=1) * 100
).fillna(0)
# Value segment
arpu_quartiles = customers_financial['arpu'].quantile([0.25, 0.5, 0.75])
customers_financial['value_segment'] = pd.cut(
customers_financial['arpu'],
bins=[0, arpu_quartiles[0.25], arpu_quartiles[0.5], arpu_quartiles[0.75], 1000],
labels=['Low', 'Medium', 'High', 'Premium']
)
print(f" βœ… Generated financial metrics")
return customers_financial
def create_master_feature_table(self, data):
"""Combine all features into master table"""
print("\nπŸ”— Creating Master Feature Table...")
# Start with base customers
features = data['customers'].copy()
# Add journey analytics
features = self.customer_journey_analytics(data)
# Add quality features
quality_features = self.service_quality_features(data)
quality_cols = [c for c in quality_features.columns if c not in features.columns or c == 'customer_id']
features = features.merge(quality_features[quality_cols], on='customer_id', how='left')
# Add behavior patterns
behavior_features = self.customer_behavior_patterns(data)
behavior_cols = [c for c in behavior_features.columns if c not in features.columns or c == 'customer_id']
features = features.merge(behavior_features[behavior_cols], on='customer_id', how='left')
# Add churn indicators
churn_features = self.churn_risk_indicators(data)
churn_cols = [c for c in churn_features.columns if c not in features.columns or c == 'customer_id']
features = features.merge(churn_features[churn_cols], on='customer_id', how='left')
# Add geographic features
geo_features = self.geographic_temporal_features(data)
geo_cols = [c for c in geo_features.columns if c not in features.columns or c == 'customer_id']
features = features.merge(geo_features[geo_cols], on='customer_id', how='left')
# Add financial metrics
financial_features = self.financial_performance_metrics(data)
financial_cols = [c for c in financial_features.columns if c not in features.columns or c == 'customer_id']
features = features.merge(financial_features[financial_cols], on='customer_id', how='left')
# Add device data if available
if data.get('device') is not None:
device_cols = [c for c in data['device'].columns if c not in features.columns or c == 'customer_id']
features = features.merge(data['device'][device_cols], on='customer_id', how='left')
# Add journey data if available
if data.get('journey') is not None:
journey_cols = [c for c in data['journey'].columns if c not in features.columns or c == 'customer_id']
features = features.merge(data['journey'][journey_cols], on='customer_id', how='left')
print(f" βœ… Master feature table: {features.shape[0]:,} rows Γ— {features.shape[1]} columns")
return features
def run_pipeline(self):
"""Execute complete feature engineering pipeline"""
print("\n" + "="*80)
print("COMPREHENSIVE FEATURE ENGINEERING PIPELINE")
print("="*80)
# Load data
data = self.load_data()
# Create master feature table
features = self.create_master_feature_table(data)
# Save
output_path = 'data/processed/comprehensive_features.csv'
features.to_csv(output_path, index=False)
print(f"\nβœ… Saved comprehensive features to: {output_path}")
print(f"\nπŸ“Š Feature Summary:")
print(f" - Total Features: {features.shape[1]}")
print(f" - Total Records: {features.shape[0]:,}")
print(f" - Data Quality: {(1 - features.isnull().sum().sum() / (features.shape[0] * features.shape[1])) * 100:.1f}% complete")
print("\n" + "="*80)
return features
def main():
"""Run comprehensive feature engineering"""
engineer = ComprehensiveFeatureEngineer()
features = engineer.run_pipeline()
return features
if __name__ == "__main__":
main()