import asyncio from packages.core.celery_app import celery_app from packages.core.logger import app_logger from apps.worker.run_brazil import run_brazil_job from apps.worker.run_chile import run_chile_job from apps.worker.run_extended import run_extended_mock_jobs from apps.worker.export_tasks import export_data_task def _run_async(coro): """辅助函数:在同步的 celery task 中运行异步函数""" loop = asyncio.get_event_loop() if loop.is_running(): # 如果已经有 running loop(比如在某些测试环境),则创建新 loop 或者用 nest_asyncio new_loop = asyncio.new_event_loop() asyncio.set_event_loop(new_loop) return new_loop.run_until_complete(coro) else: return asyncio.run(coro) @celery_app.task(bind=True, max_retries=3, default_retry_delay=300) def sync_brazil(self): """巴西海关数据同步任务""" app_logger.info("Celery Task: Starting sync_brazil") try: _run_async(run_brazil_job()) app_logger.info("Celery Task: Finished sync_brazil") except Exception as exc: app_logger.error(f"Celery Task: Error in sync_brazil: {exc}") raise self.retry(exc=exc) @celery_app.task(bind=True, max_retries=3, default_retry_delay=300) def sync_chile(self): """智利海关数据同步任务""" app_logger.info("Celery Task: Starting sync_chile") try: _run_async(run_chile_job()) app_logger.info("Celery Task: Finished sync_chile") except Exception as exc: app_logger.error(f"Celery Task: Error in sync_chile: {exc}") raise self.retry(exc=exc) @celery_app.task(bind=True, max_retries=3, default_retry_delay=300) def sync_extended_mock(self): """扩展的模拟数据同步任务""" app_logger.info("Celery Task: Starting sync_extended_mock") try: _run_async(run_extended_mock_jobs()) app_logger.info("Celery Task: Finished sync_extended_mock") except Exception as exc: app_logger.error(f"Celery Task: Error in sync_extended_mock: {exc}") raise self.retry(exc=exc) @celery_app.task(name="health_check_task") def health_check_task(): """定期健康检查任务""" app_logger.info("Celery Task: Starting health_check") try: from infrastructure.monitoring.health import run_full_health_check result = _run_async(run_full_health_check()) app_logger.info(f"Celery Task: Health check completed. Overall status: {result['overall_status']}") return result except Exception as exc: app_logger.error(f"Celery Task: Error in health_check: {exc}") return {"status": "error", "error": str(exc)} @celery_app.task(name="data_quality_check_task") def data_quality_check_task(batch_no: str): """数据质量检查任务""" app_logger.info(f"Celery Task: Starting quality check for batch {batch_no}") try: from infrastructure.monitoring.quality import check_batch_quality _run_async(check_batch_quality(batch_no)) app_logger.info(f"Celery Task: Quality check completed for batch {batch_no}") except Exception as exc: app_logger.error(f"Celery Task: Error in quality check: {exc}")