3v324v23's picture
feat: 完成多国家海关数据源接入与核心功能全量迭代
8ca8917
Raw
History Blame Contribute Delete
4.51 kB
from sqlalchemy import select, func
from packages.core.database import AsyncSessionLocal
from packages.core.models import StandardTradeRecord
from packages.core.logger import app_logger
from infrastructure.monitoring.alert import send_alert
async def check_batch_quality(batch_no: str):
"""
数据质量巡检:
在每批次数据采集并标准化入库后执行,检查是否有大规模字段缺失或总量异常。
"""
app_logger.info(f"Starting quality check for batch: {batch_no}")
async with AsyncSessionLocal() as session:
# 1. 统计批次总记录数
total_stmt = select(func.count()).select_from(StandardTradeRecord).where(
StandardTradeRecord.batch_no == batch_no
)
total_result = await session.execute(total_stmt)
total = total_result.scalar() or 0
if total == 0:
send_alert("数据空跑告警", f"批次 {batch_no} 抓取入库的标准化记录数为 0!", level="error")
return
# 2. 统计 HS 编码缺失数
missing_hs_stmt = select(func.count()).select_from(StandardTradeRecord).where(
StandardTradeRecord.batch_no == batch_no,
(StandardTradeRecord.hs_code == None) | (StandardTradeRecord.hs_code == "")
)
missing_result = await session.execute(missing_hs_stmt)
missing_hs = missing_result.scalar() or 0
missing_rate = missing_hs / total
# 统计金额与重量缺失
missing_amount_stmt = select(func.count()).select_from(StandardTradeRecord).where(
StandardTradeRecord.batch_no == batch_no,
StandardTradeRecord.amount == None
)
missing_amount = (await session.execute(missing_amount_stmt)).scalar() or 0
missing_weight_stmt = select(func.count()).select_from(StandardTradeRecord).where(
StandardTradeRecord.batch_no == batch_no,
StandardTradeRecord.weight == None
)
missing_weight = (await session.execute(missing_weight_stmt)).scalar() or 0
app_logger.info(f"Batch {batch_no} metrics - Total: {total}, Missing HS: {missing_rate:.2%}, Missing Amt: {missing_amount/total:.2%}, Missing Wgt: {missing_weight/total:.2%}")
# 3. 触发阈值告警 (例如缺失率超过 30%)
if missing_rate > 0.3:
send_alert(
"字段大面积缺失告警",
f"批次 {batch_no} 的 HS 编码缺失率高达 {missing_rate:.2%} (阈值 30%),可能解析器失效或源端改版!",
level="warning"
)
if missing_amount / total > 0.5:
send_alert("金额大面积缺失告警", f"批次 {batch_no} 金额缺失率 {missing_amount/total:.2%}", level="warning")
async def check_country_update_delay():
"""
检查国家级更新延迟。
每天定时运行,查询每个国家的最新 trade_date,
如果落后于预期的发布延迟(如 T+30),则告警。
"""
from datetime import datetime, timezone
from packages.core.logger import app_logger
from infrastructure.monitoring.alert import send_alert
# 模拟国家的预期延迟(天)
expected_delay = {
"BR": 45,
"CL": 45,
"MX": 45,
"US": 45,
"IN": 10,
"VN": 45,
"ID": 45,
"EU": 60
}
app_logger.info("Starting country update delay check...")
now = datetime.now(timezone.utc)
async with AsyncSessionLocal() as session:
stmt = select(StandardTradeRecord.source_country, func.max(StandardTradeRecord.trade_date)).group_by(StandardTradeRecord.source_country)
result = await session.execute(stmt)
latest_dates = result.all()
for country, max_date in latest_dates:
if not max_date:
continue
# max_date 通常没有 timezone,这里做个粗略计算
days_delayed = (now.replace(tzinfo=None) - max_date).days
allowed_delay = expected_delay.get(country, 60)
if days_delayed > allowed_delay:
send_alert(
"数据更新延迟告警",
f"国家 {country} 最新数据停留在 {max_date.strftime('%Y-%m-%d')},落后 {days_delayed} 天,超过允许的 {allowed_delay} 天阈值。",
level="warning"
)
app_logger.info("Country update delay check finished.")