import os import math import random import logging from typing import List, Dict, Tuple from flask import Flask, render_template, send_from_directory, jsonify, request # Logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger("audience-segmentation-lab") def ensure_dirs(): try: os.makedirs('static', exist_ok=True) os.makedirs('templates', exist_ok=True) except OSError as e: logger.warning(f"Create dir failed: {e}") class CustomFlask(Flask): jinja_options = Flask.jinja_options.copy() jinja_options.update(dict( block_start_string='<%', block_end_string='%>', variable_start_string='[[', variable_end_string=']]', comment_start_string='<#', comment_end_string='#>', )) app = CustomFlask(__name__, static_folder='static', template_folder='templates') ensure_dirs() app.config['MAX_CONTENT_LENGTH'] = 5 * 1024 * 1024 # ---------- Data Utilities ---------- FEATURES = ["age", "income", "clicks", "sessions", "purchase_amount"] def generate_sample_data(n: int = 240) -> List[Dict]: random.seed(42) data = [] # Create 3 archetypes to form visible clusters for i in range(n): g = i % 3 if g == 0: age = random.randint(22, 35) income = random.randint(6000, 15000) clicks = random.randint(10, 40) sessions = random.randint(5, 18) purchase = random.randint(200, 1200) elif g == 1: age = random.randint(30, 50) income = random.randint(12000, 30000) clicks = random.randint(2, 12) sessions = random.randint(2, 8) purchase = random.randint(1000, 5000) else: age = random.randint(18, 28) income = random.randint(3000, 9000) clicks = random.randint(20, 60) sessions = random.randint(8, 24) purchase = random.randint(50, 600) data.append({ "id": i + 1, "age": age, "income": income, "clicks": clicks, "sessions": sessions, "purchase_amount": purchase }) return data def to_matrix(items: List[Dict]) -> List[List[float]]: return [[float(item[f]) for f in FEATURES] for item in items] def min_max_scale(X: List[List[float]]) -> Tuple[List[List[float]], List[float], List[float]]: if not X: return [], [], [] d = len(X[0]) mins = [min(row[j] for row in X) for j in range(d)] maxs = [max(row[j] for row in X) for j in range(d)] scaled = [] for row in X: scaled.append([(row[j] - mins[j]) / (maxs[j] - mins[j] + 1e-9) for j in range(d)]) return scaled, mins, maxs def euclidean(a: List[float], b: List[float]) -> float: return math.sqrt(sum((x - y) ** 2 for x, y in zip(a, b))) def kmeans(X: List[List[float]], k: int, max_iters: int = 50) -> Tuple[List[int], List[List[float]]]: if not X or k <= 0: return [], [] random.seed(123) n = len(X) # Initialize centroids as random points centroids = [X[idx][:] for idx in random.sample(range(n), min(k, n))] assignments = [0] * n for _ in range(max_iters): changed = False # Assign for i in range(n): distances = [euclidean(X[i], c) for c in centroids] new_c = int(min(range(len(centroids)), key=lambda j: distances[j])) if assignments[i] != new_c: assignments[i] = new_c changed = True # Update new_centroids = [] for c_idx in range(len(centroids)): members = [X[i] for i in range(n) if assignments[i] == c_idx] if members: d = len(X[0]) new_centroids.append([sum(m[j] for m in members) / len(members) for j in range(d)]) else: # Reinitialize empty cluster centroid new_centroids.append(X[random.randint(0, n - 1)][:]) centroids = new_centroids if not changed: break return assignments, centroids def cluster_profiles(items: List[Dict], assignments: List[int], k: int) -> List[Dict]: profiles = [] for c in range(k): members = [items[i] for i in range(len(items)) if assignments[i] == c] if not members: profiles.append({"cluster": c, "size": 0, "means": {f: 0 for f in FEATURES}, "strategy": "数据不足"}) continue means = {f: sum(m[f] for m in members) / len(members) for f in FEATURES} # Simple heuristic strategy suggestion engagement = (means["clicks"] + means["sessions"]) / 2.0 value = means["purchase_amount"] income = means["income"] if value > 1500 and engagement >= 8: strategy = "主推高客单价产品 + 专属折扣" elif value > 800 and engagement >= 5: strategy = "提升复购率:会员积分、订阅优惠" elif engagement < 6 and income >= 10000: strategy = "教育型内容 + 品牌信任建设(提高点击与会话)" else: strategy = "引导新手:新手礼包、首次下单优惠" budget_pct = max(5, min(50, int((value / 2000.0 + engagement / 20.0) * 50))) profiles.append({ "cluster": c, "size": len(members), "means": means, "strategy": strategy, "recommended_budget_pct": budget_pct }) return profiles # ---------- Routes ---------- @app.route('/') def index(): return render_template('index.html') @app.route('/static/') def serve_static(path): return send_from_directory('static', path) @app.route('/health') def health(): return jsonify(status='ok'), 200 @app.errorhandler(404) def page_not_found(e): return render_template('index.html'), 200 @app.errorhandler(500) def internal_error(e): return jsonify(error=str(e)), 500 @app.route('/api/sample', methods=['GET']) def api_sample(): n = int(request.args.get('n', 240)) return jsonify(data=generate_sample_data(n)) @app.route('/api/segment', methods=['POST']) def api_segment(): payload = request.get_json(silent=True) or {} items = payload.get('data') or generate_sample_data(240) k = int(payload.get('k') or 3) X = to_matrix(items) X_scaled, mins, maxs = min_max_scale(X) assignments, centroids = kmeans(X_scaled, k) profiles = cluster_profiles(items, assignments, k) return jsonify( ok=True, k=k, assignments=assignments, centroids=centroids, profiles=profiles, mins=mins, maxs=maxs ) if __name__ == '__main__': port = int(os.environ.get('PORT', 7860)) app.run(host='0.0.0.0', port=port)