Spaces:
Runtime error
Runtime error
| import asyncio | |
| from packages.core.celery_app import celery_app | |
| from packages.core.logger import app_logger | |
| from apps.worker.run_brazil import run_brazil_job | |
| from apps.worker.run_chile import run_chile_job | |
| from apps.worker.run_extended import run_extended_mock_jobs | |
| from apps.worker.export_tasks import export_data_task | |
| def _run_async(coro): | |
| """辅助函数:在同步的 celery task 中运行异步函数""" | |
| loop = asyncio.get_event_loop() | |
| if loop.is_running(): | |
| # 如果已经有 running loop(比如在某些测试环境),则创建新 loop 或者用 nest_asyncio | |
| new_loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(new_loop) | |
| return new_loop.run_until_complete(coro) | |
| else: | |
| return asyncio.run(coro) | |
| def sync_brazil(self): | |
| """巴西海关数据同步任务""" | |
| app_logger.info("Celery Task: Starting sync_brazil") | |
| try: | |
| _run_async(run_brazil_job()) | |
| app_logger.info("Celery Task: Finished sync_brazil") | |
| except Exception as exc: | |
| app_logger.error(f"Celery Task: Error in sync_brazil: {exc}") | |
| raise self.retry(exc=exc) | |
| def sync_chile(self): | |
| """智利海关数据同步任务""" | |
| app_logger.info("Celery Task: Starting sync_chile") | |
| try: | |
| _run_async(run_chile_job()) | |
| app_logger.info("Celery Task: Finished sync_chile") | |
| except Exception as exc: | |
| app_logger.error(f"Celery Task: Error in sync_chile: {exc}") | |
| raise self.retry(exc=exc) | |
| def sync_extended_mock(self): | |
| """扩展的模拟数据同步任务""" | |
| app_logger.info("Celery Task: Starting sync_extended_mock") | |
| try: | |
| _run_async(run_extended_mock_jobs()) | |
| app_logger.info("Celery Task: Finished sync_extended_mock") | |
| except Exception as exc: | |
| app_logger.error(f"Celery Task: Error in sync_extended_mock: {exc}") | |
| raise self.retry(exc=exc) | |