Spaces:
Sleeping
Sleeping
File size: 9,066 Bytes
bb3c41b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 |
import os
import random
import json
import csv
import io
from flask import Flask, render_template, jsonify, request
from collections import defaultdict
app = Flask(__name__)
app.secret_key = os.urandom(24)
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max upload
# Configuration
CHANNELS = ['Paid Search', 'Social Ads', 'Email', 'Direct', 'Referral', 'Display']
MAX_JOURNEY_LENGTH = 5
def generate_mock_data(count=1000):
"""Generate synthetic user journeys."""
journeys = []
for _ in range(count):
# Random journey length 1-5
length = random.randint(1, MAX_JOURNEY_LENGTH)
# Random path
path = [random.choice(CHANNELS) for _ in range(length)]
# Random conversion (20% chance)
converted = random.random() < 0.2
value = 100 if converted else 0
journeys.append({
'path': path,
'converted': converted,
'value': value
})
return journeys
def calculate_attribution(journeys, model):
"""
Calculate attribution value for each channel based on the selected model.
Models: 'last_click', 'first_click', 'linear', 'time_decay', 'position_based'
"""
channel_values = defaultdict(float)
total_conversions = 0
total_revenue = 0
for journey in journeys:
# Ensure robust data types
converted = bool(journey.get('converted', False))
if not converted:
continue
path = journey.get('path', [])
if not path:
continue
value = float(journey.get('value', 0))
total_conversions += 1
total_revenue += value
if model == 'last_click':
if path:
channel_values[path[-1]] += value
elif model == 'first_click':
if path:
channel_values[path[0]] += value
elif model == 'linear':
weight = value / len(path)
for touch in path:
channel_values[touch] += weight
elif model == 'time_decay':
# Exponential decay: 2^(-x) where x is distance from conversion
weights = [2 ** -(len(path) - 1 - i) for i in range(len(path))]
total_weight = sum(weights)
if total_weight > 0:
normalized_weights = [w / total_weight * value for w in weights]
for i, touch in enumerate(path):
channel_values[touch] += normalized_weights[i]
elif model == 'position_based':
# 40% first, 40% last, 20% middle distributed
if len(path) == 1:
channel_values[path[0]] += value
elif len(path) == 2:
channel_values[path[0]] += value * 0.5
channel_values[path[1]] += value * 0.5
else:
channel_values[path[0]] += value * 0.4
channel_values[path[-1]] += value * 0.4
middle_weight = (value * 0.2) / (len(path) - 2)
for touch in path[1:-1]:
channel_values[touch] += middle_weight
return {
'breakdown': dict(channel_values),
'total_conversions': total_conversions,
'total_revenue': total_revenue
}
def get_top_paths(journeys, limit=10):
"""Aggregate common paths for Sankey diagram."""
path_counts = defaultdict(int)
for journey in journeys:
path = journey.get('path', [])
converted = journey.get('converted', False)
if not path:
continue
# Convert list to tuple for hashing
path_tuple = tuple(path + ['Conversion' if converted else 'Dropoff'])
path_counts[path_tuple] += 1
sorted_paths = sorted(path_counts.items(), key=lambda x: x[1], reverse=True)[:limit]
# Format for ECharts Sankey
nodes = set()
links = []
for path, count in sorted_paths:
for i in range(len(path) - 1):
src_node = f"{path[i]} (Step {i+1})"
tgt_node = f"{path[i+1]} (Step {i+2})"
if path[i+1] in ['Conversion', 'Dropoff']:
tgt_node = path[i+1]
nodes.add(src_node)
nodes.add(tgt_node)
# Check if link exists
found = False
for link in links:
if link['source'] == src_node and link['target'] == tgt_node:
link['value'] += count
found = True
break
if not found:
links.append({'source': src_node, 'target': tgt_node, 'value': count})
return {
'nodes': [{'name': n} for n in list(nodes)],
'links': links
}
def parse_uploaded_file(file):
"""Parse CSV or JSON file into standard journey format."""
filename = file.filename.lower()
journeys = []
try:
if filename.endswith('.json'):
content = json.load(file)
# Expect list of dicts
if isinstance(content, list):
journeys = content
else:
raise ValueError("JSON must be a list of journey objects")
elif filename.endswith('.csv'):
# Read CSV
stream = io.StringIO(file.stream.read().decode("UTF8"), newline=None)
reader = csv.DictReader(stream)
for row in reader:
# Heuristic to find path column
path_str = row.get('path') or row.get('touchpoints') or row.get('channels')
if not path_str:
continue
# Try to parse path string (e.g. "A > B > C" or "A,B,C")
if '>' in path_str:
path = [p.strip() for p in path_str.split('>')]
else:
path = [p.strip() for p in path_str.split(',')]
# Conversion
conv_str = str(row.get('converted', '0')).lower()
converted = conv_str in ['true', '1', 'yes', 'on']
# Value
try:
value = float(row.get('value', 0))
except:
value = 0
journeys.append({
'path': path,
'converted': converted,
'value': value
})
else:
raise ValueError("Unsupported file type. Please upload .csv or .json")
except Exception as e:
raise ValueError(f"Error parsing file: {str(e)}")
if not journeys:
raise ValueError("No valid journey data found in file")
return journeys
@app.route('/')
def index():
return render_template('index.html')
@app.route('/api/analyze', methods=['POST'])
def analyze():
try:
data = request.json
sample_size = int(data.get('sample_size', 1000))
# Generate data
journeys = generate_mock_data(sample_size)
# Calculate for all models
results = {}
models = ['last_click', 'first_click', 'linear', 'time_decay', 'position_based']
for m in models:
results[m] = calculate_attribution(journeys, m)
# Get Sankey data
sankey_data = get_top_paths(journeys, limit=20)
return jsonify({
'attribution_results': results,
'sankey_data': sankey_data,
'journey_count': len(journeys)
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/upload', methods=['POST'])
def upload_file():
try:
if 'file' not in request.files:
return jsonify({'error': 'No file part'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'No selected file'}), 400
journeys = parse_uploaded_file(file)
# Limit processing for performance if too large
if len(journeys) > 50000:
journeys = journeys[:50000]
# Calculate for all models
results = {}
models = ['last_click', 'first_click', 'linear', 'time_decay', 'position_based']
for m in models:
results[m] = calculate_attribution(journeys, m)
# Get Sankey data
sankey_data = get_top_paths(journeys, limit=30)
return jsonify({
'attribution_results': results,
'sankey_data': sankey_data,
'journey_count': len(journeys)
})
except ValueError as e:
return jsonify({'error': str(e)}), 400
except Exception as e:
return jsonify({'error': f"Internal error: {str(e)}"}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860)
|