Spaces:
Sleeping
Sleeping
| import random | |
| import math | |
| import numpy as np | |
| import pandas as pd | |
| import io | |
| from flask import Flask, jsonify, request, render_template | |
| app = Flask(__name__) | |
| app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB limit | |
| # 模拟退火算法参数 | |
| INITIAL_TEMP = 1000 | |
| COOLING_RATE = 0.995 | |
| MIN_TEMP = 1 | |
| MAX_ITERATIONS = 5000 | |
| class SeatingSolver: | |
| def __init__(self, guests, tables, constraints): | |
| """ | |
| guests: list of {id, name, group, tags} | |
| tables: list of {id, name, capacity} | |
| constraints: list of {guest_id_1, guest_id_2, type: 'must'|'cannot', weight} | |
| """ | |
| self.guests = guests | |
| self.tables = tables | |
| self.constraints = constraints | |
| self.guest_map = {g['id']: g for g in guests} | |
| # 预处理关系矩阵 | |
| self.relation_matrix = {} # (id1, id2) -> score | |
| for c in constraints: | |
| key = tuple(sorted([c['g1'], c['g2']])) | |
| w = c.get('weight', 10) | |
| if c['type'] == 'must': | |
| self.relation_matrix[key] = w | |
| elif c['type'] == 'cannot': | |
| self.relation_matrix[key] = -w | |
| def solve(self): | |
| # 初始解:随机分配 | |
| current_solution = self._initial_solution() | |
| current_score = self._calculate_score(current_solution) | |
| best_solution = current_solution.copy() | |
| best_score = current_score | |
| temp = INITIAL_TEMP | |
| for i in range(MAX_ITERATIONS): | |
| if temp < MIN_TEMP: | |
| break | |
| # 产生新解:随机移动或交换 | |
| new_solution = self._neighbor_solution(current_solution) | |
| new_score = self._calculate_score(new_solution) | |
| # 接受准则 | |
| delta = new_score - current_score | |
| if delta > 0 or math.exp(delta / temp) > random.random(): | |
| current_solution = new_solution | |
| current_score = new_score | |
| if current_score > best_score: | |
| best_solution = current_solution.copy() | |
| best_score = current_score | |
| temp *= COOLING_RATE | |
| return self._format_result(best_solution, best_score) | |
| def _initial_solution(self): | |
| # 简单的随机分配,尽量填满桌子 | |
| solution = {} # guest_id -> table_id | |
| table_slots = [] | |
| for t in self.tables: | |
| for _ in range(t['capacity']): | |
| table_slots.append(t['id']) | |
| random.shuffle(table_slots) | |
| for i, guest in enumerate(self.guests): | |
| if i < len(table_slots): | |
| solution[guest['id']] = table_slots[i] | |
| else: | |
| # 没座位的放在 unassigned (None) | |
| solution[guest['id']] = None | |
| return solution | |
| def _neighbor_solution(self, solution): | |
| new_sol = solution.copy() | |
| guest_ids = list(self.guests) | |
| if not guest_ids: | |
| return new_sol | |
| action = random.choice(['move', 'swap']) | |
| if action == 'move': | |
| # 移动一个客人到另一张桌子(如果有空位) | |
| g1 = random.choice(self.guests)['id'] | |
| t_target = random.choice(self.tables)['id'] | |
| # 检查容量 | |
| current_count = sum(1 for gid, tid in new_sol.items() if tid == t_target) | |
| table_cap = next(t['capacity'] for t in self.tables if t['id'] == t_target) | |
| if current_count < table_cap: | |
| new_sol[g1] = t_target | |
| elif action == 'swap': | |
| # 交换两个客人的位置 | |
| g1 = random.choice(self.guests)['id'] | |
| g2 = random.choice(self.guests)['id'] | |
| if g1 != g2: | |
| new_sol[g1], new_sol[g2] = new_sol[g2], new_sol[g1] | |
| return new_sol | |
| def _calculate_score(self, solution): | |
| score = 0 | |
| # 1. 约束分数 | |
| for (g1, g2), weight in self.relation_matrix.items(): | |
| t1 = solution.get(g1) | |
| t2 = solution.get(g2) | |
| if t1 is not None and t2 is not None and t1 == t2: | |
| score += weight * 10 # 权重放大 | |
| # 2. 组别分数 (同一组的尽量坐一起) | |
| # 遍历每张桌子 | |
| table_guests = {} | |
| for gid, tid in solution.items(): | |
| if tid: | |
| if tid not in table_guests: table_guests[tid] = [] | |
| table_guests[tid].append(self.guest_map[gid]) | |
| for tid, guests in table_guests.items(): | |
| groups = [g.get('group') for g in guests if g.get('group')] | |
| # 计算组别一致性 | |
| # 如果桌子上大部分人是同一组,加分 | |
| if groups: | |
| # 简单做法:每有一对同组人,加 2 分 | |
| for i in range(len(groups)): | |
| for j in range(i+1, len(groups)): | |
| if groups[i] == groups[j]: | |
| score += 2 | |
| return score | |
| def _format_result(self, solution, score): | |
| # 转换为前端易用的格式 | |
| tables_res = [] | |
| unassigned = [] | |
| table_guests = {t['id']: [] for t in self.tables} | |
| for gid, tid in solution.items(): | |
| if tid: | |
| table_guests[tid].append(self.guest_map[gid]) | |
| else: | |
| unassigned.append(self.guest_map[gid]) | |
| for t in self.tables: | |
| tables_res.append({ | |
| 'id': t['id'], | |
| 'name': t['name'], | |
| 'capacity': t['capacity'], | |
| 'guests': table_guests[t['id']] | |
| }) | |
| return { | |
| 'tables': tables_res, | |
| 'unassigned': unassigned, | |
| 'score': score | |
| } | |
| def index(): | |
| return render_template('index.html') | |
| def solve_seating(): | |
| data = request.json | |
| guests = data.get('guests', []) | |
| tables = data.get('tables', []) | |
| constraints = data.get('constraints', []) # [{g1: id, g2: id, type: 'must'|'cannot'}] | |
| if not guests or not tables: | |
| return jsonify({'error': 'Missing guests or tables'}), 400 | |
| solver = SeatingSolver(guests, tables, constraints) | |
| result = solver.solve() | |
| return jsonify(result) | |
| def upload_file(): | |
| if 'file' not in request.files: | |
| return jsonify({'error': 'No file part'}), 400 | |
| file = request.files['file'] | |
| if file.filename == '': | |
| return jsonify({'error': 'No selected file'}), 400 | |
| try: | |
| # Determine file type | |
| if file.filename.endswith('.csv'): | |
| df = pd.read_csv(file) | |
| elif file.filename.endswith(('.xls', '.xlsx')): | |
| df = pd.read_excel(file) | |
| else: | |
| return jsonify({'error': 'Unsupported file type'}), 400 | |
| # Normalize columns | |
| # Expected columns: Name (required), Group (optional), ID (optional) | |
| # Rename columns to standard names if possible | |
| df.columns = [str(c).strip().lower() for c in df.columns] | |
| # Map common Chinese headers to English keys | |
| col_map = { | |
| '姓名': 'name', 'name': 'name', | |
| '组别': 'group', 'group': 'group', '部门': 'group', | |
| 'id': 'id', '编号': 'id' | |
| } | |
| # Rename columns | |
| new_cols = {} | |
| for col in df.columns: | |
| if col in col_map: | |
| new_cols[col] = col_map[col] | |
| df = df.rename(columns=new_cols) | |
| if 'name' not in df.columns: | |
| return jsonify({'error': '缺少必要列: 姓名 (Name)'}), 400 | |
| guests = [] | |
| for i, row in df.iterrows(): | |
| guest_id = str(row.get('id', f'g_{i}_{random.randint(1000,9999)}')) | |
| if pd.isna(guest_id) or guest_id == 'nan': | |
| guest_id = f'g_{i}_{random.randint(1000,9999)}' | |
| name = str(row['name']) | |
| if pd.isna(name) or name == 'nan': | |
| continue | |
| group = str(row.get('group', 'Default')) | |
| if pd.isna(group) or group == 'nan': | |
| group = 'Default' | |
| guests.append({ | |
| 'id': guest_id, | |
| 'name': name, | |
| 'group': group, | |
| 'avatar': f'https://api.dicebear.com/7.x/avataaars/svg?seed={guest_id}' | |
| }) | |
| return jsonify({'guests': guests}) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| def demo_data(): | |
| # 生成演示数据 | |
| groups = ['家人', '同事', '大学同学', '高中同学', '合作伙伴'] | |
| guests = [] | |
| for i in range(50): | |
| group = random.choice(groups) | |
| guests.append({ | |
| 'id': f'g_{i}', | |
| 'name': f'Guest {i+1}', | |
| 'group': group, | |
| 'avatar': f'https://api.dicebear.com/7.x/avataaars/svg?seed={i}' | |
| }) | |
| tables = [] | |
| for i in range(6): | |
| tables.append({ | |
| 'id': f't_{i}', | |
| 'name': f'Table {i+1}', | |
| 'capacity': 10 | |
| }) | |
| # 随机生成一些约束 | |
| constraints = [] | |
| # 必须坐一起 | |
| for _ in range(5): | |
| g1 = random.choice(guests)['id'] | |
| g2 = random.choice(guests)['id'] | |
| if g1 != g2: | |
| constraints.append({'g1': g1, 'g2': g2, 'type': 'must', 'weight': 10}) | |
| # 不能坐一起 | |
| for _ in range(3): | |
| g1 = random.choice(guests)['id'] | |
| g2 = random.choice(guests)['id'] | |
| if g1 != g2: | |
| constraints.append({'g1': g1, 'g2': g2, 'type': 'cannot', 'weight': 10}) | |
| return jsonify({ | |
| 'guests': guests, | |
| 'tables': tables, | |
| 'constraints': constraints | |
| }) | |
| if __name__ == '__main__': | |
| app.run(host='0.0.0.0', port=7860, debug=True) | |