Trae Assistant
feat: enhance features, ui, and localization
3b0d577
import os
import json
import sqlite3
import datetime
import requests
import pandas as pd
from flask import Flask, render_template, request, jsonify, g
from werkzeug.utils import secure_filename
app = Flask(__name__)
app.config['SECRET_KEY'] = 'carbon-trace-secret-key'
app.config['DATABASE'] = os.path.join(app.instance_path, 'carbon.db')
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max upload
app.config['SILICONFLOW_API_KEY'] = 'sk-vimuseiptfbomzegyuvmebjzooncsqbyjtlddrfodzcdskgi'
app.config['SILICONFLOW_URL'] = 'https://api.siliconflow.cn/v1/chat/completions'
# Ensure instance folder exists
try:
os.makedirs(app.instance_path)
except OSError:
pass
def get_db():
if 'db' not in g:
g.db = sqlite3.connect(app.config['DATABASE'])
g.db.row_factory = sqlite3.Row
return g.db
@app.teardown_appcontext
def close_db(error):
db = g.pop('db', None)
if db is not None:
db.close()
def init_db():
with app.app_context():
db = get_db()
db.executescript('''
CREATE TABLE IF NOT EXISTS emissions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source TEXT NOT NULL,
scope TEXT NOT NULL,
amount REAL NOT NULL,
unit TEXT NOT NULL,
date TEXT NOT NULL,
notes TEXT
);
CREATE TABLE IF NOT EXISTS reports (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
content TEXT NOT NULL,
created_at TEXT NOT NULL
);
''')
db.commit()
# --- Helpers ---
def mock_ai_response(prompt_type):
"""Fallback response if AI service is unavailable."""
if prompt_type == 'analysis':
return """
# ESG 分析报告 (模拟生成)
## 1. 排放分析
- **主要来源**: 您的主要碳排放来源于 **Scope 2 (电力消耗)**,占总排放的 60%。
- **趋势**: 相比上月,整体排放量呈 **上升趋势**。
## 2. 减排建议
- **电力优化**: 建议更换为 LED 照明,并考虑采购绿色电力。
- **供应链管理**: 审查 Scope 3 供应商的碳足迹。
## 3. 预计影响
- 实施上述措施后,预计年度碳排放可降低 **15% - 20%**。
"""
elif prompt_type == 'chat':
return "(AI 服务暂时不可用,这是自动回复)建议您关注 ISO 14064 标准,并开始收集 Scope 1 和 Scope 2 的基础数据。"
return "服务暂不可用。"
# --- Routes ---
@app.route('/api/seed', methods=['POST'])
def seed_data():
"""Add demo data."""
db = get_db()
# Check if data exists
cursor = db.execute('SELECT count(*) FROM emissions')
if cursor.fetchone()[0] > 0:
return jsonify({'status': 'skipped', 'message': 'Data already exists'})
demo_data = [
('工厂主电力', 'Scope 2', 120.5, 'tCO2e', (datetime.date.today() - datetime.timedelta(days=1)).isoformat(), '主要生产线用电'),
('车队燃油', 'Scope 1', 45.2, 'tCO2e', (datetime.date.today() - datetime.timedelta(days=2)).isoformat(), '物流运输'),
('员工差旅', 'Scope 3', 12.8, 'tCO2e', (datetime.date.today() - datetime.timedelta(days=5)).isoformat(), '销售团队出差'),
('办公供暖', 'Scope 1', 30.1, 'tCO2e', (datetime.date.today() - datetime.timedelta(days=10)).isoformat(), '冬季供暖'),
('外购蒸汽', 'Scope 2', 80.0, 'tCO2e', (datetime.date.today() - datetime.timedelta(days=15)).isoformat(), '生产工艺需要')
]
db.executemany(
'INSERT INTO emissions (source, scope, amount, unit, date, notes) VALUES (?, ?, ?, ?, ?, ?)',
demo_data
)
db.commit()
return jsonify({'status': 'success', 'message': 'Demo data added'})
@app.route('/api/upload', methods=['POST'])
def upload_file():
if 'file' not in request.files:
return jsonify({'error': 'No file part'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'No selected file'}), 400
if file:
try:
filename = secure_filename(file.filename)
if filename.endswith('.csv'):
df = pd.read_csv(file)
elif filename.endswith(('.xls', '.xlsx')):
df = pd.read_excel(file)
else:
return jsonify({'error': 'Invalid file type. Please upload CSV or Excel.'}), 400
# Basic validation and mapping
# Expect columns: source, scope, amount, unit, date, notes
required_cols = ['source', 'scope', 'amount']
if not all(col in df.columns for col in required_cols):
return jsonify({'error': f'Missing required columns: {", ".join(required_cols)}'}), 400
db = get_db()
for _, row in df.iterrows():
db.execute(
'INSERT INTO emissions (source, scope, amount, unit, date, notes) VALUES (?, ?, ?, ?, ?, ?)',
(
str(row.get('source', 'Unknown')),
str(row.get('scope', 'Scope 1')),
float(row.get('amount', 0)),
str(row.get('unit', 'tCO2e')),
str(row.get('date', datetime.date.today().isoformat())),
str(row.get('notes', 'Imported'))
)
)
db.commit()
return jsonify({'status': 'success', 'count': len(df)})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/')
def index():
return render_template('index.html')
@app.route('/api/emissions', methods=['GET', 'POST'])
def handle_emissions():
db = get_db()
if request.method == 'POST':
data = request.json
db.execute(
'INSERT INTO emissions (source, scope, amount, unit, date, notes) VALUES (?, ?, ?, ?, ?, ?)',
(data['source'], data['scope'], float(data['amount']), data['unit'], data['date'], data.get('notes', ''))
)
db.commit()
return jsonify({'status': 'success'})
else:
cursor = db.execute('SELECT * FROM emissions ORDER BY date DESC')
rows = cursor.fetchall()
return jsonify([dict(row) for row in rows])
@app.route('/api/emissions/<int:id>', methods=['DELETE'])
def delete_emission(id):
db = get_db()
db.execute('DELETE FROM emissions WHERE id = ?', (id,))
db.commit()
return jsonify({'status': 'success'})
@app.route('/api/reports', methods=['GET'])
def get_reports():
db = get_db()
cursor = db.execute('SELECT * FROM reports ORDER BY created_at DESC')
rows = cursor.fetchall()
return jsonify([dict(row) for row in rows])
@app.route('/api/analyze', methods=['POST'])
def analyze_emissions():
"""Uses SiliconFlow to analyze emission data and suggest reductions."""
data = request.json
emissions = data.get('emissions', [])
# Prepare context for LLM
context = f"Emission Data: {json.dumps(emissions)}"
prompt = f"""
Role: You are an expert ESG (Environmental, Social, and Governance) Consultant and Carbon Footprint Analyst.
Task: Analyze the provided carbon emission data.
1. Identify the largest sources of emissions.
2. Suggest concrete, actionable strategies to reduce these emissions.
3. Estimate potential percentage reduction for each strategy.
Data:
{context}
Format: Provide the response in Markdown format, with clear headings for 'Analysis', 'Reduction Strategies', and 'Estimated Impact'.
"""
try:
response = requests.post(
app.config['SILICONFLOW_URL'],
headers={
"Authorization": f"Bearer {app.config['SILICONFLOW_API_KEY']}",
"Content-Type": "application/json"
},
json={
"model": "Qwen/Qwen2.5-7B-Instruct",
"messages": [
{"role": "system", "content": "You are a helpful ESG assistant."},
{"role": "user", "content": prompt}
],
"stream": False,
"max_tokens": 1024
},
timeout=30
)
if response.status_code != 200:
raise Exception(f"API Error: {response.status_code}")
response.raise_for_status()
ai_content = response.json()['choices'][0]['message']['content']
except Exception as e:
print(f"AI API failed: {e}. Using mock response.")
ai_content = mock_ai_response('analysis')
# Save report
db = get_db()
title = f"ESG Analysis Report - {datetime.datetime.now().strftime('%Y-%m-%d %H:%M')}"
db.execute('INSERT INTO reports (title, content, created_at) VALUES (?, ?, ?)',
(title, ai_content, datetime.datetime.now().isoformat()))
db.commit()
return jsonify({'report': ai_content, 'title': title})
@app.route('/api/chat', methods=['POST'])
def chat():
data = request.json
message = data.get('message')
history = data.get('history', [])
messages = [
{"role": "system", "content": "You are CarbonTrace AI, an expert in Carbon Management, ESG Compliance (Scope 1, 2, 3), and Sustainability. Provide concise, professional advice."}
]
# Add history (last 5 messages)
for msg in history[-5:]:
messages.append(msg)
messages.append({"role": "user", "content": message})
try:
response = requests.post(
app.config['SILICONFLOW_URL'],
headers={
"Authorization": f"Bearer {app.config['SILICONFLOW_API_KEY']}",
"Content-Type": "application/json"
},
json={
"model": "Qwen/Qwen2.5-7B-Instruct",
"messages": messages,
"stream": False,
"max_tokens": 512
},
timeout=30
)
if response.status_code != 200:
raise Exception(f"API Error: {response.status_code}")
response.raise_for_status()
reply = response.json()['choices'][0]['message']['content']
return jsonify({'reply': reply})
except Exception as e:
print(f"AI API failed: {e}. Using mock response.")
return jsonify({'reply': mock_ai_response('chat')})
if __name__ == '__main__':
init_db()
app.run(host='0.0.0.0', port=7860)