Spaces:
Running
Running
| from pathlib import Path | |
| import os | |
| import shutil | |
| import string | |
| import secrets | |
| import hashlib | |
| import random | |
| import time | |
| import re | |
| def get_files_count(directory_path): | |
| return len(os.listdir(directory_path)) | |
| def generate_random_string(length=10): | |
| characters = string.ascii_letters | |
| random_string = ''.join(secrets.choice(characters) for _ in range(length)) | |
| return random_string | |
| def generate_random_string_from_input(input_string, length=16): | |
| # Hash the input string to get a consistent value | |
| hash_object = hashlib.sha256(input_string.encode()) | |
| hashed_string = hash_object.hexdigest() | |
| # Use the hash to seed the random number generator | |
| random.seed(hashed_string) | |
| # Generate a random string based on the seed | |
| characters = string.ascii_letters + string.digits | |
| random_string = ''.join(random.choice(characters) for _ in range(length)) | |
| return random_string | |
| def is_mostly_black(frame, black_threshold=20, percentage_threshold=0.9, sample_rate=10): | |
| """ | |
| Fast black frame detection using pixel sampling. | |
| Args: | |
| frame: OpenCV BGR frame (NumPy array) | |
| black_threshold: grayscale value below which a pixel is considered black | |
| percentage_threshold: fraction of black pixels to consider frame mostly black | |
| sample_rate: sample every N-th pixel in both dimensions (higher = faster) | |
| Returns: | |
| True if mostly black, False otherwise | |
| """ | |
| import cv2 | |
| import numpy as np | |
| if frame is None or frame.size == 0: | |
| return True | |
| # Convert to grayscale | |
| gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
| # Sample pixels | |
| sampled = gray[::sample_rate, ::sample_rate] | |
| black_count = np.sum(sampled < black_threshold) | |
| total_count = sampled.size | |
| return (black_count / total_count) >= percentage_threshold | |
| def only_alpha(text: str) -> str: | |
| # Keep only alphabetic characters (make lowercase to ignore case) | |
| return re.sub(r'[^a-zA-Z]', '', text).lower() | |
| def manage_gpu(size_gb: float = 0, gpu_index: int = 0, action: str = "check"): | |
| """ | |
| Manage GPU memory: | |
| - check → just prints memory + process table | |
| - clear_cache → clears PyTorch cache | |
| - kill → kills all GPU processes | |
| """ | |
| try: | |
| import pynvml,signal, gc | |
| pynvml.nvmlInit() | |
| handle = pynvml.nvmlDeviceGetHandleByIndex(gpu_index) | |
| info = pynvml.nvmlDeviceGetMemoryInfo(handle) | |
| free_gb = info.free / 1024**3 | |
| total_gb = info.total / 1024**3 | |
| print(f"\nGPU {gpu_index}: Free {free_gb:.2f} GB / Total {total_gb:.2f} GB") | |
| # Show processes | |
| processes = pynvml.nvmlDeviceGetComputeRunningProcesses(handle) | |
| print("\nActive GPU Processes:") | |
| print(f"{'PID':<8} {'Process Name':<40} {'Used (GB)':<10}") | |
| print("-" * 60) | |
| for p in processes: | |
| used_gb = p.usedGpuMemory / 1024**3 | |
| proc_name = pynvml.nvmlSystemGetProcessName(p.pid).decode(errors="ignore") | |
| print(f"{p.pid:<8} {proc_name:<40} {used_gb:.2f}") | |
| if action == "clear_cache": | |
| try: | |
| import torch | |
| gc.collect() | |
| gc.collect() | |
| torch.cuda.empty_cache() | |
| torch.cuda.reset_peak_memory_stats() | |
| torch.cuda.synchronize() | |
| time.sleep(1) | |
| print("\n🧹 Cleared PyTorch CUDA cache") | |
| except ImportError: | |
| print("\n⚠️ PyTorch not installed, cannot clear cache.") | |
| elif action == "kill": | |
| for p in processes: | |
| proc_name = pynvml.nvmlSystemGetProcessName(p.pid).decode(errors="ignore") | |
| try: | |
| os.kill(p.pid, signal.SIGKILL) | |
| print(f"❌ Killed {p.pid} ({proc_name})") | |
| except Exception as e: | |
| print(f"⚠️ Could not kill {p.pid}: {e}") | |
| manage_gpu(action="clear_cache") | |
| gc.collect() | |
| gc.collect() | |
| return free_gb > size_gb | |
| except: return False | |
| def is_gpu_available(verbose=True): | |
| import torch | |
| if not torch.cuda.is_available(): | |
| if verbose: | |
| print("CUDA not available.") | |
| return False | |
| try: | |
| # Try a tiny allocation to check if GPU is free & usable | |
| torch.empty(1, device="cuda") | |
| if verbose: | |
| print(f"CUDA available. Using device: {torch.cuda.get_device_name(0)}") | |
| return True | |
| except RuntimeError as e: | |
| if "CUDA-capable device(s) is/are busy or unavailable" in str(e) or \ | |
| "CUDA error" in str(e): | |
| if verbose: | |
| print("CUDA detected but busy/unavailable. Please CPU.") | |
| return False | |
| raise # re-raise if it's some other unexpected error | |
| def get_device(is_vision=False): | |
| if not is_vision and os.getenv("USE_CPU_IF_POSSIBLE", None): | |
| return "cpu" | |
| else: | |
| return "cuda" if is_gpu_available() else "cpu" |