Trae Assistant
Fix HF Spaces permissions: use /app/data for DB and set correct Docker permissions
3f9ecc8
import os
import time
import random
import json
import sqlite3
import threading
from flask import Flask, render_template, jsonify, request
from flask_cors import CORS
from openai import OpenAI
from werkzeug.utils import secure_filename
app = Flask(__name__)
CORS(app)
# 配置
app.config['SECRET_KEY'] = 'fleet-commander-secret'
app.config['UPLOAD_FOLDER'] = 'uploads'
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB limit
app.config['ALLOWED_EXTENSIONS'] = {'txt', 'json', 'csv', 'log', 'md'}
# SiliconFlow API Configuration
SILICONFLOW_API_KEY = "sk-vimuseiptfbomzegyuvmebjzooncsqbyjtlddrfodzcdskgi"
SILICONFLOW_BASE_URL = "https://api.siliconflow.cn/v1"
# Ensure upload directory exists
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
# Database Setup
DATA_DIR = 'data'
os.makedirs(DATA_DIR, exist_ok=True)
DB_PATH = os.path.join(DATA_DIR, 'fleet.db')
def init_db():
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS robots
(id TEXT PRIMARY KEY, type TEXT, status TEXT, battery REAL, x REAL, y REAL, load REAL)''')
c.execute('''CREATE TABLE IF NOT EXISTS alerts
(id INTEGER PRIMARY KEY AUTOINCREMENT, time TEXT, message TEXT, level TEXT)''')
c.execute('''CREATE TABLE IF NOT EXISTS tasks
(id TEXT PRIMARY KEY, bot_id TEXT, description TEXT, status TEXT)''')
# Check if robots exist, if not, initialize
c.execute("SELECT count(*) FROM robots")
if c.fetchone()[0] == 0:
for i in range(1, 11):
bot_id = f'BOT-{i:03d}'
bot_type = 'Drone' if i <= 5 else 'AMR'
c.execute("INSERT INTO robots VALUES (?, ?, ?, ?, ?, ?, ?)",
(bot_id, bot_type, 'idle', random.randint(60, 100), random.uniform(10, 90), random.uniform(10, 90), 0))
conn.commit()
conn.close()
init_db()
def get_db_connection():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
# Fleet State Management (Hybrid: In-memory for speed, DB for persistence)
class FleetState:
def __init__(self):
self.last_update = time.time()
self.lock = threading.Lock()
def update(self):
with self.lock:
current_time = time.time()
raw_dt = current_time - self.last_update
self.last_update = current_time
dt = min(raw_dt, 1.0)
conn = get_db_connection()
c = conn.cursor()
# Fetch all robots
robots = [dict(row) for row in conn.execute("SELECT * FROM robots").fetchall()]
for bot in robots:
# Logic (same as before)
if bot['status'] == 'active':
bot['x'] += random.uniform(-1, 1) * 10 * dt
bot['y'] += random.uniform(-1, 1) * 10 * dt
bot['x'] = max(0, min(100, bot['x']))
bot['y'] = max(0, min(100, bot['y']))
bot['battery'] -= 0.5 * dt
elif bot['status'] == 'idle':
bot['battery'] -= 0.05 * dt
elif bot['status'] == 'charging':
bot['battery'] += 5 * dt
# Mock AI Logic
if bot['battery'] < 20 and bot['status'] != 'charging':
bot['status'] = 'charging'
self.add_alert(conn, f"警告: {bot['id']} 电量过低,自动返回充电。", "warning")
elif bot['battery'] > 95 and bot['status'] == 'charging':
bot['status'] = 'idle'
self.add_alert(conn, f"通知: {bot['id']} 充电完成,等待任务。", "success")
if bot['status'] != 'error' and random.random() < 0.0005: # Reduced probability
bot['status'] = 'error'
self.add_alert(conn, f"紧急: {bot['id']} 发生传感器故障!", "error")
bot['battery'] = max(0, min(100, bot['battery']))
# Update DB
c.execute("UPDATE robots SET status=?, battery=?, x=?, y=? WHERE id=?",
(bot['status'], bot['battery'], bot['x'], bot['y'], bot['id']))
conn.commit()
conn.close()
def add_alert(self, conn, message, level):
conn.execute("INSERT INTO alerts (time, message, level) VALUES (?, ?, ?)",
(time.strftime('%H:%M:%S'), message, level))
# Keep only last 50
conn.execute("DELETE FROM alerts WHERE id NOT IN (SELECT id FROM alerts ORDER BY id DESC LIMIT 50)")
def assign_task(self, task_desc):
with self.lock:
conn = get_db_connection()
# Find idle bot
bot_row = conn.execute("SELECT * FROM robots WHERE status='idle' ORDER BY RANDOM() LIMIT 1").fetchone()
if not bot_row:
conn.close()
return False, "没有可用的空闲机器人"
bot = dict(bot_row)
bot_id = bot['id']
task_id = f'TASK-{int(time.time())}'
conn.execute("UPDATE robots SET status='active' WHERE id=?", (bot_id,))
conn.execute("INSERT INTO tasks (id, bot_id, description, status) VALUES (?, ?, ?, ?)",
(task_id, bot_id, task_desc, 'in_progress'))
self.add_alert(conn, f"任务: 已分配任务 '{task_desc}' 给 {bot_id}", "info")
conn.commit()
conn.close()
return True, f"任务已分配给 {bot_id}"
def fix_bot(self, bot_id):
with self.lock:
conn = get_db_connection()
bot = conn.execute("SELECT * FROM robots WHERE id=?", (bot_id,)).fetchone()
if bot and bot['status'] == 'error':
conn.execute("UPDATE robots SET status='idle' WHERE id=?", (bot_id,))
self.add_alert(conn, f"修复: {bot_id} 远程修复成功,恢复待命。", "success")
conn.commit()
conn.close()
return True
conn.close()
return False
fleet = FleetState()
@app.route('/')
def index():
return render_template('index.html')
@app.route('/api/telemetry')
def get_telemetry():
fleet.update()
conn = get_db_connection()
robots = [dict(row) for row in conn.execute("SELECT * FROM robots").fetchall()]
alerts = [dict(row) for row in conn.execute("SELECT * FROM alerts ORDER BY id DESC LIMIT 50").fetchall()]
conn.close()
return jsonify({
'robots': robots,
'alerts': alerts,
'stats': {
'total': len(robots),
'active': len([r for r in robots if r['status'] == 'active']),
'charging': len([r for r in robots if r['status'] == 'charging']),
'error': len([r for r in robots if r['status'] == 'error'])
}
})
@app.route('/api/command', methods=['POST'])
def send_command():
data = request.json
cmd_type = data.get('type')
if cmd_type == 'task':
success, msg = fleet.assign_task(data.get('description', '巡逻任务'))
elif cmd_type == 'fix':
success = fleet.fix_bot(data.get('bot_id'))
msg = "修复指令已发送" if success else "修复失败或机器人未故障"
else:
success = False
msg = "未知指令"
return jsonify({'success': success, 'message': msg})
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in app.config['ALLOWED_EXTENSIONS']
@app.route('/api/upload', methods=['POST'])
def upload_file():
if 'file' not in request.files:
return jsonify({'success': False, 'message': '没有文件部分'})
file = request.files['file']
if file.filename == '':
return jsonify({'success': False, 'message': '未选择文件'})
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
# 简单解析逻辑 (如果是 mission.json,自动下发任务)
if filename.endswith('.json'):
try:
with open(os.path.join(app.config['UPLOAD_FOLDER'], filename), 'r') as f:
content = json.load(f)
if isinstance(content, list):
count = 0
for item in content:
if 'task' in item:
fleet.assign_task(item['task'])
count += 1
return jsonify({'success': True, 'message': f'文件上传成功,已自动解析并下发 {count} 个任务'})
except Exception as e:
return jsonify({'success': True, 'message': f'文件上传成功,但解析失败: {str(e)}'})
return jsonify({'success': True, 'message': f'文件 {filename} 上传成功'})
return jsonify({'success': False, 'message': '不支持的文件类型'})
@app.route('/api/chat', methods=['POST'])
def chat_ai():
data = request.json
user_message = data.get('message', '')
# 获取当前状态作为上下文
conn = get_db_connection()
robots = [dict(row) for row in conn.execute("SELECT * FROM robots").fetchall()]
conn.close()
system_prompt = f"""
你是一个智能机队指挥官助手。当前机队状态如下:
总数: {len(robots)}
活跃: {len([r for r in robots if r['status'] == 'active'])}
充电: {len([r for r in robots if r['status'] == 'charging'])}
故障: {len([r for r in robots if r['status'] == 'error'])}
详细列表: {json.dumps(robots[:5])} (仅显示前5个)
请根据以上信息回答用户问题。如果是下发任务的请求,请建议用户使用左侧的任务下发面板,或者告诉我你希望我如何协助。请用中文回答,简练专业。
"""
try:
client = OpenAI(api_key=SILICONFLOW_API_KEY, base_url=SILICONFLOW_BASE_URL)
response = client.chat.completions.create(
model="deepseek-ai/DeepSeek-V3", # 尝试使用 DeepSeek V3,如果不行可以换 Qwen
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_message}
],
stream=False
)
ai_reply = response.choices[0].message.content
return jsonify({'success': True, 'reply': ai_reply})
except Exception as e:
print(f"SiliconFlow API Error: {e}")
return jsonify({'success': False, 'reply': "抱歉,AI 通信暂时中断,请检查网络或 Key 配置。"})
if __name__ == '__main__':
# 确保 app.run 在主线程
app.run(host='0.0.0.0', port=7860, debug=False) # Hugging Face usually expects port 7860