Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| from transformers import WhisperForConditionalGeneration, WhisperProcessor | |
| from transformers import pipeline | |
| import librosa | |
| import torch | |
| from spleeter.separator import Separator | |
| from pydub import AudioSegment | |
| from IPython.display import Audio | |
| import os | |
| import accelerate | |
| #import pyaudio | |
| import numpy as np | |
| # Create PyAudio object | |
| #p = pyaudio.PyAudio() | |
| CHUNK_SIZE = 1024 | |
| SAMPLING_RATE = 16000 | |
| vocals_data = bytes() | |
| # preprocess and crop audio file | |
| def audio_preprocess(input_file): #, in_data, frame_count, time_info, status): | |
| # Define callback function for audio processing | |
| global vocals_data | |
| # Convert input data to numpy array | |
| with open(input_file, 'rb') as f: | |
| audio_bytes = f.read() | |
| audio_array = np.frombuffer(audio_bytes, dtype=np.int16) | |
| # Perform vocal removal on the audio input | |
| # Pass the audio array as waveform to separate() method | |
| vocals = Separator('spleeter:2stems').separate(audio_array) | |
| # Convert vocals to audio data | |
| vocals_data = vocals['vocals'].flatten().astype(np.int16).tobytes() | |
| processed_audio = vocals_data | |
| # Return processed data for output | |
| return vocals_data, pyaudio.paContinue, processed_audio | |
| # audio processing 2? | |
| def py_audio(): | |
| # Open stream for recording | |
| stream = p.open(format=pyaudio.paInt16, channels=1, rate=SAMPLING_RATE, input=True, output=True, | |
| frames_per_buffer=CHUNK_SIZE, stream_callback=process_audio) | |
| # Start stream | |
| stream.start_stream() | |
| # Create stream for playback | |
| playback_stream = p.open(format=pyaudio.paInt16, channels=1, rate=SAMPLING_RATE, output=True) | |
| # Play processed data in real-time | |
| while stream.is_active(): | |
| if len(vocals_data) >= CHUNK_SIZE: | |
| playback_stream.write(vocals_data[:CHUNK_SIZE]) | |
| vocals_data = vocals_data[CHUNK_SIZE:] | |
| # Stop streams | |
| stream.stop_stream() | |
| stream.close() | |
| playback_stream.stop_stream() | |
| playback_stream.close() | |
| # Terminate PyAudio object | |
| p.terminate() | |
| # Now 'processed_file' contains the separated vocals | |
| separated_audio = vocals_data | |
| # separate music and vocal | |
| #separator = Separator('spleeter:2stems') | |
| #separator.separate_to_file(input_file, output_file) | |
| #separated_audio = separator.separate(input_file) | |
| # Crop the audio | |
| start_time = 60000 # e.g. 30 seconds, 30000 | |
| end_time = 110000 # e.g. 40 seconds, 40000 | |
| audio = AudioSegment.from_file(separated_audio) | |
| cropped_audio = audio[start_time:end_time] | |
| processed_audio = cropped_audio | |
| # .export('cropped_vocals.wav', format='wav') # save vocal audio file | |
| return processed_audio | |
| # ASR transcription | |
| def asr_model(processed_audio): | |
| # load audio file | |
| y, sr = librosa.load(processed_audio, sr=16000) | |
| # ASR model | |
| MODEL_NAME = "RexChan/ISOM5240-whisper-small-zhhk_1" | |
| processor = WhisperProcessor.from_pretrained(MODEL_NAME) | |
| model = WhisperForConditionalGeneration.from_pretrained(MODEL_NAME, low_cpu_mem_usage=True) | |
| model.config.forced_decoder_ids = None | |
| model.config.suppress_tokens = [] | |
| model.config.use_cache = False | |
| processed_in = processor(y, sampling_rate=sr, return_tensors="pt") | |
| gout = model.generate( | |
| input_features=processed_in.input_features, | |
| output_scores=True, return_dict_in_generate=True | |
| ) | |
| transcription = processor.batch_decode(gout.sequences, skip_special_tokens=True)[0] | |
| # print result | |
| print(f"Song lyrics = {transcription}") | |
| return transcription | |
| # sentiment analysis | |
| def senti_model(transcription): | |
| pipe = pipeline("text-classification", model="lxyuan/distilbert-base-multilingual-cased-sentiments-student") | |
| final_result = pipe(transcription) | |
| display = f"Sentiment Analysis shows that this song is {final_result[0]['label']}. Confident level of this analysis is {final_result[0]['score']*100:.1f}%." | |
| print(display) | |
| return display | |
| # main | |
| def main(input_file): | |
| separated_audio = audio_preprocess(input_file) | |
| transcription = asr_model(processed_audio) | |
| final_result = senti_model(transcription) | |
| st.write(final_result) | |
| if st.button("Play Audio"): | |
| st.audio(audio_data['audio'], | |
| format="audio/wav", | |
| start_time=0, | |
| sample_rate = audio_data['sampling_rate']) | |
| if __name__ == '__main__': | |
| # steamlit setup | |
| st.set_page_config(page_title="Sentiment Analysis on Your Cantonese Song",) | |
| st.header("Cantonese Song Sentiment Analyzer") | |
| input_file = os.path.isfile("test1.mp3") | |
| #input_file = st.file_uploader("upload a song in mp3 format", type="mp3") # upload song | |
| if input_file is not None: | |
| st.write("File uploaded successfully!") | |
| st.write(input_file) | |
| else: | |
| st.write("No file uploaded.") | |
| button_click = st.button("Run Analysis", type="primary") | |
| if button_click: | |
| main(input_file=input_file) | |