Spaces:
Sleeping
Sleeping
| """ | |
| Time Series Forecasting Engine for Telecom Analytics | |
| Provides forecasting methods for all four domains: | |
| 1. Seasonal Usage Patterns | |
| 2. Technology Adoption Curves | |
| 3. Competitive Market Dynamics | |
| 4. Economic Impact Forecasting | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| from datetime import datetime, timedelta | |
| import os | |
| import json | |
| # --------------------------------------------------------------------------- | |
| # Helpers | |
| # --------------------------------------------------------------------------- | |
| def _moving_average_forecast(series, window=3, forecast_periods=12): | |
| """Weighted moving average forecast.""" | |
| values = series.values.astype(float) | |
| weights = np.arange(1, window + 1, dtype=float) | |
| weights /= weights.sum() | |
| forecasts = list(values) | |
| for _ in range(forecast_periods): | |
| recent = np.array(forecasts[-window:]) | |
| forecasts.append(float(np.dot(recent, weights))) | |
| return np.array(forecasts[-forecast_periods:]) | |
| def _seasonal_decompose_forecast(series, period=12, forecast_periods=12): | |
| """Simple seasonal decomposition + trend extrapolation.""" | |
| values = series.values.astype(float) | |
| n = len(values) | |
| # Trend via centered moving average | |
| if n >= period: | |
| trend = pd.Series(values).rolling(window=period, center=True).mean().values | |
| # Fill edges | |
| for i in range(n): | |
| if np.isnan(trend[i]): | |
| trend[i] = values[i] | |
| else: | |
| trend = values.copy() | |
| # Seasonal component | |
| detrended = values - trend | |
| seasonal = np.zeros(period) | |
| for i in range(period): | |
| indices = list(range(i, n, period)) | |
| seasonal[i] = np.mean(detrended[indices]) | |
| # Extrapolate trend linearly | |
| x = np.arange(n) | |
| valid = ~np.isnan(trend) | |
| if valid.sum() > 1: | |
| coeffs = np.polyfit(x[valid], trend[valid], 1) | |
| else: | |
| coeffs = [0, values[-1]] | |
| forecast_trend = np.polyval(coeffs, np.arange(n, n + forecast_periods)) | |
| forecast_seasonal = np.tile(seasonal, (forecast_periods // period + 2))[:forecast_periods] | |
| forecast = forecast_trend + forecast_seasonal | |
| return forecast | |
| def _logistic_curve_forecast(current_values, k=0.65, forecast_periods=12): | |
| """Forecast S-curve / logistic adoption.""" | |
| values = current_values / 100.0 # convert from pct | |
| n = len(values) | |
| # Fit logistic parameters by least squares grid search | |
| best_err = float('inf') | |
| best_x0 = n // 2 | |
| best_L = 0.2 | |
| for x0_try in range(max(1, n // 4), n + 12): | |
| for L_try in np.arange(0.05, 0.5, 0.05): | |
| t = np.arange(n) | |
| predicted = k / (1 + np.exp(-L_try * (t - x0_try))) | |
| err = np.sum((predicted - values) ** 2) | |
| if err < best_err: | |
| best_err = err | |
| best_x0 = x0_try | |
| best_L = L_try | |
| t_future = np.arange(n, n + forecast_periods) | |
| forecast = k / (1 + np.exp(-best_L * (t_future - best_x0))) | |
| return forecast * 100 # back to pct | |
| def _exponential_smoothing(series, alpha=0.3, forecast_periods=12): | |
| """Simple exponential smoothing forecast.""" | |
| values = series.values.astype(float) | |
| smoothed = [values[0]] | |
| for v in values[1:]: | |
| smoothed.append(alpha * v + (1 - alpha) * smoothed[-1]) | |
| forecasts = [] | |
| last = smoothed[-1] | |
| # Add slight trend | |
| if len(smoothed) > 1: | |
| trend = (smoothed[-1] - smoothed[-6]) / 6 if len(smoothed) >= 6 else 0 | |
| else: | |
| trend = 0 | |
| for i in range(forecast_periods): | |
| next_val = last + trend | |
| forecasts.append(next_val) | |
| last = next_val | |
| return np.array(forecasts) | |
| # --------------------------------------------------------------------------- | |
| # Forecast Functions per Domain | |
| # --------------------------------------------------------------------------- | |
| def forecast_seasonal_usage(seasonal_df, forecast_months=12): | |
| """Forecast seasonal usage patterns.""" | |
| last_date = pd.to_datetime(seasonal_df['date']).max() | |
| forecast_dates = pd.date_range(start=last_date + pd.DateOffset(months=1), | |
| periods=forecast_months, freq='ME') | |
| # Forecast each metric | |
| data_usage_forecast = _seasonal_decompose_forecast( | |
| seasonal_df['avg_data_usage_gb'], period=12, forecast_periods=forecast_months) | |
| voice_forecast = _seasonal_decompose_forecast( | |
| seasonal_df['avg_voice_minutes'], period=12, forecast_periods=forecast_months) | |
| network_load_forecast = _moving_average_forecast( | |
| seasonal_df['network_load_factor'], window=4, forecast_periods=forecast_months) | |
| network_load_forecast = np.clip(network_load_forecast, 0.3, 0.98) | |
| peak_users_forecast = _seasonal_decompose_forecast( | |
| seasonal_df['peak_concurrent_users'], period=12, forecast_periods=forecast_months) | |
| # Predict holiday months | |
| holiday_flags = [] | |
| for d in forecast_dates: | |
| holiday_flags.append(1 if d.month in [2, 7, 9, 11, 12] else 0) | |
| # Confidence intervals (wider for further out) | |
| ci_width = np.linspace(0.05, 0.20, forecast_months) | |
| result = { | |
| 'dates': [d.strftime('%Y-%m') for d in forecast_dates], | |
| 'data_usage': { | |
| 'forecast': np.round(np.maximum(data_usage_forecast, 2), 2).tolist(), | |
| 'upper': np.round(data_usage_forecast * (1 + ci_width), 2).tolist(), | |
| 'lower': np.round(data_usage_forecast * (1 - ci_width), 2).tolist(), | |
| }, | |
| 'voice_minutes': { | |
| 'forecast': np.round(np.maximum(voice_forecast, 50), 1).tolist(), | |
| 'upper': np.round(voice_forecast * (1 + ci_width * 0.8), 1).tolist(), | |
| 'lower': np.round(voice_forecast * (1 - ci_width * 0.8), 1).tolist(), | |
| }, | |
| 'network_load': { | |
| 'forecast': np.round(network_load_forecast, 3).tolist(), | |
| 'upper': np.round(np.clip(network_load_forecast + ci_width * 0.3, 0, 1), 3).tolist(), | |
| 'lower': np.round(np.clip(network_load_forecast - ci_width * 0.3, 0, 1), 3).tolist(), | |
| }, | |
| 'peak_users': { | |
| 'forecast': np.round(np.maximum(peak_users_forecast, 10000)).astype(int).tolist(), | |
| 'upper': np.round(peak_users_forecast * (1 + ci_width)).astype(int).tolist(), | |
| 'lower': np.round(peak_users_forecast * (1 - ci_width)).astype(int).tolist(), | |
| }, | |
| 'holiday_months': holiday_flags, | |
| } | |
| # Historical data for chart context | |
| hist = seasonal_df.tail(12) | |
| result['historical'] = { | |
| 'dates': [pd.to_datetime(d).strftime('%Y-%m') for d in hist['date']], | |
| 'data_usage': hist['avg_data_usage_gb'].round(2).tolist(), | |
| 'voice_minutes': hist['avg_voice_minutes'].round(1).tolist(), | |
| 'network_load': hist['network_load_factor'].round(3).tolist(), | |
| 'peak_users': hist['peak_concurrent_users'].tolist(), | |
| } | |
| return result | |
| def forecast_tech_adoption(tech_df, forecast_months=12): | |
| """Forecast 5G adoption and technology migration.""" | |
| last_date = pd.to_datetime(tech_df['date']).max() | |
| forecast_dates = pd.date_range(start=last_date + pd.DateOffset(months=1), | |
| periods=forecast_months, freq='ME') | |
| # 5G adoption via logistic curve | |
| five_g_forecast = _logistic_curve_forecast( | |
| tech_df['five_g_adoption_pct'].values, k=0.65, forecast_periods=forecast_months) | |
| # Tower deployment trend | |
| towers_forecast = _exponential_smoothing( | |
| tech_df['five_g_towers_cumulative'], alpha=0.4, forecast_periods=forecast_months) | |
| towers_forecast = np.maximum(towers_forecast, tech_df['five_g_towers_cumulative'].iloc[-1]) | |
| # Speed improvement | |
| speed_forecast = _moving_average_forecast( | |
| tech_df['avg_5g_speed_mbps'], window=4, forecast_periods=forecast_months) | |
| # Revenue premium | |
| premium_forecast = _exponential_smoothing( | |
| tech_df['five_g_revenue_premium_pct'], alpha=0.35, forecast_periods=forecast_months) | |
| # 4G and 3G derived | |
| four_g_forecast = np.maximum(100 - five_g_forecast - 5, 20) # floor at 20% | |
| three_g_forecast = 100 - five_g_forecast - four_g_forecast | |
| ci_width = np.linspace(0.03, 0.15, forecast_months) | |
| result = { | |
| 'dates': [d.strftime('%Y-%m') for d in forecast_dates], | |
| 'five_g_adoption': { | |
| 'forecast': np.round(five_g_forecast, 2).tolist(), | |
| 'upper': np.round(five_g_forecast * (1 + ci_width), 2).tolist(), | |
| 'lower': np.round(np.maximum(five_g_forecast * (1 - ci_width), 0), 2).tolist(), | |
| }, | |
| 'four_g_pct': np.round(four_g_forecast, 2).tolist(), | |
| 'three_g_pct': np.round(np.maximum(three_g_forecast, 1), 2).tolist(), | |
| 'towers_deployed': { | |
| 'forecast': np.round(towers_forecast).astype(int).tolist(), | |
| }, | |
| 'avg_speed': { | |
| 'forecast': np.round(speed_forecast, 1).tolist(), | |
| }, | |
| 'revenue_premium': { | |
| 'forecast': np.round(premium_forecast, 1).tolist(), | |
| }, | |
| } | |
| # Historical | |
| hist = tech_df.tail(12) | |
| result['historical'] = { | |
| 'dates': [pd.to_datetime(d).strftime('%Y-%m') for d in hist['date']], | |
| 'five_g_adoption': hist['five_g_adoption_pct'].round(2).tolist(), | |
| 'four_g_pct': hist['four_g_pct'].round(2).tolist(), | |
| 'three_g_pct': hist['three_g_pct'].round(2).tolist(), | |
| 'towers_deployed': hist['five_g_towers_cumulative'].tolist(), | |
| 'avg_speed': hist['avg_5g_speed_mbps'].round(1).tolist(), | |
| } | |
| return result | |
| def forecast_competitive_dynamics(comp_df, forecast_months=12): | |
| """Forecast competitive market dynamics.""" | |
| last_date = pd.to_datetime(comp_df['date']).max() | |
| forecast_dates = pd.date_range(start=last_date + pd.DateOffset(months=1), | |
| periods=forecast_months, freq='ME') | |
| # Market shares via exponential smoothing | |
| our_share_fc = _exponential_smoothing(comp_df['our_market_share'], alpha=0.35, forecast_periods=forecast_months) | |
| comp_a_fc = _exponential_smoothing(comp_df['competitor_a_share'], alpha=0.35, forecast_periods=forecast_months) | |
| comp_b_fc = _exponential_smoothing(comp_df['competitor_b_share'], alpha=0.35, forecast_periods=forecast_months) | |
| comp_c_fc = 100 - our_share_fc - comp_a_fc - comp_b_fc | |
| # Pricing forecast | |
| our_price_fc = _moving_average_forecast(comp_df['our_avg_price'], window=4, forecast_periods=forecast_months) | |
| market_price_fc = _moving_average_forecast(comp_df['market_avg_price'], window=4, forecast_periods=forecast_months) | |
| # Net subscriber adds | |
| net_adds_fc = _seasonal_decompose_forecast(comp_df['net_subscriber_adds'], period=12, forecast_periods=forecast_months) | |
| # Competitive churn | |
| comp_churn_fc = _moving_average_forecast(comp_df['competitive_churn_pct'], window=4, forecast_periods=forecast_months) | |
| # Pricing war risk assessment (based on price convergence) | |
| price_gap = np.abs(our_price_fc - market_price_fc) | |
| pricing_war_risk = np.clip(1 - price_gap / 10, 0, 1) | |
| ci_width = np.linspace(0.02, 0.12, forecast_months) | |
| result = { | |
| 'dates': [d.strftime('%Y-%m') for d in forecast_dates], | |
| 'market_shares': { | |
| 'ours': np.round(our_share_fc, 2).tolist(), | |
| 'competitor_a': np.round(comp_a_fc, 2).tolist(), | |
| 'competitor_b': np.round(comp_b_fc, 2).tolist(), | |
| 'competitor_c': np.round(np.maximum(comp_c_fc, 5), 2).tolist(), | |
| }, | |
| 'pricing': { | |
| 'our_price': np.round(our_price_fc, 2).tolist(), | |
| 'market_price': np.round(market_price_fc, 2).tolist(), | |
| }, | |
| 'net_adds': { | |
| 'forecast': np.round(net_adds_fc).astype(int).tolist(), | |
| 'upper': np.round(net_adds_fc * (1 + ci_width * 2)).astype(int).tolist(), | |
| 'lower': np.round(net_adds_fc * (1 - ci_width * 2)).astype(int).tolist(), | |
| }, | |
| 'competitive_churn': { | |
| 'forecast': np.round(np.maximum(comp_churn_fc, 0.3), 2).tolist(), | |
| }, | |
| 'pricing_war_risk': np.round(pricing_war_risk * 100, 1).tolist(), | |
| } | |
| # Historical | |
| hist = comp_df.tail(12) | |
| result['historical'] = { | |
| 'dates': [pd.to_datetime(d).strftime('%Y-%m') for d in hist['date']], | |
| 'our_share': hist['our_market_share'].round(2).tolist(), | |
| 'competitor_a': hist['competitor_a_share'].round(2).tolist(), | |
| 'competitor_b': hist['competitor_b_share'].round(2).tolist(), | |
| 'competitor_c': hist['competitor_c_share'].round(2).tolist(), | |
| 'our_price': hist['our_avg_price'].round(2).tolist(), | |
| 'market_price': hist['market_avg_price'].round(2).tolist(), | |
| 'net_adds': hist['net_subscriber_adds'].tolist(), | |
| } | |
| return result | |
| def forecast_economic_impact(econ_df, forecast_months=12): | |
| """Forecast economic impact on telecom behavior.""" | |
| last_date = pd.to_datetime(econ_df['date']).max() | |
| forecast_dates = pd.date_range(start=last_date + pd.DateOffset(months=1), | |
| periods=forecast_months, freq='ME') | |
| # GDP growth | |
| gdp_fc = _exponential_smoothing(econ_df['gdp_growth_rate'], alpha=0.3, forecast_periods=forecast_months) | |
| # Consumer confidence | |
| cci_fc = _exponential_smoothing(econ_df['consumer_confidence_index'], alpha=0.3, forecast_periods=forecast_months) | |
| # Unemployment | |
| unemp_fc = _exponential_smoothing(econ_df['unemployment_rate'], alpha=0.25, forecast_periods=forecast_months) | |
| # ARPU index | |
| arpu_fc = _exponential_smoothing(econ_df['arpu_index'], alpha=0.35, forecast_periods=forecast_months) | |
| # Downgrade rate | |
| downgrade_fc = _moving_average_forecast(econ_df['plan_downgrade_rate'], window=4, forecast_periods=forecast_months) | |
| # Delinquency | |
| delinquency_fc = _moving_average_forecast(econ_df['payment_delinquency_rate'], window=4, forecast_periods=forecast_months) | |
| # Revenue at risk | |
| risk_fc = _exponential_smoothing(econ_df['revenue_at_risk_millions'], alpha=0.3, forecast_periods=forecast_months) | |
| # Sentiment | |
| sentiment_fc = _exponential_smoothing(econ_df['customer_sentiment_index'], alpha=0.3, forecast_periods=forecast_months) | |
| # Recession probability (simple heuristic) | |
| recession_prob = np.clip((1.5 - gdp_fc) / 2.0 * 100, 0, 95) | |
| ci_width = np.linspace(0.03, 0.18, forecast_months) | |
| result = { | |
| 'dates': [d.strftime('%Y-%m') for d in forecast_dates], | |
| 'gdp_growth': { | |
| 'forecast': np.round(gdp_fc, 2).tolist(), | |
| 'upper': np.round(gdp_fc + ci_width * 3, 2).tolist(), | |
| 'lower': np.round(gdp_fc - ci_width * 3, 2).tolist(), | |
| }, | |
| 'consumer_confidence': { | |
| 'forecast': np.round(np.clip(cci_fc, 20, 100), 1).tolist(), | |
| }, | |
| 'unemployment': { | |
| 'forecast': np.round(np.clip(unemp_fc, 2, 12), 1).tolist(), | |
| }, | |
| 'arpu_index': { | |
| 'forecast': np.round(arpu_fc, 1).tolist(), | |
| }, | |
| 'downgrade_rate': { | |
| 'forecast': np.round(np.maximum(downgrade_fc, 0.5), 2).tolist(), | |
| }, | |
| 'delinquency_rate': { | |
| 'forecast': np.round(np.maximum(delinquency_fc, 0.5), 2).tolist(), | |
| }, | |
| 'revenue_at_risk': { | |
| 'forecast': np.round(np.maximum(risk_fc, 0.5), 2).tolist(), | |
| 'upper': np.round(np.maximum(risk_fc, 0.5) * (1 + ci_width), 2).tolist(), | |
| 'lower': np.round(np.maximum(risk_fc, 0.5) * (1 - ci_width), 2).tolist(), | |
| }, | |
| 'sentiment_index': { | |
| 'forecast': np.round(np.clip(sentiment_fc, 10, 100), 1).tolist(), | |
| }, | |
| 'recession_probability': np.round(recession_prob, 1).tolist(), | |
| } | |
| # Historical | |
| hist = econ_df.tail(12) | |
| result['historical'] = { | |
| 'dates': [pd.to_datetime(d).strftime('%Y-%m') for d in hist['date']], | |
| 'gdp_growth': hist['gdp_growth_rate'].round(2).tolist(), | |
| 'consumer_confidence': hist['consumer_confidence_index'].round(1).tolist(), | |
| 'unemployment': hist['unemployment_rate'].round(1).tolist(), | |
| 'arpu_index': hist['arpu_index'].round(1).tolist(), | |
| 'revenue_at_risk': hist['revenue_at_risk_millions'].round(2).tolist(), | |
| 'sentiment_index': hist['customer_sentiment_index'].round(1).tolist(), | |
| } | |
| return result | |
| def get_forecast_summary(seasonal_fc, tech_fc, comp_fc, econ_fc): | |
| """Generate a high-level summary of all forecasts for KPI cards.""" | |
| summary = {} | |
| # Seasonal | |
| usage_trend = seasonal_fc['data_usage']['forecast'] | |
| summary['data_usage_next_month'] = usage_trend[0] | |
| summary['data_usage_growth'] = round((usage_trend[-1] - usage_trend[0]) / usage_trend[0] * 100, 1) | |
| summary['peak_network_load'] = max(seasonal_fc['network_load']['forecast']) | |
| # Tech | |
| five_g = tech_fc['five_g_adoption']['forecast'] | |
| summary['five_g_current'] = tech_fc['historical']['five_g_adoption'][-1] | |
| summary['five_g_forecast_end'] = five_g[-1] | |
| summary['five_g_growth'] = round(five_g[-1] - tech_fc['historical']['five_g_adoption'][-1], 1) | |
| # Competitive | |
| summary['market_share_current'] = comp_fc['historical']['our_share'][-1] | |
| summary['market_share_forecast'] = comp_fc['market_shares']['ours'][-1] | |
| summary['avg_pricing_war_risk'] = round(np.mean(comp_fc['pricing_war_risk']), 1) | |
| # Economic | |
| summary['recession_probability'] = econ_fc['recession_probability'][0] | |
| summary['revenue_at_risk'] = econ_fc['revenue_at_risk']['forecast'][0] | |
| summary['sentiment_forecast'] = econ_fc['sentiment_index']['forecast'][0] | |
| return summary | |