Trae Assistant
Enhance app with file upload, error handling, and localization
888384b
import os
import json
import sqlite3
import requests
import random
import time
import traceback
from flask import Flask, render_template, request, jsonify, g
from flask_cors import CORS
from werkzeug.utils import secure_filename
app = Flask(__name__)
CORS(app)
# Configuration
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB
app.config['UPLOAD_FOLDER'] = os.path.join(app.instance_path, 'uploads')
app.config['DATABASE'] = os.path.join(app.instance_path, 'synapse.db')
SILICONFLOW_API_KEY = "sk-vimuseiptfbomzegyuvmebjzooncsqbyjtlddrfodzcdskgi"
SILICONFLOW_API_URL = "https://api.siliconflow.cn/v1/chat/completions"
# Ensure instance and upload folders exist
try:
os.makedirs(app.instance_path)
os.makedirs(app.config['UPLOAD_FOLDER'])
except OSError:
pass
# Error Handlers
@app.errorhandler(404)
def page_not_found(e):
return render_template('index.html'), 404
@app.errorhandler(500)
def internal_server_error(e):
traceback.print_exc()
return jsonify(error="Internal Server Error", message=str(e)), 500
@app.errorhandler(413)
def request_entity_too_large(e):
return jsonify(error="File too large", message="File exceeds the maximum allowed size of 16MB"), 413
# Database Setup
def get_db():
db = getattr(g, '_database', None)
if db is None:
db = g._database = sqlite3.connect(app.config['DATABASE'])
db.row_factory = sqlite3.Row
return db
@app.teardown_appcontext
def close_connection(exception):
db = getattr(g, '_database', None)
if db is not None:
db.close()
def init_db():
with app.app_context():
db = get_db()
db.execute('''
CREATE TABLE IF NOT EXISTS history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
subject_id TEXT,
session_type TEXT,
analysis_result TEXT,
raw_metrics TEXT
)
''')
# Check if empty and add default data
cur = db.execute('SELECT count(*) FROM history')
if cur.fetchone()[0] == 0:
default_analysis = {
"summary": "**演示数据报告**: 这是一个自动生成的示例报告。\n\n受试者 **DEMO-USER** 在 **专注训练** 中表现优秀。Alpha波与Beta波的交替出现表明受试者能够自如地在放松和专注状态间切换。",
"cognitive_state": "Flow State (心流状态)",
"recommendations": ["继续保持当前的训练频率", "尝试增加训练时长至30分钟", "记录训练后的主观感受"],
"radar_chart": {"专注": 85, "放松": 70, "反应速度": 80, "记忆负荷": 65, "情绪稳定性": 88}
}
db.execute(
'INSERT INTO history (timestamp, subject_id, session_type, analysis_result, raw_metrics) VALUES (?, ?, ?, ?, ?)',
(time.strftime('%Y-%m-%d %H:%M:%S'), 'DEMO-USER', 'Focus Training', json.dumps(default_analysis), "demo_data")
)
print("Default data initialized.")
db.commit()
init_db()
# --- Helpers ---
ALLOWED_EXTENSIONS = {'txt', 'csv', 'json', 'edf', 'bdf'}
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
# --- Routes ---
@app.route('/')
def index():
return render_template('index.html')
@app.route('/api/upload', methods=['POST'])
def upload_file():
if 'file' not in request.files:
return jsonify({'error': 'No file part'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'No selected file'}), 400
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(filepath)
# Mock processing of the file to extract metrics
# In a real app, we would parse EDF/CSV here
file_size = os.path.getsize(filepath)
return jsonify({
'status': 'success',
'message': f'File {filename} uploaded successfully ({file_size} bytes)',
'extracted_metrics': {
'alpha': random.uniform(8, 12),
'beta': random.uniform(15, 25),
'theta': random.uniform(4, 7),
'delta': random.uniform(1, 3)
}
})
return jsonify({'error': 'File type not allowed'}), 400
@app.route('/api/mock/signal')
def mock_signal():
"""Generate mock EEG signal data for visualization."""
# Generate 4 channels of data (Alpha, Beta, Theta, Delta)
timestamp = time.time()
data = []
for i in range(50): # 50 points
t = timestamp + i * 0.1
data.append({
'time': i,
'alpha': 10 + 5 * random.sin(t),
'beta': 20 + 8 * random.cos(t * 2),
'theta': 5 + 3 * random.sin(t * 0.5),
'delta': 2 + 1 * random.cos(t * 0.2)
})
return jsonify({'status': 'success', 'data': data})
@app.route('/api/analyze', methods=['POST'])
def analyze_session():
data = request.json
subject_id = data.get('subject_id', 'Unknown')
session_type = data.get('session_type', 'Focus Training')
# Construct prompt for SiliconFlow
prompt = f"""
作为神经科学专家,请分析以下脑机接口(BCI)会话数据并生成中文报告。
受试者ID: {subject_id}
会话类型: {session_type}
模拟采集数据特征:
- Alpha波 (8-12Hz): 活跃度中等偏高 (放松状态)
- Beta波 (12-30Hz): 间歇性峰值 (集中注意力)
- Theta波 (4-8Hz): 低 (无困倦)
- 专注度指数: 75/100
- 疲劳度指数: 20/100
请输出JSON格式,包含以下字段:
- summary: 会话总结 (Markdown格式)
- cognitive_state: 认知状态评估 (Focus, Relaxed, Stressed, Fatigued)
- recommendations: 3条改进建议 (数组)
- radar_chart: 雷达图数据 (5个维度: 专注, 放松, 反应速度, 记忆负荷, 情绪稳定性, 0-100分)
"""
headers = {
"Authorization": f"Bearer {SILICONFLOW_API_KEY}",
"Content-Type": "application/json"
}
payload = {
"model": "Qwen/Qwen2.5-7B-Instruct",
"messages": [
{"role": "system", "content": "你是一个专业的神经科学和脑机接口数据分析师。请只输出JSON格式。"},
{"role": "user", "content": prompt}
],
"response_format": {"type": "json_object"}
}
try:
response = requests.post(SILICONFLOW_API_URL, json=payload, headers=headers, timeout=10)
response.raise_for_status()
result_json = response.json()
content = result_json['choices'][0]['message']['content']
# Parse JSON from content
analysis = json.loads(content)
except Exception as e:
print(f"API Error: {e}")
# Mock Fallback
analysis = {
"summary": f"**模拟分析报告**: 由于API连接受限,这是基于本地规则生成的分析。\n\n受试者 **{subject_id}** 在 **{session_type}** 中表现出良好的认知稳定性。Alpha波活动表明处于放松警觉状态,适合进行高负荷任务前的调整。",
"cognitive_state": "Relaxed & Focused",
"recommendations": ["增加5分钟Theta波诱导训练", "保持当前呼吸节奏", "建议在下午时段进行下一次训练"],
"radar_chart": {"专注": 78, "放松": 85, "反应速度": 60, "记忆负荷": 45, "情绪稳定性": 90}
}
# Save to DB
try:
db = get_db()
db.execute(
'INSERT INTO history (timestamp, subject_id, session_type, analysis_result, raw_metrics) VALUES (?, ?, ?, ?, ?)',
(time.strftime('%Y-%m-%d %H:%M:%S'), subject_id, session_type, json.dumps(analysis), "mock_raw_data")
)
db.commit()
except Exception as e:
print(f"DB Error: {e}")
return jsonify({'status': 'success', 'result': analysis})
@app.route('/api/history')
def get_history():
db = get_db()
rows = db.execute('SELECT * FROM history ORDER BY id DESC LIMIT 10').fetchall()
history = []
for row in rows:
history.append({
'id': row['id'],
'timestamp': row['timestamp'],
'subject_id': row['subject_id'],
'session_type': row['session_type'],
'analysis': json.loads(row['analysis_result'])
})
return jsonify({'status': 'success', 'history': history})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860, debug=True)