Trae Assistant
Initial commit
f7d14bd
import os
import json
import sqlite3
import datetime
import requests
from flask import Flask, render_template, request, jsonify, g
from flask_cors import CORS
from werkzeug.utils import secure_filename
app = Flask(__name__, instance_relative_config=True)
CORS(app)
# Configuration
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max upload
app.config['SECRET_KEY'] = 'dev-key-audit-shield'
DATABASE = os.path.join(app.instance_path, 'audit_shield.db')
SILICONFLOW_API_KEY = "sk-vimuseiptfbomzegyuvmebjzooncsqbyjtlddrfodzcdskgi"
SILICONFLOW_URL = "https://api.siliconflow.cn/v1/chat/completions"
# Ensure instance folder exists
try:
os.makedirs(app.instance_path)
except OSError:
pass
# Database Helpers
def get_db():
db = getattr(g, '_database', None)
if db is None:
db = g._database = sqlite3.connect(DATABASE)
db.row_factory = sqlite3.Row
return db
@app.teardown_appcontext
def close_connection(exception):
db = getattr(g, '_database', None)
if db is not None:
db.close()
def init_db():
with app.app_context():
db = get_db()
# Create Logs Table
db.execute('''
CREATE TABLE IF NOT EXISTS audit_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
content TEXT,
risk_score INTEGER,
risk_level TEXT,
categories TEXT,
source TEXT
)
''')
# Check if empty, insert mock data
cur = db.execute('SELECT count(*) FROM audit_logs')
if cur.fetchone()[0] == 0:
print("Initializing with mock data...")
mock_data = [
(datetime.datetime.now().isoformat(), "User asked for customer credit card details.", 85, "High", "PII, Security", "Manual"),
(datetime.datetime.now().isoformat(), "Generate a python script to delete files.", 90, "Critical", "Malicious Code", "API"),
(datetime.datetime.now().isoformat(), "Write a marketing email for the new product.", 10, "Low", "None", "Manual"),
(datetime.datetime.now().isoformat(), "Compare our product with CompetitorX.", 45, "Medium", "Competitor Mention", "API"),
]
db.executemany('INSERT INTO audit_logs (timestamp, content, risk_score, risk_level, categories, source) VALUES (?, ?, ?, ?, ?, ?)', mock_data)
db.commit()
# Initialize DB on startup
if not os.path.exists(DATABASE):
init_db()
else:
# Double check table exists
init_db()
# Routes
@app.route('/')
def index():
return render_template('index.html')
@app.route('/api/stats', methods=['GET'])
def get_stats():
db = get_db()
total = db.execute('SELECT count(*) FROM audit_logs').fetchone()[0]
high_risk = db.execute('SELECT count(*) FROM audit_logs WHERE risk_score > 70').fetchone()[0]
avg_score = db.execute('SELECT avg(risk_score) FROM audit_logs').fetchone()[0] or 0
# Recent activity for chart
recent = db.execute('SELECT timestamp, risk_score FROM audit_logs ORDER BY id DESC LIMIT 10').fetchall()
chart_data = [{'time': r['timestamp'], 'score': r['risk_score']} for r in recent]
return jsonify({
'total_audits': total,
'high_risk_count': high_risk,
'average_risk': round(avg_score, 1),
'chart_data': chart_data
})
@app.route('/api/logs', methods=['GET'])
def get_logs():
db = get_db()
logs = db.execute('SELECT * FROM audit_logs ORDER BY id DESC LIMIT 50').fetchall()
return jsonify([dict(row) for row in logs])
@app.route('/api/analyze', methods=['POST'])
def analyze_content():
data = request.json
content = data.get('content', '')
if not content:
return jsonify({'error': 'No content provided'}), 400
# Call SiliconFlow for analysis
headers = {
"Authorization": f"Bearer {SILICONFLOW_API_KEY}",
"Content-Type": "application/json"
}
prompt = f"""
You are an AI Risk Auditor. Analyze the following text for security risks, compliance issues, PII, and toxicity.
Text: "{content}"
Return ONLY a JSON object with these fields:
- risk_score (0-100 integer)
- risk_level (Low, Medium, High, Critical)
- categories (comma separated string of detected issues, e.g. "PII, Toxicity", or "None")
- analysis (brief explanation in Chinese)
"""
try:
response = requests.post(
SILICONFLOW_URL,
headers=headers,
json={
"model": "Qwen/Qwen2.5-7B-Instruct",
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.1,
"response_format": {"type": "json_object"}
},
timeout=10
)
if response.status_code == 200:
result_json = response.json()['choices'][0]['message']['content']
# Parse JSON from LLM
try:
analysis_result = json.loads(result_json)
except json.JSONDecodeError:
# Fallback if LLM returns bad JSON
analysis_result = {
"risk_score": 50,
"risk_level": "Unknown",
"categories": "Parse Error",
"analysis": "AI response parsing failed, but content was processed."
}
else:
raise Exception(f"API Error: {response.text}")
except Exception as e:
print(f"AI Analysis failed: {e}")
# Mock Fallback
analysis_result = {
"risk_score": 0,
"risk_level": "Low",
"categories": "System Check",
"analysis": "AI服务暂时不可用,已切换至本地规则模式 (Mock Mode)。"
}
# Save to DB
db = get_db()
db.execute(
'INSERT INTO audit_logs (timestamp, content, risk_score, risk_level, categories, source) VALUES (?, ?, ?, ?, ?, ?)',
(datetime.datetime.now().isoformat(), content[:200], analysis_result['risk_score'], analysis_result['risk_level'], analysis_result['categories'], 'User Request')
)
db.commit()
return jsonify(analysis_result)
@app.route('/api/chat', methods=['POST'])
def chat():
data = request.json
message = data.get('message', '')
headers = {
"Authorization": f"Bearer {SILICONFLOW_API_KEY}",
"Content-Type": "application/json"
}
try:
response = requests.post(
SILICONFLOW_URL,
headers=headers,
json={
"model": "Qwen/Qwen2.5-7B-Instruct",
"messages": [
{"role": "system", "content": "你是 Audit Shield 的智能助手,负责解释审计规则、合规性建议和风险分析。请用中文回答。"},
{"role": "user", "content": message}
],
"temperature": 0.7
},
timeout=10
)
if response.status_code == 200:
reply = response.json()['choices'][0]['message']['content']
else:
reply = "AI 服务响应异常,请稍后再试。"
except Exception as e:
reply = f"连接错误: {str(e)}"
return jsonify({'reply': reply})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860, debug=True)