Spaces:
Sleeping
Sleeping
File size: 6,196 Bytes
0493349 451d52a 0493349 451d52a 0493349 451d52a 0493349 451d52a 0493349 451d52a 0493349 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 | """Celery app configuration and task setup."""
import logging
from celery import Celery
from kombu import Exchange, Queue
from src.config import settings
logger = logging.getLogger(__name__)
# Create Celery app
app = Celery(settings.app_name)
# Configure from settings
app.conf.update(
broker_url=settings.celery.broker_url,
result_backend=settings.celery.result_backend,
task_serializer=settings.celery.task_serializer,
result_serializer=settings.celery.result_serializer,
accept_content=settings.celery.accept_content,
timezone=settings.celery.timezone,
enable_utc=settings.celery.enable_utc,
task_track_started=settings.celery.task_track_started,
task_time_limit=settings.celery.task_time_limit,
task_soft_time_limit=settings.celery.task_soft_time_limit,
)
# Define queues with routing
default_exchange = Exchange("tasks", type="direct")
app.conf.task_queues = (
Queue("critical", exchange=default_exchange, routing_key="tasks.critical", priority=10),
Queue("high", exchange=default_exchange, routing_key="tasks.high", priority=7),
Queue("default", exchange=default_exchange, routing_key="tasks.default", priority=5),
Queue("low", exchange=default_exchange, routing_key="tasks.low", priority=1),
Queue("webhooks", exchange=default_exchange, routing_key="webhooks", priority=8),
Queue("polling", exchange=default_exchange, routing_key="polling", priority=3),
)
# Default queue
app.conf.task_default_queue = "default"
app.conf.task_default_exchange_type = "direct"
app.conf.task_default_routing_key = "tasks.default"
# Task routing
app.conf.task_routes = {
"tasks.ingest.*": {"queue": "critical", "routing_key": "tasks.critical"},
"tasks.webhooks.*": {"queue": "webhooks", "routing_key": "webhooks"},
"tasks.polling.*": {"queue": "polling", "routing_key": "polling"},
"tasks.enrichment.*": {"queue": "high", "routing_key": "tasks.high"},
}
@app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
"""Register periodic tasks with beat scheduler."""
# Slack polling every 15 minutes
sender.add_periodic_task(
settings.sync.slack_poll_interval,
sync_slack_incremental.s(),
name="sync-slack-incremental",
)
# GitHub polling every 1 hour
sender.add_periodic_task(
settings.sync.github_poll_interval,
sync_github_incremental.s(),
name="sync-github-incremental",
)
# Jira polling every 1 hour
sender.add_periodic_task(
settings.sync.jira_poll_interval,
sync_jira_incremental.s(),
name="sync-jira-incremental",
)
# Log polling every 5 minutes
sender.add_periodic_task(
settings.sync.logs_poll_interval,
poll_server_logs.s(),
name="poll-server-logs",
)
# Metrics polling every 15 minutes
sender.add_periodic_task(
settings.sync.metrics_poll_interval,
poll_metrics_anomalies.s(),
name="poll-metrics-anomalies",
)
# Error traces polling every 10 minutes
sender.add_periodic_task(
settings.sync.error_traces_poll_interval,
poll_error_traces.s(),
name="poll-error-traces",
)
# Staleness scoring daily at 03:00 UTC
from celery.schedules import crontab
sender.add_periodic_task(
crontab(hour=3, minute=0),
compute_staleness_scores.s(),
name="compute-staleness-scores",
)
# Dependency risk daily at 03:30 UTC
sender.add_periodic_task(
crontab(hour=3, minute=30),
compute_dependency_risk.s(),
name="compute-dependency-risk",
)
# Task imports (to be implemented in tasks module)
from celery import shared_task
@shared_task(queue="polling", bind=True, max_retries=3)
def sync_slack_incremental(self):
"""Sync new messages from Slack channels incrementally."""
pass
@shared_task(queue="polling", bind=True, max_retries=3)
def sync_github_incremental(self):
"""Sync new events from GitHub incrementally."""
pass
@shared_task(queue="polling", bind=True, max_retries=3)
def sync_jira_incremental(self):
"""Sync new issues from Jira incrementally."""
pass
@shared_task(queue="polling", bind=True, max_retries=3)
def poll_server_logs(self):
"""Poll and ingest ERROR/WARN logs from monitored services."""
pass
@shared_task(queue="polling", bind=True, max_retries=3)
def poll_metrics_anomalies(self):
"""Z-score spike detection and escalation trend detection — runs every 15 min."""
try:
from src.anomaly.tasks import run_zscore_anomaly_detection
run_zscore_anomaly_detection()
except Exception as exc:
logger.error("poll_metrics_anomalies failed: %s", exc)
raise self.retry(exc=exc, countdown=120)
@shared_task(queue="polling", bind=True, max_retries=3)
def poll_error_traces(self):
"""Poll error traces from APM services."""
pass
@shared_task(queue="critical", bind=True, max_retries=5)
def process_webhook_event(self, source_type: str, event_data: dict, rbac_tags: dict):
"""Route webhook event to appropriate agent for real-time ingestion."""
pass
@shared_task(queue="high", bind=True, max_retries=3)
def enrich_and_index_document(self, document: dict):
"""Extract entities, relationships, and index document."""
pass
@shared_task(queue="low", bind=True, max_retries=2)
def compute_staleness_scores(self):
"""Daily staleness risk scoring for all documents (03:00 UTC)."""
try:
from src.anomaly.tasks import run_staleness_scoring
run_staleness_scoring()
except Exception as exc:
logger.error("compute_staleness_scores failed: %s", exc)
raise self.retry(exc=exc, countdown=300)
@shared_task(queue="low", bind=True, max_retries=2)
def compute_dependency_risk(self):
"""Daily dependency risk modelling from Neo4j graph (03:30 UTC)."""
try:
from src.anomaly.tasks import run_dependency_risk_modeling
run_dependency_risk_modeling()
except Exception as exc:
logger.error("compute_dependency_risk failed: %s", exc)
raise self.retry(exc=exc, countdown=300)
if __name__ == "__main__":
app.start()
|