Shageenderan Sapai commited on
Commit
6f79f8b
·
1 Parent(s): fc55440

Custom Cache cache

Browse files
app/__pycache__/assistants.cpython-312.pyc CHANGED
Binary files a/app/__pycache__/assistants.cpython-312.pyc and b/app/__pycache__/assistants.cpython-312.pyc differ
 
app/__pycache__/main.cpython-312.pyc CHANGED
Binary files a/app/__pycache__/main.cpython-312.pyc and b/app/__pycache__/main.cpython-312.pyc differ
 
app/__pycache__/user.cpython-312.pyc CHANGED
Binary files a/app/__pycache__/user.cpython-312.pyc and b/app/__pycache__/user.cpython-312.pyc differ
 
app/__pycache__/utils.cpython-312.pyc CHANGED
Binary files a/app/__pycache__/utils.cpython-312.pyc and b/app/__pycache__/utils.cpython-312.pyc differ
 
app/cache.py ADDED
@@ -0,0 +1,193 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import time
2
+ import threading
3
+ import logging
4
+ import boto3
5
+ from botocore.exceptions import ClientError, NoCredentialsError, PartialCredentialsError
6
+ import os
7
+
8
+ from dotenv import load_dotenv
9
+
10
+ logger = logging.getLogger(__name__)
11
+
12
+ load_dotenv()
13
+
14
+ AWS_ACCESS_KEY = os.getenv('AWS_ACCESS_KEY')
15
+ AWS_SECRET_KEY = os.getenv('AWS_SECRET_KEY')
16
+ REGION = os.getenv('AWS_REGION')
17
+
18
+ def upload_file_to_s3(filename):
19
+ # user_id = users\to_upload\da43883b-7e12-425f-b498-ac8665dd23fe.pkl extract everything before the first dot and the last \
20
+ user_id = filename.split('\\')[-1].split('.')[0]
21
+ # user_id = filename.split('.')[0]
22
+ print(user_id)
23
+ function_name = upload_file_to_s3.__name__
24
+ logger.info(f"Uploading file {filename} to S3", extra={'user_id': user_id, 'endpoint': function_name})
25
+ bucket = 'core-ai-assets'
26
+ try:
27
+ if (AWS_ACCESS_KEY and AWS_SECRET_KEY):
28
+ session = boto3.session.Session(aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key=AWS_SECRET_KEY, region_name=REGION)
29
+ else:
30
+ session = boto3.session.Session()
31
+ s3_client = session.client('s3')
32
+ with open(filename, "rb") as f:
33
+ ## Upload to Production Folder
34
+ s3_client.upload_fileobj(f, bucket, f'staging/users/{user_id}.pkl')
35
+ logger.info(f"File {filename} uploaded successfully to S3", extra={'user_id': user_id, 'endpoint': function_name})
36
+
37
+ os.remove(filename)
38
+
39
+ # force_file_move(os.path.join('users', 'to_upload', filename), os.path.join('users', 'data', filename))
40
+ return True
41
+ except (FileNotFoundError, NoCredentialsError, PartialCredentialsError) as e:
42
+ logger.error(f"S3 upload failed for {filename}: {e}", extra={'user_id': user_id, 'endpoint': function_name})
43
+ return False
44
+
45
+ class CustomTTLCache:
46
+ def __init__(self, ttl=60, cleanup_interval=10):
47
+ """
48
+ ttl: Time to live (in seconds) for each item
49
+ cleanup_interval: Interval at which background thread checks for expired items
50
+ """
51
+ self.ttl = ttl
52
+ self.data = {} # key -> (value, expiration_time)
53
+ self.lock = threading.Lock()
54
+ self.cleanup_interval = cleanup_interval
55
+ self.running = True
56
+ self.cleanup_thread = threading.Thread(target=self._cleanup_task, daemon=True) # for periodic cleanup
57
+ self.cleanup_thread.start()
58
+
59
+ def __setitem__(self, key, value):
60
+ """
61
+ For item assignment i.e. cache[key] = value
62
+ """
63
+ self.set(key, value)
64
+
65
+ def set(self, key, value):
66
+ # set expiration time as current time + ttl
67
+ expire_at = time.time() + self.ttl
68
+ with self.lock:
69
+ self.data[key] = (value, expire_at)
70
+
71
+ def get(self, key, default=None):
72
+ """
73
+ Get a value from the cache. Returns default if key not found or expired.
74
+ Resets the TTL if the item is successfully accessed.
75
+ """
76
+ with self.lock:
77
+ entry = self.data.get(key)
78
+ if not entry:
79
+ return default
80
+ value, expire_at = entry
81
+ now = time.time()
82
+ if now > expire_at:
83
+ # item expired, evict and upload
84
+ self._evict_item(key, value)
85
+ return default
86
+ # refresh this data's TTL since it's accessed
87
+ self.data[key] = (value, now + self.ttl)
88
+ return value
89
+
90
+ def __contains__(self, key):
91
+ """
92
+ Chheck if a key is in the cache and not expired.
93
+ If not expired, reset the TTL.
94
+ """
95
+ with self.lock:
96
+ entry = self.data.get(key)
97
+ if not entry:
98
+ return False
99
+ value, expire_at = entry
100
+ now = time.time()
101
+ if now > expire_at:
102
+ self._evict_item(key, value)
103
+ return False
104
+ # refresh TTL
105
+ self.data[key] = (value, now + self.ttl)
106
+ return True
107
+
108
+ def __getitem__(self, key):
109
+ """
110
+ Retrieve an item i.e. user_cache[key].
111
+ Raises KeyError if not found or expired.
112
+ Resets the TTL if the item is successfully accessed.
113
+ """
114
+ with self.lock:
115
+ if key not in self.data:
116
+ raise KeyError(key)
117
+ value, expire_at = self.data[key]
118
+ now = time.time()
119
+ if now > expire_at:
120
+ self._evict_item(key, value)
121
+ raise KeyError(key)
122
+ self.data[key] = (value, now + self.ttl)
123
+ return value
124
+
125
+ def pop(self, key, default=None):
126
+ """
127
+ Remove an item from the cache and return its value.
128
+ Upload to S3 if it hasn't expired yet.
129
+ """
130
+ with self.lock:
131
+ entry = self.data.pop(key, None)
132
+ if entry:
133
+ value, expire_at = entry
134
+ # upload before removal
135
+ self._upload_value(key, value)
136
+ return value
137
+ return default
138
+
139
+ def close(self):
140
+ """
141
+ Stop the background cleanup thread.
142
+ Technically we should call this when done using the cache but I guess we are never done using it lol.
143
+ """
144
+ self.running = False
145
+ self.cleanup_thread.join()
146
+
147
+ def _cleanup_task(self):
148
+ """
149
+ Background task that periodically checks for expired items and removes them.
150
+ """
151
+ while self.running:
152
+ now = time.time()
153
+ keys_to_remove = []
154
+ with self.lock:
155
+ for k, (v, exp) in list(self.data.items()):
156
+ if now > exp:
157
+ keys_to_remove.append(k)
158
+
159
+ # Evict expired items outside the lock to handle uploading
160
+ for k in keys_to_remove:
161
+ with self.lock:
162
+ if k in self.data:
163
+ value, _ = self.data.pop(k)
164
+ self._evict_item(k, value)
165
+
166
+ time.sleep(self.cleanup_interval)
167
+
168
+ def _evict_item(self, key, value):
169
+ """
170
+ Called internally when an item expires. upload to S3 before removal.
171
+ """
172
+ self._upload_value(key, value)
173
+
174
+ def _upload_value(self, key, value):
175
+ """
176
+ Saves the user's data and uploads the resulting file to S3.
177
+ """
178
+ try:
179
+ filename = value.save_user()
180
+ logger.info(
181
+ f"FILENAME = {filename}",
182
+ extra={'user_id': key, 'endpoint': 'cache_eviction'}
183
+ )
184
+ upload_file_to_s3(filename)
185
+ logger.info(
186
+ f"User {key} saved to S3 during cache eviction",
187
+ extra={'user_id': key, 'endpoint': 'cache_eviction'}
188
+ )
189
+ except Exception as e:
190
+ logger.error(
191
+ f"Failed to save user {key} to S3 during cache eviction: {e}",
192
+ extra={'user_id': key, 'endpoint': 'cache_eviction'}
193
+ )
app/main.py CHANGED
@@ -587,20 +587,7 @@ def create_user(request: CreateUserItem, api_key: str = Security(get_api_key)):
587
  )
