File size: 6,196 Bytes
0493349
 
451d52a
 
0493349
 
 
 
451d52a
 
0493349
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
451d52a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0493349
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
451d52a
 
 
 
 
 
 
0493349
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
451d52a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0493349
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
"""Celery app configuration and task setup."""

import logging

from celery import Celery
from kombu import Exchange, Queue
from src.config import settings

logger = logging.getLogger(__name__)

# Create Celery app
app = Celery(settings.app_name)

# Configure from settings
app.conf.update(
    broker_url=settings.celery.broker_url,
    result_backend=settings.celery.result_backend,
    task_serializer=settings.celery.task_serializer,
    result_serializer=settings.celery.result_serializer,
    accept_content=settings.celery.accept_content,
    timezone=settings.celery.timezone,
    enable_utc=settings.celery.enable_utc,
    task_track_started=settings.celery.task_track_started,
    task_time_limit=settings.celery.task_time_limit,
    task_soft_time_limit=settings.celery.task_soft_time_limit,
)

# Define queues with routing
default_exchange = Exchange("tasks", type="direct")

app.conf.task_queues = (
    Queue("critical", exchange=default_exchange, routing_key="tasks.critical", priority=10),
    Queue("high", exchange=default_exchange, routing_key="tasks.high", priority=7),
    Queue("default", exchange=default_exchange, routing_key="tasks.default", priority=5),
    Queue("low", exchange=default_exchange, routing_key="tasks.low", priority=1),
    Queue("webhooks", exchange=default_exchange, routing_key="webhooks", priority=8),
    Queue("polling", exchange=default_exchange, routing_key="polling", priority=3),
)

# Default queue
app.conf.task_default_queue = "default"
app.conf.task_default_exchange_type = "direct"
app.conf.task_default_routing_key = "tasks.default"

# Task routing
app.conf.task_routes = {
    "tasks.ingest.*": {"queue": "critical", "routing_key": "tasks.critical"},
    "tasks.webhooks.*": {"queue": "webhooks", "routing_key": "webhooks"},
    "tasks.polling.*": {"queue": "polling", "routing_key": "polling"},
    "tasks.enrichment.*": {"queue": "high", "routing_key": "tasks.high"},
}


@app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
    """Register periodic tasks with beat scheduler."""

    # Slack polling every 15 minutes
    sender.add_periodic_task(
        settings.sync.slack_poll_interval,
        sync_slack_incremental.s(),
        name="sync-slack-incremental",
    )

    # GitHub polling every 1 hour
    sender.add_periodic_task(
        settings.sync.github_poll_interval,
        sync_github_incremental.s(),
        name="sync-github-incremental",
    )

    # Jira polling every 1 hour
    sender.add_periodic_task(
        settings.sync.jira_poll_interval,
        sync_jira_incremental.s(),
        name="sync-jira-incremental",
    )

    # Log polling every 5 minutes
    sender.add_periodic_task(
        settings.sync.logs_poll_interval,
        poll_server_logs.s(),
        name="poll-server-logs",
    )

    # Metrics polling every 15 minutes
    sender.add_periodic_task(
        settings.sync.metrics_poll_interval,
        poll_metrics_anomalies.s(),
        name="poll-metrics-anomalies",
    )

    # Error traces polling every 10 minutes
    sender.add_periodic_task(
        settings.sync.error_traces_poll_interval,
        poll_error_traces.s(),
        name="poll-error-traces",
    )

    # Staleness scoring daily at 03:00 UTC
    from celery.schedules import crontab
    sender.add_periodic_task(
        crontab(hour=3, minute=0),
        compute_staleness_scores.s(),
        name="compute-staleness-scores",
    )

    # Dependency risk daily at 03:30 UTC
    sender.add_periodic_task(
        crontab(hour=3, minute=30),
        compute_dependency_risk.s(),
        name="compute-dependency-risk",
    )


# Task imports (to be implemented in tasks module)
from celery import shared_task


@shared_task(queue="polling", bind=True, max_retries=3)
def sync_slack_incremental(self):
    """Sync new messages from Slack channels incrementally."""
    pass


@shared_task(queue="polling", bind=True, max_retries=3)
def sync_github_incremental(self):
    """Sync new events from GitHub incrementally."""
    pass


@shared_task(queue="polling", bind=True, max_retries=3)
def sync_jira_incremental(self):
    """Sync new issues from Jira incrementally."""
    pass


@shared_task(queue="polling", bind=True, max_retries=3)
def poll_server_logs(self):
    """Poll and ingest ERROR/WARN logs from monitored services."""
    pass


@shared_task(queue="polling", bind=True, max_retries=3)
def poll_metrics_anomalies(self):
    """Z-score spike detection and escalation trend detection — runs every 15 min."""
    try:
        from src.anomaly.tasks import run_zscore_anomaly_detection
        run_zscore_anomaly_detection()
    except Exception as exc:
        logger.error("poll_metrics_anomalies failed: %s", exc)
        raise self.retry(exc=exc, countdown=120)


@shared_task(queue="polling", bind=True, max_retries=3)
def poll_error_traces(self):
    """Poll error traces from APM services."""
    pass


@shared_task(queue="critical", bind=True, max_retries=5)
def process_webhook_event(self, source_type: str, event_data: dict, rbac_tags: dict):
    """Route webhook event to appropriate agent for real-time ingestion."""
    pass


@shared_task(queue="high", bind=True, max_retries=3)
def enrich_and_index_document(self, document: dict):
    """Extract entities, relationships, and index document."""
    pass


@shared_task(queue="low", bind=True, max_retries=2)
def compute_staleness_scores(self):
    """Daily staleness risk scoring for all documents (03:00 UTC)."""
    try:
        from src.anomaly.tasks import run_staleness_scoring
        run_staleness_scoring()
    except Exception as exc:
        logger.error("compute_staleness_scores failed: %s", exc)
        raise self.retry(exc=exc, countdown=300)


@shared_task(queue="low", bind=True, max_retries=2)
def compute_dependency_risk(self):
    """Daily dependency risk modelling from Neo4j graph (03:30 UTC)."""
    try:
        from src.anomaly.tasks import run_dependency_risk_modeling
        run_dependency_risk_modeling()
    except Exception as exc:
        logger.error("compute_dependency_risk failed: %s", exc)
        raise self.retry(exc=exc, countdown=300)


if __name__ == "__main__":
    app.start()