Spaces:
Sleeping
Sleeping
| """Celery app configuration and task setup.""" | |
| import logging | |
| from celery import Celery | |
| from kombu import Exchange, Queue | |
| from src.config import settings | |
| logger = logging.getLogger(__name__) | |
| # Create Celery app | |
| app = Celery(settings.app_name) | |
| # Configure from settings | |
| app.conf.update( | |
| broker_url=settings.celery.broker_url, | |
| result_backend=settings.celery.result_backend, | |
| task_serializer=settings.celery.task_serializer, | |
| result_serializer=settings.celery.result_serializer, | |
| accept_content=settings.celery.accept_content, | |
| timezone=settings.celery.timezone, | |
| enable_utc=settings.celery.enable_utc, | |
| task_track_started=settings.celery.task_track_started, | |
| task_time_limit=settings.celery.task_time_limit, | |
| task_soft_time_limit=settings.celery.task_soft_time_limit, | |
| ) | |
| # Define queues with routing | |
| default_exchange = Exchange("tasks", type="direct") | |
| app.conf.task_queues = ( | |
| Queue("critical", exchange=default_exchange, routing_key="tasks.critical", priority=10), | |
| Queue("high", exchange=default_exchange, routing_key="tasks.high", priority=7), | |
| Queue("default", exchange=default_exchange, routing_key="tasks.default", priority=5), | |
| Queue("low", exchange=default_exchange, routing_key="tasks.low", priority=1), | |
| Queue("webhooks", exchange=default_exchange, routing_key="webhooks", priority=8), | |
| Queue("polling", exchange=default_exchange, routing_key="polling", priority=3), | |
| ) | |
| # Default queue | |
| app.conf.task_default_queue = "default" | |
| app.conf.task_default_exchange_type = "direct" | |
| app.conf.task_default_routing_key = "tasks.default" | |
| # Task routing | |
| app.conf.task_routes = { | |
| "tasks.ingest.*": {"queue": "critical", "routing_key": "tasks.critical"}, | |
| "tasks.webhooks.*": {"queue": "webhooks", "routing_key": "webhooks"}, | |
| "tasks.polling.*": {"queue": "polling", "routing_key": "polling"}, | |
| "tasks.enrichment.*": {"queue": "high", "routing_key": "tasks.high"}, | |
| } | |
| def setup_periodic_tasks(sender, **kwargs): | |
| """Register periodic tasks with beat scheduler.""" | |
| # Slack polling every 15 minutes | |
| sender.add_periodic_task( | |
| settings.sync.slack_poll_interval, | |
| sync_slack_incremental.s(), | |
| name="sync-slack-incremental", | |
| ) | |
| # GitHub polling every 1 hour | |
| sender.add_periodic_task( | |
| settings.sync.github_poll_interval, | |
| sync_github_incremental.s(), | |
| name="sync-github-incremental", | |
| ) | |
| # Jira polling every 1 hour | |
| sender.add_periodic_task( | |
| settings.sync.jira_poll_interval, | |
| sync_jira_incremental.s(), | |
| name="sync-jira-incremental", | |
| ) | |
| # Log polling every 5 minutes | |
| sender.add_periodic_task( | |
| settings.sync.logs_poll_interval, | |
| poll_server_logs.s(), | |
| name="poll-server-logs", | |
| ) | |
| # Metrics polling every 15 minutes | |
| sender.add_periodic_task( | |
| settings.sync.metrics_poll_interval, | |
| poll_metrics_anomalies.s(), | |
| name="poll-metrics-anomalies", | |
| ) | |
| # Error traces polling every 10 minutes | |
| sender.add_periodic_task( | |
| settings.sync.error_traces_poll_interval, | |
| poll_error_traces.s(), | |
| name="poll-error-traces", | |
| ) | |
| # Staleness scoring daily at 03:00 UTC | |
| from celery.schedules import crontab | |
| sender.add_periodic_task( | |
| crontab(hour=3, minute=0), | |
| compute_staleness_scores.s(), | |
| name="compute-staleness-scores", | |
| ) | |
| # Dependency risk daily at 03:30 UTC | |
| sender.add_periodic_task( | |
| crontab(hour=3, minute=30), | |
| compute_dependency_risk.s(), | |
| name="compute-dependency-risk", | |
| ) | |
| # Task imports (to be implemented in tasks module) | |
| from celery import shared_task | |
| def sync_slack_incremental(self): | |
| """Sync new messages from Slack channels incrementally.""" | |
| pass | |
| def sync_github_incremental(self): | |
| """Sync new events from GitHub incrementally.""" | |
| pass | |
| def sync_jira_incremental(self): | |
| """Sync new issues from Jira incrementally.""" | |
| pass | |
| def poll_server_logs(self): | |
| """Poll and ingest ERROR/WARN logs from monitored services.""" | |
| pass | |
| def poll_metrics_anomalies(self): | |
| """Z-score spike detection and escalation trend detection — runs every 15 min.""" | |
| try: | |
| from src.anomaly.tasks import run_zscore_anomaly_detection | |
| run_zscore_anomaly_detection() | |
| except Exception as exc: | |
| logger.error("poll_metrics_anomalies failed: %s", exc) | |
| raise self.retry(exc=exc, countdown=120) | |
| def poll_error_traces(self): | |
| """Poll error traces from APM services.""" | |
| pass | |
| def process_webhook_event(self, source_type: str, event_data: dict, rbac_tags: dict): | |
| """Route webhook event to appropriate agent for real-time ingestion.""" | |
| pass | |
| def enrich_and_index_document(self, document: dict): | |
| """Extract entities, relationships, and index document.""" | |
| pass | |
| def compute_staleness_scores(self): | |
| """Daily staleness risk scoring for all documents (03:00 UTC).""" | |
| try: | |
| from src.anomaly.tasks import run_staleness_scoring | |
| run_staleness_scoring() | |
| except Exception as exc: | |
| logger.error("compute_staleness_scores failed: %s", exc) | |
| raise self.retry(exc=exc, countdown=300) | |
| def compute_dependency_risk(self): | |
| """Daily dependency risk modelling from Neo4j graph (03:30 UTC).""" | |
| try: | |
| from src.anomaly.tasks import run_dependency_risk_modeling | |
| run_dependency_risk_modeling() | |
| except Exception as exc: | |
| logger.error("compute_dependency_risk failed: %s", exc) | |
| raise self.retry(exc=exc, countdown=300) | |
| if __name__ == "__main__": | |
| app.start() | |