Trae Assistant
Enhanced Hydro Logic Agent with improved UI, file upload, and robust error handling
1fe2fbd
import os
import json
import sqlite3
import datetime
import requests
import random
import time
from flask import Flask, render_template, jsonify, request, g, send_from_directory
from werkzeug.utils import secure_filename
from werkzeug.exceptions import RequestEntityTooLarge
app = Flask(__name__)
app.config['SECRET_KEY'] = 'hydro-logic-secret-key-2026'
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB 限制
app.config['UPLOAD_FOLDER'] = 'uploads'
# 确保上传目录存在
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
# 允许的文件扩展名
ALLOWED_EXTENSIONS = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif', 'csv', 'json'}
def allowed_file(filename):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
# Database Configuration
DB_NAME = "hydro.db"
def get_db_path():
return os.path.join(app.instance_path, DB_NAME)
def get_db():
db = getattr(g, '_database', None)
if db is None:
db_path = get_db_path()
os.makedirs(os.path.dirname(db_path), exist_ok=True)
db = g._database = sqlite3.connect(db_path)
db.row_factory = sqlite3.Row
return db
@app.teardown_appcontext
def close_connection(exception):
db = getattr(g, '_database', None)
if db is not None:
db.close()
# 全局错误处理
@app.errorhandler(404)
def page_not_found(e):
return render_template('index.html'), 200 # SPA 模式下返回主页或错误页
@app.errorhandler(500)
def internal_server_error(e):
return jsonify({"error": "Internal Server Error", "message": str(e)}), 500
@app.errorhandler(413)
@app.errorhandler(RequestEntityTooLarge)
def request_entity_too_large(e):
return jsonify({"error": "File Too Large", "message": "文件大小超过限制 (最大 16MB)"}), 413
def init_db():
with app.app_context():
db = get_db()
cursor = db.cursor()
# Incidents Table
cursor.execute('''
CREATE TABLE IF NOT EXISTS incidents (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
type TEXT NOT NULL,
location TEXT NOT NULL,
severity TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'Open',
analysis TEXT,
plan TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Sensors/Assets Table (Mock Data Source)
cursor.execute('''
CREATE TABLE IF NOT EXISTS sensors (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
type TEXT NOT NULL,
location TEXT NOT NULL,
status TEXT NOT NULL,
value REAL,
unit TEXT
)
''')
# Check if sensors empty, seed data
cursor.execute('SELECT count(*) FROM sensors')
if cursor.fetchone()[0] == 0:
seed_sensors = [
('泵站 Alpha (Pump Station Alpha)', '水泵', 'A区 (Zone A)', '运行中 (Active)', 85.5, '%'),
('阀门节点 102 (Valve Node 102)', '阀门', 'B区 (Zone B)', '运行中 (Active)', 100.0, '%'),
('水质传感器 Q1 (Quality Sensor Q1)', '水质', 'A区 (Zone A)', '正常 (Normal)', 7.2, 'pH'),
('压力表 P4 (Pressure Gauge P4)', '压力', 'C区 (Zone C)', '警告 (Warning)', 120.0, 'PSI'),
('流量计 F7 (Flow Meter F7)', '流量', 'B区 (Zone B)', '运行中 (Active)', 450.0, 'L/min'),
(' turbidity T3 (Turbidity T3)', '浊度', 'D区 (Zone D)', '正常 (Normal)', 1.5, 'NTU'),
('液位计 L9 (Level Meter L9)', '液位', '储水池 (Reservoir)', '正常 (Normal)', 4.2, 'm')
]
cursor.executemany('INSERT INTO sensors (name, type, location, status, value, unit) VALUES (?, ?, ?, ?, ?, ?)', seed_sensors)
db.commit()
# SiliconFlow API Configuration
SILICON_API_KEY = "sk-vimuseiptfbomzegyuvmebjzooncsqbyjtlddrfodzcdskgi"
SILICON_API_URL = "https://api.siliconflow.cn/v1/chat/completions"
def call_ai_agent(prompt, system_role="You are an expert Water Management AI assistant."):
"""Calls SiliconFlow API with fallback."""
if not SILICON_API_KEY:
return mock_ai_response(prompt)
headers = {
"Authorization": f"Bearer {SILICON_API_KEY}",
"Content-Type": "application/json"
}
payload = {
"model": "Qwen/Qwen2.5-7B-Instruct",
"messages": [
{"role": "system", "content": system_role},
{"role": "user", "content": prompt}
],
"temperature": 0.7
}
try:
# 设置较短的超时,以便快速回退到 mock
response = requests.post(SILICON_API_URL, json=payload, headers=headers, timeout=5)
if response.status_code == 200:
data = response.json()
return data['choices'][0]['message']['content']
else:
print(f"API Error: {response.status_code} - {response.text}")
return mock_ai_response(prompt)
except Exception as e:
print(f"API Exception: {str(e)}")
return mock_ai_response(prompt)
def mock_ai_response(prompt):
"""Fallback mock response with rich formatting."""
time.sleep(1) # Simulate thinking
base_response = """**[AI 智能诊断 (模拟模式)]**
根据传感器数据的实时监测,系统检测到以下情况:
### 1. 异常分析 (Diagnosis)
- **C区压力异常**: 压力表 P4 读数显示 120.0 PSI,超过正常阈值 (100 PSI)。可能存在管道堵塞或局部压力激增。
- **水质正常**: pH 值保持在 7.2,符合饮用水标准。
### 2. 风险等级 (Risk Level)
- **等级**: <span style="color:orange">**中等 (Medium)**</span>
- 潜在影响:如果不及时处理,可能导致 C 区管道破裂。
### 3. 建议操作 (Recommended Action)
1. 立即检查 C 区减压阀状态。
2. 派遣维修人员前往 C 区进行现场勘查。
3. 暂时降低 A 区泵站输出功率以缓解管网压力。
### 4. 执行指令 (Output Command)
```json
{
"commands": [
{"target": "valve_c_relief", "action": "open", "value": 50},
{"target": "pump_station_alpha", "action": "set_speed", "value": 70}
]
}
```
"""
return base_response
# Routes
@app.route('/')
def index():
return render_template('index.html')
@app.route('/api/stats')
def get_stats():
db = get_db()
cursor = db.cursor()
cursor.execute('SELECT count(*) FROM incidents WHERE status="Open"')
open_incidents = cursor.fetchone()[0]
cursor.execute('SELECT count(*) FROM sensors WHERE status="运行中 (Active)" OR status="正常 (Normal)"')
active_sensors = cursor.fetchone()[0]
cursor.execute('SELECT count(*) FROM sensors WHERE status="警告 (Warning)" OR status="严重 (Critical)"')
alerts = cursor.fetchone()[0]
# Mock real-time efficiency
efficiency = random.randint(85, 99)
return jsonify({
"open_incidents": open_incidents,
"active_sensors": active_sensors,
"alerts": alerts,
"efficiency": efficiency
})
@app.route('/api/sensors')
def get_sensors():
db = get_db()
cursor = db.cursor()
cursor.execute('SELECT * FROM sensors')
rows = cursor.fetchall()
return jsonify([dict(row) for row in rows])
@app.route('/api/incidents', methods=['GET', 'POST'])
def handle_incidents():
db = get_db()
cursor = db.cursor()
if request.method == 'POST':
data = request.json
cursor.execute(
'INSERT INTO incidents (title, type, location, severity, analysis, plan) VALUES (?, ?, ?, ?, ?, ?)',
(data['title'], data['type'], data['location'], data['severity'], data.get('analysis', ''), data.get('plan', ''))
)
db.commit()
return jsonify({"status": "success", "message": "工单已创建"})
cursor.execute('SELECT * FROM incidents ORDER BY created_at DESC')
rows = cursor.fetchall()
return jsonify([dict(row) for row in rows])
@app.route('/api/analyze', methods=['POST'])
def analyze_data():
data = request.json
sensor_data = data.get('sensor_data', {})
prompt = f"""
分析以下水务基础设施传感器数据并寻找异常:
{json.dumps(sensor_data)}
请提供 Markdown 格式的结构化报告:
1. **诊断**: 发生了什么?
2. **风险等级**: 低/中/高.
3. **建议操作**: 分步计划.
4. **输出指令**: 包含推荐机器指令的 JSON 块 (例如 valve_close, pump_speed_up).
"""
analysis = call_ai_agent(prompt)
return jsonify({"analysis": analysis})
@app.route('/api/execute_action', methods=['POST'])
def execute_action():
# Simulation of action execution
action = request.json.get('action')
# Logic to update sensor state would go here
time.sleep(0.5) # Simulate network delay
return jsonify({"status": "executed", "timestamp": datetime.datetime.now().isoformat()})
@app.route('/api/upload', methods=['POST'])
def upload_file():
if 'file' not in request.files:
return jsonify({"error": "No file part"}), 400
file = request.files['file']
if file.filename == '':
return jsonify({"error": "No selected file"}), 400
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(filepath)
return jsonify({"message": f"文件 '{filename}' 上传成功", "path": filepath}), 200
else:
return jsonify({"error": "File type not allowed"}), 400
# Initialize DB on start
init_db()
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860, debug=True)