import os import json import sqlite3 import datetime import requests from flask import Flask, render_template, request, jsonify, g from flask_cors import CORS from werkzeug.utils import secure_filename app = Flask(__name__, instance_relative_config=True) CORS(app) # Configuration app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max upload app.config['SECRET_KEY'] = 'dev-key-audit-shield' DATABASE = os.path.join(app.instance_path, 'audit_shield.db') SILICONFLOW_API_KEY = "sk-vimuseiptfbomzegyuvmebjzooncsqbyjtlddrfodzcdskgi" SILICONFLOW_URL = "https://api.siliconflow.cn/v1/chat/completions" # Ensure instance folder exists try: os.makedirs(app.instance_path) except OSError: pass # Database Helpers def get_db(): db = getattr(g, '_database', None) if db is None: db = g._database = sqlite3.connect(DATABASE) db.row_factory = sqlite3.Row return db @app.teardown_appcontext def close_connection(exception): db = getattr(g, '_database', None) if db is not None: db.close() def init_db(): with app.app_context(): db = get_db() # Create Logs Table db.execute(''' CREATE TABLE IF NOT EXISTS audit_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, content TEXT, risk_score INTEGER, risk_level TEXT, categories TEXT, source TEXT ) ''') # Check if empty, insert mock data cur = db.execute('SELECT count(*) FROM audit_logs') if cur.fetchone()[0] == 0: print("Initializing with mock data...") mock_data = [ (datetime.datetime.now().isoformat(), "User asked for customer credit card details.", 85, "High", "PII, Security", "Manual"), (datetime.datetime.now().isoformat(), "Generate a python script to delete files.", 90, "Critical", "Malicious Code", "API"), (datetime.datetime.now().isoformat(), "Write a marketing email for the new product.", 10, "Low", "None", "Manual"), (datetime.datetime.now().isoformat(), "Compare our product with CompetitorX.", 45, "Medium", "Competitor Mention", "API"), ] db.executemany('INSERT INTO audit_logs (timestamp, content, risk_score, risk_level, categories, source) VALUES (?, ?, ?, ?, ?, ?)', mock_data) db.commit() # Initialize DB on startup if not os.path.exists(DATABASE): init_db() else: # Double check table exists init_db() # Routes @app.route('/') def index(): return render_template('index.html') @app.route('/api/stats', methods=['GET']) def get_stats(): db = get_db() total = db.execute('SELECT count(*) FROM audit_logs').fetchone()[0] high_risk = db.execute('SELECT count(*) FROM audit_logs WHERE risk_score > 70').fetchone()[0] avg_score = db.execute('SELECT avg(risk_score) FROM audit_logs').fetchone()[0] or 0 # Recent activity for chart recent = db.execute('SELECT timestamp, risk_score FROM audit_logs ORDER BY id DESC LIMIT 10').fetchall() chart_data = [{'time': r['timestamp'], 'score': r['risk_score']} for r in recent] return jsonify({ 'total_audits': total, 'high_risk_count': high_risk, 'average_risk': round(avg_score, 1), 'chart_data': chart_data }) @app.route('/api/logs', methods=['GET']) def get_logs(): db = get_db() logs = db.execute('SELECT * FROM audit_logs ORDER BY id DESC LIMIT 50').fetchall() return jsonify([dict(row) for row in logs]) @app.route('/api/analyze', methods=['POST']) def analyze_content(): data = request.json content = data.get('content', '') if not content: return jsonify({'error': 'No content provided'}), 400 # Call SiliconFlow for analysis headers = { "Authorization": f"Bearer {SILICONFLOW_API_KEY}", "Content-Type": "application/json" } prompt = f""" You are an AI Risk Auditor. Analyze the following text for security risks, compliance issues, PII, and toxicity. Text: "{content}" Return ONLY a JSON object with these fields: - risk_score (0-100 integer) - risk_level (Low, Medium, High, Critical) - categories (comma separated string of detected issues, e.g. "PII, Toxicity", or "None") - analysis (brief explanation in Chinese) """ try: response = requests.post( SILICONFLOW_URL, headers=headers, json={ "model": "Qwen/Qwen2.5-7B-Instruct", "messages": [{"role": "user", "content": prompt}], "temperature": 0.1, "response_format": {"type": "json_object"} }, timeout=10 ) if response.status_code == 200: result_json = response.json()['choices'][0]['message']['content'] # Parse JSON from LLM try: analysis_result = json.loads(result_json) except json.JSONDecodeError: # Fallback if LLM returns bad JSON analysis_result = { "risk_score": 50, "risk_level": "Unknown", "categories": "Parse Error", "analysis": "AI response parsing failed, but content was processed." } else: raise Exception(f"API Error: {response.text}") except Exception as e: print(f"AI Analysis failed: {e}") # Mock Fallback analysis_result = { "risk_score": 0, "risk_level": "Low", "categories": "System Check", "analysis": "AI服务暂时不可用,已切换至本地规则模式 (Mock Mode)。" } # Save to DB db = get_db() db.execute( 'INSERT INTO audit_logs (timestamp, content, risk_score, risk_level, categories, source) VALUES (?, ?, ?, ?, ?, ?)', (datetime.datetime.now().isoformat(), content[:200], analysis_result['risk_score'], analysis_result['risk_level'], analysis_result['categories'], 'User Request') ) db.commit() return jsonify(analysis_result) @app.route('/api/chat', methods=['POST']) def chat(): data = request.json message = data.get('message', '') headers = { "Authorization": f"Bearer {SILICONFLOW_API_KEY}", "Content-Type": "application/json" } try: response = requests.post( SILICONFLOW_URL, headers=headers, json={ "model": "Qwen/Qwen2.5-7B-Instruct", "messages": [ {"role": "system", "content": "你是 Audit Shield 的智能助手,负责解释审计规则、合规性建议和风险分析。请用中文回答。"}, {"role": "user", "content": message} ], "temperature": 0.7 }, timeout=10 ) if response.status_code == 200: reply = response.json()['choices'][0]['message']['content'] else: reply = "AI 服务响应异常,请稍后再试。" except Exception as e: reply = f"连接错误: {str(e)}" return jsonify({'reply': reply}) if __name__ == '__main__': app.run(host='0.0.0.0', port=7860, debug=True)