Spaces:
Paused
Paused
| from datetime import timedelta | |
| from celery import Celery, Task | |
| from celery.schedules import crontab | |
| from flask import Flask | |
| from configs import dify_config | |
| def init_app(app: Flask) -> Celery: | |
| class FlaskTask(Task): | |
| def __call__(self, *args: object, **kwargs: object) -> object: | |
| with app.app_context(): | |
| return self.run(*args, **kwargs) | |
| broker_transport_options = {} | |
| if dify_config.CELERY_USE_SENTINEL: | |
| broker_transport_options = { | |
| "master_name": dify_config.CELERY_SENTINEL_MASTER_NAME, | |
| "sentinel_kwargs": { | |
| "socket_timeout": dify_config.CELERY_SENTINEL_SOCKET_TIMEOUT, | |
| }, | |
| } | |
| celery_app = Celery( | |
| app.name, | |
| task_cls=FlaskTask, | |
| broker=dify_config.CELERY_BROKER_URL, | |
| backend=dify_config.CELERY_BACKEND, | |
| task_ignore_result=True, | |
| ) | |
| # Add SSL options to the Celery configuration | |
| ssl_options = { | |
| "ssl_cert_reqs": None, | |
| "ssl_ca_certs": None, | |
| "ssl_certfile": None, | |
| "ssl_keyfile": None, | |
| } | |
| celery_app.conf.update( | |
| result_backend=dify_config.CELERY_RESULT_BACKEND, | |
| broker_transport_options=broker_transport_options, | |
| broker_connection_retry_on_startup=True, | |
| ) | |
| if dify_config.BROKER_USE_SSL: | |
| celery_app.conf.update( | |
| broker_use_ssl=ssl_options, # Add the SSL options to the broker configuration | |
| ) | |
| celery_app.set_default() | |
| app.extensions["celery"] = celery_app | |
| imports = [ | |
| "schedule.clean_embedding_cache_task", | |
| "schedule.clean_unused_datasets_task", | |
| "schedule.create_tidb_serverless_task", | |
| "schedule.update_tidb_serverless_status_task", | |
| ] | |
| day = dify_config.CELERY_BEAT_SCHEDULER_TIME | |
| beat_schedule = { | |
| "clean_embedding_cache_task": { | |
| "task": "schedule.clean_embedding_cache_task.clean_embedding_cache_task", | |
| "schedule": timedelta(days=day), | |
| }, | |
| "clean_unused_datasets_task": { | |
| "task": "schedule.clean_unused_datasets_task.clean_unused_datasets_task", | |
| "schedule": timedelta(days=day), | |
| }, | |
| "create_tidb_serverless_task": { | |
| "task": "schedule.create_tidb_serverless_task.create_tidb_serverless_task", | |
| "schedule": crontab(minute="0", hour="*"), | |
| }, | |
| "update_tidb_serverless_status_task": { | |
| "task": "schedule.update_tidb_serverless_status_task.update_tidb_serverless_status_task", | |
| "schedule": crontab(minute="30", hour="*"), | |
| }, | |
| } | |
| celery_app.conf.update(beat_schedule=beat_schedule, imports=imports) | |
| return celery_app | |