Spaces:
Sleeping
Sleeping
File size: 4,032 Bytes
b3f9178 1679f10 10fecdb b3f9178 749c94d b3f9178 bc6e045 b3f9178 bc6e045 b3f9178 bf646a8 bc6e045 bf646a8 bc6e045 bf646a8 179896d b3f9178 179896d b3f9178 bc6e045 179896d bf646a8 179896d bf646a8 b3f9178 bf646a8 b3f9178 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 | """
Speaker Diarization Module
Pyannote-audio ile konuşmacı ayrımı (kim ne zaman konuşuyor).
"""
import os
from typing import List, Tuple, Optional
# PyTorch 2.6+ compatibility: Disable weights_only restriction for pyannote models
os.environ["TORCH_FORCE_NO_WEIGHTS_ONLY_LOAD"] = "1"
import torch
# Check for GPU availability
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"🔧 Diarization device: {DEVICE}")
def get_diarization_pipeline(hf_token: Optional[str] = None):
"""
Load pyannote speaker diarization pipeline.
Args:
hf_token: Hugging Face token (required for pyannote models)
Returns:
Diarization pipeline or None if failed
"""
try:
from pyannote.audio import Pipeline
# Try to get token from environment if not provided
token = hf_token or os.environ.get("HF_TOKEN")
if not token:
print("⚠️ HF_TOKEN bulunamadı. pyannote modeli yüklenemeyebilir.")
pipeline = Pipeline.from_pretrained(
"pyannote/speaker-diarization-3.1",
token=token
)
# Move to GPU if available
pipeline.to(DEVICE)
print("✅ Diarization pipeline yüklendi!")
return pipeline
except Exception as e:
print(f"❌ Diarization pipeline yüklenemedi: {e}")
return None
def diarize_audio(audio_path: str, pipeline, num_speakers: int = None) -> List[Tuple[float, float, str]]:
"""
Perform speaker diarization on audio file.
Args:
audio_path: Path to audio file
pipeline: Pyannote diarization pipeline
num_speakers: Expected number of speakers (None for auto-detect)
Returns:
List of (start_time, end_time, speaker_label) tuples
"""
if pipeline is None:
return []
try:
# Run diarization - let pyannote auto-detect if num_speakers not specified
print(f"🔍 Diarization parametreleri: num_speakers={num_speakers}")
if num_speakers:
# Use min/max range for better detection
result = pipeline(audio_path, min_speakers=2, max_speakers=num_speakers)
else:
# Auto-detect number of speakers
result = pipeline(audio_path)
# Extract segments from DiarizeOutput object
segments = []
# DiarizeOutput has speaker_diarization attribute which is the Annotation
if hasattr(result, 'speaker_diarization'):
diarization = result.speaker_diarization
print(f"🔍 Using speaker_diarization attribute")
else:
diarization = result
# Now iterate over the Annotation object
unique_speakers = set()
for segment, track, speaker in diarization.itertracks(yield_label=True):
segments.append((segment.start, segment.end, speaker))
unique_speakers.add(speaker)
print(f"✅ Diarization tamamlandı: {len(segments)} segment, {len(unique_speakers)} konuşmacı")
print(f"🔍 Bulunan konuşmacılar: {unique_speakers}")
return segments
except Exception as e:
print(f"❌ Diarization hatası: {e}")
return []
def format_speaker_label(speaker: str) -> str:
"""
Convert pyannote speaker labels (SPEAKER_00, SPEAKER_01) to user-friendly format.
"""
speaker_map = {
"SPEAKER_00": "Kişi 1",
"SPEAKER_01": "Kişi 2",
"SPEAKER_02": "Kişi 3",
"SPEAKER_03": "Kişi 4",
}
return speaker_map.get(speaker, speaker)
def format_timestamp(seconds: float) -> str:
"""
Convert seconds to [HH:MM:SS] or [MM:SS] format.
"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
if hours > 0:
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
else:
return f"{minutes:02d}:{secs:02d}"
|