import os import json import sqlite3 import requests import random import time import traceback from flask import Flask, render_template, request, jsonify, g from flask_cors import CORS from werkzeug.utils import secure_filename app = Flask(__name__) CORS(app) # Configuration app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB app.config['UPLOAD_FOLDER'] = os.path.join(app.instance_path, 'uploads') app.config['DATABASE'] = os.path.join(app.instance_path, 'synapse.db') SILICONFLOW_API_KEY = "sk-vimuseiptfbomzegyuvmebjzooncsqbyjtlddrfodzcdskgi" SILICONFLOW_API_URL = "https://api.siliconflow.cn/v1/chat/completions" # Ensure instance and upload folders exist try: os.makedirs(app.instance_path) os.makedirs(app.config['UPLOAD_FOLDER']) except OSError: pass # Error Handlers @app.errorhandler(404) def page_not_found(e): return render_template('index.html'), 404 @app.errorhandler(500) def internal_server_error(e): traceback.print_exc() return jsonify(error="Internal Server Error", message=str(e)), 500 @app.errorhandler(413) def request_entity_too_large(e): return jsonify(error="File too large", message="File exceeds the maximum allowed size of 16MB"), 413 # Database Setup def get_db(): db = getattr(g, '_database', None) if db is None: db = g._database = sqlite3.connect(app.config['DATABASE']) db.row_factory = sqlite3.Row return db @app.teardown_appcontext def close_connection(exception): db = getattr(g, '_database', None) if db is not None: db.close() def init_db(): with app.app_context(): db = get_db() db.execute(''' CREATE TABLE IF NOT EXISTS history ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, subject_id TEXT, session_type TEXT, analysis_result TEXT, raw_metrics TEXT ) ''') # Check if empty and add default data cur = db.execute('SELECT count(*) FROM history') if cur.fetchone()[0] == 0: default_analysis = { "summary": "**演示数据报告**: 这是一个自动生成的示例报告。\n\n受试者 **DEMO-USER** 在 **专注训练** 中表现优秀。Alpha波与Beta波的交替出现表明受试者能够自如地在放松和专注状态间切换。", "cognitive_state": "Flow State (心流状态)", "recommendations": ["继续保持当前的训练频率", "尝试增加训练时长至30分钟", "记录训练后的主观感受"], "radar_chart": {"专注": 85, "放松": 70, "反应速度": 80, "记忆负荷": 65, "情绪稳定性": 88} } db.execute( 'INSERT INTO history (timestamp, subject_id, session_type, analysis_result, raw_metrics) VALUES (?, ?, ?, ?, ?)', (time.strftime('%Y-%m-%d %H:%M:%S'), 'DEMO-USER', 'Focus Training', json.dumps(default_analysis), "demo_data") ) print("Default data initialized.") db.commit() init_db() # --- Helpers --- ALLOWED_EXTENSIONS = {'txt', 'csv', 'json', 'edf', 'bdf'} def allowed_file(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS # --- Routes --- @app.route('/') def index(): return render_template('index.html') @app.route('/api/upload', methods=['POST']) def upload_file(): if 'file' not in request.files: return jsonify({'error': 'No file part'}), 400 file = request.files['file'] if file.filename == '': return jsonify({'error': 'No selected file'}), 400 if file and allowed_file(file.filename): filename = secure_filename(file.filename) filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) file.save(filepath) # Mock processing of the file to extract metrics # In a real app, we would parse EDF/CSV here file_size = os.path.getsize(filepath) return jsonify({ 'status': 'success', 'message': f'File {filename} uploaded successfully ({file_size} bytes)', 'extracted_metrics': { 'alpha': random.uniform(8, 12), 'beta': random.uniform(15, 25), 'theta': random.uniform(4, 7), 'delta': random.uniform(1, 3) } }) return jsonify({'error': 'File type not allowed'}), 400 @app.route('/api/mock/signal') def mock_signal(): """Generate mock EEG signal data for visualization.""" # Generate 4 channels of data (Alpha, Beta, Theta, Delta) timestamp = time.time() data = [] for i in range(50): # 50 points t = timestamp + i * 0.1 data.append({ 'time': i, 'alpha': 10 + 5 * random.sin(t), 'beta': 20 + 8 * random.cos(t * 2), 'theta': 5 + 3 * random.sin(t * 0.5), 'delta': 2 + 1 * random.cos(t * 0.2) }) return jsonify({'status': 'success', 'data': data}) @app.route('/api/analyze', methods=['POST']) def analyze_session(): data = request.json subject_id = data.get('subject_id', 'Unknown') session_type = data.get('session_type', 'Focus Training') # Construct prompt for SiliconFlow prompt = f""" 作为神经科学专家,请分析以下脑机接口(BCI)会话数据并生成中文报告。 受试者ID: {subject_id} 会话类型: {session_type} 模拟采集数据特征: - Alpha波 (8-12Hz): 活跃度中等偏高 (放松状态) - Beta波 (12-30Hz): 间歇性峰值 (集中注意力) - Theta波 (4-8Hz): 低 (无困倦) - 专注度指数: 75/100 - 疲劳度指数: 20/100 请输出JSON格式,包含以下字段: - summary: 会话总结 (Markdown格式) - cognitive_state: 认知状态评估 (Focus, Relaxed, Stressed, Fatigued) - recommendations: 3条改进建议 (数组) - radar_chart: 雷达图数据 (5个维度: 专注, 放松, 反应速度, 记忆负荷, 情绪稳定性, 0-100分) """ headers = { "Authorization": f"Bearer {SILICONFLOW_API_KEY}", "Content-Type": "application/json" } payload = { "model": "Qwen/Qwen2.5-7B-Instruct", "messages": [ {"role": "system", "content": "你是一个专业的神经科学和脑机接口数据分析师。请只输出JSON格式。"}, {"role": "user", "content": prompt} ], "response_format": {"type": "json_object"} } try: response = requests.post(SILICONFLOW_API_URL, json=payload, headers=headers, timeout=10) response.raise_for_status() result_json = response.json() content = result_json['choices'][0]['message']['content'] # Parse JSON from content analysis = json.loads(content) except Exception as e: print(f"API Error: {e}") # Mock Fallback analysis = { "summary": f"**模拟分析报告**: 由于API连接受限,这是基于本地规则生成的分析。\n\n受试者 **{subject_id}** 在 **{session_type}** 中表现出良好的认知稳定性。Alpha波活动表明处于放松警觉状态,适合进行高负荷任务前的调整。", "cognitive_state": "Relaxed & Focused", "recommendations": ["增加5分钟Theta波诱导训练", "保持当前呼吸节奏", "建议在下午时段进行下一次训练"], "radar_chart": {"专注": 78, "放松": 85, "反应速度": 60, "记忆负荷": 45, "情绪稳定性": 90} } # Save to DB try: db = get_db() db.execute( 'INSERT INTO history (timestamp, subject_id, session_type, analysis_result, raw_metrics) VALUES (?, ?, ?, ?, ?)', (time.strftime('%Y-%m-%d %H:%M:%S'), subject_id, session_type, json.dumps(analysis), "mock_raw_data") ) db.commit() except Exception as e: print(f"DB Error: {e}") return jsonify({'status': 'success', 'result': analysis}) @app.route('/api/history') def get_history(): db = get_db() rows = db.execute('SELECT * FROM history ORDER BY id DESC LIMIT 10').fetchall() history = [] for row in rows: history.append({ 'id': row['id'], 'timestamp': row['timestamp'], 'subject_id': row['subject_id'], 'session_type': row['session_type'], 'analysis': json.loads(row['analysis_result']) }) return jsonify({'status': 'success', 'history': history}) if __name__ == '__main__': app.run(host='0.0.0.0', port=7860, debug=True)