File size: 7,456 Bytes
60016b2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
from datetime import datetime, timedelta
from app.utils.logging import log
def clean_expired_stats(api_call_stats):
"""清理过期统计数据的函数"""
now = datetime.now()
# 清理24小时前的数据
# 清理总调用次数
for hour_key in list(api_call_stats['last_24h']['total'].keys()):
try:
hour_time = datetime.strptime(hour_key, '%Y-%m-%d %H:00')
if (now - hour_time).total_seconds() > 24 * 3600: # 超过24小时
del api_call_stats['last_24h']['total'][hour_key]
except ValueError:
# 如果键格式不正确,直接删除
del api_call_stats['last_24h']['total'][hour_key]
# 清理按端点分类的数据
for endpoint in list(api_call_stats['last_24h']['by_endpoint'].keys()):
if not isinstance(api_call_stats['last_24h']['by_endpoint'][endpoint], dict):
del api_call_stats['last_24h']['by_endpoint'][endpoint]
continue
for hour_key in list(api_call_stats['last_24h']['by_endpoint'][endpoint].keys()):
try:
hour_time = datetime.strptime(hour_key, '%Y-%m-%d %H:00')
if (now - hour_time).total_seconds() > 24 * 3600: # 超过24小时
del api_call_stats['last_24h']['by_endpoint'][endpoint][hour_key]
except ValueError:
# 如果键格式不正确,直接删除
del api_call_stats['last_24h']['by_endpoint'][endpoint][hour_key]
# 清理一小时前的小时统计数据
one_hour_ago = now - timedelta(hours=1)
# 清理总调用次数
for hour_key in list(api_call_stats['hourly']['total'].keys()):
try:
hour_time = datetime.strptime(hour_key, '%Y-%m-%d %H:00')
if hour_time < one_hour_ago:
del api_call_stats['hourly']['total'][hour_key]
except ValueError:
# 如果键格式不正确,直接删除
del api_call_stats['hourly']['total'][hour_key]
# 清理按端点分类的数据
for endpoint in list(api_call_stats['hourly']['by_endpoint'].keys()):
if not isinstance(api_call_stats['hourly']['by_endpoint'][endpoint], dict):
del api_call_stats['hourly']['by_endpoint'][endpoint]
continue
for hour_key in list(api_call_stats['hourly']['by_endpoint'][endpoint].keys()):
try:
hour_time = datetime.strptime(hour_key, '%Y-%m-%d %H:00')
if hour_time < one_hour_ago:
del api_call_stats['hourly']['by_endpoint'][endpoint][hour_key]
except ValueError:
# 如果键格式不正确,直接删除
del api_call_stats['hourly']['by_endpoint'][endpoint][hour_key]
# 清理一分钟前的分钟统计数据
one_minute_ago = now - timedelta(minutes=1)
# 清理总调用次数
for minute_key in list(api_call_stats['minute']['total'].keys()):
try:
minute_time = datetime.strptime(minute_key, '%Y-%m-%d %H:%M')
if minute_time < one_minute_ago:
del api_call_stats['minute']['total'][minute_key]
except ValueError:
# 如果键格式不正确,直接删除
del api_call_stats['minute']['total'][minute_key]
# 清理按端点分类的数据
for endpoint in list(api_call_stats['minute']['by_endpoint'].keys()):
if not isinstance(api_call_stats['minute']['by_endpoint'][endpoint], dict):
del api_call_stats['minute']['by_endpoint'][endpoint]
continue
for minute_key in list(api_call_stats['minute']['by_endpoint'][endpoint].keys()):
try:
minute_time = datetime.strptime(minute_key, '%Y-%m-%d %H:%M')
if minute_time < one_minute_ago:
del api_call_stats['minute']['by_endpoint'][endpoint][minute_key]
except ValueError:
# 如果键格式不正确,直接删除
del api_call_stats['minute']['by_endpoint'][endpoint][minute_key]
def update_api_call_stats(api_call_stats, endpoint=None):
"""
更新API调用统计的函数
参数:
- api_call_stats: 统计数据字典
- endpoint: APIkey,为None则只更新总调用次数
"""
now = datetime.now()
hour_key = now.strftime('%Y-%m-%d %H:00')
minute_key = now.strftime('%Y-%m-%d %H:%M')
# 检查并清理过期统计
clean_expired_stats(api_call_stats)
# 初始化总调用次数键(如果不存在)
if hour_key not in api_call_stats['last_24h']['total']:
api_call_stats['last_24h']['total'][hour_key] = 0
if hour_key not in api_call_stats['hourly']['total']:
api_call_stats['hourly']['total'][hour_key] = 0
if minute_key not in api_call_stats['minute']['total']:
api_call_stats['minute']['total'][minute_key] = 0
# 更新总调用次数统计
api_call_stats['last_24h']['total'][hour_key] += 1
api_call_stats['hourly']['total'][hour_key] += 1
api_call_stats['minute']['total'][minute_key] += 1
# 如果提供了端点,更新按端点分类的统计
if endpoint:
# 确保端点字典存在
if endpoint not in api_call_stats['last_24h']['by_endpoint']:
api_call_stats['last_24h']['by_endpoint'][endpoint] = {}
if endpoint not in api_call_stats['hourly']['by_endpoint']:
api_call_stats['hourly']['by_endpoint'][endpoint] = {}
if endpoint not in api_call_stats['minute']['by_endpoint']:
api_call_stats['minute']['by_endpoint'][endpoint] = {}
# 初始化端点特定的键(如果不存在)
if hour_key not in api_call_stats['last_24h']['by_endpoint'][endpoint]:
api_call_stats['last_24h']['by_endpoint'][endpoint][hour_key] = 0
if hour_key not in api_call_stats['hourly']['by_endpoint'][endpoint]:
api_call_stats['hourly']['by_endpoint'][endpoint][hour_key] = 0
if minute_key not in api_call_stats['minute']['by_endpoint'][endpoint]:
api_call_stats['minute']['by_endpoint'][endpoint][minute_key] = 0
# 更新端点特定的统计
api_call_stats['last_24h']['by_endpoint'][endpoint][hour_key] += 1
api_call_stats['hourly']['by_endpoint'][endpoint][hour_key] += 1
api_call_stats['minute']['by_endpoint'][endpoint][minute_key] += 1
# 计算总调用次数
total_24h = sum(api_call_stats['last_24h']['total'].values())
total_hourly = sum(api_call_stats['hourly']['total'].values())
total_minute = sum(api_call_stats['minute']['total'].values())
log_message = "API调用统计已更新: 24小时=%s, 1小时=%s, 1分钟=%s" % (
total_24h, total_hourly, total_minute
)
# 如果提供了端点,添加端点特定的统计信息
if endpoint:
endpoint_24h = sum(api_call_stats['last_24h']['by_endpoint'][endpoint].values())
endpoint_hourly = sum(api_call_stats['hourly']['by_endpoint'][endpoint].values())
endpoint_minute = sum(api_call_stats['minute']['by_endpoint'][endpoint].values())
log_message += " | 端点 '%s': 24小时=%s, 1小时=%s, 1分钟=%s" % (
endpoint[:8], endpoint_24h, endpoint_hourly, endpoint_minute
)
log('info', log_message) |