customs-data / apps /worker /celery_tasks.py
3v324v23's picture
feat: 完成多国家海关数据源接入与核心功能全量迭代
8ca8917
Raw
History Blame Contribute Delete
2.09 kB
import asyncio
from packages.core.celery_app import celery_app
from packages.core.logger import app_logger
from apps.worker.run_brazil import run_brazil_job
from apps.worker.run_chile import run_chile_job
from apps.worker.run_extended import run_extended_mock_jobs
from apps.worker.export_tasks import export_data_task
def _run_async(coro):
"""辅助函数:在同步的 celery task 中运行异步函数"""
loop = asyncio.get_event_loop()
if loop.is_running():
# 如果已经有 running loop(比如在某些测试环境),则创建新 loop 或者用 nest_asyncio
new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(new_loop)
return new_loop.run_until_complete(coro)
else:
return asyncio.run(coro)
@celery_app.task(bind=True, max_retries=3, default_retry_delay=300)
def sync_brazil(self):
"""巴西海关数据同步任务"""
app_logger.info("Celery Task: Starting sync_brazil")
try:
_run_async(run_brazil_job())
app_logger.info("Celery Task: Finished sync_brazil")
except Exception as exc:
app_logger.error(f"Celery Task: Error in sync_brazil: {exc}")
raise self.retry(exc=exc)
@celery_app.task(bind=True, max_retries=3, default_retry_delay=300)
def sync_chile(self):
"""智利海关数据同步任务"""
app_logger.info("Celery Task: Starting sync_chile")
try:
_run_async(run_chile_job())
app_logger.info("Celery Task: Finished sync_chile")
except Exception as exc:
app_logger.error(f"Celery Task: Error in sync_chile: {exc}")
raise self.retry(exc=exc)
@celery_app.task(bind=True, max_retries=3, default_retry_delay=300)
def sync_extended_mock(self):
"""扩展的模拟数据同步任务"""
app_logger.info("Celery Task: Starting sync_extended_mock")
try:
_run_async(run_extended_mock_jobs())
app_logger.info("Celery Task: Finished sync_extended_mock")
except Exception as exc:
app_logger.error(f"Celery Task: Error in sync_extended_mock: {exc}")
raise self.retry(exc=exc)