import time import threading import logging import boto3 from botocore.exceptions import ClientError, NoCredentialsError, PartialCredentialsError import os import re from dotenv import load_dotenv logger = logging.getLogger(__name__) load_dotenv() AWS_ACCESS_KEY = os.getenv('AWS_ACCESS_KEY') AWS_SECRET_KEY = os.getenv('AWS_SECRET_KEY') REGION = os.getenv('AWS_REGION') def upload_file_to_s3(filename): # Extract user_id from the filename user_id = os.path.basename(filename).split('.')[0] # user_id = filename.split('.')[0] print(user_id) function_name = upload_file_to_s3.__name__ logger.info(f"Uploading file {filename} to S3", extra={'user_id': user_id, 'endpoint': function_name}) bucket = 'core-ai-assets' try: if (AWS_ACCESS_KEY and AWS_SECRET_KEY): session = boto3.session.Session(aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key=AWS_SECRET_KEY, region_name=REGION) else: session = boto3.session.Session() s3_client = session.client('s3') with open(filename, "rb") as f: ## Upload to Production Folder s3_client.upload_fileobj(f, bucket, f'dev/users/{user_id}.pkl') logger.info(f"File {filename} uploaded successfully to S3", extra={'user_id': user_id, 'endpoint': function_name}) os.remove(filename) # force_file_move(os.path.join('users', 'to_upload', filename), os.path.join('users', 'data', filename)) return True except (FileNotFoundError, NoCredentialsError, PartialCredentialsError) as e: logger.error(f"S3 upload failed for {filename}: {e}", extra={'user_id': user_id, 'endpoint': function_name}) return False class CustomTTLCache: def __init__(self, ttl=60, cleanup_interval=10): """ ttl: Time to live (in seconds) for each item cleanup_interval: Interval at which background thread checks for expired items """ self.ttl = ttl self.data = {} # key -> (value, expiration_time) self.lock = threading.Lock() self.cleanup_interval = cleanup_interval self.running = True self.cleanup_thread = threading.Thread(target=self._cleanup_task, daemon=True) # for periodic cleanup self.cleanup_thread.start() def __setitem__(self, key, value): """ For item assignment i.e. cache[key] = value """ self.set(key, value) def set(self, key, value): # set expiration time as current time + ttl expire_at = time.time() + self.ttl with self.lock: self.data[key] = (value, expire_at) def get(self, key, default=None): """ Get a value from the cache. Returns default if key not found or expired. Resets the TTL if the item is successfully accessed. """ with self.lock: entry = self.data.get(key) if not entry: return default value, expire_at = entry now = time.time() if now > expire_at: # item expired, evict and upload self._evict_item(key, value) return default # refresh this data's TTL since it's accessed self.data[key] = (value, now + self.ttl) return value def __contains__(self, key): """ Chheck if a key is in the cache and not expired. If not expired, reset the TTL. """ with self.lock: entry = self.data.get(key) if not entry: return False value, expire_at = entry now = time.time() if now > expire_at: self._evict_item(key, value) return False # refresh TTL self.data[key] = (value, now + self.ttl) return True def __getitem__(self, key): """ Retrieve an item i.e. user_cache[key]. Raises KeyError if not found or expired. Resets the TTL if the item is successfully accessed. """ with self.lock: if key not in self.data: raise KeyError(key) value, expire_at = self.data[key] now = time.time() if now > expire_at: self._evict_item(key, value) raise KeyError(key) self.data[key] = (value, now + self.ttl) return value def reset_cache(self): """ Reset the cache by removing all items. """ with self.lock: for key, value in self.data.items(): self._evict_item(key, value) self.data = {} def pop(self, key, default=None): """ Remove an item from the cache and return its value. Upload to S3 if it hasn't expired yet. """ with self.lock: entry = self.data.pop(key, None) if entry: value, expire_at = entry # upload before removal self._upload_value(key, value) return value return default def close(self): """ Stop the background cleanup thread. Technically we should call this when done using the cache but I guess we are never done using it lol. """ self.running = False self.cleanup_thread.join() def _cleanup_task(self): """ Background task that periodically checks for expired items and removes them. """ while self.running: now = time.time() keys_to_remove = [] with self.lock: for k, (v, exp) in list(self.data.items()): if now > exp: keys_to_remove.append(k) # Evict expired items outside the lock to handle uploading for k in keys_to_remove: with self.lock: if k in self.data: value, _ = self.data.pop(k) self._evict_item(k, value) time.sleep(self.cleanup_interval) def _evict_item(self, key, value): """ Called internally when an item expires. upload to S3 before removal. """ self._upload_value(key, value) def _upload_value(self, key, value): """ Saves the user's data and uploads the resulting file to S3. """ try: filename = value.save_user() logger.info( f"FILENAME = {filename}", extra={'user_id': key, 'endpoint': 'cache_eviction'} ) upload_file_to_s3(filename) logger.info( f"User {key} saved to S3 during cache eviction", extra={'user_id': key, 'endpoint': 'cache_eviction'} ) except Exception as e: logger.error( f"Failed to save user {key} to S3 during cache eviction: {e}", extra={'user_id': key, 'endpoint': 'cache_eviction'} )