File size: 4,487 Bytes
9c400b9
d068566
9c400b9
 
 
 
 
 
 
 
 
 
 
 
a3ce5eb
9c400b9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a3ce5eb
9c400b9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a3ce5eb
9c400b9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import os
import json
from dotenv import load_dotenv
from upstash_redis import Redis
from src.libs.logger import logger
from src.libs.helper_functions import chunk_data, create_uuid_from_string
load_dotenv()

url = os.getenv("UPSTASH_REDIS_REST_URL")
token = os.getenv("UPSTASH_REDIS_REST_TOKEN")

REDIS = Redis(url=url, token=token)
REDIS_PIPELINE = REDIS.pipeline()


@logger.instrument()
def args_to_key(*args, **kwargs):
    """
    This function generates a unique key based on the provided arguments and keyword arguments.

    Args:
        *args: A variable number of positional arguments.
        **kwargs: A variable number of keyword arguments.

    Returns:
        str: A unique key string generated from the provided arguments and keyword arguments.

    Example:
        >>> args_to_key("hello", "world", a=1, b=2)
        'hello_world_a_1_b_2'
    """
    params = []

    # TODO: turn these into debugging logs
    logger.debug(f"args: {args}")
    logger.debug(f"kwargs: {kwargs}")

    # Append the names of callable arguments to the params list
    for arg in args:
        if callable(arg):
            params.append(arg.__name__)

    # Append the string representations of non-callable arguments and keyword arguments to the params list
    for arg in args:
        if not callable(arg):
            params.append(cast_args_to_string_and_return_first_index(arg))

    for kwarg in kwargs.values():
        params.append(str(kwarg))

    # Join the elements in the params list using the '_' character as a separator
    return "_".join(params)


@logger.instrument()
def REDIS_CACHED(ttl: int = 3600, chunk: bool = False):
    """
    This decorator caches the result of a function call in Redis.

    Args:
        ttl (int): The time-to-live (in seconds) for the cached result. Defaults to 3600 seconds (1 hour).
        chunk (bool): Whether to chunk the result of the original function call. Defaults to False.

    Returns:
        A wrapper function that caches the result of the original function call.

    Example:
        >>> @REDIS_CACHED(ttl=60,  chunk=True)
        ... def example_function(arg1, arg2):
        ...     return arg1 + arg2
        ...
        ... cached_result = example_function(3, 4)
        ... print(cached_result)  # Output: 7
        ... print(cached_result)  # Output: 7 (from cache)
    """
    def decorator(func):
        def wrapper(*args, **kwargs):
            r = REDIS

            cache_key = args_to_key(func, *args, **kwargs)
            logger.debug(f"Cache key: {cache_key}") # TODO: turn these into debugging logs
            cache_key = str(create_uuid_from_string(cache_key))
            logger.debug(f"Cache key: {cache_key}") # TODO: turn these into debugging logs

            # Test if a matching cache key exists
            cached = r.get(cache_key)
            if cached:
                # Found in cache, return it
                return json.loads(cached)

            # Otherwise, pass everything to the downstream function
            result = func(*args, **kwargs)

            # Set cache time-to-live duration
            # Use the default TTL if not provided as an argument
            cache_ttl = kwargs.get('cache_ttl')
            ttl_seconds = cache_ttl or kwargs.get('ttl', ttl)

            if chunk:
                chunked_result = chunk_data(result.data, 100)
                for i in range(len(chunked_result)):
                    r.rpush(cache_key, json.dumps(chunked_result[i]))
                r.pexpire(cache_key, ttl_seconds)
            else:
                # Put the result from downstream function into cache, with a TTL
                # So next call with the same parameters will be handled by the cache
                r.setex(cache_key, ttl_seconds, result)

            # Return the result transparently
            return result
        return wrapper
    return decorator


@logger.instrument()
def cast_args_to_string_and_return_first_index(args):
    """
    This function takes an argument, casts it to a string, removes '<' and '>', splits it by spaces, and returns the first index.

    Args:
        args (any): The argument to be processed.

    Returns:
        str or None: The first index of the argument after processing, or None if the argument is empty.

    Example:
        >>> cast_args_to_string_and_return_first_index("hello <world>")
        'hello'
    """
    args_str = str(args).strip('<>').split(' ')
    return args_str[0] if args_str else None