import os import json from dotenv import load_dotenv from upstash_redis import Redis from src.libs.logger import logger from src.libs.helper_functions import chunk_data, create_uuid_from_string load_dotenv() url = os.getenv("UPSTASH_REDIS_REST_URL") token = os.getenv("UPSTASH_REDIS_REST_TOKEN") REDIS = Redis(url=url, token=token) REDIS_PIPELINE = REDIS.pipeline() @logger.instrument() def args_to_key(*args, **kwargs): """ This function generates a unique key based on the provided arguments and keyword arguments. Args: *args: A variable number of positional arguments. **kwargs: A variable number of keyword arguments. Returns: str: A unique key string generated from the provided arguments and keyword arguments. Example: >>> args_to_key("hello", "world", a=1, b=2) 'hello_world_a_1_b_2' """ params = [] # TODO: turn these into debugging logs logger.debug(f"args: {args}") logger.debug(f"kwargs: {kwargs}") # Append the names of callable arguments to the params list for arg in args: if callable(arg): params.append(arg.__name__) # Append the string representations of non-callable arguments and keyword arguments to the params list for arg in args: if not callable(arg): params.append(cast_args_to_string_and_return_first_index(arg)) for kwarg in kwargs.values(): params.append(str(kwarg)) # Join the elements in the params list using the '_' character as a separator return "_".join(params) @logger.instrument() def REDIS_CACHED(ttl: int = 3600, chunk: bool = False): """ This decorator caches the result of a function call in Redis. Args: ttl (int): The time-to-live (in seconds) for the cached result. Defaults to 3600 seconds (1 hour). chunk (bool): Whether to chunk the result of the original function call. Defaults to False. Returns: A wrapper function that caches the result of the original function call. Example: >>> @REDIS_CACHED(ttl=60, chunk=True) ... def example_function(arg1, arg2): ... return arg1 + arg2 ... ... cached_result = example_function(3, 4) ... print(cached_result) # Output: 7 ... print(cached_result) # Output: 7 (from cache) """ def decorator(func): def wrapper(*args, **kwargs): r = REDIS cache_key = args_to_key(func, *args, **kwargs) logger.debug(f"Cache key: {cache_key}") # TODO: turn these into debugging logs cache_key = str(create_uuid_from_string(cache_key)) logger.debug(f"Cache key: {cache_key}") # TODO: turn these into debugging logs # Test if a matching cache key exists cached = r.get(cache_key) if cached: # Found in cache, return it return json.loads(cached) # Otherwise, pass everything to the downstream function result = func(*args, **kwargs) # Set cache time-to-live duration # Use the default TTL if not provided as an argument cache_ttl = kwargs.get('cache_ttl') ttl_seconds = cache_ttl or kwargs.get('ttl', ttl) if chunk: chunked_result = chunk_data(result.data, 100) for i in range(len(chunked_result)): r.rpush(cache_key, json.dumps(chunked_result[i])) r.pexpire(cache_key, ttl_seconds) else: # Put the result from downstream function into cache, with a TTL # So next call with the same parameters will be handled by the cache r.setex(cache_key, ttl_seconds, result) # Return the result transparently return result return wrapper return decorator @logger.instrument() def cast_args_to_string_and_return_first_index(args): """ This function takes an argument, casts it to a string, removes '<' and '>', splits it by spaces, and returns the first index. Args: args (any): The argument to be processed. Returns: str or None: The first index of the argument after processing, or None if the argument is empty. Example: >>> cast_args_to_string_and_return_first_index("hello ") 'hello' """ args_str = str(args).strip('<>').split(' ') return args_str[0] if args_str else None