rafaaa2105 commited on
Commit
1710879
·
verified ·
1 Parent(s): e35c4d7

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +76 -79
app.py CHANGED
@@ -1,90 +1,87 @@
1
- import os
2
  import zipfile
 
3
  import shutil
4
- import torch
5
- import torchaudio
6
  from pyannote.audio import Pipeline
7
- import gradio as gr
8
 
9
- # Load the pre-trained model using your Hugging Face access token
10
- HUGGINGFACE_ACCESS_TOKEN = os.getenv("HF_TOKEN")
 
 
 
11
  pipeline = Pipeline.from_pretrained(
12
  "pyannote/speaker-diarization-3.1",
13
- use_auth_token=HUGGINGFACE_ACCESS_TOKEN)
14
-
15
- # Function to unzip the uploaded file
16
- def unzip_files(zip_fp, extract_to):
17
- with zipfile.ZipFile(zip_fp, 'r') as z:
18
- z.extractall(extract_to)
19
-
20
- # Function to zip files to a zip file
21
- def zip_files(input_dir, zip_fp):
22
- with zipfile.ZipFile(zip_fp, 'w') as z:
23
- for folder_name, subfolders, filenames in os.walk(input_dir):
24
- for filename in filenames:
25
- file_path = os.path.join(folder_name, filename)
26
- z.write(file_path, os.path.relpath(file_path, input_dir))
27
-
28
- # Function to classify and group files by speaker
29
- def classify_and_group_speakers(zip_file):
30
- # Step 1: Create temporary directories
31
- extract_dir = 'extract_temp'
32
- speaker1_dir = 'speaker1_temp'
33
- speaker2_dir = 'speaker2_temp'
34
- os.makedirs(extract_dir, exist_ok=True)
35
  os.makedirs(speaker1_dir, exist_ok=True)
36
  os.makedirs(speaker2_dir, exist_ok=True)
37
 
38
- # Step 2: Extract uploaded zip file
39
- unzip_files(zip_file.name, extract_dir)
40
-
41
- # Step 3: Analyze each audio file and determine the speaker
42
- for audio_file in os.listdir(extract_dir):
43
- audio_fp = os.path.join(extract_dir, audio_file)
44
-
45
- # Convert MP3 to waveform
46
- waveform, sample_rate = torchaudio.load(audio_fp)
47
-
48
- # Ensure the audio is mono and at 16 kHz
49
- if waveform.shape[0] > 1:
50
- waveform = waveform.mean(dim=0).unsqueeze(0)
51
- if sample_rate != 16000:
52
- waveform = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000)(waveform)
53
- sample_rate = 16000
54
-
55
- diarization = pipeline({"waveform": waveform, "sample_rate": sample_rate})
56
-
57
- # Check which speaker is dominant
58
- speaker1_segments = [segment for segment, _, label in diarization.itertracks(yield_label=True) if label == 'SPEAKER_00']
59
- speaker2_segments = [segment for segment, _, label in diarization.itertracks(yield_label=True) if label == 'SPEAKER_01']
60
-
61
- speaker1_duration = sum([segment.duration for segment in speaker1_segments])
62
- speaker2_duration = sum([segment.duration for segment in speaker2_segments])
63
-
64
- if speaker1_duration > speaker2_duration:
65
- shutil.copy(audio_fp, speaker1_dir)
66
- else:
67
- shutil.copy(audio_fp, speaker2_dir)
68
-
69
- # Step 4: Zip the grouped files
70
- speaker1_zip = 'speaker1.zip'
71
- speaker2_zip = 'speaker2.zip'
72
- zip_files(speaker1_dir, speaker1_zip)
73
- zip_files(speaker2_dir, speaker2_zip)
74
-
75
- # Step 5: Clean up temporary directories
76
- shutil.rmtree(extract_dir)
77
- shutil.rmtree(speaker1_dir)
78
- shutil.rmtree(speaker2_dir)
79
-
80
  return speaker1_zip, speaker2_zip
81
 
