import asyncio from packages.core.celery_app import celery_app from packages.core.logger import app_logger from apps.worker.run_brazil import run_brazil_job from apps.worker.run_chile import run_chile_job from apps.worker.run_extended import run_extended_mock_jobs from apps.worker.export_tasks import export_data_task def _run_async(coro): """辅助函数:在同步的 celery task 中运行异步函数""" loop = asyncio.get_event_loop() if loop.is_running(): # 如果已经有 running loop(比如在某些测试环境),则创建新 loop 或者用 nest_asyncio new_loop = asyncio.new_event_loop() asyncio.set_event_loop(new_loop) return new_loop.run_until_complete(coro) else: return asyncio.run(coro) @celery_app.task(bind=True, max_retries=3, default_retry_delay=300) def sync_brazil(self): """巴西海关数据同步任务""" app_logger.info("Celery Task: Starting sync_brazil") try: _run_async(run_brazil_job()) app_logger.info("Celery Task: Finished sync_brazil") except Exception as exc: app_logger.error(f"Celery Task: Error in sync_brazil: {exc}") raise self.retry(exc=exc) @celery_app.task(bind=True, max_retries=3, default_retry_delay=300) def sync_chile(self): """智利海关数据同步任务""" app_logger.info("Celery Task: Starting sync_chile") try: _run_async(run_chile_job()) app_logger.info("Celery Task: Finished sync_chile") except Exception as exc: app_logger.error(f"Celery Task: Error in sync_chile: {exc}") raise self.retry(exc=exc) @celery_app.task(bind=True, max_retries=3, default_retry_delay=300) def sync_extended_mock(self): """扩展的模拟数据同步任务""" app_logger.info("Celery Task: Starting sync_extended_mock") try: _run_async(run_extended_mock_jobs()) app_logger.info("Celery Task: Finished sync_extended_mock") except Exception as exc: app_logger.error(f"Celery Task: Error in sync_extended_mock: {exc}") raise self.retry(exc=exc)