import torch import torchaudio import torchaudio.transforms as T from pyannote.audio import Pipeline from faster_whisper import WhisperModel import os import numpy as np # --- Configuration --- # IMPORTANT: You need to agree to the terms of use for pyannote/speaker-diarization-3.1 # on Hugging Face: https://huggingface.co/pyannote/speaker-diarization-3.1 # Make sure to set your HF_TOKEN as an environment variable. # For example: export HF_TOKEN="your_token_here" if "HF_TOKEN" not in os.environ: print("WARNING: Hugging Face token not found. Diarization may fail.") print("Please set the HF_TOKEN environment variable with your token.") AUDIO_FILE = "WhatsApp Audio 2026-01-24 at 12.43.45 PM.ogg" # Choose a device (GPU if available, otherwise CPU) DEVICE = "cuda" if torch.cuda.is_available() else "cpu" # --- 1. Diarization Model --- print("Loading diarization pipeline...") # Use the full pipeline for speaker diarization diarization_pipeline = Pipeline.from_pretrained( "pyannote/speaker-diarization-3.1" ) diarization_pipeline.to(torch.device(DEVICE)) print("Diarization pipeline loaded.") # --- 2. Transcription Model --- # Using faster-whisper for efficient transcription # Model options: "tiny", "base", "small", "medium", "large-v2" # "base" is a good starting point. transcription_model_size = "base" print(f"Loading transcription model '{transcription_model_size}'...") transcription_model = WhisperModel(transcription_model_size, device=DEVICE, compute_type="int8") print("Transcription model loaded.") # --- 3. Processing --- if not os.path.exists(AUDIO_FILE): print(f"āŒ Error: Audio file not found at '{AUDIO_FILE}'") else: print(f"\nProcessing audio file: {AUDIO_FILE}") # Load full audio using torchaudio waveform, sample_rate = torchaudio.load(AUDIO_FILE) # Ensure audio is mono (if stereo, take one channel) if waveform.shape[0] > 1: waveform = waveform[0, :].unsqueeze(0) print(f"Original audio loaded: {waveform.shape[1]/sample_rate:.2f} seconds at {sample_rate} Hz.") # Resample audio to 16kHz for pyannote (common practice) TARGET_SAMPLE_RATE = 16000 if sample_rate != TARGET_SAMPLE_RATE: print(f"Resampling audio from {sample_rate} Hz to {TARGET_SAMPLE_RATE} Hz...") resampler = T.Resample(orig_freq=sample_rate, new_freq=TARGET_SAMPLE_RATE).to(DEVICE) waveform = resampler(waveform) sample_rate = TARGET_SAMPLE_RATE print(f"Audio resampled: {waveform.shape[1]/sample_rate:.2f} seconds at {sample_rate} Hz.") # --- Diarization Step --- print("Diarizing speakers...") diarization_input = {"waveform": waveform, "sample_rate": sample_rate} diarization = diarization_pipeline(diarization_input) print("Diarization complete.") # --- Transcription Step --- print("Transcribing segments...") for turn, _, speaker in diarization.itertracks(yield_label=True): start_sample = int(turn.start * sample_rate) end_sample = int(turn.end * sample_rate) segment_waveform = waveform[:, start_sample:end_sample] # Convert tensor to numpy array for faster-whisper segment_np = segment_waveform.squeeze().cpu().numpy().astype(np.float32) # Transcribe the segment segments, _ = transcription_model.transcribe(segment_np, beam_size=5) # Combine transcribed parts for the current segment text = "".join(segment.text for segment in segments) # Print the result print(f"[{turn.start:.2f}s - {turn.end:.2f}s] {speaker}: {text.strip()}") print("\nāœ… Processing finished.")