""" Comprehensive Feature Engineering Pipeline Implements ALL transformation requirements from Technical Specification """ import pandas as pd import numpy as np from datetime import datetime, timedelta import warnings warnings.filterwarnings('ignore') class ComprehensiveFeatureEngineer: """ Implements complete feature engineering per technical requirements: - Customer Journey Analytics - Network Performance Indicators - Service Quality Features - Customer Behavior Patterns - Churn Risk Indicators - Geographic and Temporal Features - Financial Performance Metrics - Competitive Intelligence Integration """ def __init__(self): print("Initializing Comprehensive Feature Engineering Pipeline...") self.features_df = None def load_data(self): """Load all data sources""" print("\nšŸ“‚ Loading All Data Sources...") data = {} # Core data data['customers'] = pd.read_csv('data/synthetic/customers.csv') data['billing'] = pd.read_csv('data/synthetic/billing.csv') data['churn'] = pd.read_csv('data/synthetic/churn_labels.csv') data['service'] = pd.read_csv('data/synthetic/customer_service.csv') data['quality'] = pd.read_csv('data/synthetic/service_quality.csv') data['network'] = pd.read_csv('data/synthetic/network_performance.csv') data['towers'] = pd.read_csv('data/synthetic/network_infrastructure.csv') data['usage'] = pd.read_csv('data/synthetic/customer_usage.csv') # Enhanced data (if exists) try: data['device'] = pd.read_csv('data/synthetic/device_data.csv') data['journey'] = pd.read_csv('data/synthetic/customer_journey.csv') data['competitive'] = pd.read_csv('data/synthetic/competitive_intelligence.csv') data['weather'] = pd.read_csv('data/synthetic/weather_data.csv') data['demographics'] = pd.read_csv('data/synthetic/demographics_data.csv') print(" āœ… Enhanced datasets loaded") except FileNotFoundError: print(" ⚠ Enhanced datasets not found - run generate_enhanced_data.py first") data['device'] = None data['journey'] = None print(f" āœ… Loaded {len(data['customers']):,} customers") return data def customer_journey_analytics(self, data): """ Calculate customer lifetime value and lifecycle patterns Track customer service interaction history Analyze payment behavior Create customer segmentation """ print("\nšŸ‘¤ Customer Journey Analytics...") customers = data['customers'].copy() billing = data['billing'] service = data['service'] # Calculate CLV customer_revenue = billing.groupby('customer_id')['total_amount'].agg([ ('total_revenue', 'sum'), ('avg_monthly_revenue', 'mean'), ('revenue_std', 'std'), ('billing_cycles', 'count') ]).reset_index() # Service interaction patterns service_stats = service.groupby('customer_id').agg({ 'interaction_id': 'count', 'was_resolved': 'mean', 'was_escalated': 'mean', 'customer_satisfaction_score': ['mean', 'std', 'min'], 'resolution_time_minutes': 'mean' }).reset_index() service_stats.columns = ['customer_id', 'total_service_calls', 'resolution_rate', 'escalation_rate', 'avg_csat', 'csat_volatility', 'min_csat', 'avg_resolution_time'] # Merge journey data customers = customers.merge(customer_revenue, on='customer_id', how='left') customers = customers.merge(service_stats, on='customer_id', how='left') # Fill NaNs customers['total_service_calls'] = customers['total_service_calls'].fillna(0) customers['avg_csat'] = customers['avg_csat'].fillna(7.0) # Calculate ARPU customers['arpu'] = customers['total_revenue'] / customers['tenure_months'].clip(lower=1) # Customer lifecycle stage customers['lifecycle_stage'] = pd.cut( customers['tenure_months'], bins=[0, 3, 12, 36, 1000], labels=['New', 'Growing', 'Mature', 'Tenured'] ) # Payment behavior score customers['payment_score'] = ( (customers.get('autopay_enabled', 0).astype(int) * 3) + (customers.get('paperless_billing', 0).astype(int) * 2) + ((10 - customers.get('late_payments', 0).clip(upper=10)) / 2) ) print(f" āœ… Generated {len(customers.columns)} journey features") return customers def network_performance_indicators(self, data): """ Cell tower load balancing and capacity utilization Signal quality metrics Network availability calculations Coverage analysis """ print("\nšŸ“” Network Performance Indicators...") network = data['network'] towers = data['towers'] # Tower-level aggregations tower_metrics = network.groupby('tower_id').agg({ 'bandwidth_utilization_pct': ['mean', 'max', 'std'], 'latency_ms': ['mean', 'p95', 'max'], 'packet_loss_pct': ['mean', 'max'], 'throughput_mbps': ['mean', 'min'], 'availability_pct': 'mean', 'active_users': ['mean', 'max'], 'handover_success_rate_pct': 'mean', 'call_setup_success_rate_pct': 'mean' }).reset_index() # Flatten column names tower_metrics.columns = ['_'.join(col).strip('_') for col in tower_metrics.columns.values] # Merge with tower data tower_features = towers.merge(tower_metrics, on='tower_id', how='left') # Calculate efficiency metrics tower_features['capacity_efficiency'] = ( tower_features['bandwidth_utilization_pct_mean'] / tower_features['max_capacity_mbps'] ) tower_features['quality_score'] = ( (tower_features['availability_pct_mean'] / 100) * 0.4 + (tower_features['handover_success_rate_pct_mean'] / 100) * 0.3 + (tower_features['call_setup_success_rate_pct_mean'] / 100) * 0.3 ) print(f" āœ… Generated {len(tower_features.columns)} network features") return tower_features def service_quality_features(self, data): """ Customer-reported vs. network-measured quality correlation Speed test results by location and device Call drop patterns Video streaming quality indicators """ print("\n⚔ Service Quality Features...") quality = data['quality'] customers = data['customers'] # Customer-level quality metrics quality_metrics = quality.groupby('customer_id').agg({ 'call_drop_occurred': 'sum', 'download_speed_mbps': ['mean', 'std', 'min'], 'upload_speed_mbps': ['mean', 'min'], 'mos_score': 'mean', 'jitter_ms': 'mean', 'buffering_events': 'sum', 'connection_time_sec': 'mean' }).reset_index() quality_metrics.columns = ['_'.join(col).strip('_') for col in quality_metrics.columns.values] # Merge with customers customer_quality = customers.merge(quality_metrics, on='customer_id', how='left') # Quality score customer_quality['overall_quality_score'] = ( (customer_quality.get('download_speed_mbps_mean', 50) / 100) * 0.3 + (customer_quality.get('mos_score_mean', 4) / 5) * 0.3 + ((20 - customer_quality.get('call_drop_occurred', 0).clip(upper=20)) / 20) * 0.4 ) print(f" āœ… Generated {quality_metrics.shape[1]} quality features") return customer_quality def customer_behavior_patterns(self, data): """ Data usage trends and seasonal patterns Peak usage times Roaming behavior Device upgrade cycles Plan change patterns """ print("\nšŸ“Š Customer Behavior Patterns...") usage = data['usage'] customers = data['customers'] # Usage pattern analysis usage_patterns = usage.groupby('customer_id').agg({ 'data_usage_gb': ['mean', 'std', 'max', 'sum'], 'voice_minutes': ['mean', 'max', 'sum'], 'sms_count': 'sum', 'roaming_minutes': 'sum', 'international_calls_min': 'sum', 'peak_hour_usage_gb': 'sum', 'data_session_count': 'mean' }).reset_index() usage_patterns.columns = ['_'.join(col).strip('_') for col in usage_patterns.columns.values] # Merge with customers customer_behavior = customers.merge(usage_patterns, on='customer_id', how='left') # Calculate behavior scores customer_behavior['data_intensity_score'] = ( customer_behavior.get('data_usage_gb_mean', 10) / 50 # Normalize to 0-1 ).clip(upper=1) customer_behavior['roaming_frequency'] = pd.cut( customer_behavior.get('roaming_minutes_sum', 0), bins=[0, 1, 100, 500, 10000], labels=['Never', 'Rare', 'Occasional', 'Frequent'] ) customer_behavior['usage_volatility'] = ( customer_behavior.get('data_usage_gb_std', 0) / customer_behavior.get('data_usage_gb_mean', 1).clip(lower=0.1) ) print(f" āœ… Generated {usage_patterns.shape[1]} behavior features") return customer_behavior def churn_risk_indicators(self, data): """ Service quality degradation trends Billing complaint patterns Usage pattern changes Customer service interaction frequency Contract expiration proximity """ print("\nšŸŽÆ Churn Risk Indicators...") customers = data['customers'] churn = data['churn'] service = data['service'] # Merge churn labels customers_churn = customers.merge(churn, on='customer_id', how='left') # Calculate risk factors # 1. Contract expiration risk if 'contract_end_date' in customers_churn.columns: customers_churn['contract_end_date'] = pd.to_datetime( customers_churn['contract_end_date'], errors='coerce' ) reference_date = pd.Timestamp('2024-12-31') customers_churn['days_to_contract_end'] = ( (customers_churn['contract_end_date'] - reference_date).dt.days ).fillna(999) customers_churn['contract_expiring_soon'] = ( customers_churn['days_to_contract_end'] < 90 ).astype(int) else: customers_churn['days_to_contract_end'] = 999 customers_churn['contract_expiring_soon'] = 0 # 2. Service complaint intensity complaints = service[service['complaint_type'].str.contains('Issue|Problem', na=False, case=False)] complaint_counts = complaints.groupby('customer_id').size().reset_index(name='complaint_count') customers_churn = customers_churn.merge(complaint_counts, on='customer_id', how='left') customers_churn['complaint_count'] = customers_churn['complaint_count'].fillna(0) # 3. Tenure risk (early vs late stage) customers_churn['early_tenure_risk'] = (customers_churn['tenure_months'] < 6).astype(int) customers_churn['mid_tenure_risk'] = ( (customers_churn['tenure_months'] >= 6) & (customers_churn['tenure_months'] <= 12) ).astype(int) # 4. Price sensitivity indicator customers_churn['price_to_arpu_ratio'] = ( customers_churn['monthly_plan_cost'] / customers_churn.get('arpu', customers_churn['monthly_plan_cost']).clip(lower=1) ) # 5. Composite churn risk score customers_churn['computed_churn_risk_score'] = ( customers_churn['contract_expiring_soon'] * 0.25 + (customers_churn['complaint_count'] / 10).clip(upper=1) * 0.25 + customers_churn['early_tenure_risk'] * 0.20 + (customers_churn['price_to_arpu_ratio'] - 1).clip(lower=0, upper=1) * 0.15 + ((10 - customers_churn.get('avg_csat', 7)) / 10).clip(lower=0) * 0.15 ) print(f" āœ… Generated churn risk indicators") return customers_churn def geographic_temporal_features(self, data): """ Location-based service quality Urban vs rural performance Time-of-day patterns Seasonal variations Weather impact """ print("\nšŸŒ Geographic & Temporal Features...") customers = data['customers'] network = data['network'] towers = data['towers'] # City-level aggregations city_metrics = customers.groupby('city').agg({ 'customer_id': 'count', 'tenure_months': 'mean', 'monthly_plan_cost': 'mean' }).reset_index() city_metrics.columns = ['city', 'customers_in_city', 'avg_tenure_city', 'avg_price_city'] # Merge demographics if available if 'demographics' in data and data['demographics'] is not None: demo = data['demographics'] city_metrics = city_metrics.merge(demo, on='city', how='left') # Merge city features back to customers customers_geo = customers.merge(city_metrics, on='city', how='left') # Urban/rural classification if 'population_density_per_sqkm' in customers_geo.columns: customers_geo['location_type'] = pd.cut( customers_geo['population_density_per_sqkm'], bins=[0, 500, 2000, 100000], labels=['Rural', 'Suburban', 'Urban'] ) else: customers_geo['location_type'] = 'Unknown' print(f" āœ… Generated geographic/temporal features") return customers_geo def financial_performance_metrics(self, data): """ ARPU and revenue calculations Customer acquisition cost Churn cost analysis Pricing optimization features """ print("\nšŸ’° Financial Performance Metrics...") customers = data['customers'] billing = data['billing'] # Revenue metrics already calculated in journey analytics # Additional financial features # Revenue growth billing_sorted = billing.sort_values(['customer_id', 'billing_period']) billing_sorted['billing_period'] = pd.to_datetime(billing_sorted['billing_period']) # Calculate month-over-month change (simplified) revenue_change = billing.groupby('customer_id')['total_amount'].agg([ ('first_month_revenue', 'first'), ('last_month_revenue', 'last'), ('revenue_trend', lambda x: x.iloc[-1] - x.iloc[0] if len(x) > 1 else 0) ]).reset_index() customers_financial = customers.merge(revenue_change, on='customer_id', how='left') # Profitability indicators customers_financial['revenue_trend_pct'] = ( customers_financial['revenue_trend'] / customers_financial['first_month_revenue'].clip(lower=1) * 100 ).fillna(0) # Value segment arpu_quartiles = customers_financial['arpu'].quantile([0.25, 0.5, 0.75]) customers_financial['value_segment'] = pd.cut( customers_financial['arpu'], bins=[0, arpu_quartiles[0.25], arpu_quartiles[0.5], arpu_quartiles[0.75], 1000], labels=['Low', 'Medium', 'High', 'Premium'] ) print(f" āœ… Generated financial metrics") return customers_financial def create_master_feature_table(self, data): """Combine all features into master table""" print("\nšŸ”— Creating Master Feature Table...") # Start with base customers features = data['customers'].copy() # Add journey analytics features = self.customer_journey_analytics(data) # Add quality features quality_features = self.service_quality_features(data) quality_cols = [c for c in quality_features.columns if c not in features.columns or c == 'customer_id'] features = features.merge(quality_features[quality_cols], on='customer_id', how='left') # Add behavior patterns behavior_features = self.customer_behavior_patterns(data) behavior_cols = [c for c in behavior_features.columns if c not in features.columns or c == 'customer_id'] features = features.merge(behavior_features[behavior_cols], on='customer_id', how='left') # Add churn indicators churn_features = self.churn_risk_indicators(data) churn_cols = [c for c in churn_features.columns if c not in features.columns or c == 'customer_id'] features = features.merge(churn_features[churn_cols], on='customer_id', how='left') # Add geographic features geo_features = self.geographic_temporal_features(data) geo_cols = [c for c in geo_features.columns if c not in features.columns or c == 'customer_id'] features = features.merge(geo_features[geo_cols], on='customer_id', how='left') # Add financial metrics financial_features = self.financial_performance_metrics(data) financial_cols = [c for c in financial_features.columns if c not in features.columns or c == 'customer_id'] features = features.merge(financial_features[financial_cols], on='customer_id', how='left') # Add device data if available if data.get('device') is not None: device_cols = [c for c in data['device'].columns if c not in features.columns or c == 'customer_id'] features = features.merge(data['device'][device_cols], on='customer_id', how='left') # Add journey data if available if data.get('journey') is not None: journey_cols = [c for c in data['journey'].columns if c not in features.columns or c == 'customer_id'] features = features.merge(data['journey'][journey_cols], on='customer_id', how='left') print(f" āœ… Master feature table: {features.shape[0]:,} rows Ɨ {features.shape[1]} columns") return features def run_pipeline(self): """Execute complete feature engineering pipeline""" print("\n" + "="*80) print("COMPREHENSIVE FEATURE ENGINEERING PIPELINE") print("="*80) # Load data data = self.load_data() # Create master feature table features = self.create_master_feature_table(data) # Save output_path = 'data/processed/comprehensive_features.csv' features.to_csv(output_path, index=False) print(f"\nāœ… Saved comprehensive features to: {output_path}") print(f"\nšŸ“Š Feature Summary:") print(f" - Total Features: {features.shape[1]}") print(f" - Total Records: {features.shape[0]:,}") print(f" - Data Quality: {(1 - features.isnull().sum().sum() / (features.shape[0] * features.shape[1])) * 100:.1f}% complete") print("\n" + "="*80) return features def main(): """Run comprehensive feature engineering""" engineer = ComprehensiveFeatureEngineer() features = engineer.run_pipeline() return features if __name__ == "__main__": main()