3v324v23's picture
Enhances platform with robust monitoring and notifications
4e22b4d
Raw
History Blame Contribute Delete
4.54 kB
from sqlalchemy import select, func
from packages.core.database import AsyncSessionLocal
from packages.core.models import StandardTradeRecord
from packages.core.logger import app_logger
from infrastructure.monitoring.alert import send_alert
async def check_batch_quality(batch_no: str):
"""
数据质量巡检:
在每批次数据采集并标准化入库后执行,检查是否有大规模字段缺失或总量异常。
"""
app_logger.info(f"Starting quality check for batch: {batch_no}")
async with AsyncSessionLocal() as session:
# 1. 统计批次总记录数
total_stmt = select(func.count()).select_from(StandardTradeRecord).where(
StandardTradeRecord.batch_no == batch_no
)
total_result = await session.execute(total_stmt)
total = total_result.scalar() or 0
if total == 0:
await send_alert("数据空跑告警", f"批次 {batch_no} 抓取入库的标准化记录数为 0!", level="error")
return
# 2. 统计 HS 编码缺失数
missing_hs_stmt = select(func.count()).select_from(StandardTradeRecord).where(
StandardTradeRecord.batch_no == batch_no,
(StandardTradeRecord.hs_code == None) | (StandardTradeRecord.hs_code == "")
)
missing_result = await session.execute(missing_hs_stmt)
missing_hs = missing_result.scalar() or 0
missing_rate = missing_hs / total
# 统计金额与重量缺失
missing_amount_stmt = select(func.count()).select_from(StandardTradeRecord).where(
StandardTradeRecord.batch_no == batch_no,
StandardTradeRecord.amount == None
)
missing_amount = (await session.execute(missing_amount_stmt)).scalar() or 0
missing_weight_stmt = select(func.count()).select_from(StandardTradeRecord).where(
StandardTradeRecord.batch_no == batch_no,
StandardTradeRecord.weight == None
)
missing_weight = (await session.execute(missing_weight_stmt)).scalar() or 0
app_logger.info(f"Batch {batch_no} metrics - Total: {total}, Missing HS: {missing_rate:.2%}, Missing Amt: {missing_amount/total:.2%}, Missing Wgt: {missing_weight/total:.2%}")
# 3. 触发阈值告警 (例如缺失率超过 30%)
if missing_rate > 0.3:
await send_alert(
"字段大面积缺失告警",
f"批次 {batch_no} 的 HS 编码缺失率高达 {missing_rate:.2%} (阈值 30%),可能解析器失效或源端改版!",
level="warning"
)
if missing_amount / total > 0.5:
await send_alert("金额大面积缺失告警", f"批次 {batch_no} 金额缺失率 {missing_amount/total:.2%}", level="warning")
async def check_country_update_delay():
"""
检查国家级更新延迟。
每天定时运行,查询每个国家的最新 trade_date,
如果落后于预期的发布延迟(如 T+30),则告警。
"""
from datetime import datetime, timezone
from packages.core.logger import app_logger
from infrastructure.monitoring.alert import send_alert
# 模拟国家的预期延迟(天)
expected_delay = {
"BR": 45,
"CL": 45,
"MX": 45,
"US": 45,
"IN": 10,
"VN": 45,
"ID": 45,
"EU": 60
}
app_logger.info("Starting country update delay check...")
now = datetime.now(timezone.utc)
async with AsyncSessionLocal() as session:
stmt = select(StandardTradeRecord.source_country, func.max(StandardTradeRecord.trade_date)).group_by(StandardTradeRecord.source_country)
result = await session.execute(stmt)
latest_dates = result.all()
for country, max_date in latest_dates:
if not max_date:
continue
# max_date 通常没有 timezone,这里做个粗略计算
days_delayed = (now.replace(tzinfo=None) - max_date).days
allowed_delay = expected_delay.get(country, 60)
if days_delayed > allowed_delay:
await send_alert(
"数据更新延迟告警",
f"国家 {country} 最新数据停留在 {max_date.strftime('%Y-%m-%d')},落后 {days_delayed} 天,超过允许的 {allowed_delay} 天阈值。",
level="warning"
)
app_logger.info("Country update delay check finished.")