File size: 2,913 Bytes
1710879
f418fac
1710879
0a1b45f
901bdaa
1710879
901bdaa
1710879
 
 
 
 
901bdaa
 
1710879
 
 
 
 
 
 
 
 
 
 
 
 
 
901bdaa
 
 
1710879
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
901bdaa
b0b070a
1710879
 
 
52d4965
1710879
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import gradio as gr
import zipfile
import os
import shutil
from pyannote.audio import Pipeline
import torch

# Set up the directory for processing
TEMP_DIR = "temp_audio"
os.makedirs(TEMP_DIR, exist_ok=True)

# Initialize the pyannote.audio pipeline
pipeline = Pipeline.from_pretrained(
    "pyannote/speaker-diarization-3.1",
    use_auth_token=os.getenv("HF_TOKEN")
)

# Move pipeline to GPU if available
if torch.cuda.is_available():
    pipeline.to(torch.device("cuda"))

def process_audio_zip(file_info):
    # Unzip the uploaded file
    with zipfile.ZipFile(file_info, 'r') as zip_ref:
        zip_ref.extractall(TEMP_DIR)
    
    speaker1_dir = os.path.join(TEMP_DIR, "speaker1")
    speaker2_dir = os.path.join(TEMP_DIR, "speaker2")
    os.makedirs(speaker1_dir, exist_ok=True)
    os.makedirs(speaker2_dir, exist_ok=True)

    # Process each audio file in the temporary directory
    for filename in os.listdir(TEMP_DIR):
        if filename.endswith(".wav"):
            file_path = os.path.join(TEMP_DIR, filename)
            
            # Run the diarization pipeline
            diarization = pipeline(file_path)
            
            # Determine if the audio is mostly from speaker1 or speaker2
            total_duration = {1: 0.0, 2: 0.0}
            for turn, _, speaker in diarization.itertracks(yield_label=True):
                total_duration[speaker] += turn.duration
            
            # Move file to the corresponding speaker directory
            dominant_speaker = 1 if total_duration[1] >= total_duration[2] else 2
            if dominant_speaker == 1:
                shutil.move(file_path, os.path.join(speaker1_dir, filename))
            else:
                shutil.move(file_path, os.path.join(speaker2_dir, filename))

    # Zip the results
    speaker1_zip = "speaker1.zip"
    speaker2_zip = "speaker2.zip"
    
    def zipdir(path, ziph):
        # Zip the directories
        for root, dirs, files in os.walk(path):
            for file in files:
                ziph.write(os.path.join(root, file),
                           os.path.relpath(os.path.join(root, file),
                           os.path.join(path, '..')))

    with zipfile.ZipFile(speaker1_zip, 'w', zipfile.ZIP_DEFLATED) as zipf:
        zipdir(speaker1_dir, zipf)
    
    with zipfile.ZipFile(speaker2_zip, 'w', zipfile.ZIP_DEFLATED) as zipf:
        zipdir(speaker2_dir, zipf)
    
    # Clean up the temporary directory
    shutil.rmtree(TEMP_DIR)
    
    return speaker1_zip, speaker2_zip

# Gradio interface
iface = gr.Interface(
    fn=process_audio_zip,
    inputs=gr.File(type="filepath"),
    outputs=[
        gr.File(label="Speaker 1 Audio"),
        gr.File(label="Speaker 2 Audio")
    ],
    title="Speaker Diarization",
    description="Upload a ZIP file containing audio files, and this will return two ZIP files containing diarized audio for each speaker."
)

iface.launch()