Spaces:
Sleeping
Sleeping
| """ | |
| Comprehensive Feature Engineering Pipeline | |
| Implements ALL transformation requirements from Technical Specification | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| from datetime import datetime, timedelta | |
| import warnings | |
| warnings.filterwarnings('ignore') | |
| class ComprehensiveFeatureEngineer: | |
| """ | |
| Implements complete feature engineering per technical requirements: | |
| - Customer Journey Analytics | |
| - Network Performance Indicators | |
| - Service Quality Features | |
| - Customer Behavior Patterns | |
| - Churn Risk Indicators | |
| - Geographic and Temporal Features | |
| - Financial Performance Metrics | |
| - Competitive Intelligence Integration | |
| """ | |
| def __init__(self): | |
| print("Initializing Comprehensive Feature Engineering Pipeline...") | |
| self.features_df = None | |
| def load_data(self): | |
| """Load all data sources""" | |
| print("\nπ Loading All Data Sources...") | |
| data = {} | |
| # Core data | |
| data['customers'] = pd.read_csv('data/synthetic/customers.csv') | |
| data['billing'] = pd.read_csv('data/synthetic/billing.csv') | |
| data['churn'] = pd.read_csv('data/synthetic/churn_labels.csv') | |
| data['service'] = pd.read_csv('data/synthetic/customer_service.csv') | |
| data['quality'] = pd.read_csv('data/synthetic/service_quality.csv') | |
| data['network'] = pd.read_csv('data/synthetic/network_performance.csv') | |
| data['towers'] = pd.read_csv('data/synthetic/network_infrastructure.csv') | |
| data['usage'] = pd.read_csv('data/synthetic/customer_usage.csv') | |
| # Enhanced data (if exists) | |
| try: | |
| data['device'] = pd.read_csv('data/synthetic/device_data.csv') | |
| data['journey'] = pd.read_csv('data/synthetic/customer_journey.csv') | |
| data['competitive'] = pd.read_csv('data/synthetic/competitive_intelligence.csv') | |
| data['weather'] = pd.read_csv('data/synthetic/weather_data.csv') | |
| data['demographics'] = pd.read_csv('data/synthetic/demographics_data.csv') | |
| print(" β Enhanced datasets loaded") | |
| except FileNotFoundError: | |
| print(" β Enhanced datasets not found - run generate_enhanced_data.py first") | |
| data['device'] = None | |
| data['journey'] = None | |
| print(f" β Loaded {len(data['customers']):,} customers") | |
| return data | |
| def customer_journey_analytics(self, data): | |
| """ | |
| Calculate customer lifetime value and lifecycle patterns | |
| Track customer service interaction history | |
| Analyze payment behavior | |
| Create customer segmentation | |
| """ | |
| print("\nπ€ Customer Journey Analytics...") | |
| customers = data['customers'].copy() | |
| billing = data['billing'] | |
| service = data['service'] | |
| # Calculate CLV | |
| customer_revenue = billing.groupby('customer_id')['total_amount'].agg([ | |
| ('total_revenue', 'sum'), | |
| ('avg_monthly_revenue', 'mean'), | |
| ('revenue_std', 'std'), | |
| ('billing_cycles', 'count') | |
| ]).reset_index() | |
| # Service interaction patterns | |
| service_stats = service.groupby('customer_id').agg({ | |
| 'interaction_id': 'count', | |
| 'was_resolved': 'mean', | |
| 'was_escalated': 'mean', | |
| 'customer_satisfaction_score': ['mean', 'std', 'min'], | |
| 'resolution_time_minutes': 'mean' | |
| }).reset_index() | |
| service_stats.columns = ['customer_id', 'total_service_calls', 'resolution_rate', | |
| 'escalation_rate', 'avg_csat', 'csat_volatility', | |
| 'min_csat', 'avg_resolution_time'] | |
| # Merge journey data | |
| customers = customers.merge(customer_revenue, on='customer_id', how='left') | |
| customers = customers.merge(service_stats, on='customer_id', how='left') | |
| # Fill NaNs | |
| customers['total_service_calls'] = customers['total_service_calls'].fillna(0) | |
| customers['avg_csat'] = customers['avg_csat'].fillna(7.0) | |
| # Calculate ARPU | |
| customers['arpu'] = customers['total_revenue'] / customers['tenure_months'].clip(lower=1) | |
| # Customer lifecycle stage | |
| customers['lifecycle_stage'] = pd.cut( | |
| customers['tenure_months'], | |
| bins=[0, 3, 12, 36, 1000], | |
| labels=['New', 'Growing', 'Mature', 'Tenured'] | |
| ) | |
| # Payment behavior score | |
| customers['payment_score'] = ( | |
| (customers.get('autopay_enabled', 0).astype(int) * 3) + | |
| (customers.get('paperless_billing', 0).astype(int) * 2) + | |
| ((10 - customers.get('late_payments', 0).clip(upper=10)) / 2) | |
| ) | |
| print(f" β Generated {len(customers.columns)} journey features") | |
| return customers | |
| def network_performance_indicators(self, data): | |
| """ | |
| Cell tower load balancing and capacity utilization | |
| Signal quality metrics | |
| Network availability calculations | |
| Coverage analysis | |
| """ | |
| print("\nπ‘ Network Performance Indicators...") | |
| network = data['network'] | |
| towers = data['towers'] | |
| # Tower-level aggregations | |
| tower_metrics = network.groupby('tower_id').agg({ | |
| 'bandwidth_utilization_pct': ['mean', 'max', 'std'], | |
| 'latency_ms': ['mean', 'p95', 'max'], | |
| 'packet_loss_pct': ['mean', 'max'], | |
| 'throughput_mbps': ['mean', 'min'], | |
| 'availability_pct': 'mean', | |
| 'active_users': ['mean', 'max'], | |
| 'handover_success_rate_pct': 'mean', | |
| 'call_setup_success_rate_pct': 'mean' | |
| }).reset_index() | |
| # Flatten column names | |
| tower_metrics.columns = ['_'.join(col).strip('_') for col in tower_metrics.columns.values] | |
| # Merge with tower data | |
| tower_features = towers.merge(tower_metrics, on='tower_id', how='left') | |
| # Calculate efficiency metrics | |
| tower_features['capacity_efficiency'] = ( | |
| tower_features['bandwidth_utilization_pct_mean'] / | |
| tower_features['max_capacity_mbps'] | |
| ) | |
| tower_features['quality_score'] = ( | |
| (tower_features['availability_pct_mean'] / 100) * 0.4 + | |
| (tower_features['handover_success_rate_pct_mean'] / 100) * 0.3 + | |
| (tower_features['call_setup_success_rate_pct_mean'] / 100) * 0.3 | |
| ) | |
| print(f" β Generated {len(tower_features.columns)} network features") | |
| return tower_features | |
| def service_quality_features(self, data): | |
| """ | |
| Customer-reported vs. network-measured quality correlation | |
| Speed test results by location and device | |
| Call drop patterns | |
| Video streaming quality indicators | |
| """ | |
| print("\nβ‘ Service Quality Features...") | |
| quality = data['quality'] | |
| customers = data['customers'] | |
| # Customer-level quality metrics | |
| quality_metrics = quality.groupby('customer_id').agg({ | |
| 'call_drop_occurred': 'sum', | |
| 'download_speed_mbps': ['mean', 'std', 'min'], | |
| 'upload_speed_mbps': ['mean', 'min'], | |
| 'mos_score': 'mean', | |
| 'jitter_ms': 'mean', | |
| 'buffering_events': 'sum', | |
| 'connection_time_sec': 'mean' | |
| }).reset_index() | |
| quality_metrics.columns = ['_'.join(col).strip('_') for col in quality_metrics.columns.values] | |
| # Merge with customers | |
| customer_quality = customers.merge(quality_metrics, on='customer_id', how='left') | |
| # Quality score | |
| customer_quality['overall_quality_score'] = ( | |
| (customer_quality.get('download_speed_mbps_mean', 50) / 100) * 0.3 + | |
| (customer_quality.get('mos_score_mean', 4) / 5) * 0.3 + | |
| ((20 - customer_quality.get('call_drop_occurred', 0).clip(upper=20)) / 20) * 0.4 | |
| ) | |
| print(f" β Generated {quality_metrics.shape[1]} quality features") | |
| return customer_quality | |
| def customer_behavior_patterns(self, data): | |
| """ | |
| Data usage trends and seasonal patterns | |
| Peak usage times | |
| Roaming behavior | |
| Device upgrade cycles | |
| Plan change patterns | |
| """ | |
| print("\nπ Customer Behavior Patterns...") | |
| usage = data['usage'] | |
| customers = data['customers'] | |
| # Usage pattern analysis | |
| usage_patterns = usage.groupby('customer_id').agg({ | |
| 'data_usage_gb': ['mean', 'std', 'max', 'sum'], | |
| 'voice_minutes': ['mean', 'max', 'sum'], | |
| 'sms_count': 'sum', | |
| 'roaming_minutes': 'sum', | |
| 'international_calls_min': 'sum', | |
| 'peak_hour_usage_gb': 'sum', | |
| 'data_session_count': 'mean' | |
| }).reset_index() | |
| usage_patterns.columns = ['_'.join(col).strip('_') for col in usage_patterns.columns.values] | |
| # Merge with customers | |
| customer_behavior = customers.merge(usage_patterns, on='customer_id', how='left') | |
| # Calculate behavior scores | |
| customer_behavior['data_intensity_score'] = ( | |
| customer_behavior.get('data_usage_gb_mean', 10) / 50 # Normalize to 0-1 | |
| ).clip(upper=1) | |
| customer_behavior['roaming_frequency'] = pd.cut( | |
| customer_behavior.get('roaming_minutes_sum', 0), | |
| bins=[0, 1, 100, 500, 10000], | |
| labels=['Never', 'Rare', 'Occasional', 'Frequent'] | |
| ) | |
| customer_behavior['usage_volatility'] = ( | |
| customer_behavior.get('data_usage_gb_std', 0) / | |
| customer_behavior.get('data_usage_gb_mean', 1).clip(lower=0.1) | |
| ) | |
| print(f" β Generated {usage_patterns.shape[1]} behavior features") | |
| return customer_behavior | |
| def churn_risk_indicators(self, data): | |
| """ | |
| Service quality degradation trends | |
| Billing complaint patterns | |
| Usage pattern changes | |
| Customer service interaction frequency | |
| Contract expiration proximity | |
| """ | |
| print("\nπ― Churn Risk Indicators...") | |
| customers = data['customers'] | |
| churn = data['churn'] | |
| service = data['service'] | |
| # Merge churn labels | |
| customers_churn = customers.merge(churn, on='customer_id', how='left') | |
| # Calculate risk factors | |
| # 1. Contract expiration risk | |
| if 'contract_end_date' in customers_churn.columns: | |
| customers_churn['contract_end_date'] = pd.to_datetime( | |
| customers_churn['contract_end_date'], errors='coerce' | |
| ) | |
| reference_date = pd.Timestamp('2024-12-31') | |
| customers_churn['days_to_contract_end'] = ( | |
| (customers_churn['contract_end_date'] - reference_date).dt.days | |
| ).fillna(999) | |
| customers_churn['contract_expiring_soon'] = ( | |
| customers_churn['days_to_contract_end'] < 90 | |
| ).astype(int) | |
| else: | |
| customers_churn['days_to_contract_end'] = 999 | |
| customers_churn['contract_expiring_soon'] = 0 | |
| # 2. Service complaint intensity | |
| complaints = service[service['complaint_type'].str.contains('Issue|Problem', na=False, case=False)] | |
| complaint_counts = complaints.groupby('customer_id').size().reset_index(name='complaint_count') | |
| customers_churn = customers_churn.merge(complaint_counts, on='customer_id', how='left') | |
| customers_churn['complaint_count'] = customers_churn['complaint_count'].fillna(0) | |
| # 3. Tenure risk (early vs late stage) | |
| customers_churn['early_tenure_risk'] = (customers_churn['tenure_months'] < 6).astype(int) | |
| customers_churn['mid_tenure_risk'] = ( | |
| (customers_churn['tenure_months'] >= 6) & | |
| (customers_churn['tenure_months'] <= 12) | |
| ).astype(int) | |
| # 4. Price sensitivity indicator | |
| customers_churn['price_to_arpu_ratio'] = ( | |
| customers_churn['monthly_plan_cost'] / | |
| customers_churn.get('arpu', customers_churn['monthly_plan_cost']).clip(lower=1) | |
| ) | |
| # 5. Composite churn risk score | |
| customers_churn['computed_churn_risk_score'] = ( | |
| customers_churn['contract_expiring_soon'] * 0.25 + | |
| (customers_churn['complaint_count'] / 10).clip(upper=1) * 0.25 + | |
| customers_churn['early_tenure_risk'] * 0.20 + | |
| (customers_churn['price_to_arpu_ratio'] - 1).clip(lower=0, upper=1) * 0.15 + | |
| ((10 - customers_churn.get('avg_csat', 7)) / 10).clip(lower=0) * 0.15 | |
| ) | |
| print(f" β Generated churn risk indicators") | |
| return customers_churn | |
| def geographic_temporal_features(self, data): | |
| """ | |
| Location-based service quality | |
| Urban vs rural performance | |
| Time-of-day patterns | |
| Seasonal variations | |
| Weather impact | |
| """ | |
| print("\nπ Geographic & Temporal Features...") | |
| customers = data['customers'] | |
| network = data['network'] | |
| towers = data['towers'] | |
| # City-level aggregations | |
| city_metrics = customers.groupby('city').agg({ | |
| 'customer_id': 'count', | |
| 'tenure_months': 'mean', | |
| 'monthly_plan_cost': 'mean' | |
| }).reset_index() | |
| city_metrics.columns = ['city', 'customers_in_city', 'avg_tenure_city', 'avg_price_city'] | |
| # Merge demographics if available | |
| if 'demographics' in data and data['demographics'] is not None: | |
| demo = data['demographics'] | |
| city_metrics = city_metrics.merge(demo, on='city', how='left') | |
| # Merge city features back to customers | |
| customers_geo = customers.merge(city_metrics, on='city', how='left') | |
| # Urban/rural classification | |
| if 'population_density_per_sqkm' in customers_geo.columns: | |
| customers_geo['location_type'] = pd.cut( | |
| customers_geo['population_density_per_sqkm'], | |
| bins=[0, 500, 2000, 100000], | |
| labels=['Rural', 'Suburban', 'Urban'] | |
| ) | |
| else: | |
| customers_geo['location_type'] = 'Unknown' | |
| print(f" β Generated geographic/temporal features") | |
| return customers_geo | |
| def financial_performance_metrics(self, data): | |
| """ | |
| ARPU and revenue calculations | |
| Customer acquisition cost | |
| Churn cost analysis | |
| Pricing optimization features | |
| """ | |
| print("\nπ° Financial Performance Metrics...") | |
| customers = data['customers'] | |
| billing = data['billing'] | |
| # Revenue metrics already calculated in journey analytics | |
| # Additional financial features | |
| # Revenue growth | |
| billing_sorted = billing.sort_values(['customer_id', 'billing_period']) | |
| billing_sorted['billing_period'] = pd.to_datetime(billing_sorted['billing_period']) | |
| # Calculate month-over-month change (simplified) | |
| revenue_change = billing.groupby('customer_id')['total_amount'].agg([ | |
| ('first_month_revenue', 'first'), | |
| ('last_month_revenue', 'last'), | |
| ('revenue_trend', lambda x: x.iloc[-1] - x.iloc[0] if len(x) > 1 else 0) | |
| ]).reset_index() | |
| customers_financial = customers.merge(revenue_change, on='customer_id', how='left') | |
| # Profitability indicators | |
| customers_financial['revenue_trend_pct'] = ( | |
| customers_financial['revenue_trend'] / | |
| customers_financial['first_month_revenue'].clip(lower=1) * 100 | |
| ).fillna(0) | |
| # Value segment | |
| arpu_quartiles = customers_financial['arpu'].quantile([0.25, 0.5, 0.75]) | |
| customers_financial['value_segment'] = pd.cut( | |
| customers_financial['arpu'], | |
| bins=[0, arpu_quartiles[0.25], arpu_quartiles[0.5], arpu_quartiles[0.75], 1000], | |
| labels=['Low', 'Medium', 'High', 'Premium'] | |
| ) | |
| print(f" β Generated financial metrics") | |
| return customers_financial | |
| def create_master_feature_table(self, data): | |
| """Combine all features into master table""" | |
| print("\nπ Creating Master Feature Table...") | |
| # Start with base customers | |
| features = data['customers'].copy() | |
| # Add journey analytics | |
| features = self.customer_journey_analytics(data) | |
| # Add quality features | |
| quality_features = self.service_quality_features(data) | |
| quality_cols = [c for c in quality_features.columns if c not in features.columns or c == 'customer_id'] | |
| features = features.merge(quality_features[quality_cols], on='customer_id', how='left') | |
| # Add behavior patterns | |
| behavior_features = self.customer_behavior_patterns(data) | |
| behavior_cols = [c for c in behavior_features.columns if c not in features.columns or c == 'customer_id'] | |
| features = features.merge(behavior_features[behavior_cols], on='customer_id', how='left') | |
| # Add churn indicators | |
| churn_features = self.churn_risk_indicators(data) | |
| churn_cols = [c for c in churn_features.columns if c not in features.columns or c == 'customer_id'] | |
| features = features.merge(churn_features[churn_cols], on='customer_id', how='left') | |
| # Add geographic features | |
| geo_features = self.geographic_temporal_features(data) | |
| geo_cols = [c for c in geo_features.columns if c not in features.columns or c == 'customer_id'] | |
| features = features.merge(geo_features[geo_cols], on='customer_id', how='left') | |
| # Add financial metrics | |
| financial_features = self.financial_performance_metrics(data) | |
| financial_cols = [c for c in financial_features.columns if c not in features.columns or c == 'customer_id'] | |
| features = features.merge(financial_features[financial_cols], on='customer_id', how='left') | |
| # Add device data if available | |
| if data.get('device') is not None: | |
| device_cols = [c for c in data['device'].columns if c not in features.columns or c == 'customer_id'] | |
| features = features.merge(data['device'][device_cols], on='customer_id', how='left') | |
| # Add journey data if available | |
| if data.get('journey') is not None: | |
| journey_cols = [c for c in data['journey'].columns if c not in features.columns or c == 'customer_id'] | |
| features = features.merge(data['journey'][journey_cols], on='customer_id', how='left') | |
| print(f" β Master feature table: {features.shape[0]:,} rows Γ {features.shape[1]} columns") | |
| return features | |
| def run_pipeline(self): | |
| """Execute complete feature engineering pipeline""" | |
| print("\n" + "="*80) | |
| print("COMPREHENSIVE FEATURE ENGINEERING PIPELINE") | |
| print("="*80) | |
| # Load data | |
| data = self.load_data() | |
| # Create master feature table | |
| features = self.create_master_feature_table(data) | |
| # Save | |
| output_path = 'data/processed/comprehensive_features.csv' | |
| features.to_csv(output_path, index=False) | |
| print(f"\nβ Saved comprehensive features to: {output_path}") | |
| print(f"\nπ Feature Summary:") | |
| print(f" - Total Features: {features.shape[1]}") | |
| print(f" - Total Records: {features.shape[0]:,}") | |
| print(f" - Data Quality: {(1 - features.isnull().sum().sum() / (features.shape[0] * features.shape[1])) * 100:.1f}% complete") | |
| print("\n" + "="*80) | |
| return features | |
| def main(): | |
| """Run comprehensive feature engineering""" | |
| engineer = ComprehensiveFeatureEngineer() | |
| features = engineer.run_pipeline() | |
| return features | |
| if __name__ == "__main__": | |
| main() | |