rafaaa2105 commited on
Commit
901bdaa
·
verified ·
1 Parent(s): f3df7c4

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +72 -77
app.py CHANGED
@@ -1,86 +1,81 @@
1
- import gradio as gr
2
- from pyannote.audio import Pipeline
3
- import torch
4
  import os
5
  import zipfile
6
- import tempfile
7
  import shutil
8
- from pydub import AudioSegment
9
- import numpy as np
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
10
 
11
- # Monkey patch numpy to add NAN attribute
12
- np.NAN = np.nan
 
 
 
13
 
14
- hf_token = os.getenv("HF_TOKEN")
 
 
 
15
 
16
- # Initialize the diarization pipeline
17
- pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token=hf_token)
18
- pipeline.to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))
19
 
20
- def process_zip(zip_file):
21
- with tempfile.TemporaryDirectory() as temp_dir:
22
- # Step 1: Extract the zip file
23
- with zipfile.ZipFile(zip_file.name, 'r') as zip_ref:
24
- zip_ref.extractall(temp_dir)
25
-
26
- # Create directories for each speaker
27
- speaker1_dir = os.path.join(temp_dir, "speaker1")
28
- speaker2_dir = os.path.join(temp_dir, "speaker2")
29
- os.makedirs(speaker1_dir, exist_ok=True)
30
- os.makedirs(speaker2_dir, exist_ok=True)
31
-
32
- # Step 2: Analyze each audio file
33
- for filename in os.listdir(temp_dir):
34
- if filename.lower().endswith(('.wav', '.mp3', '.ogg', '.flac')):
35
- file_path = os.path.join(temp_dir, filename)
36
-
37
- # Load audio file
38
- audio = AudioSegment.from_file(file_path)
39
- samples = np.array(audio.get_array_of_samples())
40
-
41
- # Convert to mono if stereo
42
- if audio.channels == 2:
43
- samples = samples.reshape((-1, 2)).mean(axis=1)
44
-
45
- # Convert to float32 numpy array
46
- waveform = torch.tensor(samples).float() / 32768.0 # Assuming 16-bit audio
47
- waveform = waveform.unsqueeze(0) # Add channel dimension
48
-
49
- # Perform diarization
50
- diarization = pipeline({"waveform": waveform, "sample_rate": audio.frame_rate})
51
-
52
- # Determine dominant speaker
53
- speaker_times = {1: 0, 2: 0}
54
- for turn, _, speaker in diarization.itertracks(yield_label=True):
55
- speaker_num = int(speaker.split('_')[-1])
56
- speaker_times[speaker_num] += turn.end - turn.start
57
-
58
- dominant_speaker = 1 if speaker_times[1] > speaker_times[2] else 2
59
-
60
- # Move file to appropriate speaker directory
61
- if dominant_speaker == 1:
62
- shutil.move(file_path, os.path.join(speaker1_dir, filename))
63
- else:
64
- shutil.move(file_path, os.path.join(speaker2_dir, filename))
65
-
66
- # Step 3: Create zip files for each speaker
67
- speaker1_zip = os.path.join(temp_dir, "speaker1.zip")
68
- speaker2_zip = os.path.join(temp_dir, "speaker2.zip")
69
-
70
- shutil.make_archive(os.path.join(temp_dir, "speaker1"), 'zip', speaker1_dir)
71
- shutil.make_archive(os.path.join(temp_dir, "speaker2"), 'zip', speaker2_dir)
72
-
73
- return speaker1_zip, speaker2_zip
74
 
75
- iface = gr.Interface(
76
- fn=process_zip,
77
- inputs=gr.File(label="Upload ZIP file containing audio files"),
78
- outputs=[
79
- gr.File(label="Speaker 1 Audio Files"),
80
- gr.File(label="Speaker 2 Audio Files")
81
- ],
82
- title="Speaker Diarization and Audio Sorting",
83
- description="Upload a ZIP file containing audio files. The system will analyze each file and sort them into two groups based on the dominant speaker."
84
- )
85
 
86
- iface.launch()
 
 
 
 
1
  import os
2
  import zipfile
 
3
  import shutil
