Spaces:
Sleeping
Sleeping
| import os | |
| import random | |
| import json | |
| import csv | |
| import io | |
| from flask import Flask, render_template, jsonify, request | |
| from collections import defaultdict | |
| app = Flask(__name__) | |
| app.secret_key = os.urandom(24) | |
| app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max upload | |
| # Configuration | |
| CHANNELS = ['Paid Search', 'Social Ads', 'Email', 'Direct', 'Referral', 'Display'] | |
| MAX_JOURNEY_LENGTH = 5 | |
| def generate_mock_data(count=1000): | |
| """Generate synthetic user journeys.""" | |
| journeys = [] | |
| for _ in range(count): | |
| # Random journey length 1-5 | |
| length = random.randint(1, MAX_JOURNEY_LENGTH) | |
| # Random path | |
| path = [random.choice(CHANNELS) for _ in range(length)] | |
| # Random conversion (20% chance) | |
| converted = random.random() < 0.2 | |
| value = 100 if converted else 0 | |
| journeys.append({ | |
| 'path': path, | |
| 'converted': converted, | |
| 'value': value | |
| }) | |
| return journeys | |
| def calculate_attribution(journeys, model): | |
| """ | |
| Calculate attribution value for each channel based on the selected model. | |
| Models: 'last_click', 'first_click', 'linear', 'time_decay', 'position_based' | |
| """ | |
| channel_values = defaultdict(float) | |
| total_conversions = 0 | |
| total_revenue = 0 | |
| for journey in journeys: | |
| # Ensure robust data types | |
| converted = bool(journey.get('converted', False)) | |
| if not converted: | |
| continue | |
| path = journey.get('path', []) | |
| if not path: | |
| continue | |
| value = float(journey.get('value', 0)) | |
| total_conversions += 1 | |
| total_revenue += value | |
| if model == 'last_click': | |
| if path: | |
| channel_values[path[-1]] += value | |
| elif model == 'first_click': | |
| if path: | |
| channel_values[path[0]] += value | |
| elif model == 'linear': | |
| weight = value / len(path) | |
| for touch in path: | |
| channel_values[touch] += weight | |
| elif model == 'time_decay': | |
| # Exponential decay: 2^(-x) where x is distance from conversion | |
| weights = [2 ** -(len(path) - 1 - i) for i in range(len(path))] | |
| total_weight = sum(weights) | |
| if total_weight > 0: | |
| normalized_weights = [w / total_weight * value for w in weights] | |
| for i, touch in enumerate(path): | |
| channel_values[touch] += normalized_weights[i] | |
| elif model == 'position_based': | |
| # 40% first, 40% last, 20% middle distributed | |
| if len(path) == 1: | |
| channel_values[path[0]] += value | |
| elif len(path) == 2: | |
| channel_values[path[0]] += value * 0.5 | |
| channel_values[path[1]] += value * 0.5 | |
| else: | |
| channel_values[path[0]] += value * 0.4 | |
| channel_values[path[-1]] += value * 0.4 | |
| middle_weight = (value * 0.2) / (len(path) - 2) | |
| for touch in path[1:-1]: | |
| channel_values[touch] += middle_weight | |
| return { | |
| 'breakdown': dict(channel_values), | |
| 'total_conversions': total_conversions, | |
| 'total_revenue': total_revenue | |
| } | |
| def get_top_paths(journeys, limit=10): | |
| """Aggregate common paths for Sankey diagram.""" | |
| path_counts = defaultdict(int) | |
| for journey in journeys: | |
| path = journey.get('path', []) | |
| converted = journey.get('converted', False) | |
| if not path: | |
| continue | |
| # Convert list to tuple for hashing | |
| path_tuple = tuple(path + ['Conversion' if converted else 'Dropoff']) | |
| path_counts[path_tuple] += 1 | |
| sorted_paths = sorted(path_counts.items(), key=lambda x: x[1], reverse=True)[:limit] | |
| # Format for ECharts Sankey | |
| nodes = set() | |
| links = [] | |
| for path, count in sorted_paths: | |
| for i in range(len(path) - 1): | |
| src_node = f"{path[i]} (Step {i+1})" | |
| tgt_node = f"{path[i+1]} (Step {i+2})" | |
| if path[i+1] in ['Conversion', 'Dropoff']: | |
| tgt_node = path[i+1] | |
| nodes.add(src_node) | |
| nodes.add(tgt_node) | |
| # Check if link exists | |
| found = False | |
| for link in links: | |
| if link['source'] == src_node and link['target'] == tgt_node: | |
| link['value'] += count | |
| found = True | |
| break | |
| if not found: | |
| links.append({'source': src_node, 'target': tgt_node, 'value': count}) | |
| return { | |
| 'nodes': [{'name': n} for n in list(nodes)], | |
| 'links': links | |
| } | |
| def parse_uploaded_file(file): | |
| """Parse CSV or JSON file into standard journey format.""" | |
| filename = file.filename.lower() | |
| journeys = [] | |
| try: | |
| if filename.endswith('.json'): | |
| content = json.load(file) | |
| # Expect list of dicts | |
| if isinstance(content, list): | |
| journeys = content | |
| else: | |
| raise ValueError("JSON must be a list of journey objects") | |
| elif filename.endswith('.csv'): | |
| # Read CSV | |
| stream = io.StringIO(file.stream.read().decode("UTF8"), newline=None) | |
| reader = csv.DictReader(stream) | |
| for row in reader: | |
| # Heuristic to find path column | |
| path_str = row.get('path') or row.get('touchpoints') or row.get('channels') | |
| if not path_str: | |
| continue | |
| # Try to parse path string (e.g. "A > B > C" or "A,B,C") | |
| if '>' in path_str: | |
| path = [p.strip() for p in path_str.split('>')] | |
| else: | |
| path = [p.strip() for p in path_str.split(',')] | |
| # Conversion | |
| conv_str = str(row.get('converted', '0')).lower() | |
| converted = conv_str in ['true', '1', 'yes', 'on'] | |
| # Value | |
| try: | |
| value = float(row.get('value', 0)) | |
| except: | |
| value = 0 | |
| journeys.append({ | |
| 'path': path, | |
| 'converted': converted, | |
| 'value': value | |
| }) | |
| else: | |
| raise ValueError("Unsupported file type. Please upload .csv or .json") | |
| except Exception as e: | |
| raise ValueError(f"Error parsing file: {str(e)}") | |
| if not journeys: | |
| raise ValueError("No valid journey data found in file") | |
| return journeys | |
| def index(): | |
| return render_template('index.html') | |
| def analyze(): | |
| try: | |
| data = request.json | |
| sample_size = int(data.get('sample_size', 1000)) | |
| # Generate data | |
| journeys = generate_mock_data(sample_size) | |
| # Calculate for all models | |
| results = {} | |
| models = ['last_click', 'first_click', 'linear', 'time_decay', 'position_based'] | |
| for m in models: | |
| results[m] = calculate_attribution(journeys, m) | |
| # Get Sankey data | |
| sankey_data = get_top_paths(journeys, limit=20) | |
| return jsonify({ | |
| 'attribution_results': results, | |
| 'sankey_data': sankey_data, | |
| 'journey_count': len(journeys) | |
| }) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| def upload_file(): | |
| try: | |
| if 'file' not in request.files: | |
| return jsonify({'error': 'No file part'}), 400 | |
| file = request.files['file'] | |
| if file.filename == '': | |
| return jsonify({'error': 'No selected file'}), 400 | |
| journeys = parse_uploaded_file(file) | |
| # Limit processing for performance if too large | |
| if len(journeys) > 50000: | |
| journeys = journeys[:50000] | |
| # Calculate for all models | |
| results = {} | |
| models = ['last_click', 'first_click', 'linear', 'time_decay', 'position_based'] | |
| for m in models: | |
| results[m] = calculate_attribution(journeys, m) | |
| # Get Sankey data | |
| sankey_data = get_top_paths(journeys, limit=30) | |
| return jsonify({ | |
| 'attribution_results': results, | |
| 'sankey_data': sankey_data, | |
| 'journey_count': len(journeys) | |
| }) | |
| except ValueError as e: | |
| return jsonify({'error': str(e)}), 400 | |
| except Exception as e: | |
| return jsonify({'error': f"Internal error: {str(e)}"}), 500 | |
| if __name__ == '__main__': | |
| app.run(host='0.0.0.0', port=7860) | |