fastapi-v2 / app /cache.py
Shageenderan Sapai
[FEATURE] Error Handling
f5fccf5
raw
history blame
7.21 kB
import time
import threading
import logging
import boto3
from botocore.exceptions import ClientError, NoCredentialsError, PartialCredentialsError
import os
import re
from dotenv import load_dotenv
from app.exceptions import DBError
logger = logging.getLogger(__name__)
load_dotenv()
AWS_ACCESS_KEY = os.getenv('AWS_ACCESS_KEY')
AWS_SECRET_KEY = os.getenv('AWS_SECRET_KEY')
REGION = os.getenv('AWS_REGION')
def upload_file_to_s3(filename):
# Extract user_id from the filename
user_id = os.path.basename(filename).split('.')[0]
# user_id = filename.split('.')[0]
print(user_id)
function_name = upload_file_to_s3.__name__
logger.info(f"Uploading file {filename} to S3", extra={'user_id': user_id, 'endpoint': function_name})
bucket = 'core-ai-assets'
try:
if (AWS_ACCESS_KEY and AWS_SECRET_KEY):
session = boto3.session.Session(aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key=AWS_SECRET_KEY, region_name=REGION)
else:
session = boto3.session.Session()
s3_client = session.client('s3')
with open(filename, "rb") as f:
## Upload to Production Folder
s3_client.upload_fileobj(f, bucket, f'staging/users/{user_id}.pkl')
logger.info(f"File {filename} uploaded successfully to S3", extra={'user_id': user_id, 'endpoint': function_name})
os.remove(filename)
# force_file_move(os.path.join('users', 'to_upload', filename), os.path.join('users', 'data', filename))
return True
except (FileNotFoundError, NoCredentialsError, PartialCredentialsError) as e:
logger.error(f"S3 upload failed for {filename}: {e}", extra={'user_id': user_id, 'endpoint': function_name})
raise DBError(user_id, "S3Error", f"Failed to upload file {filename} to S3", e)
class CustomTTLCache:
def __init__(self, ttl=60, cleanup_interval=10):
"""
ttl: Time to live (in seconds) for each item
cleanup_interval: Interval at which background thread checks for expired items
"""
self.ttl = ttl
self.data = {} # key -> (value, expiration_time)
self.lock = threading.Lock()
self.cleanup_interval = cleanup_interval
self.running = True
self.cleanup_thread = threading.Thread(target=self._cleanup_task, daemon=True) # for periodic cleanup
self.cleanup_thread.start()
def __setitem__(self, key, value):
"""
For item assignment i.e. cache[key] = value
"""
self.set(key, value)
def set(self, key, value):
# set expiration time as current time + ttl
expire_at = time.time() + self.ttl
with self.lock:
self.data[key] = (value, expire_at)
def get(self, key, default=None):
"""
Get a value from the cache. Returns default if key not found or expired.
Resets the TTL if the item is successfully accessed.
"""
with self.lock:
entry = self.data.get(key)
if not entry:
return default
value, expire_at = entry
now = time.time()
if now > expire_at:
# item expired, evict and upload
self._evict_item(key, value)
return default
# refresh this data's TTL since it's accessed
self.data[key] = (value, now + self.ttl)
return value
def __contains__(self, key):
"""
Chheck if a key is in the cache and not expired.
If not expired, reset the TTL.
"""
with self.lock:
entry = self.data.get(key)
if not entry:
return False
value, expire_at = entry
now = time.time()
if now > expire_at:
self._evict_item(key, value)
return False
# refresh TTL
self.data[key] = (value, now + self.ttl)
return True
def __getitem__(self, key):
"""
Retrieve an item i.e. user_cache[key].
Raises KeyError if not found or expired.
Resets the TTL if the item is successfully accessed.
"""
with self.lock:
if key not in self.data:
raise KeyError(key)
value, expire_at = self.data[key]
now = time.time()
if now > expire_at:
self._evict_item(key, value)
raise KeyError(key)
self.data[key] = (value, now + self.ttl)
return value
def reset_cache(self):
"""
Reset the cache by removing all items.
"""
with self.lock:
for key, value in self.data.items():
self._evict_item(key, value)
self.data = {}
def pop(self, key, default=None):
"""
Remove an item from the cache and return its value.
Upload to S3 if it hasn't expired yet.
"""
with self.lock:
entry = self.data.pop(key, None)
if entry:
value, expire_at = entry
# upload before removal
self._upload_value(key, value)
return value
return default
def close(self):
"""
Stop the background cleanup thread.
Technically we should call this when done using the cache but I guess we are never done using it lol.
"""
self.running = False
self.cleanup_thread.join()
def _cleanup_task(self):
"""
Background task that periodically checks for expired items and removes them.
"""
while self.running:
now = time.time()
keys_to_remove = []
with self.lock:
for k, (v, exp) in list(self.data.items()):
if now > exp:
keys_to_remove.append(k)
# Evict expired items outside the lock to handle uploading
for k in keys_to_remove:
with self.lock:
if k in self.data:
value, _ = self.data.pop(k)
self._evict_item(k, value)
time.sleep(self.cleanup_interval)
def _evict_item(self, key, value):
"""
Called internally when an item expires. upload to S3 before removal.
"""
self._upload_value(key, value)
def _upload_value(self, key, value):
"""
Saves the user's data and uploads the resulting file to S3.
"""
try:
filename = value.save_user()
logger.info(
f"FILENAME = {filename}",
extra={'user_id': key, 'endpoint': 'cache_eviction'}
)
upload_file_to_s3(filename)
logger.info(
f"User {key} saved to S3 during cache eviction",
extra={'user_id': key, 'endpoint': 'cache_eviction'}
)
except Exception as e:
logger.error(
f"Failed to save user {key} to S3 during cache eviction: {e}",
extra={'user_id': key, 'endpoint': 'cache_eviction'}
)