Spaces:
Running
Running
| import os | |
| import numpy as np | |
| from tqdm import tqdm | |
| import sys | |
| import ctypes # To keep Windows awake | |
| # Add project root to path to allow absolute imports | |
| PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| if PROJECT_ROOT not in sys.path: | |
| sys.path.append(PROJECT_ROOT) | |
| from utils.hear_extractor import HeARExtractor | |
| # --- Configuration --- | |
| DATA_ROOT = r"c:\Users\ASUS\lung_ai_project\data\coughvid_public\organized" | |
| OUTPUT_DIR = r"c:\Users\ASUS\lung_ai_project\data\hear_embeddings_coughvid" | |
| CHECKPOINT_EVERY = 50 | |
| TARGET_SICK_COUNT = 2500 # Extracting 1,000 more sick samples | |
| def run_extraction(): | |
| # Keep Windows awake during extraction | |
| try: | |
| # ES_CONTINUOUS (0x80000000) | ES_SYSTEM_REQUIRED (0x00000001) | |
| ctypes.windll.kernel32.SetThreadExecutionState(0x80000000 | 0x00000001) | |
| print(">>> Windows 'Stay Awake' mode enabled.") | |
| except Exception: | |
| print(">>> Warning: Could not enable 'Stay Awake' mode.") | |
| if not os.path.exists(OUTPUT_DIR): | |
| os.makedirs(OUTPUT_DIR, exist_ok=True) | |
| features_path = os.path.join(OUTPUT_DIR, "X_coughvid.npy") | |
| labels_path = os.path.join(OUTPUT_DIR, "y_coughvid.npy") | |
| # Load existing if available to resume | |
| features = [] | |
| labels = [] | |
| if os.path.exists(features_path) and os.path.exists(labels_path): | |
| print("Loading existing embeddings...") | |
| features = list(np.load(features_path)) | |
| labels = list(np.load(labels_path)) | |
| current_sick_count = sum(1 for l in labels if l == 'sick') | |
| print(f"Current Sick Samples: {current_sick_count}") | |
| if current_sick_count >= TARGET_SICK_COUNT: | |
| print(f"Goal reached! You already have {current_sick_count} sick samples.") | |
| return | |
| # Tracker for processed paths to avoid duplicates | |
| tracker_path = os.path.join(OUTPUT_DIR, "processed_paths.txt") | |
| processed_paths = set() | |
| if os.path.exists(tracker_path): | |
| with open(tracker_path, 'r') as f: | |
| processed_paths = set(line.strip() for line in f) | |
| # Collect only SICK files | |
| folder = os.path.join(DATA_ROOT, 'sick') | |
| all_sick_files = [] | |
| if os.path.exists(folder): | |
| for f in os.listdir(folder): | |
| full_path = os.path.join(folder, f) | |
| if f.endswith(('.webm', '.ogg', '.wav')) and full_path not in processed_paths: | |
| all_sick_files.append(full_path) | |
| remaining_to_goal = TARGET_SICK_COUNT - current_sick_count | |
| files_to_process = all_sick_files[:remaining_to_goal] | |
| print(f"Extraction Target: {len(files_to_process)} more sick samples.") | |
| if not files_to_process: | |
| print("No more unique sick files found to process.") | |
| return | |
| # Initialize Extractor | |
| print("Initializing HeAR Extractor...") | |
| extractor = HeARExtractor() | |
| try: | |
| count = 0 | |
| with open(tracker_path, 'a') as tracker: | |
| for path in tqdm(files_to_process, desc="Extracting Sick"): | |
| emb = extractor.extract(path) | |
| if emb is not None: | |
| features.append(emb) | |
| labels.append('sick') | |
| tracker.write(path + "\n") | |
| count += 1 | |
| if count % CHECKPOINT_EVERY == 0 and count > 0: | |
| np.save(features_path, np.array(features)) | |
| np.save(labels_path, np.array(labels)) | |
| # Final save | |
| np.save(features_path, np.array(features)) | |
| np.save(labels_path, np.array(labels)) | |
| print(f"Success! Now you have {sum(1 for l in labels if l == 'sick')} sick samples in total.") | |
| except KeyboardInterrupt: | |
| print("\nStopping and saving progress...") | |
| np.save(features_path, np.array(features)) | |
| np.save(labels_path, np.array(labels)) | |
| print("Progress saved.") | |
| finally: | |
| # Reset Windows sleep settings to normal | |
| try: | |
| ctypes.windll.kernel32.SetThreadExecutionState(0x80000000) | |
| except Exception: | |
| pass | |
| if __name__ == "__main__": | |
| run_extraction() | |
| if __name__ == "__main__": | |
| run_extraction() | |