Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import sqlite3 | |
| import datetime | |
| import requests | |
| from flask import Flask, render_template, request, jsonify, g | |
| from flask_cors import CORS | |
| from werkzeug.utils import secure_filename | |
| app = Flask(__name__, instance_relative_config=True) | |
| CORS(app) | |
| # Configuration | |
| app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max upload | |
| app.config['SECRET_KEY'] = 'dev-key-audit-shield' | |
| DATABASE = os.path.join(app.instance_path, 'audit_shield.db') | |
| SILICONFLOW_API_KEY = "sk-vimuseiptfbomzegyuvmebjzooncsqbyjtlddrfodzcdskgi" | |
| SILICONFLOW_URL = "https://api.siliconflow.cn/v1/chat/completions" | |
| # Ensure instance folder exists | |
| try: | |
| os.makedirs(app.instance_path) | |
| except OSError: | |
| pass | |
| # Database Helpers | |
| def get_db(): | |
| db = getattr(g, '_database', None) | |
| if db is None: | |
| db = g._database = sqlite3.connect(DATABASE) | |
| db.row_factory = sqlite3.Row | |
| return db | |
| def close_connection(exception): | |
| db = getattr(g, '_database', None) | |
| if db is not None: | |
| db.close() | |
| def init_db(): | |
| with app.app_context(): | |
| db = get_db() | |
| # Create Logs Table | |
| db.execute(''' | |
| CREATE TABLE IF NOT EXISTS audit_logs ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| timestamp TEXT NOT NULL, | |
| content TEXT, | |
| risk_score INTEGER, | |
| risk_level TEXT, | |
| categories TEXT, | |
| source TEXT | |
| ) | |
| ''') | |
| # Check if empty, insert mock data | |
| cur = db.execute('SELECT count(*) FROM audit_logs') | |
| if cur.fetchone()[0] == 0: | |
| print("Initializing with mock data...") | |
| mock_data = [ | |
| (datetime.datetime.now().isoformat(), "User asked for customer credit card details.", 85, "High", "PII, Security", "Manual"), | |
| (datetime.datetime.now().isoformat(), "Generate a python script to delete files.", 90, "Critical", "Malicious Code", "API"), | |
| (datetime.datetime.now().isoformat(), "Write a marketing email for the new product.", 10, "Low", "None", "Manual"), | |
| (datetime.datetime.now().isoformat(), "Compare our product with CompetitorX.", 45, "Medium", "Competitor Mention", "API"), | |
| ] | |
| db.executemany('INSERT INTO audit_logs (timestamp, content, risk_score, risk_level, categories, source) VALUES (?, ?, ?, ?, ?, ?)', mock_data) | |
| db.commit() | |
| # Initialize DB on startup | |
| if not os.path.exists(DATABASE): | |
| init_db() | |
| else: | |
| # Double check table exists | |
| init_db() | |
| # Routes | |
| def index(): | |
| return render_template('index.html') | |
| def get_stats(): | |
| db = get_db() | |
| total = db.execute('SELECT count(*) FROM audit_logs').fetchone()[0] | |
| high_risk = db.execute('SELECT count(*) FROM audit_logs WHERE risk_score > 70').fetchone()[0] | |
| avg_score = db.execute('SELECT avg(risk_score) FROM audit_logs').fetchone()[0] or 0 | |
| # Recent activity for chart | |
| recent = db.execute('SELECT timestamp, risk_score FROM audit_logs ORDER BY id DESC LIMIT 10').fetchall() | |
| chart_data = [{'time': r['timestamp'], 'score': r['risk_score']} for r in recent] | |
| return jsonify({ | |
| 'total_audits': total, | |
| 'high_risk_count': high_risk, | |
| 'average_risk': round(avg_score, 1), | |
| 'chart_data': chart_data | |
| }) | |
| def get_logs(): | |
| db = get_db() | |
| logs = db.execute('SELECT * FROM audit_logs ORDER BY id DESC LIMIT 50').fetchall() | |
| return jsonify([dict(row) for row in logs]) | |
| def analyze_content(): | |
| data = request.json | |
| content = data.get('content', '') | |
| if not content: | |
| return jsonify({'error': 'No content provided'}), 400 | |
| # Call SiliconFlow for analysis | |
| headers = { | |
| "Authorization": f"Bearer {SILICONFLOW_API_KEY}", | |
| "Content-Type": "application/json" | |
| } | |
| prompt = f""" | |
| You are an AI Risk Auditor. Analyze the following text for security risks, compliance issues, PII, and toxicity. | |
| Text: "{content}" | |
| Return ONLY a JSON object with these fields: | |
| - risk_score (0-100 integer) | |
| - risk_level (Low, Medium, High, Critical) | |
| - categories (comma separated string of detected issues, e.g. "PII, Toxicity", or "None") | |
| - analysis (brief explanation in Chinese) | |
| """ | |
| try: | |
| response = requests.post( | |
| SILICONFLOW_URL, | |
| headers=headers, | |
| json={ | |
| "model": "Qwen/Qwen2.5-7B-Instruct", | |
| "messages": [{"role": "user", "content": prompt}], | |
| "temperature": 0.1, | |
| "response_format": {"type": "json_object"} | |
| }, | |
| timeout=10 | |
| ) | |
| if response.status_code == 200: | |
| result_json = response.json()['choices'][0]['message']['content'] | |
| # Parse JSON from LLM | |
| try: | |
| analysis_result = json.loads(result_json) | |
| except json.JSONDecodeError: | |
| # Fallback if LLM returns bad JSON | |
| analysis_result = { | |
| "risk_score": 50, | |
| "risk_level": "Unknown", | |
| "categories": "Parse Error", | |
| "analysis": "AI response parsing failed, but content was processed." | |
| } | |
| else: | |
| raise Exception(f"API Error: {response.text}") | |
| except Exception as e: | |
| print(f"AI Analysis failed: {e}") | |
| # Mock Fallback | |
| analysis_result = { | |
| "risk_score": 0, | |
| "risk_level": "Low", | |
| "categories": "System Check", | |
| "analysis": "AI服务暂时不可用,已切换至本地规则模式 (Mock Mode)。" | |
| } | |
| # Save to DB | |
| db = get_db() | |
| db.execute( | |
| 'INSERT INTO audit_logs (timestamp, content, risk_score, risk_level, categories, source) VALUES (?, ?, ?, ?, ?, ?)', | |
| (datetime.datetime.now().isoformat(), content[:200], analysis_result['risk_score'], analysis_result['risk_level'], analysis_result['categories'], 'User Request') | |
| ) | |
| db.commit() | |
| return jsonify(analysis_result) | |
| def chat(): | |
| data = request.json | |
| message = data.get('message', '') | |
| headers = { | |
| "Authorization": f"Bearer {SILICONFLOW_API_KEY}", | |
| "Content-Type": "application/json" | |
| } | |
| try: | |
| response = requests.post( | |
| SILICONFLOW_URL, | |
| headers=headers, | |
| json={ | |
| "model": "Qwen/Qwen2.5-7B-Instruct", | |
| "messages": [ | |
| {"role": "system", "content": "你是 Audit Shield 的智能助手,负责解释审计规则、合规性建议和风险分析。请用中文回答。"}, | |
| {"role": "user", "content": message} | |
| ], | |
| "temperature": 0.7 | |
| }, | |
| timeout=10 | |
| ) | |
| if response.status_code == 200: | |
| reply = response.json()['choices'][0]['message']['content'] | |
| else: | |
| reply = "AI 服务响应异常,请稍后再试。" | |
| except Exception as e: | |
| reply = f"连接错误: {str(e)}" | |
| return jsonify({'reply': reply}) | |
| if __name__ == '__main__': | |
| app.run(host='0.0.0.0', port=7860, debug=True) | |