588
 
589
  user = User(request.user_id, user_info, client, GENERAL_ASSISTANT)
590
- save = user.save_user()
591
-
592
 
593
- if save:
594
- print_log("INFO",f"Created pickle file for user", extra={"user_id": request.user_id, "endpoint": "/create_user"})
595
- logger.info(f"Created pickle file for user", extra={"user_id": request.user_id, "endpoint": "/create_user"})
596
- else:
597
- print_log("ERROR",f"Failed to create (user.save_user()) pickle file", extra={"user_id": request.user_id, "endpoint": "/create_user"})
598
- logger.error(f"Failed to create (user.save_user()) pickle file", extra={"user_id": request.user_id, "endpoint": "/create_user"})
599
- raise HTTPException(
600
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
601
- detail="Failed to create user pickle file"
602
- )
603
-
604
  # create memento folder for user
605
  folder_path = os.path.join("mementos", "to_upload", request.user_id)
606
 
@@ -612,7 +599,6 @@ def create_user(request: CreateUserItem, api_key: str = Security(get_api_key)):
612
 
613
  # upload user pickle file to s3 bucket
614
  try:
615
- # value.save_user()
616
  add_to_cache(user)
617
  pop_cache(request.user_id)
618
  upload = True
@@ -622,7 +608,7 @@ def create_user(request: CreateUserItem, api_key: str = Security(get_api_key)):
622
  if upload == True:
