File size: 14,298 Bytes
31f0e50
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ed26b37
 
 
 
 
31f0e50
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ed26b37
 
 
 
31f0e50
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ed26b37
 
 
 
 
 
 
 
 
31f0e50
ed26b37
 
 
 
 
 
31f0e50
 
ed26b37
 
31f0e50
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
"""

Redis Client Module.



Provides session state management with TTL for:

- Active honeypot sessions

- Conversation context caching

- Rate limiting counters



Implements Task 6.2 requirements:

- AC-2.3.1: State persists across API calls

- AC-2.3.2: Session expires after 1 hour

- AC-2.3.4: Redis failure degrades gracefully

"""

from typing import Dict, Optional, Any, Callable, TypeVar
import json
import os
import time
from functools import wraps
import redis
from redis.exceptions import ConnectionError as RedisConnectionError, RedisError

from app.config import settings
from app.utils.logger import get_logger

logger = get_logger(__name__)

# Type variable for generic returns
T = TypeVar("T")

# Global Redis client
redis_client: Optional[redis.Redis] = None

# Track if Redis is known to be unavailable (to skip connection attempts)
_redis_unavailable: bool = False
_redis_last_check: float = 0
_REDIS_RECHECK_INTERVAL = 60  # Only try reconnecting every 60 seconds

# In-memory fallback cache when Redis is unavailable
_fallback_cache: Dict[str, Dict[str, Any]] = {}
_fallback_cache_ttl: Dict[str, float] = {}

# Default TTL in seconds (1 hour)
DEFAULT_SESSION_TTL = 3600


def init_redis_client() -> None:
    """

    Initialize Redis client from configuration.

    

    Raises:

        ValueError: If REDIS_URL is not configured

    """
    global redis_client
    
    if redis_client is not None:
        return
    
    redis_url = settings.REDIS_URL
    
    if not redis_url:
        logger.warning("REDIS_URL not configured. Redis operations will fail.")
        return
    
    try:
        redis_client = redis.from_url(
            redis_url,
            decode_responses=True,
            socket_connect_timeout=1,  # Reduced from 5s for faster fallback
            socket_timeout=1,          # Reduced from 5s for faster fallback
            retry_on_timeout=False,    # Don't retry, use fallback cache instead
            health_check_interval=60,
        )
        # Test connection
        redis_client.ping()
        logger.info("Redis client initialized successfully")
    except (RedisConnectionError, RedisError) as e:
        logger.error(f"Failed to initialize Redis client: {e}")
        redis_client = None
        raise


def get_redis_client() -> redis.Redis:
    """

    Get Redis client connection.

    

    Returns:

        Redis client object

        

    Raises:

        ConnectionError: If Redis connection fails

        ValueError: If REDIS_URL is not configured

    """
    global _redis_unavailable, _redis_last_check
    
    # Skip connection attempts if Redis was recently unavailable
    if _redis_unavailable:
        if time.time() - _redis_last_check < _REDIS_RECHECK_INTERVAL:
            raise ConnectionError("Redis unavailable (cached)")
        # Time to recheck
        _redis_unavailable = False
    
    if redis_client is None:
        try:
            init_redis_client()
        except Exception:
            _redis_unavailable = True
            _redis_last_check = time.time()
            raise
    
    if redis_client is None:
        _redis_unavailable = True
        _redis_last_check = time.time()
        raise ConnectionError("Redis client not initialized. Check REDIS_URL configuration.")
    
    return redis_client


def save_session_state(session_id: str, state: Dict[str, Any], ttl: int = 3600) -> bool:
    """

    Save session state to Redis with TTL.

    

    Args:

        session_id: Unique session identifier

        state: Session state dictionary

        ttl: Time-to-live in seconds (default 1 hour)

        

    Returns:

        True if successful, False otherwise

    """
    try:
        client = get_redis_client()
        key = f"session:{session_id}"
        client.setex(key, ttl, json.dumps(state))
        return True
    except (ConnectionError, RedisError) as e:
        logger.error(f"Failed to save session state: {e}")
        return False


