load-balancer / LoadBalancer.py
ChandimaPrabath's picture
0.0.2 Alpha test
b8db82b
raw
history blame
7.17 kB
import os
from indexer import indexer
from threading import Event, Thread
import asyncio
import time
import logging
from utils import convert_to_gb
from api import InstancesAPI
CACHE_DIR = os.getenv("CACHE_DIR")
class LoadBalancer:
def __init__(self, cache_dir, token, repo, polling_interval=4, max_retries=3, initial_delay=1):
self.version = "0.0.2 Alpha"
self.instances = []
self.instances_health = {}
self.polling_interval = polling_interval
self.max_retries = max_retries
self.initial_delay = initial_delay
self.stop_event = Event()
self.instances_api = InstancesAPI(self.instances)
self.CACHE_DIR = cache_dir
self.TOKEN = token
self.REPO = repo
self.MUSIC_STORE = {}
self.file_structure = None
self.category_files_map = {}
# Ensure CACHE_DIR exists
if not os.path.exists(self.CACHE_DIR):
os.makedirs(self.CACHE_DIR)
# Initialize file structure and start prefetching
self.update_file_structure()
# Start polling and file checking in separate threads
polling_thread = Thread(target=self.start_polling)
polling_thread.daemon = True
polling_thread.start()
# Start periodic tasks
asyncio.create_task(self.run_periodic_tasks())
def update_file_structure(self):
"""Update the file structure and the category-files map."""
self.file_structure = indexer() # Assume this re-fetches the file structure
self.category_files_map = {} # Reset the map
for directory in self.file_structure:
if directory['type'] == 'directory':
# Map category to its files
self.category_files_map[directory['path']] = [
file['path'] for file in directory['contents'] if file['type'] == 'file'
]
async def run_periodic_tasks(self):
"""Run indexer and prefetch functions every 5 minutes."""
while not self.stop_event.is_set():
self.update_file_structure() # Re-run indexer and update the map
await asyncio.sleep(300) # Sleep for 5 minutes
def get_reports(self):
reports = self.instances_api.fetch_reports()
temp_music_store = {}
for instance_url in self.instances[:]:
if instance_url in reports:
report = reports[instance_url]
logging.info(f"Report from {instance_url}: {report}")
self.process_report(instance_url, report, temp_music_store)
else:
logging.error(f"Failed to get report from {instance_url}. Removing instance.")
self.remove_instance(instance_url)
self.MUSIC_STORE = temp_music_store
def process_report(self, instance_url, report, temp_music_store):
music_store = report.get('music_store', {})
cache_size = report.get('cache_size')
logging.info(f"Processing report from {instance_url}")
# Update temporary music store
for title, path in music_store.items():
url = f"{instance_url}/api/get/music/{title.replace(' ', '%20')}"
temp_music_store[title] = url
logging.info("Music Store processed successfully.")
self.update_instances_health(instance=instance_url, cache_size=cache_size)
def start_polling(self):
logging.info("Starting polling.")
while not self.stop_event.is_set():
self.get_reports()
time.sleep(self.polling_interval)
logging.info("Polling stopped.")
def stop_polling(self):
logging.info("Stopping polling.")
self.stop_event.set()
def register_instance(self, instance_url):
if instance_url not in self.instances:
self.instances.append(instance_url)
logging.info(f"Registered instance {instance_url}")
else:
logging.info(f"Instance {instance_url} is already registered.")
def remove_instance(self, instance_url):
if instance_url in self.instances:
self.instances.remove(instance_url)
self.instances_health.pop(instance_url, None)
logging.info(f"Removed instance {instance_url}")
else:
logging.info(f"Instance {instance_url} not found for removal.")
def update_instances_health(self, instance, cache_size):
self.instances_health[instance] = {"used": cache_size["cache_size"], "total": "50 GB"}
logging.info(f"Updated instance {instance} with cache size {cache_size}")
def download_music_to_best_instance(self, file_name):
"""Downloads a music file to the first instance that has more free space on the self.instance_health list variable."""
best_instance = None
max_free_space = -1
for instance_url, space_info in self.instances_health.items():
total_space = convert_to_gb(space_info['total'])
used_space = convert_to_gb(space_info['used'])
free_space = total_space - used_space
if free_space > max_free_space:
max_free_space = free_space
best_instance = instance_url
if best_instance:
result = self.instances_api.download_music(best_instance, file_name)
music_id = result["music_id"]
status = result["status"]
progress_url = f'{best_instance}/api/get/progress/{music_id}'
response = {
"music_id": music_id,
"status": status,
"progress_url": progress_url
}
return response
else:
logging.error("No suitable instance found for downloading the music.")
return {"error": "No suitable instance found for downloading the music."}
def find_music_path(self, title):
"""Find the path of the music in the indexed data based on the title."""
for directory in self.file_structure:
if directory['type'] == 'directory':
for sub_directory in directory['contents']:
if sub_directory['type'] == 'file' and title.lower() in sub_directory['path'].lower():
return sub_directory['path']
return None
def get_music_id(self, title):
"""Generate a unique music ID based on the title."""
return title.replace(" ", "_").lower()
def get_all_music(self):
"""Get all music files from the indexed file structure."""
music_files = []
for directory in self.file_structure:
if directory['type'] == 'directory':
for sub_directory in directory['contents']:
if sub_directory['type'] == 'file':
music_files.append(sub_directory['path'])
return music_files
def get_all_categories(self):
"""Get a list of all category folders."""
return list(self.category_files_map.keys())
def get_files_from_category(self, category):
"""Get all files from a specified category."""
return self.category_files_map.get(category, [])