|
|
from google.cloud import speech |
|
|
from google.cloud import storage |
|
|
import os |
|
|
import wave |
|
|
from pydub import AudioSegment |
|
|
|
|
|
def get_audio_properties(file_path): |
|
|
""" |
|
|
Get sample rate and number of channels from the WAV file. |
|
|
""" |
|
|
with wave.open(file_path, "rb") as wav_file: |
|
|
sample_rate = wav_file.getframerate() |
|
|
channels = wav_file.getnchannels() |
|
|
return sample_rate, channels |
|
|
|
|
|
def convert_to_mono(input_path, output_path): |
|
|
"""Convert video from 2+ channel audio to 1 channel audio for a |
|
|
single detection""" |
|
|
audio = AudioSegment.from_wav(input_path) |
|
|
mono_audio = audio.set_channels(1) |
|
|
mono_audio.export(output_path, format="wav") |
|
|
|
|
|
def upload_to_gcs(bucket_name, local_file_path, gcs_file_name): |
|
|
storage_client = storage.Client() |
|
|
bucket = storage_client.bucket(bucket_name) |
|
|
blob = bucket.blob(gcs_file_name) |
|
|
blob.upload_from_filename(local_file_path) |
|
|
return f"gs://{bucket_name}/{gcs_file_name}" |
|
|
|
|
|
def transcribe_gcs(gcs_uri): |
|
|
sample_rate , _ = get_audio_properties('temp_audio.wav') |
|
|
client = speech.SpeechClient() |
|
|
audio = speech.RecognitionAudio(uri=gcs_uri) |
|
|
config = speech.RecognitionConfig( |
|
|
encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16, |
|
|
sample_rate_hertz=sample_rate, |
|
|
language_code="en-US", |
|
|
) |
|
|
|
|
|
operation = client.long_running_recognize(config=config, audio=audio) |
|
|
response = operation.result(timeout=600) |
|
|
|
|
|
transcript = "\n".join([result.alternatives[0].transcript for result in response.results]) |
|
|
return transcript |
|
|
|
|
|
def get_transcription(): |
|
|
|
|
|
bucket_name = "meeting-audio-bucket" |
|
|
audio_file = "temp_audio.wav" |
|
|
convert_to_mono(audio_file,audio_file) |
|
|
gcs_uri = upload_to_gcs(bucket_name, audio_file, "temp_audio.wav") |
|
|
transcript = transcribe_gcs(gcs_uri) |
|
|
|
|
|
return transcript |
|
|
|