Spaces:
Running
Running
| from sqlalchemy import select, func | |
| from packages.core.database import AsyncSessionLocal | |
| from packages.core.models import StandardTradeRecord | |
| from packages.core.logger import app_logger | |
| from infrastructure.monitoring.alert import send_alert | |
| async def check_batch_quality(batch_no: str): | |
| """ | |
| 数据质量巡检: | |
| 在每批次数据采集并标准化入库后执行,检查是否有大规模字段缺失或总量异常。 | |
| """ | |
| app_logger.info(f"Starting quality check for batch: {batch_no}") | |
| async with AsyncSessionLocal() as session: | |
| # 1. 统计批次总记录数 | |
| total_stmt = select(func.count()).select_from(StandardTradeRecord).where( | |
| StandardTradeRecord.batch_no == batch_no | |
| ) | |
| total_result = await session.execute(total_stmt) | |
| total = total_result.scalar() or 0 | |
| if total == 0: | |
| await send_alert("数据空跑告警", f"批次 {batch_no} 抓取入库的标准化记录数为 0!", level="error") | |
| return | |
| # 2. 统计 HS 编码缺失数 | |
| missing_hs_stmt = select(func.count()).select_from(StandardTradeRecord).where( | |
| StandardTradeRecord.batch_no == batch_no, | |
| (StandardTradeRecord.hs_code == None) | (StandardTradeRecord.hs_code == "") | |
| ) | |
| missing_result = await session.execute(missing_hs_stmt) | |
| missing_hs = missing_result.scalar() or 0 | |
| missing_rate = missing_hs / total | |
| # 统计金额与重量缺失 | |
| missing_amount_stmt = select(func.count()).select_from(StandardTradeRecord).where( | |
| StandardTradeRecord.batch_no == batch_no, | |
| StandardTradeRecord.amount == None | |
| ) | |
| missing_amount = (await session.execute(missing_amount_stmt)).scalar() or 0 | |
| missing_weight_stmt = select(func.count()).select_from(StandardTradeRecord).where( | |
| StandardTradeRecord.batch_no == batch_no, | |
| StandardTradeRecord.weight == None | |
| ) | |
| missing_weight = (await session.execute(missing_weight_stmt)).scalar() or 0 | |
| app_logger.info(f"Batch {batch_no} metrics - Total: {total}, Missing HS: {missing_rate:.2%}, Missing Amt: {missing_amount/total:.2%}, Missing Wgt: {missing_weight/total:.2%}") | |
| # 3. 触发阈值告警 (例如缺失率超过 30%) | |
| if missing_rate > 0.3: | |
| await send_alert( | |
| "字段大面积缺失告警", | |
| f"批次 {batch_no} 的 HS 编码缺失率高达 {missing_rate:.2%} (阈值 30%),可能解析器失效或源端改版!", | |
| level="warning" | |
| ) | |
| if missing_amount / total > 0.5: | |
| await send_alert("金额大面积缺失告警", f"批次 {batch_no} 金额缺失率 {missing_amount/total:.2%}", level="warning") | |
| async def check_country_update_delay(): | |
| """ | |
| 检查国家级更新延迟。 | |
| 每天定时运行,查询每个国家的最新 trade_date, | |
| 如果落后于预期的发布延迟(如 T+30),则告警。 | |
| """ | |
| from datetime import datetime, timezone | |
| from packages.core.logger import app_logger | |
| from infrastructure.monitoring.alert import send_alert | |
| # 模拟国家的预期延迟(天) | |
| expected_delay = { | |
| "BR": 45, | |
| "CL": 45, | |
| "MX": 45, | |
| "US": 45, | |
| "IN": 10, | |
| "VN": 45, | |
| "ID": 45, | |
| "EU": 60 | |
| } | |
| app_logger.info("Starting country update delay check...") | |
| now = datetime.now(timezone.utc) | |
| async with AsyncSessionLocal() as session: | |
| stmt = select(StandardTradeRecord.source_country, func.max(StandardTradeRecord.trade_date)).group_by(StandardTradeRecord.source_country) | |
| result = await session.execute(stmt) | |
| latest_dates = result.all() | |
| for country, max_date in latest_dates: | |
| if not max_date: | |
| continue | |
| # max_date 通常没有 timezone,这里做个粗略计算 | |
| days_delayed = (now.replace(tzinfo=None) - max_date).days | |
| allowed_delay = expected_delay.get(country, 60) | |
| if days_delayed > allowed_delay: | |
| await send_alert( | |
| "数据更新延迟告警", | |
| f"国家 {country} 最新数据停留在 {max_date.strftime('%Y-%m-%d')},落后 {days_delayed} 天,超过允许的 {allowed_delay} 天阈值。", | |
| level="warning" | |
| ) | |
| app_logger.info("Country update delay check finished.") | |