623
  print_log("INFO",f"Successfully created user", extra={"user_id": request.user_id, "endpoint": "/create_user"})
624
  logger.info(f"Successfully created user", extra={"user_id": request.user_id, "endpoint": "/create_user"})
625
- return {"message": f"[OK] User created: {user.user_id}"}
626
  else:
627
  print_log("ERROR",f"Failed to upload user pickle to S3", extra={"user_id": request.user_id, "endpoint": "/create_user"})
628
  logger.error(f"Failed to upload user pickle to S3", extra={"user_id": request.user_id, "endpoint": "/create_user"})
 
587
  )
588
 
589
  user = User(request.user_id, user_info, client, GENERAL_ASSISTANT)
 
 
590
 
 
 
 
 
 
 
 
 
 
 
 
591
  # create memento folder for user
592
  folder_path = os.path.join("mementos", "to_upload", request.user_id)
593
 
 
599
 
600
  # upload user pickle file to s3 bucket
601
  try:
 
602
  add_to_cache(user)
603
  pop_cache(request.user_id)
604
  upload = True
 
608
  if upload == True:
609
  print_log("INFO",f"Successfully created user", extra={"user_id": request.user_id, "endpoint": "/create_user"})
610
  logger.info(f"Successfully created user", extra={"user_id": request.user_id, "endpoint": "/create_user"})
611
+ return {"message": {"info": f"[OK] User created: {user}", "messages": user.get_messages()}}
612
  else:
613
  print_log("ERROR",f"Failed to upload user pickle to S3", extra={"user_id": request.user_id, "endpoint": "/create_user"})
614
  logger.error(f"Failed to upload user pickle to S3", extra={"user_id": request.user_id, "endpoint": "/create_user"})
app/utils.py CHANGED
@@ -23,6 +23,8 @@ import threading
23
  import time
24
  import uuid
25
 
 
 
26
  load_dotenv()
27
 
28
  # Environment Variables for API Keys
@@ -37,29 +39,8 @@ REGION = os.getenv('AWS_REGION')
37
 
