Trae Assistant
feat: upgrade UI, fix delimiters, add file upload, localization
bb3c41b
import os
import random
import json
import csv
import io
from flask import Flask, render_template, jsonify, request
from collections import defaultdict
app = Flask(__name__)
app.secret_key = os.urandom(24)
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max upload
# Configuration
CHANNELS = ['Paid Search', 'Social Ads', 'Email', 'Direct', 'Referral', 'Display']
MAX_JOURNEY_LENGTH = 5
def generate_mock_data(count=1000):
"""Generate synthetic user journeys."""
journeys = []
for _ in range(count):
# Random journey length 1-5
length = random.randint(1, MAX_JOURNEY_LENGTH)
# Random path
path = [random.choice(CHANNELS) for _ in range(length)]
# Random conversion (20% chance)
converted = random.random() < 0.2
value = 100 if converted else 0
journeys.append({
'path': path,
'converted': converted,
'value': value
})
return journeys
def calculate_attribution(journeys, model):
"""
Calculate attribution value for each channel based on the selected model.
Models: 'last_click', 'first_click', 'linear', 'time_decay', 'position_based'
"""
channel_values = defaultdict(float)
total_conversions = 0
total_revenue = 0
for journey in journeys:
# Ensure robust data types
converted = bool(journey.get('converted', False))
if not converted:
continue
path = journey.get('path', [])
if not path:
continue
value = float(journey.get('value', 0))
total_conversions += 1
total_revenue += value
if model == 'last_click':
if path:
channel_values[path[-1]] += value
elif model == 'first_click':
if path:
channel_values[path[0]] += value
elif model == 'linear':
weight = value / len(path)
for touch in path:
channel_values[touch] += weight
elif model == 'time_decay':
# Exponential decay: 2^(-x) where x is distance from conversion
weights = [2 ** -(len(path) - 1 - i) for i in range(len(path))]
total_weight = sum(weights)
if total_weight > 0:
normalized_weights = [w / total_weight * value for w in weights]
for i, touch in enumerate(path):
channel_values[touch] += normalized_weights[i]
elif model == 'position_based':
# 40% first, 40% last, 20% middle distributed
if len(path) == 1:
channel_values[path[0]] += value
elif len(path) == 2:
channel_values[path[0]] += value * 0.5
channel_values[path[1]] += value * 0.5
else:
channel_values[path[0]] += value * 0.4
channel_values[path[-1]] += value * 0.4
middle_weight = (value * 0.2) / (len(path) - 2)
for touch in path[1:-1]:
channel_values[touch] += middle_weight
return {
'breakdown': dict(channel_values),
'total_conversions': total_conversions,
'total_revenue': total_revenue
}
def get_top_paths(journeys, limit=10):
"""Aggregate common paths for Sankey diagram."""
path_counts = defaultdict(int)
for journey in journeys:
path = journey.get('path', [])
converted = journey.get('converted', False)
if not path:
continue
# Convert list to tuple for hashing
path_tuple = tuple(path + ['Conversion' if converted else 'Dropoff'])
path_counts[path_tuple] += 1
sorted_paths = sorted(path_counts.items(), key=lambda x: x[1], reverse=True)[:limit]
# Format for ECharts Sankey
nodes = set()
links = []
for path, count in sorted_paths:
for i in range(len(path) - 1):
src_node = f"{path[i]} (Step {i+1})"
tgt_node = f"{path[i+1]} (Step {i+2})"
if path[i+1] in ['Conversion', 'Dropoff']:
tgt_node = path[i+1]
nodes.add(src_node)
nodes.add(tgt_node)
# Check if link exists
found = False
for link in links:
if link['source'] == src_node and link['target'] == tgt_node:
link['value'] += count
found = True
break
if not found:
links.append({'source': src_node, 'target': tgt_node, 'value': count})
return {
'nodes': [{'name': n} for n in list(nodes)],
'links': links
}
def parse_uploaded_file(file):
"""Parse CSV or JSON file into standard journey format."""
filename = file.filename.lower()
journeys = []
try:
if filename.endswith('.json'):
content = json.load(file)
# Expect list of dicts
if isinstance(content, list):
journeys = content
else:
raise ValueError("JSON must be a list of journey objects")
elif filename.endswith('.csv'):
# Read CSV
stream = io.StringIO(file.stream.read().decode("UTF8"), newline=None)
reader = csv.DictReader(stream)
for row in reader:
# Heuristic to find path column
path_str = row.get('path') or row.get('touchpoints') or row.get('channels')
if not path_str:
continue
# Try to parse path string (e.g. "A > B > C" or "A,B,C")
if '>' in path_str:
path = [p.strip() for p in path_str.split('>')]
else:
path = [p.strip() for p in path_str.split(',')]
# Conversion
conv_str = str(row.get('converted', '0')).lower()
converted = conv_str in ['true', '1', 'yes', 'on']
# Value
try:
value = float(row.get('value', 0))
except:
value = 0
journeys.append({
'path': path,
'converted': converted,
'value': value
})
else:
raise ValueError("Unsupported file type. Please upload .csv or .json")
except Exception as e:
raise ValueError(f"Error parsing file: {str(e)}")
if not journeys:
raise ValueError("No valid journey data found in file")
return journeys
@app.route('/')
def index():
return render_template('index.html')
@app.route('/api/analyze', methods=['POST'])
def analyze():
try:
data = request.json
sample_size = int(data.get('sample_size', 1000))
# Generate data
journeys = generate_mock_data(sample_size)
# Calculate for all models
results = {}
models = ['last_click', 'first_click', 'linear', 'time_decay', 'position_based']
for m in models:
results[m] = calculate_attribution(journeys, m)
# Get Sankey data
sankey_data = get_top_paths(journeys, limit=20)
return jsonify({
'attribution_results': results,
'sankey_data': sankey_data,
'journey_count': len(journeys)
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/upload', methods=['POST'])
def upload_file():
try:
if 'file' not in request.files:
return jsonify({'error': 'No file part'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'No selected file'}), 400
journeys = parse_uploaded_file(file)
# Limit processing for performance if too large
if len(journeys) > 50000:
journeys = journeys[:50000]
# Calculate for all models
results = {}
models = ['last_click', 'first_click', 'linear', 'time_decay', 'position_based']
for m in models:
results[m] = calculate_attribution(journeys, m)
# Get Sankey data
sankey_data = get_top_paths(journeys, limit=30)
return jsonify({
'attribution_results': results,
'sankey_data': sankey_data,
'journey_count': len(journeys)
})
except ValueError as e:
return jsonify({'error': str(e)}), 400
except Exception as e:
return jsonify({'error': f"Internal error: {str(e)}"}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860)