customs-data / apps /worker /celery_tasks.py
3v324v23's picture
Enhances platform with robust monitoring and notifications
4e22b4d
Raw
History Blame Contribute Delete
3.21 kB
import asyncio
from packages.core.celery_app import celery_app
from packages.core.logger import app_logger
from apps.worker.run_brazil import run_brazil_job
from apps.worker.run_chile import run_chile_job
from apps.worker.run_extended import run_extended_mock_jobs
from apps.worker.export_tasks import export_data_task
def _run_async(coro):
"""辅助函数:在同步的 celery task 中运行异步函数"""
loop = asyncio.get_event_loop()
if loop.is_running():
# 如果已经有 running loop(比如在某些测试环境),则创建新 loop 或者用 nest_asyncio
new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(new_loop)
return new_loop.run_until_complete(coro)
else:
return asyncio.run(coro)
@celery_app.task(bind=True, max_retries=3, default_retry_delay=300)
def sync_brazil(self):
"""巴西海关数据同步任务"""
app_logger.info("Celery Task: Starting sync_brazil")
try:
_run_async(run_brazil_job())
app_logger.info("Celery Task: Finished sync_brazil")
except Exception as exc:
app_logger.error(f"Celery Task: Error in sync_brazil: {exc}")
raise self.retry(exc=exc)
@celery_app.task(bind=True, max_retries=3, default_retry_delay=300)
def sync_chile(self):
"""智利海关数据同步任务"""
app_logger.info("Celery Task: Starting sync_chile")
try:
_run_async(run_chile_job())
app_logger.info("Celery Task: Finished sync_chile")
except Exception as exc:
app_logger.error(f"Celery Task: Error in sync_chile: {exc}")
raise self.retry(exc=exc)
@celery_app.task(bind=True, max_retries=3, default_retry_delay=300)
def sync_extended_mock(self):
"""扩展的模拟数据同步任务"""
app_logger.info("Celery Task: Starting sync_extended_mock")
try:
_run_async(run_extended_mock_jobs())
app_logger.info("Celery Task: Finished sync_extended_mock")
except Exception as exc:
app_logger.error(f"Celery Task: Error in sync_extended_mock: {exc}")
raise self.retry(exc=exc)
@celery_app.task(name="health_check_task")
def health_check_task():
"""定期健康检查任务"""
app_logger.info("Celery Task: Starting health_check")
try:
from infrastructure.monitoring.health import run_full_health_check
result = _run_async(run_full_health_check())
app_logger.info(f"Celery Task: Health check completed. Overall status: {result['overall_status']}")
return result
except Exception as exc:
app_logger.error(f"Celery Task: Error in health_check: {exc}")
return {"status": "error", "error": str(exc)}
@celery_app.task(name="data_quality_check_task")
def data_quality_check_task(batch_no: str):
"""数据质量检查任务"""
app_logger.info(f"Celery Task: Starting quality check for batch {batch_no}")
try:
from infrastructure.monitoring.quality import check_batch_quality
_run_async(check_batch_quality(batch_no))
app_logger.info(f"Celery Task: Quality check completed for batch {batch_no}")
except Exception as exc:
app_logger.error(f"Celery Task: Error in quality check: {exc}")