82
- # Gradio Interface
83
- def gradio_interface(zip_file):
84
- speaker1_zip, speaker2_zip = classify_and_group_speakers(zip_file)
85
- return speaker1_zip, speaker2_zip
86
-
87
- gradio_inputs = gr.File(label="Upload ZIP of Audio Files", file_count="single")
88
- gradio_outputs = [gr.File(label="Speaker 1 ZIP"), gr.File(label="Speaker 2 ZIP")]
89
-
90
- gr.Interface(fn=gradio_interface, inputs=gradio_inputs, outputs=gradio_outputs, title="Speaker Diarization & Grouping").launch()
 
 
 
 
 
1
+ import gradio as gr
2
  import zipfile
3
+ import os
4
  import shutil
 
 
5
  from pyannote.audio import Pipeline
6
+ import torch
7
 
8
+ # Set up the directory for processing
9
+ TEMP_DIR = "temp_audio"
10
+ os.makedirs(TEMP_DIR, exist_ok=True)
11
+
12
+ # Initialize the pyannote.audio pipeline
13
  pipeline = Pipeline.from_pretrained(
14
  "pyannote/speaker-diarization-3.1",
15
+ use_auth_token=os.getenv("HF_TOKEN")
16
+ )
17
+
18
+ # Move pipeline to GPU if available
19
+ if torch.cuda.is_available():
20
+ pipeline.to(torch.device("cuda"))
21
+
22
+ def process_audio_zip(file_info):
23
+ # Unzip the uploaded file
24
+ with zipfile.ZipFile(file_info, 'r') as zip_ref:
25
+ zip_ref.extractall(TEMP_DIR)
26
+
27
+ speaker1_dir = os.path.join(TEMP_DIR, "speaker1")
28
+ speaker2_dir = os.path.join(TEMP_DIR, "speaker2")
 
 
 
 
 
 
 
 
29
  os.makedirs(speaker1_dir, exist_ok=True)
30
  os.makedirs(speaker2_dir, exist_ok=True)
31
 
32
+ # Process each audio file in the temporary directory
33
+ for filename in os.listdir(TEMP_DIR):
34
+ if filename.endswith(".wav"):
35
+ file_path = os.path.join(TEMP_DIR, filename)
36
+
37
+ # Run the diarization pipeline
38
+ diarization = pipeline(file_path)
39
+
40
+ # Determine if the audio is mostly from speaker1 or speaker2
41
+ total_duration = {1: 0.0, 2: 0.0}
42
+ for turn, _, speaker in diarization.itertracks(yield_label=True):
43
+ total_duration[speaker] += turn.duration
44
+
45
+ # Move file to the corresponding speaker directory
46
+ dominant_speaker = 1 if total_duration[1] >= total_duration[2] else 2
47
+ if dominant_speaker == 1:
48
+ shutil.move(file_path, os.path.join(speaker1_dir, filename))
49
+ else:
50
+ shutil.move(file_path, os.path.join(speaker2_dir, filename))
51
+
52
+ # Zip the results
53
+ speaker1_zip = "speaker1.zip"
54
+ speaker2_zip = "speaker2.zip"
55
+
56
+ def zipdir(path, ziph):
57
+ # Zip the directories
58
+ for root, dirs, files in os.walk(path):
59
+ for file in files:
60
+ ziph.write(os.path.join(root, file),
61
+ os.path.relpath(os.path.join(root, file),
62
+ os.path.join(path, '..')))
63
+
64
+ with zipfile.ZipFile(speaker1_zip, 'w', zipfile.ZIP_DEFLATED) as zipf:
65
+ zipdir(speaker1_dir, zipf)
66
+
67
+ with zipfile.ZipFile(speaker2_zip, 'w', zipfile.ZIP_DEFLATED) as zipf:
68
+ zipdir(speaker2_dir, zipf)
69
+
70
+ # Clean up the temporary directory
71
+ shutil.rmtree(TEMP_DIR)
72
+
 
73
  return speaker1_zip, speaker2_zip
74
 
75
+ # Gradio interface
76
+ iface = gr.Interface(
77
+ fn=process_audio_zip,
78
+ inputs=gr.inputs.File(type="file"),
79
+ outputs=[
80
+ gr.File(label="Speaker 1 Audio"),
81
+ gr.File(label="Speaker 2 Audio")
82
+ ],
83
+ title="Speaker Diarization",
84
+ description="Upload a ZIP file containing audio files, and this will return two ZIP files containing diarized audio for each speaker."
85
+ )
86
+
87
+ iface.launch()