Spaces:
Sleeping
Sleeping
File size: 8,694 Bytes
c2b307b 888384b c2b307b 888384b c2b307b 888384b c2b307b 888384b c2b307b 888384b c2b307b 888384b c2b307b 888384b c2b307b 888384b c2b307b 888384b c2b307b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 | import os
import json
import sqlite3
import requests
import random
import time
import traceback
from flask import Flask, render_template, request, jsonify, g
from flask_cors import CORS
from werkzeug.utils import secure_filename
app = Flask(__name__)
CORS(app)
# Configuration
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB
app.config['UPLOAD_FOLDER'] = os.path.join(app.instance_path, 'uploads')
app.config['DATABASE'] = os.path.join(app.instance_path, 'synapse.db')
SILICONFLOW_API_KEY = "sk-vimuseiptfbomzegyuvmebjzooncsqbyjtlddrfodzcdskgi"
SILICONFLOW_API_URL = "https://api.siliconflow.cn/v1/chat/completions"
# Ensure instance and upload folders exist
try:
os.makedirs(app.instance_path)
os.makedirs(app.config['UPLOAD_FOLDER'])
except OSError:
pass
# Error Handlers
@app.errorhandler(404)
def page_not_found(e):
return render_template('index.html'), 404
@app.errorhandler(500)
def internal_server_error(e):
traceback.print_exc()
return jsonify(error="Internal Server Error", message=str(e)), 500
@app.errorhandler(413)
def request_entity_too_large(e):
return jsonify(error="File too large", message="File exceeds the maximum allowed size of 16MB"), 413
# Database Setup
def get_db():
db = getattr(g, '_database', None)
if db is None:
db = g._database = sqlite3.connect(app.config['DATABASE'])
db.row_factory = sqlite3.Row
return db
@app.teardown_appcontext
def close_connection(exception):
db = getattr(g, '_database', None)
if db is not None:
db.close()
def init_db():
with app.app_context():
db = get_db()
db.execute('''
CREATE TABLE IF NOT EXISTS history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
subject_id TEXT,
session_type TEXT,
analysis_result TEXT,
raw_metrics TEXT
)
''')
# Check if empty and add default data
cur = db.execute('SELECT count(*) FROM history')
if cur.fetchone()[0] == 0:
default_analysis = {
"summary": "**演示数据报告**: 这是一个自动生成的示例报告。\n\n受试者 **DEMO-USER** 在 **专注训练** 中表现优秀。Alpha波与Beta波的交替出现表明受试者能够自如地在放松和专注状态间切换。",
"cognitive_state": "Flow State (心流状态)",
"recommendations": ["继续保持当前的训练频率", "尝试增加训练时长至30分钟", "记录训练后的主观感受"],
"radar_chart": {"专注": 85, "放松": 70, "反应速度": 80, "记忆负荷": 65, "情绪稳定性": 88}
}
db.execute(
'INSERT INTO history (timestamp, subject_id, session_type, analysis_result, raw_metrics) VALUES (?, ?, ?, ?, ?)',
(time.strftime('%Y-%m-%d %H:%M:%S'), 'DEMO-USER', 'Focus Training', json.dumps(default_analysis), "demo_data")
)
print("Default data initialized.")
db.commit()
init_db()
# --- Helpers ---
ALLOWED_EXTENSIONS = {'txt', 'csv', 'json', 'edf', 'bdf'}
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
# --- Routes ---
@app.route('/')
def index():
return render_template('index.html')
@app.route('/api/upload', methods=['POST'])
def upload_file():
if 'file' not in request.files:
return jsonify({'error': 'No file part'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'No selected file'}), 400
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(filepath)
# Mock processing of the file to extract metrics
# In a real app, we would parse EDF/CSV here
file_size = os.path.getsize(filepath)
return jsonify({
'status': 'success',
'message': f'File {filename} uploaded successfully ({file_size} bytes)',
'extracted_metrics': {
'alpha': random.uniform(8, 12),
'beta': random.uniform(15, 25),
'theta': random.uniform(4, 7),
'delta': random.uniform(1, 3)
}
})
return jsonify({'error': 'File type not allowed'}), 400
@app.route('/api/mock/signal')
def mock_signal():
"""Generate mock EEG signal data for visualization."""
# Generate 4 channels of data (Alpha, Beta, Theta, Delta)
timestamp = time.time()
data = []
for i in range(50): # 50 points
t = timestamp + i * 0.1
data.append({
'time': i,
'alpha': 10 + 5 * random.sin(t),
'beta': 20 + 8 * random.cos(t * 2),
'theta': 5 + 3 * random.sin(t * 0.5),
'delta': 2 + 1 * random.cos(t * 0.2)
})
return jsonify({'status': 'success', 'data': data})
@app.route('/api/analyze', methods=['POST'])
def analyze_session():
data = request.json
subject_id = data.get('subject_id', 'Unknown')
session_type = data.get('session_type', 'Focus Training')
# Construct prompt for SiliconFlow
prompt = f"""
作为神经科学专家,请分析以下脑机接口(BCI)会话数据并生成中文报告。
受试者ID: {subject_id}
会话类型: {session_type}
模拟采集数据特征:
- Alpha波 (8-12Hz): 活跃度中等偏高 (放松状态)
- Beta波 (12-30Hz): 间歇性峰值 (集中注意力)
- Theta波 (4-8Hz): 低 (无困倦)
- 专注度指数: 75/100
- 疲劳度指数: 20/100
请输出JSON格式,包含以下字段:
- summary: 会话总结 (Markdown格式)
- cognitive_state: 认知状态评估 (Focus, Relaxed, Stressed, Fatigued)
- recommendations: 3条改进建议 (数组)
- radar_chart: 雷达图数据 (5个维度: 专注, 放松, 反应速度, 记忆负荷, 情绪稳定性, 0-100分)
"""
headers = {
"Authorization": f"Bearer {SILICONFLOW_API_KEY}",
"Content-Type": "application/json"
}
payload = {
"model": "Qwen/Qwen2.5-7B-Instruct",
"messages": [
{"role": "system", "content": "你是一个专业的神经科学和脑机接口数据分析师。请只输出JSON格式。"},
{"role": "user", "content": prompt}
],
"response_format": {"type": "json_object"}
}
try:
response = requests.post(SILICONFLOW_API_URL, json=payload, headers=headers, timeout=10)
response.raise_for_status()
result_json = response.json()
content = result_json['choices'][0]['message']['content']
# Parse JSON from content
analysis = json.loads(content)
except Exception as e:
print(f"API Error: {e}")
# Mock Fallback
analysis = {
"summary": f"**模拟分析报告**: 由于API连接受限,这是基于本地规则生成的分析。\n\n受试者 **{subject_id}** 在 **{session_type}** 中表现出良好的认知稳定性。Alpha波活动表明处于放松警觉状态,适合进行高负荷任务前的调整。",
"cognitive_state": "Relaxed & Focused",
"recommendations": ["增加5分钟Theta波诱导训练", "保持当前呼吸节奏", "建议在下午时段进行下一次训练"],
"radar_chart": {"专注": 78, "放松": 85, "反应速度": 60, "记忆负荷": 45, "情绪稳定性": 90}
}
# Save to DB
try:
db = get_db()
db.execute(
'INSERT INTO history (timestamp, subject_id, session_type, analysis_result, raw_metrics) VALUES (?, ?, ?, ?, ?)',
(time.strftime('%Y-%m-%d %H:%M:%S'), subject_id, session_type, json.dumps(analysis), "mock_raw_data")
)
db.commit()
except Exception as e:
print(f"DB Error: {e}")
return jsonify({'status': 'success', 'result': analysis})
@app.route('/api/history')
def get_history():
db = get_db()
rows = db.execute('SELECT * FROM history ORDER BY id DESC LIMIT 10').fetchall()
history = []
for row in rows:
history.append({
'id': row['id'],
'timestamp': row['timestamp'],
'subject_id': row['subject_id'],
'session_type': row['session_type'],
'analysis': json.loads(row['analysis_result'])
})
return jsonify({'status': 'success', 'history': history})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860, debug=True)
|