def get_session_state(session_id: str) -> Optional[Dict[str, Any]]:
    """

    Retrieve session state from Redis.

    

    Args:

        session_id: Session identifier

        

    Returns:

        Session state dictionary or None if not found/expired

    """
    try:
        client = get_redis_client()
        key = f"session:{session_id}"
        data = client.get(key)
        if data:
            return json.loads(data)
        return None
    except (ConnectionError, RedisError) as e:
        logger.error(f"Failed to get session state: {e}")
        return None
    except json.JSONDecodeError as e:
        logger.error(f"Failed to decode session state JSON: {e}")
        return None


def delete_session_state(session_id: str) -> bool:
    """

    Delete session state from Redis.

    

    Args:

        session_id: Session identifier

        

    Returns:

        True if deleted, False if not found

    """
    try:
        client = get_redis_client()
        key = f"session:{session_id}"
        deleted = client.delete(key)
        return deleted > 0
    except (ConnectionError, RedisError) as e:
        logger.error(f"Failed to delete session state: {e}")
        return False


def update_session_state(session_id: str, updates: Dict[str, Any]) -> bool:
    """

    Update existing session state.

    

    Args:

        session_id: Session identifier

        updates: Fields to update

        

    Returns:

        True if successful, False if session not found

    """
    # TODO: Implement session update
    state = get_session_state(session_id)
    if state is None:
        return False
    
    state.update(updates)
    return save_session_state(session_id, state)


def increment_rate_counter(key: str, window_seconds: int = 60) -> int:
    """

    Increment rate limiting counter.

    

    Args:

        key: Counter key (e.g., IP address)

        window_seconds: Time window for counter

        

    Returns:

        Current count within window

    """
    try:
        client = get_redis_client()
        counter_key = f"rate_limit:{key}"
        count = client.incr(counter_key)
        if count == 1:
            # Set expiration on first increment
            client.expire(counter_key, window_seconds)
        return count
    except (ConnectionError, RedisError) as e:
        logger.error(f"Failed to increment rate counter: {e}")
        return 0


def check_rate_limit(key: str, limit: int, window_seconds: int = 60) -> bool:
    """

    Check if rate limit is exceeded.

    

    Args:

        key: Counter key

        limit: Maximum allowed requests

        window_seconds: Time window

        

    Returns:

        True if within limit, False if exceeded

    """
    try:
        count = increment_rate_counter(key, window_seconds)
        return count <= limit
    except Exception as e:
        logger.error(f"Failed to check rate limit: {e}")
        # Fail open - allow request if Redis is down
        return True


def health_check() -> bool:
    """

    Check Redis connection health.

    

    Returns:

        True if Redis is responsive, False otherwise

    """
    try:
        client = get_redis_client()
        client.ping()
        return True
    except (ConnectionError, RedisError) as e:
        logger.warning(f"Redis health check failed: {e}")
        return False
    except Exception as e:
        logger.error(f"Unexpected error in Redis health check: {e}")
        return False


# ============================================================================
# Graceful Degradation with In-Memory Fallback
# ============================================================================

def _cleanup_fallback_cache() -> None:
    """Remove expired entries from fallback cache."""
    now = time.time()
    expired_keys = [
        key for key, expiry in _fallback_cache_ttl.items()
        if expiry < now
    ]
    for key in expired_keys:
        _fallback_cache.pop(key, None)
        _fallback_cache_ttl.pop(key, None)


def save_session_state_with_fallback(

    session_id: str,

    state: Dict[str, Any],

    ttl: int = DEFAULT_SESSION_TTL,

) -> bool:
    """

    Save session state with in-memory fallback.

    

    Implements AC-2.3.4: Redis failure degrades gracefully.

    

    Args:

        session_id: Unique session identifier

        state: Session state dictionary

        ttl: Time-to-live in seconds (default 1 hour per AC-2.3.2)

        

    Returns:

        True if saved (Redis or fallback), False on complete failure

    """
    # Try Redis first
    if save_session_state(session_id, state, ttl):
        return True
    
    # Fall back to in-memory cache
    logger.warning(f"Redis unavailable, using fallback cache for session {session_id}")
    try:
        _cleanup_fallback_cache()
        key = f"session:{session_id}"
        _fallback_cache[key] = state.copy()
        _fallback_cache_ttl[key] = time.time() + ttl
        return True
    except Exception as e:
        logger.error(f"Fallback cache failed: {e}")
        return False