38
  logger = logging.getLogger(__name__)
39
 
40
- class AutoSaveTTLCache(TTLCache):
41
- def __init__(self, *args, **kwargs):
42
- super().__init__(*args, **kwargs)
43
- self.lock = threading.Lock()
44
-
45
- def pop(self, key, *args):
46
- with self.lock:
47
- if key in self:
48
- value = self[key]
49
- # Save to S3 before removing
50
- filename = f'{key}.pkl'
51
- try:
52
- value.save_user()
53
- upload_file_to_s3(filename)
54
- logger.info(f"User {key} saved to S3 during cache eviction",
55
- extra={'user_id': key, 'endpoint': 'cache_eviction'})
56
- except Exception as e:
57
- logger.error(f"Failed to save user {key} to S3 during cache eviction: {e}",
58
- extra={'user_id': key, 'endpoint': 'cache_eviction'})
59
- return super().pop(key, *args)
60
-
61
  # Replace the simple TTLCache with our custom implementation
62
- user_cache = AutoSaveTTLCache(maxsize=100, ttl=1200) # 20 minutes TTL
63
 
64
  def force_file_move(source, destination):
65
  function_name = force_file_move.__name__
@@ -79,45 +60,41 @@ def force_file_move(source, destination):
79
  def get_user(user_id):
80
  function_name = get_user.__name__
81
  logger.info(f"Fetching user {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
82
-
83
- try:
84
- if user_id in user_cache:
85
- logger.info(f"User {user_id} found in cache", extra={'user_id': user_id, 'endpoint': function_name})
86
- return user_cache[user_id]
87
- except KeyError:
88
- # TTLCache automatically removes expired entries
89
- pass
90
-
91
- client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))
92
- user_file = os.path.join('users', 'data', f'{user_id}.pkl')
93
- # if os.path.exists(user_file):
94
- # with open(user_file, 'rb') as f:
95
- # user = pickle.load(f)
96
- # user.client = client
97
- # user.conversations.client = client
98
- # with cache_lock:
99
- # user_cache[user_id] = user
100
- # return user
101
- logger.warning(f"User {user_id} not found locally. Attempting to download from S3", extra={'user_id': user_id, 'endpoint': function_name})
102
- download = download_file_from_s3(f'{user_id}.pkl', 'core-ai-assets')
103
- logger.info(f"Download success: {download}", extra={'user_id': user_id, 'endpoint': function_name})
104
- if (download):
105
- with open(user_file, 'rb') as f:
106
- user = pickle.load(f)
107
- user.client = client
108
- user.conversations.client = client
109
- user_cache[user_id] = user # No need for lock here
110
- os.remove(user_file)
111
- logger.info(f"User {user_id} loaded successfully from S3", extra={'user_id': user_id, 'endpoint': function_name})
112
- return user
113
  else:
114
- logger.error(f"User {user_id} pickle does not exist in S3", extra={'user_id': user_id, 'endpoint': function_name})
115
- # check if user_info exists
116
- user_info = get_user_info(user_id)
117
- if (user_info):
118
- # user has done onboarding but pickle file not created
119
- raise ReferenceError(f"User {user_id} pickle still being created")
120
- raise LookupError(f"User [{user_id}] has not onboarded yet")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
121
 
122
  def get_life_score(user_id):
123
  function_name = get_life_score.__name__
@@ -1194,30 +1171,6 @@ def get_growth_guide_session(user_id, session_id):
1194
  return None
1195
 
1196
 
