Spaces:
Sleeping
Sleeping
File size: 7,716 Bytes
7bb2782 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 | import os
import json
import uuid
from flask import Flask, render_template, request, jsonify, send_from_directory
app = Flask(__name__)
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB limit for safety
# 配置
DATA_DIR = os.path.join(os.path.dirname(__file__), 'data')
DATA_FILE = os.path.join(DATA_DIR, 'postmortems.json')
# 确保数据目录存在
os.makedirs(DATA_DIR, exist_ok=True)
def load_data():
if not os.path.exists(DATA_FILE):
# 初始化默认数据
default_data = [
{
"id": "demo-001",
"title": "生产环境数据库连接池耗尽导致服务不可用",
"date": "2023-10-24",
"severity": "P0",
"owner": "张三",
"created_at": "2023-10-24T10:00:00Z",
"summary": "在双十一预热活动期间,主数据库连接数突然飙升至 100%,导致订单服务无法连接数据库,所有下单请求失败。持续时间约 15 分钟。",
"impact": "造成约 5000 单交易失败,预估 GMV 损失 100 万。用户投诉激增。",
"timeline": [
{"time": "20:00", "description": "流量开始激增,达到平时峰值的 3 倍", "type": "detection"},
{"time": "20:05", "description": "监控系统报警,DB CPU 飙升至 90%", "type": "detection"},
{"time": "20:08", "description": "DBA 介入排查,发现活动连接数打满", "type": "investigation"},
{"time": "20:12", "description": "紧急扩容 Read Replica,并临时调大 Max Connections", "type": "fix"},
{"time": "20:15", "description": "服务逐步恢复正常", "type": "verification"}
],
"root_cause": {
"whys": [
"数据库连接数打满,拒绝新连接",
"后端服务实例扩容后,每个实例的连接池配置未做相应调整",
"微服务配置中心下发的连接池参数是静态的,未随实例数动态计算",
"在进行压力测试时,只测试了单实例性能,未测试全链路大规模并发下的 DB 瓶颈",
"缺乏对数据库总连接数的全局管控机制"
],
"conclusion": "缺乏全局的数据库连接治理机制,且压测覆盖不全。"
},
"action_items": [
{"task": "实施数据库连接代理 (Proxy) 层", "owner": "李四", "deadline": "2023-11-01", "status": "in_progress"},
{"task": "优化全链路压测模型,包含 DB 极限场景", "owner": "王五", "deadline": "2023-11-15", "status": "pending"}
]
},
{
"id": "demo-002",
"title": "支付回调接口验签逻辑缺陷",
"date": "2023-09-15",
"severity": "P2",
"owner": "赵六",
"created_at": "2023-09-15T14:00:00Z",
"summary": "收到用户反馈支付成功但订单状态未更新。排查发现部分特殊字符导致签名验证失败。",
"impact": "约 20 笔订单卡单,需人工补单。",
"timeline": [
{"time": "14:00", "description": "客服收到用户反馈", "type": "detection"},
{"time": "14:30", "description": "定位到日志中存在大量验签失败错误", "type": "investigation"},
{"time": "15:00", "description": "修复验签逻辑中的字符编码处理问题", "type": "fix"}
],
"root_cause": {
"whys": [
"签名验证失败",
"第三方支付回调参数中包含特殊 Emoji 字符",
"验签逻辑未正确处理 UTF-8 编码",
"开发测试阶段未覆盖特殊字符场景",
""
],
"conclusion": "编码规范执行不到位,测试用例缺失。"
},
"action_items": [
{"task": "修复代码并补充单元测试", "owner": "赵六", "deadline": "2023-09-16", "status": "done"}
]
}
]
save_data(default_data)
return default_data
try:
with open(DATA_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception:
return []
def save_data(data):
with open(DATA_FILE, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
@app.route('/')
def index():
return render_template('index.html')
@app.route('/api/postmortems', methods=['GET'])
def get_postmortems():
data = load_data()
# 按最后修改时间或创建时间排序(这里简单按列表顺序倒序)
return jsonify(data[::-1])
@app.route('/api/postmortems', methods=['POST'])
def create_postmortem():
data = load_data()
new_item = request.json
# 基础验证与默认值
if not new_item.get('id'):
new_item['id'] = str(uuid.uuid4())
new_item['created_at'] = new_item.get('created_at', '')
new_item['updated_at'] = new_item.get('updated_at', '')
data.append(new_item)
save_data(data)
return jsonify(new_item), 201
@app.route('/api/postmortems/<item_id>', methods=['PUT'])
def update_postmortem(item_id):
data = load_data()
updated_item = request.json
for i, item in enumerate(data):
if item['id'] == item_id:
data[i] = updated_item
save_data(data)
return jsonify(updated_item)
return jsonify({'error': 'Not found'}), 404
@app.route('/api/postmortems/<item_id>', methods=['DELETE'])
def delete_postmortem(item_id):
data = load_data()
data = [item for item in data if item['id'] != item_id]
save_data(data)
return jsonify({'success': True})
@app.route('/api/export/<item_id>', methods=['GET'])
def export_markdown(item_id):
data = load_data()
item = next((i for i in data if i['id'] == item_id), None)
if not item:
return "Not Found", 404
# 生成 Markdown 内容
md = f"# 事故复盘: {item.get('title', '无标题')}\n\n"
md += f"**日期**: {item.get('date', '')} | **级别**: {item.get('severity', '')} | **负责人**: {item.get('owner', '')}\n\n"
md += "## 1. 事故摘要 (Summary)\n"
md += f"{item.get('summary', '无')}\n\n"
md += "## 2. 影响范围 (Impact)\n"
md += f"{item.get('impact', '无')}\n\n"
md += "## 3. 时间线 (Timeline)\n"
for t in item.get('timeline', []):
md += f"- **{t.get('time', '')}**: {t.get('description', '')} ({t.get('type', '')})\n"
md += "\n"
md += "## 4. 根因分析 (5 Whys)\n"
whys = item.get('root_cause', {}).get('whys', [])
for idx, why in enumerate(whys):
if why:
md += f"{idx+1}. Why? {why}\n"
md += f"\n**结论**: {item.get('root_cause', {}).get('conclusion', '')}\n\n"
md += "## 5. 改进措施 (Action Items)\n"
md += "| 任务 | 负责人 | 截止日期 | 状态 |\n"
md += "| --- | --- | --- | --- |\n"
for action in item.get('action_items', []):
md += f"| {action.get('task', '')} | {action.get('owner', '')} | {action.get('deadline', '')} | {action.get('status', '')} |\n"
return jsonify({'markdown': md})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860, debug=True)
|