ChandimaPrabath commited on
Commit
733f7a8
·
1 Parent(s): 5517352

unstable fix

Browse files
Files changed (1) hide show
  1. LoadBalancer.py +26 -32
LoadBalancer.py CHANGED
@@ -34,7 +34,7 @@ class LoadBalancer:
34
  if not os.path.exists(self.CACHE_DIR):
35
  os.makedirs(self.CACHE_DIR)
36
 
37
- # Initialize file structure and prefetching
38
  self.file_structure = indexer()
39
  self.start_prefetching()
40
 
@@ -43,14 +43,19 @@ class LoadBalancer:
43
  polling_thread.daemon = True
44
  polling_thread.start()
45
 
46
- # Start the periodic content check
47
- content_check_thread = Thread(target=self.check_for_updates)
48
- content_check_thread.daemon = True
49
- content_check_thread.start()
 
 
 
 
 
50
 
51
  def start_prefetching(self):
52
  """Start the metadata prefetching in the FastAPI event loop."""
53
- asyncio.create_task(self.prefetch_metadata())
54
 
55
  async def prefetch_metadata(self):
56
  """Prefetch metadata for all items in the file structure."""
@@ -84,32 +89,6 @@ class LoadBalancer:
84
  # Run all tasks concurrently
85
  await asyncio.gather(*tasks)
86
 
87
- def check_for_updates(self):
88
- """Periodically check for updates and trigger prefetching if new content is detected."""
89
- while not self.stop_event.is_set():
90
- time.sleep(120) # Wait for 2 minutes
91
- current_file_structure = indexer()
92
- if current_file_structure != self.previous_file_structure:
93
- self.start_prefetching()
94
- self.previous_file_structure = current_file_structure
95
- else:
96
- logging.info("No new content detected.")
97
-
98
- def register_instance(self, instance_url):
99
- if instance_url not in self.instances:
100
- self.instances.append(instance_url)
101
- logging.info(f"Registered instance {instance_url}")
102
- else:
103
- logging.info(f"Instance {instance_url} is already registered.")
104
-
105
- def remove_instance(self, instance_url):
106
- if instance_url in self.instances:
107
- self.instances.remove(instance_url)
108
- self.instances_health.pop(instance_url, None)
109
- logging.info(f"Removed instance {instance_url}")
110
- else:
111
- logging.info(f"Instance {instance_url} not found for removal.")
112
-
113
  def get_reports(self):
114
  reports = self.instances_api.fetch_reports()
115
  temp_film_store = {}
@@ -164,6 +143,21 @@ class LoadBalancer:
164
  logging.info("Stopping polling.")
165
  self.stop_event.set()
166
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
167
  def update_instances_health(self, instance, cache_size):
168
  self.instances_health[instance] = {"used":cache_size["cache_size"],
169
  "total": "50 GB"}
 
34
  if not os.path.exists(self.CACHE_DIR):
35
  os.makedirs(self.CACHE_DIR)
36
 
37
+ # Initialize file structure and start prefetching
38
  self.file_structure = indexer()
39
  self.start_prefetching()
40
 
 
43
  polling_thread.daemon = True
44
  polling_thread.start()
45
 
46
+ # Start periodic tasks
47
+ asyncio.create_task(self.run_periodic_tasks())
48
+
49
+ async def run_periodic_tasks(self):
50
+ """Run indexer and prefetch functions every 2 minutes."""
51
+ while not self.stop_event.is_set():
52
+ self.file_structure = indexer() # Re-run indexer
53
+ await self.start_prefetching() # Start prefetching
54
+ await asyncio.sleep(120) # Sleep for 2 minutes
55
 
56
  def start_prefetching(self):
57
  """Start the metadata prefetching in the FastAPI event loop."""
58
+ return asyncio.create_task(self.prefetch_metadata())
59
 
60
  async def prefetch_metadata(self):
61
  """Prefetch metadata for all items in the file structure."""
 
89
  # Run all tasks concurrently
90
  await asyncio.gather(*tasks)
91
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
92
  def get_reports(self):
93
  reports = self.instances_api.fetch_reports()
94
  temp_film_store = {}
 
143
  logging.info("Stopping polling.")
144
  self.stop_event.set()
145
 
146
+ def register_instance(self, instance_url):
147
+ if instance_url not in self.instances:
148
+ self.instances.append(instance_url)
149
+ logging.info(f"Registered instance {instance_url}")
150
+ else:
151
+ logging.info(f"Instance {instance_url} is already registered.")
152
+
153
+ def remove_instance(self, instance_url):
154
+ if instance_url in self.instances:
155
+ self.instances.remove(instance_url)
156
+ self.instances_health.pop(instance_url, None)
157
+ logging.info(f"Removed instance {instance_url}")
158
+ else:
159
+ logging.info(f"Instance {instance_url} not found for removal.")
160
+
161
  def update_instances_health(self, instance, cache_size):
162
  self.instances_health[instance] = {"used":cache_size["cache_size"],
163
  "total": "50 GB"}