File size: 4,487 Bytes
9c400b9 d068566 9c400b9 a3ce5eb 9c400b9 a3ce5eb 9c400b9 a3ce5eb 9c400b9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
import os
import json
from dotenv import load_dotenv
from upstash_redis import Redis
from src.libs.logger import logger
from src.libs.helper_functions import chunk_data, create_uuid_from_string
load_dotenv()
url = os.getenv("UPSTASH_REDIS_REST_URL")
token = os.getenv("UPSTASH_REDIS_REST_TOKEN")
REDIS = Redis(url=url, token=token)
REDIS_PIPELINE = REDIS.pipeline()
@logger.instrument()
def args_to_key(*args, **kwargs):
"""
This function generates a unique key based on the provided arguments and keyword arguments.
Args:
*args: A variable number of positional arguments.
**kwargs: A variable number of keyword arguments.
Returns:
str: A unique key string generated from the provided arguments and keyword arguments.
Example:
>>> args_to_key("hello", "world", a=1, b=2)
'hello_world_a_1_b_2'
"""
params = []
# TODO: turn these into debugging logs
logger.debug(f"args: {args}")
logger.debug(f"kwargs: {kwargs}")
# Append the names of callable arguments to the params list
for arg in args:
if callable(arg):
params.append(arg.__name__)
# Append the string representations of non-callable arguments and keyword arguments to the params list
for arg in args:
if not callable(arg):
params.append(cast_args_to_string_and_return_first_index(arg))
for kwarg in kwargs.values():
params.append(str(kwarg))
# Join the elements in the params list using the '_' character as a separator
return "_".join(params)
@logger.instrument()
def REDIS_CACHED(ttl: int = 3600, chunk: bool = False):
"""
This decorator caches the result of a function call in Redis.
Args:
ttl (int): The time-to-live (in seconds) for the cached result. Defaults to 3600 seconds (1 hour).
chunk (bool): Whether to chunk the result of the original function call. Defaults to False.
Returns:
A wrapper function that caches the result of the original function call.
Example:
>>> @REDIS_CACHED(ttl=60, chunk=True)
... def example_function(arg1, arg2):
... return arg1 + arg2
...
... cached_result = example_function(3, 4)
... print(cached_result) # Output: 7
... print(cached_result) # Output: 7 (from cache)
"""
def decorator(func):
def wrapper(*args, **kwargs):
r = REDIS
cache_key = args_to_key(func, *args, **kwargs)
logger.debug(f"Cache key: {cache_key}") # TODO: turn these into debugging logs
cache_key = str(create_uuid_from_string(cache_key))
logger.debug(f"Cache key: {cache_key}") # TODO: turn these into debugging logs
# Test if a matching cache key exists
cached = r.get(cache_key)
if cached:
# Found in cache, return it
return json.loads(cached)
# Otherwise, pass everything to the downstream function
result = func(*args, **kwargs)
# Set cache time-to-live duration
# Use the default TTL if not provided as an argument
cache_ttl = kwargs.get('cache_ttl')
ttl_seconds = cache_ttl or kwargs.get('ttl', ttl)
if chunk:
chunked_result = chunk_data(result.data, 100)
for i in range(len(chunked_result)):
r.rpush(cache_key, json.dumps(chunked_result[i]))
r.pexpire(cache_key, ttl_seconds)
else:
# Put the result from downstream function into cache, with a TTL
# So next call with the same parameters will be handled by the cache
r.setex(cache_key, ttl_seconds, result)
# Return the result transparently
return result
return wrapper
return decorator
@logger.instrument()
def cast_args_to_string_and_return_first_index(args):
"""
This function takes an argument, casts it to a string, removes '<' and '>', splits it by spaces, and returns the first index.
Args:
args (any): The argument to be processed.
Returns:
str or None: The first index of the argument after processing, or None if the argument is empty.
Example:
>>> cast_args_to_string_and_return_first_index("hello <world>")
'hello'
"""
args_str = str(args).strip('<>').split(' ')
return args_str[0] if args_str else None
|