Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import sqlite3 | |
| import requests | |
| import random | |
| import time | |
| import traceback | |
| from flask import Flask, render_template, request, jsonify, g | |
| from flask_cors import CORS | |
| from werkzeug.utils import secure_filename | |
| app = Flask(__name__) | |
| CORS(app) | |
| # Configuration | |
| app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB | |
| app.config['UPLOAD_FOLDER'] = os.path.join(app.instance_path, 'uploads') | |
| app.config['DATABASE'] = os.path.join(app.instance_path, 'synapse.db') | |
| SILICONFLOW_API_KEY = "sk-vimuseiptfbomzegyuvmebjzooncsqbyjtlddrfodzcdskgi" | |
| SILICONFLOW_API_URL = "https://api.siliconflow.cn/v1/chat/completions" | |
| # Ensure instance and upload folders exist | |
| try: | |
| os.makedirs(app.instance_path) | |
| os.makedirs(app.config['UPLOAD_FOLDER']) | |
| except OSError: | |
| pass | |
| # Error Handlers | |
| def page_not_found(e): | |
| return render_template('index.html'), 404 | |
| def internal_server_error(e): | |
| traceback.print_exc() | |
| return jsonify(error="Internal Server Error", message=str(e)), 500 | |
| def request_entity_too_large(e): | |
| return jsonify(error="File too large", message="File exceeds the maximum allowed size of 16MB"), 413 | |
| # Database Setup | |
| def get_db(): | |
| db = getattr(g, '_database', None) | |
| if db is None: | |
| db = g._database = sqlite3.connect(app.config['DATABASE']) | |
| db.row_factory = sqlite3.Row | |
| return db | |
| def close_connection(exception): | |
| db = getattr(g, '_database', None) | |
| if db is not None: | |
| db.close() | |
| def init_db(): | |
| with app.app_context(): | |
| db = get_db() | |
| db.execute(''' | |
| CREATE TABLE IF NOT EXISTS history ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| timestamp TEXT NOT NULL, | |
| subject_id TEXT, | |
| session_type TEXT, | |
| analysis_result TEXT, | |
| raw_metrics TEXT | |
| ) | |
| ''') | |
| # Check if empty and add default data | |
| cur = db.execute('SELECT count(*) FROM history') | |
| if cur.fetchone()[0] == 0: | |
| default_analysis = { | |
| "summary": "**演示数据报告**: 这是一个自动生成的示例报告。\n\n受试者 **DEMO-USER** 在 **专注训练** 中表现优秀。Alpha波与Beta波的交替出现表明受试者能够自如地在放松和专注状态间切换。", | |
| "cognitive_state": "Flow State (心流状态)", | |
| "recommendations": ["继续保持当前的训练频率", "尝试增加训练时长至30分钟", "记录训练后的主观感受"], | |
| "radar_chart": {"专注": 85, "放松": 70, "反应速度": 80, "记忆负荷": 65, "情绪稳定性": 88} | |
| } | |
| db.execute( | |
| 'INSERT INTO history (timestamp, subject_id, session_type, analysis_result, raw_metrics) VALUES (?, ?, ?, ?, ?)', | |
| (time.strftime('%Y-%m-%d %H:%M:%S'), 'DEMO-USER', 'Focus Training', json.dumps(default_analysis), "demo_data") | |
| ) | |
| print("Default data initialized.") | |
| db.commit() | |
| init_db() | |
| # --- Helpers --- | |
| ALLOWED_EXTENSIONS = {'txt', 'csv', 'json', 'edf', 'bdf'} | |
| def allowed_file(filename): | |
| return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS | |
| # --- Routes --- | |
| def index(): | |
| return render_template('index.html') | |
| def upload_file(): | |
| if 'file' not in request.files: | |
| return jsonify({'error': 'No file part'}), 400 | |
| file = request.files['file'] | |
| if file.filename == '': | |
| return jsonify({'error': 'No selected file'}), 400 | |
| if file and allowed_file(file.filename): | |
| filename = secure_filename(file.filename) | |
| filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) | |
| file.save(filepath) | |
| # Mock processing of the file to extract metrics | |
| # In a real app, we would parse EDF/CSV here | |
| file_size = os.path.getsize(filepath) | |
| return jsonify({ | |
| 'status': 'success', | |
| 'message': f'File {filename} uploaded successfully ({file_size} bytes)', | |
| 'extracted_metrics': { | |
| 'alpha': random.uniform(8, 12), | |
| 'beta': random.uniform(15, 25), | |
| 'theta': random.uniform(4, 7), | |
| 'delta': random.uniform(1, 3) | |
| } | |
| }) | |
| return jsonify({'error': 'File type not allowed'}), 400 | |
| def mock_signal(): | |
| """Generate mock EEG signal data for visualization.""" | |
| # Generate 4 channels of data (Alpha, Beta, Theta, Delta) | |
| timestamp = time.time() | |
| data = [] | |
| for i in range(50): # 50 points | |
| t = timestamp + i * 0.1 | |
| data.append({ | |
| 'time': i, | |
| 'alpha': 10 + 5 * random.sin(t), | |
| 'beta': 20 + 8 * random.cos(t * 2), | |
| 'theta': 5 + 3 * random.sin(t * 0.5), | |
| 'delta': 2 + 1 * random.cos(t * 0.2) | |
| }) | |
| return jsonify({'status': 'success', 'data': data}) | |
| def analyze_session(): | |
| data = request.json | |
| subject_id = data.get('subject_id', 'Unknown') | |
| session_type = data.get('session_type', 'Focus Training') | |
| # Construct prompt for SiliconFlow | |
| prompt = f""" | |
| 作为神经科学专家,请分析以下脑机接口(BCI)会话数据并生成中文报告。 | |
| 受试者ID: {subject_id} | |
| 会话类型: {session_type} | |
| 模拟采集数据特征: | |
| - Alpha波 (8-12Hz): 活跃度中等偏高 (放松状态) | |
| - Beta波 (12-30Hz): 间歇性峰值 (集中注意力) | |
| - Theta波 (4-8Hz): 低 (无困倦) | |
| - 专注度指数: 75/100 | |
| - 疲劳度指数: 20/100 | |
| 请输出JSON格式,包含以下字段: | |
| - summary: 会话总结 (Markdown格式) | |
| - cognitive_state: 认知状态评估 (Focus, Relaxed, Stressed, Fatigued) | |
| - recommendations: 3条改进建议 (数组) | |
| - radar_chart: 雷达图数据 (5个维度: 专注, 放松, 反应速度, 记忆负荷, 情绪稳定性, 0-100分) | |
| """ | |
| headers = { | |
| "Authorization": f"Bearer {SILICONFLOW_API_KEY}", | |
| "Content-Type": "application/json" | |
| } | |
| payload = { | |
| "model": "Qwen/Qwen2.5-7B-Instruct", | |
| "messages": [ | |
| {"role": "system", "content": "你是一个专业的神经科学和脑机接口数据分析师。请只输出JSON格式。"}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| "response_format": {"type": "json_object"} | |
| } | |
| try: | |
| response = requests.post(SILICONFLOW_API_URL, json=payload, headers=headers, timeout=10) | |
| response.raise_for_status() | |
| result_json = response.json() | |
| content = result_json['choices'][0]['message']['content'] | |
| # Parse JSON from content | |
| analysis = json.loads(content) | |
| except Exception as e: | |
| print(f"API Error: {e}") | |
| # Mock Fallback | |
| analysis = { | |
| "summary": f"**模拟分析报告**: 由于API连接受限,这是基于本地规则生成的分析。\n\n受试者 **{subject_id}** 在 **{session_type}** 中表现出良好的认知稳定性。Alpha波活动表明处于放松警觉状态,适合进行高负荷任务前的调整。", | |
| "cognitive_state": "Relaxed & Focused", | |
| "recommendations": ["增加5分钟Theta波诱导训练", "保持当前呼吸节奏", "建议在下午时段进行下一次训练"], | |
| "radar_chart": {"专注": 78, "放松": 85, "反应速度": 60, "记忆负荷": 45, "情绪稳定性": 90} | |
| } | |
| # Save to DB | |
| try: | |
| db = get_db() | |
| db.execute( | |
| 'INSERT INTO history (timestamp, subject_id, session_type, analysis_result, raw_metrics) VALUES (?, ?, ?, ?, ?)', | |
| (time.strftime('%Y-%m-%d %H:%M:%S'), subject_id, session_type, json.dumps(analysis), "mock_raw_data") | |
| ) | |
| db.commit() | |
| except Exception as e: | |
| print(f"DB Error: {e}") | |
| return jsonify({'status': 'success', 'result': analysis}) | |
| def get_history(): | |
| db = get_db() | |
| rows = db.execute('SELECT * FROM history ORDER BY id DESC LIMIT 10').fetchall() | |
| history = [] | |
| for row in rows: | |
| history.append({ | |
| 'id': row['id'], | |
| 'timestamp': row['timestamp'], | |
| 'subject_id': row['subject_id'], | |
| 'session_type': row['session_type'], | |
| 'analysis': json.loads(row['analysis_result']) | |
| }) | |
| return jsonify({'status': 'success', 'history': history}) | |
| if __name__ == '__main__': | |
| app.run(host='0.0.0.0', port=7860, debug=True) | |