import os import random import json import csv import io from flask import Flask, render_template, jsonify, request from collections import defaultdict app = Flask(__name__) app.secret_key = os.urandom(24) app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max upload # Configuration CHANNELS = ['Paid Search', 'Social Ads', 'Email', 'Direct', 'Referral', 'Display'] MAX_JOURNEY_LENGTH = 5 def generate_mock_data(count=1000): """Generate synthetic user journeys.""" journeys = [] for _ in range(count): # Random journey length 1-5 length = random.randint(1, MAX_JOURNEY_LENGTH) # Random path path = [random.choice(CHANNELS) for _ in range(length)] # Random conversion (20% chance) converted = random.random() < 0.2 value = 100 if converted else 0 journeys.append({ 'path': path, 'converted': converted, 'value': value }) return journeys def calculate_attribution(journeys, model): """ Calculate attribution value for each channel based on the selected model. Models: 'last_click', 'first_click', 'linear', 'time_decay', 'position_based' """ channel_values = defaultdict(float) total_conversions = 0 total_revenue = 0 for journey in journeys: # Ensure robust data types converted = bool(journey.get('converted', False)) if not converted: continue path = journey.get('path', []) if not path: continue value = float(journey.get('value', 0)) total_conversions += 1 total_revenue += value if model == 'last_click': if path: channel_values[path[-1]] += value elif model == 'first_click': if path: channel_values[path[0]] += value elif model == 'linear': weight = value / len(path) for touch in path: channel_values[touch] += weight elif model == 'time_decay': # Exponential decay: 2^(-x) where x is distance from conversion weights = [2 ** -(len(path) - 1 - i) for i in range(len(path))] total_weight = sum(weights) if total_weight > 0: normalized_weights = [w / total_weight * value for w in weights] for i, touch in enumerate(path): channel_values[touch] += normalized_weights[i] elif model == 'position_based': # 40% first, 40% last, 20% middle distributed if len(path) == 1: channel_values[path[0]] += value elif len(path) == 2: channel_values[path[0]] += value * 0.5 channel_values[path[1]] += value * 0.5 else: channel_values[path[0]] += value * 0.4 channel_values[path[-1]] += value * 0.4 middle_weight = (value * 0.2) / (len(path) - 2) for touch in path[1:-1]: channel_values[touch] += middle_weight return { 'breakdown': dict(channel_values), 'total_conversions': total_conversions, 'total_revenue': total_revenue } def get_top_paths(journeys, limit=10): """Aggregate common paths for Sankey diagram.""" path_counts = defaultdict(int) for journey in journeys: path = journey.get('path', []) converted = journey.get('converted', False) if not path: continue # Convert list to tuple for hashing path_tuple = tuple(path + ['Conversion' if converted else 'Dropoff']) path_counts[path_tuple] += 1 sorted_paths = sorted(path_counts.items(), key=lambda x: x[1], reverse=True)[:limit] # Format for ECharts Sankey nodes = set() links = [] for path, count in sorted_paths: for i in range(len(path) - 1): src_node = f"{path[i]} (Step {i+1})" tgt_node = f"{path[i+1]} (Step {i+2})" if path[i+1] in ['Conversion', 'Dropoff']: tgt_node = path[i+1] nodes.add(src_node) nodes.add(tgt_node) # Check if link exists found = False for link in links: if link['source'] == src_node and link['target'] == tgt_node: link['value'] += count found = True break if not found: links.append({'source': src_node, 'target': tgt_node, 'value': count}) return { 'nodes': [{'name': n} for n in list(nodes)], 'links': links } def parse_uploaded_file(file): """Parse CSV or JSON file into standard journey format.""" filename = file.filename.lower() journeys = [] try: if filename.endswith('.json'): content = json.load(file) # Expect list of dicts if isinstance(content, list): journeys = content else: raise ValueError("JSON must be a list of journey objects") elif filename.endswith('.csv'): # Read CSV stream = io.StringIO(file.stream.read().decode("UTF8"), newline=None) reader = csv.DictReader(stream) for row in reader: # Heuristic to find path column path_str = row.get('path') or row.get('touchpoints') or row.get('channels') if not path_str: continue # Try to parse path string (e.g. "A > B > C" or "A,B,C") if '>' in path_str: path = [p.strip() for p in path_str.split('>')] else: path = [p.strip() for p in path_str.split(',')] # Conversion conv_str = str(row.get('converted', '0')).lower() converted = conv_str in ['true', '1', 'yes', 'on'] # Value try: value = float(row.get('value', 0)) except: value = 0 journeys.append({ 'path': path, 'converted': converted, 'value': value }) else: raise ValueError("Unsupported file type. Please upload .csv or .json") except Exception as e: raise ValueError(f"Error parsing file: {str(e)}") if not journeys: raise ValueError("No valid journey data found in file") return journeys @app.route('/') def index(): return render_template('index.html') @app.route('/api/analyze', methods=['POST']) def analyze(): try: data = request.json sample_size = int(data.get('sample_size', 1000)) # Generate data journeys = generate_mock_data(sample_size) # Calculate for all models results = {} models = ['last_click', 'first_click', 'linear', 'time_decay', 'position_based'] for m in models: results[m] = calculate_attribution(journeys, m) # Get Sankey data sankey_data = get_top_paths(journeys, limit=20) return jsonify({ 'attribution_results': results, 'sankey_data': sankey_data, 'journey_count': len(journeys) }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/upload', methods=['POST']) def upload_file(): try: if 'file' not in request.files: return jsonify({'error': 'No file part'}), 400 file = request.files['file'] if file.filename == '': return jsonify({'error': 'No selected file'}), 400 journeys = parse_uploaded_file(file) # Limit processing for performance if too large if len(journeys) > 50000: journeys = journeys[:50000] # Calculate for all models results = {} models = ['last_click', 'first_click', 'linear', 'time_decay', 'position_based'] for m in models: results[m] = calculate_attribution(journeys, m) # Get Sankey data sankey_data = get_top_paths(journeys, limit=30) return jsonify({ 'attribution_results': results, 'sankey_data': sankey_data, 'journey_count': len(journeys) }) except ValueError as e: return jsonify({'error': str(e)}), 400 except Exception as e: return jsonify({'error': f"Internal error: {str(e)}"}), 500 if __name__ == '__main__': app.run(host='0.0.0.0', port=7860)