GodSpeed / src /celery_app.py
Ananth Shyam
Implement anomaly detection and forecasting features
451d52a
"""Celery app configuration and task setup."""
import logging
from celery import Celery
from kombu import Exchange, Queue
from src.config import settings
logger = logging.getLogger(__name__)
# Create Celery app
app = Celery(settings.app_name)
# Configure from settings
app.conf.update(
broker_url=settings.celery.broker_url,
result_backend=settings.celery.result_backend,
task_serializer=settings.celery.task_serializer,
result_serializer=settings.celery.result_serializer,
accept_content=settings.celery.accept_content,
timezone=settings.celery.timezone,
enable_utc=settings.celery.enable_utc,
task_track_started=settings.celery.task_track_started,
task_time_limit=settings.celery.task_time_limit,
task_soft_time_limit=settings.celery.task_soft_time_limit,
)
# Define queues with routing
default_exchange = Exchange("tasks", type="direct")
app.conf.task_queues = (
Queue("critical", exchange=default_exchange, routing_key="tasks.critical", priority=10),
Queue("high", exchange=default_exchange, routing_key="tasks.high", priority=7),
Queue("default", exchange=default_exchange, routing_key="tasks.default", priority=5),
Queue("low", exchange=default_exchange, routing_key="tasks.low", priority=1),
Queue("webhooks", exchange=default_exchange, routing_key="webhooks", priority=8),
Queue("polling", exchange=default_exchange, routing_key="polling", priority=3),
)
# Default queue
app.conf.task_default_queue = "default"
app.conf.task_default_exchange_type = "direct"
app.conf.task_default_routing_key = "tasks.default"
# Task routing
app.conf.task_routes = {
"tasks.ingest.*": {"queue": "critical", "routing_key": "tasks.critical"},
"tasks.webhooks.*": {"queue": "webhooks", "routing_key": "webhooks"},
"tasks.polling.*": {"queue": "polling", "routing_key": "polling"},
"tasks.enrichment.*": {"queue": "high", "routing_key": "tasks.high"},
}
@app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
"""Register periodic tasks with beat scheduler."""
# Slack polling every 15 minutes
sender.add_periodic_task(
settings.sync.slack_poll_interval,
sync_slack_incremental.s(),
name="sync-slack-incremental",
)
# GitHub polling every 1 hour
sender.add_periodic_task(
settings.sync.github_poll_interval,
sync_github_incremental.s(),
name="sync-github-incremental",
)
# Jira polling every 1 hour
sender.add_periodic_task(
settings.sync.jira_poll_interval,
sync_jira_incremental.s(),
name="sync-jira-incremental",
)
# Log polling every 5 minutes
sender.add_periodic_task(
settings.sync.logs_poll_interval,
poll_server_logs.s(),
name="poll-server-logs",
)
# Metrics polling every 15 minutes
sender.add_periodic_task(
settings.sync.metrics_poll_interval,
poll_metrics_anomalies.s(),
name="poll-metrics-anomalies",
)
# Error traces polling every 10 minutes
sender.add_periodic_task(
settings.sync.error_traces_poll_interval,
poll_error_traces.s(),
name="poll-error-traces",
)
# Staleness scoring daily at 03:00 UTC
from celery.schedules import crontab
sender.add_periodic_task(
crontab(hour=3, minute=0),
compute_staleness_scores.s(),
name="compute-staleness-scores",
)
# Dependency risk daily at 03:30 UTC
sender.add_periodic_task(
crontab(hour=3, minute=30),
compute_dependency_risk.s(),
name="compute-dependency-risk",
)
# Task imports (to be implemented in tasks module)
from celery import shared_task
@shared_task(queue="polling", bind=True, max_retries=3)
def sync_slack_incremental(self):
"""Sync new messages from Slack channels incrementally."""
pass
@shared_task(queue="polling", bind=True, max_retries=3)
def sync_github_incremental(self):
"""Sync new events from GitHub incrementally."""
pass
@shared_task(queue="polling", bind=True, max_retries=3)
def sync_jira_incremental(self):
"""Sync new issues from Jira incrementally."""
pass
@shared_task(queue="polling", bind=True, max_retries=3)
def poll_server_logs(self):
"""Poll and ingest ERROR/WARN logs from monitored services."""
pass
@shared_task(queue="polling", bind=True, max_retries=3)
def poll_metrics_anomalies(self):
"""Z-score spike detection and escalation trend detection — runs every 15 min."""
try:
from src.anomaly.tasks import run_zscore_anomaly_detection
run_zscore_anomaly_detection()
except Exception as exc:
logger.error("poll_metrics_anomalies failed: %s", exc)
raise self.retry(exc=exc, countdown=120)
@shared_task(queue="polling", bind=True, max_retries=3)
def poll_error_traces(self):
"""Poll error traces from APM services."""
pass
@shared_task(queue="critical", bind=True, max_retries=5)
def process_webhook_event(self, source_type: str, event_data: dict, rbac_tags: dict):
"""Route webhook event to appropriate agent for real-time ingestion."""
pass
@shared_task(queue="high", bind=True, max_retries=3)
def enrich_and_index_document(self, document: dict):
"""Extract entities, relationships, and index document."""
pass
@shared_task(queue="low", bind=True, max_retries=2)
def compute_staleness_scores(self):
"""Daily staleness risk scoring for all documents (03:00 UTC)."""
try:
from src.anomaly.tasks import run_staleness_scoring
run_staleness_scoring()
except Exception as exc:
logger.error("compute_staleness_scores failed: %s", exc)
raise self.retry(exc=exc, countdown=300)
@shared_task(queue="low", bind=True, max_retries=2)
def compute_dependency_risk(self):
"""Daily dependency risk modelling from Neo4j graph (03:30 UTC)."""
try:
from src.anomaly.tasks import run_dependency_risk_modeling
run_dependency_risk_modeling()
except Exception as exc:
logger.error("compute_dependency_risk failed: %s", exc)
raise self.retry(exc=exc, countdown=300)
if __name__ == "__main__":
app.start()