def get_session_state_with_fallback(session_id: str) -> Optional[Dict[str, Any]]:
    """

    Get session state with in-memory fallback.

    

    Implements AC-2.3.4: Redis failure degrades gracefully.

    

    Args:

        session_id: Session identifier

        

    Returns:

        Session state or None if not found/expired

    """
    # Try Redis first
    state = get_session_state(session_id)
    if state is not None:
        logger.debug(f"Session {session_id} found in Redis")
        return state
    
    # Try fallback cache
    _cleanup_fallback_cache()
    key = f"session:{session_id}"
    
    if key in _fallback_cache:
        expiry = _fallback_cache_ttl.get(key, 0)
        if expiry > time.time():
            logger.debug(f"Session {session_id} retrieved from fallback cache")
            return _fallback_cache[key].copy()
        else:
            # Expired
            _fallback_cache.pop(key, None)
            _fallback_cache_ttl.pop(key, None)
    
    return None


def delete_session_state_with_fallback(session_id: str) -> bool:
    """

    Delete session state from Redis and fallback cache.

    

    Args:

        session_id: Session identifier

        

    Returns:

        True if deleted from either location

    """
    redis_deleted = delete_session_state(session_id)
    
    # Also remove from fallback
    key = f"session:{session_id}"
    fallback_deleted = key in _fallback_cache
    _fallback_cache.pop(key, None)
    _fallback_cache_ttl.pop(key, None)
    
    return redis_deleted or fallback_deleted


def extend_session_ttl(session_id: str, additional_seconds: int = DEFAULT_SESSION_TTL) -> bool:
    """

    Extend session TTL.

    

    Useful for keeping active sessions alive beyond initial TTL.

    

    Args:

        session_id: Session identifier

        additional_seconds: Additional time in seconds

        

    Returns:

        True if extended, False otherwise

    """
    try:
        client = get_redis_client()
        key = f"session:{session_id}"
        
        # Get current TTL
        current_ttl = client.ttl(key)
        
        if current_ttl > 0:
            # Extend by adding to current TTL
            new_ttl = current_ttl + additional_seconds
            client.expire(key, new_ttl)
            logger.debug(f"Session {session_id} TTL extended by {additional_seconds}s")
            return True
        
        return False
    except (ConnectionError, RedisError) as e:
        logger.error(f"Failed to extend session TTL: {e}")
        return False


def get_session_ttl(session_id: str) -> int:
    """

    Get remaining TTL for a session.

    

    Args:

        session_id: Session identifier

        

    Returns:

        Remaining TTL in seconds, -2 if key doesn't exist, -1 if no expiry

    """
    try:
        client = get_redis_client()
        key = f"session:{session_id}"
        return client.ttl(key)
    except (ConnectionError, RedisError) as e:
        logger.error(f"Failed to get session TTL: {e}")
        return -2


def get_active_session_count() -> int:
    """

    Get count of active sessions.

    

    Returns:

        Number of active sessions

    """
    try:
        client = get_redis_client()
        keys = client.keys("session:*")
        return len(keys)
    except (ConnectionError, RedisError) as e:
        logger.error(f"Failed to get active session count: {e}")
        # Return fallback count
        _cleanup_fallback_cache()
        return len([k for k in _fallback_cache if k.startswith("session:")])


def clear_all_sessions() -> int:
    """

    Clear all session data (for testing/admin purposes).

    

    Returns:

        Number of sessions cleared

    """
    try:
        client = get_redis_client()
        keys = client.keys("session:*")
        if keys:
            deleted = client.delete(*keys)
            logger.info(f"Cleared {deleted} sessions from Redis")
            return deleted
        return 0
    except (ConnectionError, RedisError) as e:
        logger.error(f"Failed to clear sessions: {e}")
        return 0


def reset_fallback_cache() -> None:
    """Reset the in-memory fallback cache (for testing)."""
    global _fallback_cache, _fallback_cache_ttl
    _fallback_cache = {}
    _fallback_cache_ttl = {}


def get_fallback_cache_stats() -> Dict[str, Any]:
    """

    Get fallback cache statistics.

    

    Returns:

        Dictionary with cache stats

    """
    _cleanup_fallback_cache()
    return {
        "entries": len(_fallback_cache),
        "total_size_bytes": sum(
            len(json.dumps(v)) for v in _fallback_cache.values()
        ),
    }


def is_redis_available() -> bool:
    """

    Check if Redis is available without raising exceptions.

    

    Returns:

        True if Redis is available, False otherwise

    """
    return health_check()