# # SPDX-FileCopyrightText: Hadad # SPDX-License-Identifier: Apache-2.0 # import threading generation_state_lock = threading.Lock() is_currently_generating = False stop_generation_requested = False temporary_files_registry = {} temporary_files_lock = threading.Lock() memory_enforcement_lock = threading.Lock() background_cleanup_thread = None background_cleanup_stop_event = threading.Event() background_cleanup_trigger_event = threading.Event() text_to_speech_manager = None audio_conversion_semaphore = threading.Semaphore(1) audio_conversion_queue_lock = threading.Lock() audio_conversion_active_count = 0 audio_conversion_waiting_count = 0 accelerator_log_lock = threading.Lock() accelerator_log_thread = None accelerator_log_stop_event = threading.Event() model_usage_lock = threading.Lock() model_usage_count = 0 model_usage_condition = threading.Condition(model_usage_lock) generation_protection_lock = threading.RLock() generation_protection_count = 0 def set_text_to_speech_manager(manager_instance): global text_to_speech_manager text_to_speech_manager = manager_instance def get_text_to_speech_manager(): global text_to_speech_manager return text_to_speech_manager def check_if_generation_is_currently_active(): with generation_state_lock: return is_currently_generating def set_generation_active(is_active): global is_currently_generating with generation_state_lock: is_currently_generating = is_active def set_stop_generation_requested(requested): global stop_generation_requested with generation_state_lock: stop_generation_requested = requested def get_stop_generation_requested(): with generation_state_lock: return stop_generation_requested def increment_audio_conversion_active(): global audio_conversion_active_count with audio_conversion_queue_lock: audio_conversion_active_count += 1 return audio_conversion_active_count def decrement_audio_conversion_active(): global audio_conversion_active_count with audio_conversion_queue_lock: audio_conversion_active_count = max(0, audio_conversion_active_count - 1) return audio_conversion_active_count def get_audio_conversion_active_count(): with audio_conversion_queue_lock: return audio_conversion_active_count def increment_audio_conversion_waiting(): global audio_conversion_waiting_count with audio_conversion_queue_lock: audio_conversion_waiting_count += 1 return audio_conversion_waiting_count def decrement_audio_conversion_waiting(): global audio_conversion_waiting_count with audio_conversion_queue_lock: audio_conversion_waiting_count = max(0, audio_conversion_waiting_count - 1) return audio_conversion_waiting_count def get_audio_conversion_waiting_count(): with audio_conversion_queue_lock: return audio_conversion_waiting_count def is_audio_conversion_queue_busy(): with audio_conversion_queue_lock: return audio_conversion_active_count > 0 def increment_model_usage(): global model_usage_count with model_usage_lock: model_usage_count += 1 return model_usage_count def decrement_model_usage(): global model_usage_count with model_usage_lock: model_usage_count = max(0, model_usage_count - 1) current_count = model_usage_count model_usage_condition.notify_all() return current_count def get_model_usage_count(): with model_usage_lock: return model_usage_count def is_model_in_use(): with model_usage_lock: return model_usage_count > 0 def wait_for_model_usage_zero(timeout_seconds=None): with model_usage_lock: if model_usage_count == 0: return True return model_usage_condition.wait_for( lambda: model_usage_count == 0, timeout=timeout_seconds ) def acquire_generation_protection(): global generation_protection_count generation_protection_lock.acquire() generation_protection_count += 1 return generation_protection_count def release_generation_protection(): global generation_protection_count generation_protection_count = max(0, generation_protection_count - 1) generation_protection_lock.release() def is_generation_protected(): if generation_protection_lock.acquire(blocking=False): is_protected = generation_protection_count > 0 generation_protection_lock.release() return is_protected return True def try_acquire_generation_protection_for_cleanup(timeout_seconds=0.1): acquired = generation_protection_lock.acquire(blocking=True, timeout=timeout_seconds) if acquired: if generation_protection_count > 0: generation_protection_lock.release() return False return True return False def release_generation_protection_for_cleanup(): generation_protection_lock.release()