1197
- def upload_file_to_s3(filename):
1198
- user_id = filename.split('.')[0]
1199
- function_name = upload_file_to_s3.__name__
1200
- logger.info(f"Uploading file {filename} to S3", extra={'user_id': user_id, 'endpoint': function_name})
1201
- bucket = 'core-ai-assets'
1202
- try:
1203
- if (AWS_ACCESS_KEY and AWS_SECRET_KEY):
1204
- session = boto3.session.Session(aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key=AWS_SECRET_KEY, region_name=REGION)
1205
- else:
1206
- session = boto3.session.Session()
1207
- s3_client = session.client('s3')
1208
- with open(os.path.join('users', 'to_upload', filename), "rb") as f:
1209
- ## Upload to Production Folder
1210
- s3_client.upload_fileobj(f, bucket, f'staging/users/{filename}')
1211
- logger.info(f"File {filename} uploaded successfully to S3", extra={'user_id': user_id, 'endpoint': function_name})
1212
-
1213
- os.remove(os.path.join('users', 'to_upload', filename))
1214
-
1215
- # force_file_move(os.path.join('users', 'to_upload', filename), os.path.join('users', 'data', filename))
1216
- return True
1217
- except (FileNotFoundError, NoCredentialsError, PartialCredentialsError) as e:
1218
- logger.error(f"S3 upload failed for {filename}: {e}", extra={'user_id': user_id, 'endpoint': function_name})
1219
- return False
1220
-
1221
  def download_file_from_s3(filename, bucket):
1222
  user_id = filename.split('.')[0]
1223
  function_name = download_file_from_s3.__name__
 
23
  import time
24
  import uuid
25
 
26
+ from app.cache import CustomTTLCache, upload_file_to_s3
27
+
28
  load_dotenv()
29
 
30
  # Environment Variables for API Keys
 
39
 
40
  logger = logging.getLogger(__name__)
41
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
42
  # Replace the simple TTLCache with our custom implementation
43
+ user_cache = CustomTTLCache(ttl=120, cleanup_interval=30) # 2 minutes TTL
44
 
45
  def force_file_move(source, destination):
46
  function_name = force_file_move.__name__
 
60
  def get_user(user_id):
61
  function_name = get_user.__name__
62
  logger.info(f"Fetching user {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
63
+ logger.info(f"[CACHE]: {user_cache}", extra={'user_id': user_id, 'endpoint': function_name})
64
+ if user_id in user_cache:
65
+ logger.info(f"User {user_id} found in cache", extra={'user_id': user_id, 'endpoint': function_name})
66
+ return user_cache[user_id]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
67
  else:
68
+ client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))
69
+ user_file = os.path.join('users', 'data', f'{user_id}.pkl')
70
+ # if os.path.exists(user_file):
71
+ # with open(user_file, 'rb') as f:
72
+ # user = pickle.load(f)
73
+ # user.client = client
74
+ # user.conversations.client = client
75
+ # with cache_lock:
76
+ # user_cache[user_id] = user
77
+ # return user
78
+ logger.warning(f"User {user_id} not found locally. Attempting to download from S3", extra={'user_id': user_id, 'endpoint': function_name})
79
+ download = download_file_from_s3(f'{user_id}.pkl', 'core-ai-assets')
80
+ logger.info(f"Download success: {download}", extra={'user_id': user_id, 'endpoint': function_name})
81
+ if (download):
82
+ with open(user_file, 'rb') as f:
83
+ user = pickle.load(f)
84
+ user.client = client
85
+ user.conversations.client = client
86
+ user_cache[user_id] = user # No need for lock here
87
+ os.remove(user_file)
88
+ logger.info(f"User {user_id} loaded successfully from S3", extra={'user_id': user_id, 'endpoint': function_name})
89
+ return user
90
+ else:
91
+ logger.error(f"User {user_id} pickle does not exist in S3", extra={'user_id': user_id, 'endpoint': function_name})
92
+ # check if user_info exists
93
+ user_info = get_user_info(user_id)
94
+ if (user_info):
95
+ # user has done onboarding but pickle file not created
96
+ raise ReferenceError(f"User {user_id} pickle still being created")
97
+ raise LookupError(f"User [{user_id}] has not onboarded yet")
98
 
99
  def get_life_score(user_id):
100
  function_name = get_life_score.__name__
 
1171
  return None
1172
 
1173
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1174
  def download_file_from_s3(filename, bucket):
1175
  user_id = filename.split('.')[0]
1176
  function_name = download_file_from_s3.__name__