from sqlalchemy import select, func from packages.core.database import AsyncSessionLocal from packages.core.models import StandardTradeRecord from packages.core.logger import app_logger from infrastructure.monitoring.alert import send_alert async def check_batch_quality(batch_no: str): """ 数据质量巡检: 在每批次数据采集并标准化入库后执行,检查是否有大规模字段缺失或总量异常。 """ app_logger.info(f"Starting quality check for batch: {batch_no}") async with AsyncSessionLocal() as session: # 1. 统计批次总记录数 total_stmt = select(func.count()).select_from(StandardTradeRecord).where( StandardTradeRecord.batch_no == batch_no ) total_result = await session.execute(total_stmt) total = total_result.scalar() or 0 if total == 0: send_alert("数据空跑告警", f"批次 {batch_no} 抓取入库的标准化记录数为 0!", level="error") return # 2. 统计 HS 编码缺失数 missing_hs_stmt = select(func.count()).select_from(StandardTradeRecord).where( StandardTradeRecord.batch_no == batch_no, (StandardTradeRecord.hs_code == None) | (StandardTradeRecord.hs_code == "") ) missing_result = await session.execute(missing_hs_stmt) missing_hs = missing_result.scalar() or 0 missing_rate = missing_hs / total # 统计金额与重量缺失 missing_amount_stmt = select(func.count()).select_from(StandardTradeRecord).where( StandardTradeRecord.batch_no == batch_no, StandardTradeRecord.amount == None ) missing_amount = (await session.execute(missing_amount_stmt)).scalar() or 0 missing_weight_stmt = select(func.count()).select_from(StandardTradeRecord).where( StandardTradeRecord.batch_no == batch_no, StandardTradeRecord.weight == None ) missing_weight = (await session.execute(missing_weight_stmt)).scalar() or 0 app_logger.info(f"Batch {batch_no} metrics - Total: {total}, Missing HS: {missing_rate:.2%}, Missing Amt: {missing_amount/total:.2%}, Missing Wgt: {missing_weight/total:.2%}") # 3. 触发阈值告警 (例如缺失率超过 30%) if missing_rate > 0.3: send_alert( "字段大面积缺失告警", f"批次 {batch_no} 的 HS 编码缺失率高达 {missing_rate:.2%} (阈值 30%),可能解析器失效或源端改版!", level="warning" ) if missing_amount / total > 0.5: send_alert("金额大面积缺失告警", f"批次 {batch_no} 金额缺失率 {missing_amount/total:.2%}", level="warning") async def check_country_update_delay(): """ 检查国家级更新延迟。 每天定时运行,查询每个国家的最新 trade_date, 如果落后于预期的发布延迟(如 T+30),则告警。 """ from datetime import datetime, timezone from packages.core.logger import app_logger from infrastructure.monitoring.alert import send_alert # 模拟国家的预期延迟(天) expected_delay = { "BR": 45, "CL": 45, "MX": 45, "US": 45, "IN": 10, "VN": 45, "ID": 45, "EU": 60 } app_logger.info("Starting country update delay check...") now = datetime.now(timezone.utc) async with AsyncSessionLocal() as session: stmt = select(StandardTradeRecord.source_country, func.max(StandardTradeRecord.trade_date)).group_by(StandardTradeRecord.source_country) result = await session.execute(stmt) latest_dates = result.all() for country, max_date in latest_dates: if not max_date: continue # max_date 通常没有 timezone,这里做个粗略计算 days_delayed = (now.replace(tzinfo=None) - max_date).days allowed_delay = expected_delay.get(country, 60) if days_delayed > allowed_delay: send_alert( "数据更新延迟告警", f"国家 {country} 最新数据停留在 {max_date.strftime('%Y-%m-%d')},落后 {days_delayed} 天,超过允许的 {allowed_delay} 天阈值。", level="warning" ) app_logger.info("Country update delay check finished.")