Samyuktha24 commited on
Commit
58ace4b
·
1 Parent(s): 15b828c

Add Redis utilities for session management, caching, and task queues

Browse files
.env ADDED
File without changes
Docs/ARCHITECTURE.md ADDED
@@ -0,0 +1,63 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ src/
2
+ ├── adapters/ # All source adapters
3
+ │ ├── __init__.py # Registry + factory
4
+ │ ├── base.py # BaseSourceAdapter interface
5
+ │ ├── notion.py # NotionAdapter
6
+ │ ├── confluence.py # ConfluenceAdapter
7
+ │ ├── github.py # GitHubAdapter
8
+ │ ├── slack.py # SlackAdapter (NEW)
9
+ │ ├── jira.py # JiraAdapter
10
+ │ ├── logs.py # LogAggregatorAdapter (NEW)
11
+ │ ├── metrics.py # MetricsAdapter (NEW)
12
+ │ ├── error_traces.py # ErrorTraceAdapter (NEW)
13
+ │ ├── business_data.py # BusinessDataAdapter (NEW)
14
+ │ ├── ocr.py # OCRAdapter (NEW)
15
+ │ └── pdf.py # PDFAdapter
16
+
17
+ ├── integrations/ # Webhooks, event handlers, polling tasks
18
+ │ ├── __init__.py
19
+ │ ├── github/
20
+ │ │ ├── webhooks.py # GitHub webhook handler
21
+ │ │ └── tasks.py # GitHub Celery tasks
22
+ │ ├── slack/
23
+ │ │ ├── webhooks.py # Slack event handler (NEW)
24
+ │ │ └── tasks.py # Slack polling tasks (NEW)
25
+ │ ├── jira/
26
+ │ │ ├── webhooks.py # Jira webhook handler (NEW)
27
+ │ │ └── tasks.py # Jira polling tasks (NEW)
28
+ │ ├── logs/
29
+ │ │ ├── webhooks.py # Error log webhook handler (NEW)
30
+ │ │ └── tasks.py # Log polling tasks (NEW)
31
+ │ ├── metrics/
32
+ │ │ └── tasks.py # Metrics polling tasks (NEW)
33
+ │ └── business_data/
34
+ │ └── tasks.py # Business data sync tasks (NEW)
35
+
36
+ ├── orm/ # Database ORM connectors
37
+ │ ├── __init__.py
38
+ │ ├── base.py # BaseORM interface
39
+ │ ├── postgres.py # PostgreSQL connector
40
+ │ ├── salesforce.py # Salesforce REST API
41
+ │ ├── netsuite.py # NetSuite SuiteTalk API
42
+ │ ├── sap.py # SAP OData API
43
+ │ └── generic_rest.py # Generic REST API wrapper
44
+
45
+ ├── tasks/ # Celery task definitions (YOUR DOMAIN)
46
+ │ ├── __init__.py
47
+ │ ├── ingest_tasks.py # Main ingest coordination
48
+ │ ├── sync_tasks.py # Polling/incremental sync (notion, confluence, etc)
49
+ │ ├── webhook_tasks.py # Handle webhook queuing (redis backed)
50
+ │ └── state_management.py # Redis state tracking
51
+
52
+ ├── redis/ # Redis utilities (YOUR DOMAIN)
53
+ │ ├── __init__.py
54
+ │ ├── cache.py # Caching layer (credentials, last_sync_times)
55
+ │ ├── queues.py # Task queues (ingest queue, webhook queue)
56
+ │ ├── session_state.py # Webhook/API session state
57
+ │ └── locks.py # Distributed locks (prevent parallel syncs)
58
+
59
+ └── ingestion/
60
+ ├── pipeline.py # Main ingestion pipeline
61
+ ├── enrichment.py # Entity/relationship extraction
62
+ └── orchestrator.py # Routes docs through adapters
63
+
Docs/TODO.md ADDED
File without changes
src/redis/__init__.py ADDED
@@ -0,0 +1,15 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Redis utilities for queuing, caching, locks, and session state."""
2
+
3
+ from .queues import IngestQueue, WebhookQueue, get_queue
4
+ from .cache import RedisCache
5
+ from .locks import DistributedLock
6
+ from .session_state import SessionStateManager
7
+
8
+ __all__ = [
9
+ "IngestQueue",
10
+ "WebhookQueue",
11
+ "get_queue",
12
+ "RedisCache",
13
+ "DistributedLock",
14
+ "SessionStateManager",
15
+ ]
src/redis/cache.py ADDED
@@ -0,0 +1,164 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Redis caching layer for credentials, sync state, and frequently accessed data."""
2
+
3
+ import json
4
+ from datetime import datetime, timedelta
5
+ from typing import Any, Optional
6
+
7
+ import redis
8
+
9
+
10
+ class RedisCache:
11
+ """Simple Redis cache with TTL support."""
12
+
13
+ def __init__(self, redis_client: redis.Redis, key_prefix: str = "cache"):
14
+ self.redis = redis_client
15
+ self.key_prefix = key_prefix
16
+
17
+ def _make_key(self, namespace: str, key: str) -> str:
18
+ """Build cache key with namespace."""
19
+ return f"{self.key_prefix}:{namespace}:{key}"
20
+
21
+ async def set(
22
+ self,
23
+ namespace: str,
24
+ key: str,
25
+ value: Any,
26
+ ttl_seconds: Optional[int] = None,
27
+ ) -> bool:
28
+ """
29
+ Set cache value with optional TTL.
30
+
31
+ Args:
32
+ namespace: Logical grouping (e.g., "credentials", "sync_state")
33
+ key: Cache key
34
+ value: Value to cache (auto-serialized to JSON if not string)
35
+ ttl_seconds: Time-to-live (None = no expiration)
36
+ """
37
+ cache_key = self._make_key(namespace, key)
38
+
39
+ # Auto-serialize non-string values
40
+ if isinstance(value, str):
41
+ cache_value = value
42
+ else:
43
+ cache_value = json.dumps(value)
44
+
45
+ if ttl_seconds:
46
+ self.redis.setex(cache_key, ttl_seconds, cache_value)
47
+ else:
48
+ self.redis.set(cache_key, cache_value)
49
+
50
+ return True
51
+
52
+ async def get(
53
+ self,
54
+ namespace: str,
55
+ key: str,
56
+ deserialize: bool = True,
57
+ ) -> Optional[Any]:
58
+ """
59
+ Get cache value.
60
+
61
+ Args:
62
+ namespace: Logical grouping
63
+ key: Cache key
64
+ deserialize: If True, attempt JSON deserialization
65
+ """
66
+ cache_key = self._make_key(namespace, key)
67
+ value = self.redis.get(cache_key)
68
+
69
+ if not value:
70
+ return None
71
+
72
+ value_str = value.decode('utf-8')
73
+
74
+ if deserialize:
75
+ try:
76
+ return json.loads(value_str)
77
+ except (json.JSONDecodeError, ValueError):
78
+ return value_str
79
+
80
+ return value_str
81
+
82
+ async def delete(self, namespace: str, key: str) -> bool:
83
+ """Delete cache entry."""
84
+ cache_key = self._make_key(namespace, key)
85
+ self.redis.delete(cache_key)
86
+ return True
87
+
88
+ async def exists(self, namespace: str, key: str) -> bool:
89
+ """Check if cache entry exists."""
90
+ cache_key = self._make_key(namespace, key)
91
+ return self.redis.exists(cache_key) > 0
92
+
93
+ async def clear_namespace(self, namespace: str) -> int:
94
+ """Clear all entries in a namespace."""
95
+ pattern = self._make_key(namespace, "*")
96
+ keys = self.redis.keys(pattern)
97
+ if keys:
98
+ return self.redis.delete(*keys)
99
+ return 0
100
+
101
+
102
+ class SyncStateCache(RedisCache):
103
+ """Specialized cache for tracking last sync times per source."""
104
+
105
+ def __init__(self, redis_client: redis.Redis):
106
+ super().__init__(redis_client, key_prefix="sync_state")
107
+
108
+ async def get_last_sync(self, source_type: str, space_id: str) -> Optional[datetime]:
109
+ """Get last sync timestamp for a source/space."""
110
+ key = f"{source_type}:{space_id}"
111
+ timestamp_str = await self.get("last_sync", key, deserialize=False)
112
+
113
+ if timestamp_str:
114
+ return datetime.fromisoformat(timestamp_str)
115
+ return None
116
+
117
+ async def set_last_sync(self, source_type: str, space_id: str):
118
+ """Update last sync timestamp to now."""
119
+ key = f"{source_type}:{space_id}"
120
+ await self.set("last_sync", key, datetime.utcnow().isoformat())
121
+
122
+ async def get_last_full_sync(self, source_type: str, space_id: str) -> Optional[datetime]:
123
+ """Get last full (non-incremental) sync timestamp."""
124
+ key = f"{source_type}:{space_id}"
125
+ timestamp_str = await self.get("full_sync", key, deserialize=False)
126
+
127
+ if timestamp_str:
128
+ return datetime.fromisoformat(timestamp_str)
129
+ return None
130
+
131
+ async def set_last_full_sync(self, source_type: str, space_id: str):
132
+ """Update last full sync timestamp to now."""
133
+ key = f"{source_type}:{space_id}"
134
+ await self.set("full_sync", key, datetime.utcnow().isoformat())
135
+
136
+
137
+ class CredentialCache(RedisCache):
138
+ """Specialized cache for storing integration credentials (encrypted in production)."""
139
+
140
+ def __init__(self, redis_client: redis.Redis):
141
+ super().__init__(redis_client, key_prefix="credentials")
142
+ # In production, use redis-py with encryption or a separate vault service
143
+ # For now, TTL prevents long-term exposure
144
+
145
+ async def get_credentials(self, integration_type: str, org_id: str) -> Optional[dict]:
146
+ """Get cached credentials for an integration."""
147
+ key = f"{integration_type}:{org_id}"
148
+ return await self.get("creds", key, deserialize=True)
149
+
150
+ async def set_credentials(
151
+ self,
152
+ integration_type: str,
153
+ org_id: str,
154
+ credentials: dict,
155
+ ttl_seconds: int = 3600, # 1 hour default
156
+ ):
157
+ """Cache credentials with short TTL (should fetch fresh when possible)."""
158
+ key = f"{integration_type}:{org_id}"
159
+ await self.set("creds", key, credentials, ttl_seconds=ttl_seconds)
160
+
161
+ async def clear_credentials(self, integration_type: str, org_id: str):
162
+ """Revoke cached credentials immediately."""
163
+ key = f"{integration_type}:{org_id}"
164
+ await self.delete("creds", key)
src/redis/locks.py ADDED
@@ -0,0 +1,141 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Distributed locks to prevent concurrent syncing of same source."""
2
+
3
+ import uuid
4
+ from datetime import datetime, timedelta
5
+ from typing import Optional
6
+
7
+ import redis
8
+
9
+
10
+ class DistributedLock:
11
+ """
12
+ Redis-backed distributed lock for preventing concurrent operations on same resource.
13
+
14
+ Prevents multiple Celery workers from syncing the same source simultaneously.
15
+ """
16
+
17
+ def __init__(self, redis_client: redis.Redis, key_prefix: str = "locks"):
18
+ self.redis = redis_client
19
+ self.key_prefix = key_prefix
20
+ self.lock_id = str(uuid.uuid4())
21
+
22
+ def _make_lock_key(self, resource: str) -> str:
23
+ """Build lock key."""
24
+ return f"{self.key_prefix}:{resource}"
25
+
26
+ async def acquire(
27
+ self,
28
+ resource: str,
29
+ timeout_seconds: int = 3600,
30
+ ) -> bool:
31
+ """
32
+ Try to acquire a lock on a resource.
33
+
34
+ Args:
35
+ resource: Unique identifier (e.g., "notion:workspace-123")
36
+ timeout_seconds: How long lock lasts before auto-releasing
37
+
38
+ Returns:
39
+ True if lock acquired, False if resource already locked
40
+ """
41
+ lock_key = self._make_lock_key(resource)
42
+
43
+ # SET NX: only set if not exists
44
+ # SET with EX: auto-expire after timeout
45
+ result = self.redis.set(
46
+ lock_key,
47
+ self.lock_id,
48
+ nx=True,
49
+ ex=timeout_seconds,
50
+ )
51
+
52
+ return result is not None
53
+
54
+ async def release(self, resource: str) -> bool:
55
+ """
56
+ Release a lock (only if we own it).
57
+
58
+ Uses Lua script for atomic check-and-delete.
59
+ """
60
+ lock_key = self._make_lock_key(resource)
61
+
62
+ # Lua script: delete only if value matches our lock_id (prevents freeing other locks)
63
+ lua_script = """
64
+ if redis.call("get", KEYS[1]) == ARGV[1] then
65
+ return redis.call("del", KEYS[1])
66
+ else
67
+ return 0
68
+ end
69
+ """
70
+
71
+ script = self.redis.register_script(lua_script)
72
+ result = script(keys=[lock_key], args=[self.lock_id])
73
+
74
+ return result > 0
75
+
76
+ async def is_locked(self, resource: str) -> bool:
77
+ """Check if resource is currently locked."""
78
+ lock_key = self._make_lock_key(resource)
79
+ return self.redis.exists(lock_key) > 0
80
+
81
+ async def wait_and_acquire(
82
+ self,
83
+ resource: str,
84
+ timeout_seconds: int = 3600,
85
+ max_wait_seconds: int = 300,
86
+ ) -> bool:
87
+ """
88
+ Poll and wait until lock is available, then acquire it.
89
+
90
+ Useful for ensuring sequential processing of the same resource.
91
+ """
92
+ wait_start = datetime.utcnow()
93
+ wait_deadline = wait_start + timedelta(seconds=max_wait_seconds)
94
+
95
+ while datetime.utcnow() < wait_deadline:
96
+ if await self.acquire(resource, timeout_seconds):
97
+ return True
98
+
99
+ # Back off: sleep 100ms before retrying
100
+ await asyncio.sleep(0.1)
101
+
102
+ return False
103
+
104
+
105
+ class LockPool:
106
+ """Manage multiple locks efficiently."""
107
+
108
+ def __init__(self, redis_client: redis.Redis):
109
+ self.redis = redis_client
110
+ self.locks = {} # resource -> DistributedLock instance
111
+
112
+ def get_lock(self, resource: str) -> DistributedLock:
113
+ """Get or create a lock for a resource."""
114
+ if resource not in self.locks:
115
+ self.locks[resource] = DistributedLock(self.redis)
116
+
117
+ return self.locks[resource]
118
+
119
+ async def acquire_many(
120
+ self,
121
+ resources: list[str],
122
+ timeout_seconds: int = 3600,
123
+ ) -> dict[str, bool]:
124
+ """Try to acquire locks on multiple resources."""
125
+ results = {}
126
+
127
+ for resource in resources:
128
+ lock = self.get_lock(resource)
129
+ results[resource] = await lock.acquire(resource, timeout_seconds)
130
+
131
+ return results
132
+
133
+ async def release_all(self):
134
+ """Release all locks held by this pool."""
135
+ for resource, lock in self.locks.items():
136
+ await lock.release(resource)
137
+
138
+ self.locks.clear()
139
+
140
+
141
+ import asyncio
src/redis/queues.py ADDED
@@ -0,0 +1,234 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Redis-backed task queues for document ingestion and webhook events."""
2
+
3
+ import json
4
+ from datetime import datetime
5
+ from typing import Optional
6
+ from enum import Enum
7
+
8
+ import redis
9
+ from pydantic import BaseModel
10
+
11
+
12
+ class Priority(str, Enum):
13
+ """Task priority levels."""
14
+ CRITICAL = "critical" # Real-time (errors, alerts)
15
+ HIGH = "high" # Fast track (key decisions, breaking changes)
16
+ NORMAL = "normal" # Standard (routine data, logs)
17
+ LOW = "low" # Background (metrics, non-urgent)
18
+
19
+
20
+ class QueuedTask(BaseModel):
21
+ """Task in a queue."""
22
+ id: str
23
+ source_type: str
24
+ payload: dict
25
+ rbac_tags: dict
26
+ priority: Priority
27
+ created_at: datetime
28
+ attempt: int = 0
29
+ max_retries: int = 3
30
+
31
+
32
+ class IngestQueue:
33
+ """Queue for documents ready for ingestion pipeline."""
34
+
35
+ def __init__(self, redis_client: redis.Redis, key_prefix: str = "ingest"):
36
+ self.redis = redis_client
37
+ self.key_prefix = key_prefix
38
+
39
+ async def add(
40
+ self,
41
+ source_type: str,
42
+ payload: dict,
43
+ rbac_tags: dict,
44
+ priority: Priority = Priority.NORMAL,
45
+ ) -> str:
46
+ """
47
+ Add document to ingest queue.
48
+
49
+ Uses sorted set with priority as score for ordering:
50
+ - CRITICAL (0): real-time processing
51
+ - HIGH (1): priority batch
52
+ - NORMAL (2): standard batch
53
+ - LOW (3): low-priority batch
54
+ """
55
+ task_id = f"{source_type}:{payload.get('uri', 'unknown')}:{datetime.utcnow().timestamp()}"
56
+
57
+ priority_scores = {
58
+ Priority.CRITICAL: 0,
59
+ Priority.HIGH: 1,
60
+ Priority.NORMAL: 2,
61
+ Priority.LOW: 3,
62
+ }
63
+
64
+ task = {
65
+ "id": task_id,
66
+ "source_type": source_type,
67
+ "payload": json.dumps(payload),
68
+ "rbac_tags": json.dumps(rbac_tags),
69
+ "priority": priority.value,
70
+ "created_at": datetime.utcnow().isoformat(),
71
+ "attempt": 0,
72
+ }
73
+
74
+ # Add to sorted set: score = priority (lower = higher priority)
75
+ self.redis.zadd(
76
+ f"{self.key_prefix}:queue",
77
+ {json.dumps(task): priority_scores[priority]}
78
+ )
79
+
80
+ return task_id
81
+
82
+ async def pop(self, batch_size: int = 10) -> list[QueuedTask]:
83
+ """
84
+ Pop highest-priority tasks from queue.
85
+ Returns up to batch_size tasks, prioritizing CRITICAL > HIGH > NORMAL > LOW.
86
+ """
87
+ key = f"{self.key_prefix}:queue"
88
+ tasks = []
89
+
90
+ # Pop from lowest score (highest priority)
91
+ for _ in range(batch_size):
92
+ result = self.redis.zrange(key, 0, 0) # Get first item
93
+ if not result:
94
+ break
95
+
96
+ task_json = result[0].decode('utf-8')
97
+ task_dict = json.loads(task_json)
98
+
99
+ # Remove from queue
100
+ self.redis.zrem(key, task_json)
101
+
102
+ task = QueuedTask(
103
+ id=task_dict["id"],
104
+ source_type=task_dict["source_type"],
105
+ payload=json.loads(task_dict["payload"]),
106
+ rbac_tags=json.loads(task_dict["rbac_tags"]),
107
+ priority=Priority(task_dict["priority"]),
108
+ created_at=datetime.fromisoformat(task_dict["created_at"]),
109
+ attempt=task_dict["attempt"],
110
+ )
111
+ tasks.append(task)
112
+
113
+ return tasks
114
+
115
+ async def requeue_on_failure(self, task: QueuedTask) -> bool:
116
+ """
117
+ Re-queue task if it hasn't exceeded max retries.
118
+ Returns True if re-queued, False if max retries exceeded.
119
+ """
120
+ if task.attempt >= task.max_retries:
121
+ # Send to deadletter queue
122
+ await self.send_to_deadletter(task, "max_retries_exceeded")
123
+ return False
124
+
125
+ task.attempt += 1
126
+ await self.add(
127
+ task.source_type,
128
+ task.payload,
129
+ task.rbac_tags,
130
+ task.priority,
131
+ )
132
+ return True
133
+
134
+ async def send_to_deadletter(self, task: QueuedTask, reason: str):
135
+ """Send task to deadletter queue for manual inspection."""
136
+ dlq_key = f"{self.key_prefix}:deadletter"
137
+
138
+ dlq_item = {
139
+ "task": json.dumps(task.dict()),
140
+ "reason": reason,
141
+ "failed_at": datetime.utcnow().isoformat(),
142
+ }
143
+
144
+ self.redis.lpush(dlq_key, json.dumps(dlq_item))
145
+ # Keep deadletter for 30 days
146
+ self.redis.expire(dlq_key, 86400 * 30)
147
+
148
+ async def get_stats(self) -> dict:
149
+ """Get queue statistics."""
150
+ key = f"{self.key_prefix}:queue"
151
+ dlq_key = f"{self.key_prefix}:deadletter"
152
+
153
+ queue_length = self.redis.zcard(key)
154
+ dlq_length = self.redis.llen(dlq_key)
155
+
156
+ return {
157
+ "queue_length": queue_length,
158
+ "deadletter_length": dlq_length,
159
+ "total": queue_length + dlq_length,
160
+ }
161
+
162
+
163
+ class WebhookQueue:
164
+ """Queue for raw webhook events (before processing)."""
165
+
166
+ def __init__(self, redis_client: redis.Redis, key_prefix: str = "webhooks"):
167
+ self.redis = redis_client
168
+ self.key_prefix = key_prefix
169
+
170
+ async def add(
171
+ self,
172
+ event_type: str,
173
+ payload: dict,
174
+ priority: Priority = Priority.NORMAL,
175
+ ) -> str:
176
+ """Queue a raw webhook event for async processing."""
177
+ event_id = f"{event_type}:{datetime.utcnow().timestamp()}"
178
+
179
+ priority_scores = {
180
+ Priority.CRITICAL: 0,
181
+ Priority.HIGH: 1,
182
+ Priority.NORMAL: 2,
183
+ Priority.LOW: 3,
184
+ }
185
+
186
+ event = {
187
+ "id": event_id,
188
+ "event_type": event_type,
189
+ "payload": json.dumps(payload),
190
+ "priority": priority.value,
191
+ "received_at": datetime.utcnow().isoformat(),
192
+ }
193
+
194
+ # Add to sorted set by priority
195
+ self.redis.zadd(
196
+ f"{self.key_prefix}:pending",
197
+ {json.dumps(event): priority_scores[priority]}
198
+ )
199
+
200
+ return event_id
201
+
202
+ async def pop(self, batch_size: int = 5) -> list[dict]:
203
+ """Pop highest-priority webhook events."""
204
+ key = f"{self.key_prefix}:pending"
205
+ events = []
206
+
207
+ for _ in range(batch_size):
208
+ result = self.redis.zrange(key, 0, 0)
209
+ if not result:
210
+ break
211
+
212
+ event_json = result[0].decode('utf-8')
213
+ event = json.loads(event_json)
214
+
215
+ # Remove from queue
216
+ self.redis.zrem(key, event_json)
217
+
218
+ events.append(event)
219
+
220
+ return events
221
+
222
+
223
+ def get_queue(queue_type: str, redis_client: redis.Redis) -> Optional[object]:
224
+ """Factory function to get queue instance."""
225
+ queues = {
226
+ "ingest": IngestQueue,
227
+ "webhook": WebhookQueue,
228
+ }
229
+
230
+ queue_class = queues.get(queue_type)
231
+ if not queue_class:
232
+ return None
233
+
234
+ return queue_class(redis_client)
src/redis/session_state.py ADDED
@@ -0,0 +1,222 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Session state management for API requests, webhook processing, and user interactions."""
2
+
3
+ import json
4
+ from datetime import datetime, timedelta
5
+ from typing import Any, Optional
6
+ from uuid import uuid4
7
+
8
+ import redis
9
+
10
+
11
+ class SessionStateManager:
12
+ """Manage session state in Redis (API sessions, webhook processing state, etc.)."""
13
+
14
+ def __init__(self, redis_client: redis.Redis, key_prefix: str = "session"):
15
+ self.redis = redis_client
16
+ self.key_prefix = key_prefix
17
+
18
+ def _make_session_key(self, session_id: str) -> str:
19
+ """Build session key."""
20
+ return f"{self.key_prefix}:{session_id}"
21
+
22
+ async def create_session(
23
+ self,
24
+ user_id: str,
25
+ org_id: str,
26
+ ttl_seconds: int = 3600,
27
+ ) -> str:
28
+ """Create a new session."""
29
+ session_id = str(uuid4())
30
+ session_key = self._make_session_key(session_id)
31
+
32
+ session_data = {
33
+ "session_id": session_id,
34
+ "user_id": user_id,
35
+ "org_id": org_id,
36
+ "created_at": datetime.utcnow().isoformat(),
37
+ "last_activity": datetime.utcnow().isoformat(),
38
+ }
39
+
40
+ self.redis.setex(
41
+ session_key,
42
+ ttl_seconds,
43
+ json.dumps(session_data),
44
+ )
45
+
46
+ return session_id
47
+
48
+ async def get_session(self, session_id: str) -> Optional[dict]:
49
+ """Retrieve session data."""
50
+ session_key = self._make_session_key(session_id)
51
+ data = self.redis.get(session_key)
52
+
53
+ if not data:
54
+ return None
55
+
56
+ return json.loads(data.decode('utf-8'))
57
+
58
+ async def update_session(self, session_id: str, updates: dict, ttl_seconds: int = 3600):
59
+ """Update session data."""
60
+ session_key = self._make_session_key(session_id)
61
+ session_data = await self.get_session(session_id)
62
+
63
+ if not session_data:
64
+ return False
65
+
66
+ session_data.update(updates)
67
+ session_data["last_activity"] = datetime.utcnow().isoformat()
68
+
69
+ self.redis.setex(
70
+ session_key,
71
+ ttl_seconds,
72
+ json.dumps(session_data),
73
+ )
74
+
75
+ return True
76
+
77
+ async def delete_session(self, session_id: str):
78
+ """Invalidate a session."""
79
+ session_key = self._make_session_key(session_id)
80
+ self.redis.delete(session_key)
81
+
82
+ async def session_exists(self, session_id: str) -> bool:
83
+ """Check if session is still valid."""
84
+ session_key = self._make_session_key(session_id)
85
+ return self.redis.exists(session_key) > 0
86
+
87
+
88
+ class WebhookProcessingState:
89
+ """Track webhook processing state (for idempotency and retry logic)."""
90
+
91
+ def __init__(self, redis_client: redis.Redis):
92
+ self.redis = redis_client
93
+ self.key_prefix = "webhook_state"
94
+
95
+ def _make_key(self, webhook_id: str) -> str:
96
+ """Build key for webhook state."""
97
+ return f"{self.key_prefix}:{webhook_id}"
98
+
99
+ async def mark_processing(self, webhook_id: str, ttl_seconds: int = 86400):
100
+ """Mark webhook as being processed."""
101
+ key = self._make_key(webhook_id)
102
+ state = {
103
+ "webhook_id": webhook_id,
104
+ "status": "processing",
105
+ "started_at": datetime.utcnow().isoformat(),
106
+ }
107
+
108
+ self.redis.setex(key, ttl_seconds, json.dumps(state))
109
+
110
+ async def mark_completed(self, webhook_id: str, result: dict):
111
+ """Mark webhook as completed."""
112
+ key = self._make_key(webhook_id)
113
+ state = {
114
+ "webhook_id": webhook_id,
115
+ "status": "completed",
116
+ "completed_at": datetime.utcnow().isoformat(),
117
+ "result": result,
118
+ }
119
+
120
+ # Keep for 7 days after completion
121
+ self.redis.setex(key, 86400 * 7, json.dumps(state))
122
+
123
+ async def mark_failed(self, webhook_id: str, error: str, retry_at: Optional[datetime] = None):
124
+ """Mark webhook as failed."""
125
+ key = self._make_key(webhook_id)
126
+ state = {
127
+ "webhook_id": webhook_id,
128
+ "status": "failed",
129
+ "failed_at": datetime.utcnow().isoformat(),
130
+ "error": error,
131
+ "retry_at": retry_at.isoformat() if retry_at else None,
132
+ }
133
+
134
+ self.redis.setex(key, 86400 * 7, json.dumps(state))
135
+
136
+ async def get_state(self, webhook_id: str) -> Optional[dict]:
137
+ """Get current state of a webhook."""
138
+ key = self._make_key(webhook_id)
139
+ data = self.redis.get(key)
140
+
141
+ if not data:
142
+ return None
143
+
144
+ return json.loads(data.decode('utf-8'))
145
+
146
+ async def is_processing(self, webhook_id: str) -> bool:
147
+ """Check if webhook is currently being processed."""
148
+ state = await self.get_state(webhook_id)
149
+ return state and state.get("status") == "processing"
150
+
151
+ async def is_completed(self, webhook_id: str) -> bool:
152
+ """Check if webhook has been completed (idempotency check)."""
153
+ state = await self.get_state(webhook_id)
154
+ return state and state.get("status") == "completed"
155
+
156
+
157
+ class AgentJobTracker:
158
+ """Track agent job execution state (for orchestrator coordination)."""
159
+
160
+ def __init__(self, redis_client: redis.Redis):
161
+ self.redis = redis_client
162
+ self.key_prefix = "agent_jobs"
163
+
164
+ def _make_job_key(self, job_id: str) -> str:
165
+ """Build key for agent job."""
166
+ return f"{self.key_prefix}:{job_id}"
167
+
168
+ async def create_job(
169
+ self,
170
+ job_id: str,
171
+ agent_name: str,
172
+ input_data: dict,
173
+ ttl_seconds: int = 3600,
174
+ ):
175
+ """Create a new agent job."""
176
+ key = self._make_job_key(job_id)
177
+ job = {
178
+ "job_id": job_id,
179
+ "agent_name": agent_name,
180
+ "status": "queued",
181
+ "input": json.dumps(input_data),
182
+ "created_at": datetime.utcnow().isoformat(),
183
+ "output": None,
184
+ }
185
+
186
+ self.redis.setex(key, ttl_seconds, json.dumps(job))
187
+
188
+ async def mark_job_running(self, job_id: str):
189
+ """Mark job as running."""
190
+ key = self._make_job_key(job_id)
191
+ job = json.loads(self.redis.get(key).decode('utf-8'))
192
+
193
+ job["status"] = "running"
194
+ job["started_at"] = datetime.utcnow().isoformat()
195
+
196
+ self.redis.set(key, json.dumps(job))
197
+
198
+ async def mark_job_completed(self, job_id: str, output: dict):
199
+ """Mark job as completed with output."""
200
+ key = self._make_job_key(job_id)
201
+ job = json.loads(self.redis.get(key).decode('utf-8'))
202
+
203
+ job["status"] = "completed"
204
+ job["output"] = json.dumps(output)
205
+ job["completed_at"] = datetime.utcnow().isoformat()
206
+
207
+ self.redis.set(key, json.dumps(job))
208
+
209
+ async def get_job(self, job_id: str) -> Optional[dict]:
210
+ """Get job status and output."""
211
+ key = self._make_job_key(job_id)
212
+ data = self.redis.get(key)
213
+
214
+ if not data:
215
+ return None
216
+
217
+ return json.loads(data.decode('utf-8'))
218
+
219
+ async def is_job_complete(self, job_id: str) -> bool:
220
+ """Check if job is completed."""
221
+ job = await self.get_job(job_id)
222
+ return job and job.get("status") == "completed"