4
+ import torch
5
+ import torchaudio
6
+ from pyannote.audio import Pipeline
7
+ from pyannote.core import Segment
8
+ import gradio as gr
9
+
10
+ # Load the pre-trained model using your Hugging Face access token
11
+ HUGGINGFACE_ACCESS_TOKEN = 'YOUR_ACCESS_TOKEN'
12
+ pipeline = Pipeline.from_pretrained(
13
+ "pyannote/speaker-diarization-3.1",
14
+ use_auth_token=HUGGINGFACE_ACCESS_TOKEN)
15
+
16
+ # Function to unzip the uploaded file
17
+ def unzip_files(zip_fp, extract_to):
18
+ with zipfile.ZipFile(zip_fp, 'r') as z:
19
+ z.extractall(extract_to)
20
+
21
+ # Function to zip files to a zip file
22
+ def zip_files(input_dir, zip_fp):
23
+ with zipfile.ZipFile(zip_fp, 'w') as z:
24
+ for folder_name, subfolders, filenames in os.walk(input_dir):
25
+ for filename in filenames:
26
+ file_path = os.path.join(folder_name, filename)
27
+ z.write(file_path, os.path.relpath(file_path, input_dir))
28
+
29
+ # Function to classify and group files by speaker
30
+ def classify_and_group_speakers(zip_file):
31
+ # Step 1: Create temporary directories
32
+ extract_dir = 'extract_temp'
33
+ speaker1_dir = 'speaker1_temp'
34
+ speaker2_dir = 'speaker2_temp'
35
+ os.makedirs(extract_dir, exist_ok=True)
36
+ os.makedirs(speaker1_dir, exist_ok=True)
37
+ os.makedirs(speaker2_dir, exist_ok=True)
38
+
39
+ # Step 2: Extract uploaded zip file
40
+ unzip_files(zip_file.name, extract_dir)
41
+
42
+ # Step 3: Analyze each audio file and determine the speaker
43
+ for audio_file in os.listdir(extract_dir):
44
+ audio_fp = os.path.join(extract_dir, audio_file)
45
+ waveform, sample_rate = torchaudio.load(audio_fp)
46
+ diarization = pipeline({"waveform": waveform, "sample_rate": sample_rate})
47
+
48
+ # Check which speaker is dominant
49
+ speaker1_segments = [segment for segment, _, label in diarization.itertracks(yield_label=True) if label == 'SPEAKER_00']
50
+ speaker2_segments = [segment for segment, _, label in diarization.itertracks(yield_label=True) if label == 'SPEAKER_01']
51
+
52
+ speaker1_duration = sum([segment.duration for segment in speaker1_segments])
53
+ speaker2_duration = sum([segment.duration for segment in speaker2_segments])
54
+
55
+ if speaker1_duration > speaker2_duration:
56
+ shutil.copy(audio_fp, speaker1_dir)
57
+ else:
58
+ shutil.copy(audio_fp, speaker2_dir)
59
 
60
+ # Step 4: Zip the grouped files
61
+ speaker1_zip = 'speaker1.zip'
62
+ speaker2_zip = 'speaker2.zip'
63
+ zip_files(speaker1_dir, speaker1_zip)
64
+ zip_files(speaker2_dir, speaker2_zip)
65
 
66
+ # Step 5: Clean up temporary directories
67
+ shutil.rmtree(extract_dir)
68
+ shutil.rmtree(speaker1_dir)
69
+ shutil.rmtree(speaker2_dir)
70
 
71
+ return speaker1_zip, speaker2_zip
 
 
72
 
73
+ # Gradio Interface
74
+ def gradio_interface(zip_file):
75
+ speaker1_zip, speaker2_zip = classify_and_group_speakers(zip_file)
76
+ return speaker1_zip, speaker2_zip
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
77
 
78
+ gradio_inputs = gr.inputs.File(label="Upload ZIP of Audio Files", file_count="single")
79
+ gradio_outputs = [gr.outputs.File(label="Speaker 1 ZIP"), gr.outputs.File(label="Speaker 2 ZIP")]
 
 
 
 
 
 
 
 
80
 
81
+ gr.Interface(fn=gradio_interface, inputs=gradio_inputs, outputs=gradio_outputs, title="Speaker Diarization & Grouping").launch()