File size: 6,991 Bytes
1dfcad5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
from flask import Blueprint, render_template, request, flash, redirect, url_for
import pandas as pd
import numpy as np
import json
campaign_analysis_bp = Blueprint('campaign_analysis', __name__, url_prefix='/campaign')
_current_sales_df = None
_current_campaign_df = None
def normalize(s):
return str(s).strip().lower()
# OPTIONAL: channel mapping between campaign and sales if desired
CHANNEL_MAP = {
'whatsapp': 'online',
'email': 'online',
'mobile app': 'online',
'tv': 'offline',
# add more as needed
}
@campaign_analysis_bp.route('/', methods=['GET'])
def show_campaign_analysis():
return render_template('campaign_analysis.html', title="Campaign Analysis Engine")
@campaign_analysis_bp.route('/upload', methods=['POST'])
def upload_files():
global _current_sales_df, _current_campaign_df
sales_file = request.files.get('sales_file')
campaign_file = request.files.get('campaign_file')
if not sales_file or not campaign_file:
flash('Please upload both sales and campaign CSV files.')
return redirect(url_for('campaign_analysis.show_campaign_analysis'))
try:
_current_sales_df = pd.read_csv(sales_file, parse_dates=['date'], dayfirst=True)
_current_campaign_df = pd.read_csv(campaign_file, parse_dates=['start_date', 'end_date'], dayfirst=True)
except Exception as e:
flash(f'Error reading CSVs: {e}')
return redirect(url_for('campaign_analysis.show_campaign_analysis'))
try:
analysis = analyze_campaigns(_current_sales_df, _current_campaign_df)
plot_data = make_plot_data(analysis['campaigns'])
except Exception as e:
flash(f'Error during analysis: {e}')
return redirect(url_for('campaign_analysis.show_campaign_analysis'))
return render_template('campaign_analysis.html',
title="Campaign Analysis Engine",
campaigns=analysis['campaigns'],
summary=analysis['summary'],
columns=_current_campaign_df.columns.tolist(),
plot_data=plot_data # <--- DO NOT json.dumps here!
)
def make_plot_data(campaigns):
# For bar and line charts
names = [c['name'] if c['name'] else c['id'] for c in campaigns]
roi = [c['roi'] if c['roi']==c['roi'] else 0 for c in campaigns]
uplift = [c['uplift_pct'] if c['uplift_pct']==c['uplift_pct'] else 0 for c in campaigns]
revenue = [c['total_revenue'] for c in campaigns]
start_dates = [str(c['start_date']) for c in campaigns]
types = [c['name'] for c in campaigns]
regions = [c['region'] for c in campaigns]
# Pie chart of types
from collections import Counter
type_counts = Counter(types)
region_counts = Counter(regions)
return {
"bar_uplift": {"x": names, "y": uplift},
"bar_roi": {"x": names, "y": roi},
"line_revenue": {"x": start_dates, "y": revenue, "names": names},
"pie_type": {"labels": list(type_counts.keys()), "values": list(type_counts.values())},
"pie_region": {"labels": list(region_counts.keys()), "values": list(region_counts.values())}
}
def map_campaign_channel_to_sales(campaign_channel):
# Normalize and map campaign channel to sales channel if needed
ch = normalize(campaign_channel)
return CHANNEL_MAP.get(ch, ch)
def analyze_campaigns(sales_df, campaign_df):
"""
For each campaign, compute total units/revenue during the campaign period,
estimate uplift vs. pre-campaign, and simple ROI.
"""
sales_df['channel'] = sales_df['channel'].astype(str)
campaign_results = []
for _, camp in campaign_df.iterrows():
# Split target_skus and iterate
target_skus = [sku.strip() for sku in str(camp['target_skus']).split(',')]
region = camp['region']
# Map campaign channel to sales channel if needed
campaign_channel = camp['channel']
mapped_channel = map_campaign_channel_to_sales(campaign_channel)
start_date = camp['start_date']
end_date = camp['end_date']
# Filter sales for this campaign
sales_mask = (
sales_df['sku'].isin(target_skus)
& (sales_df['region'] == region)
& (sales_df['channel'].str.lower() == mapped_channel)
& (sales_df['date'] >= start_date)
& (sales_df['date'] <= end_date)
)
sales_during = sales_df[sales_mask]
total_units = int(sales_during['sales_units'].sum())
total_revenue = float(sales_during['sales_revenue'].sum())
# Uplift: compare average daily units in campaign vs. 14 days prior
pre_mask = (
sales_df['sku'].isin(target_skus)
& (sales_df['region'] == region)
& (sales_df['channel'].str.lower() == mapped_channel)
& (sales_df['date'] >= (start_date - pd.Timedelta(days=14)))
& (sales_df['date'] < start_date)
)
sales_pre = sales_df[pre_mask]
days_campaign = (end_date - start_date).days + 1
days_pre = (sales_pre['date'].max() - sales_pre['date'].min()).days + 1 if not sales_pre.empty else 1
avg_daily_campaign = total_units / days_campaign if days_campaign > 0 else 0
avg_daily_pre = sales_pre['sales_units'].sum() / days_pre if days_pre > 0 and not sales_pre.empty else 0
uplift_pct = ((avg_daily_campaign - avg_daily_pre) / avg_daily_pre * 100) if avg_daily_pre > 0 else np.nan
# ROI: (Incremental revenue - budget) / budget
budget = camp.get('budget', np.nan)
try:
budget = float(budget)
except Exception:
budget = np.nan
incremental_revenue = total_revenue - sales_pre['sales_revenue'].sum() if not sales_pre.empty else total_revenue
roi = ((incremental_revenue - budget) / budget * 100) if pd.notna(budget) and budget > 0 else np.nan
campaign_results.append({
'id': camp.get('campaign_id', ''),
'name': camp.get('type', ''),
'sku': ','.join(target_skus),
'region': region,
'channel': campaign_channel,
'start_date': start_date.date() if not pd.isnull(start_date) else '',
'end_date': end_date.date() if not pd.isnull(end_date) else '',
'budget': budget if not pd.isnull(budget) else '-',
'total_units': total_units,
'total_revenue': total_revenue,
'uplift_pct': uplift_pct,
'roi': roi
})
# Simple summary for display
summary = {
'total_campaigns': len(campaign_results),
'total_revenue': float(np.nansum([c['total_revenue'] for c in campaign_results])),
'avg_uplift_pct': float(np.nanmean([c['uplift_pct'] for c in campaign_results if not np.isnan(c['uplift_pct'])])) if campaign_results else 0,
'avg_roi': float(np.nanmean([c['roi'] for c in campaign_results if not np.isnan(c['roi'])])) if campaign_results else 0,
}
return {'campaigns': campaign_results, 'summary': summary}
|