| | import glob
|
| | import queue
|
| | import sqlite3
|
| | import sys
|
| | import threading
|
| | import time
|
| | import uuid
|
| | from datetime import datetime
|
| | from pathlib import Path
|
| | from typing import Optional, Dict
|
| |
|
| | import numpy as np
|
| | import pyaudio
|
| | import streamlit as st
|
| | import torch
|
| | from transformers import WhisperForConditionalGeneration, WhisperProcessor
|
| |
|
| |
|
| |
|
| |
|
| | db_path = Path(__file__).parent.parent / "database" / "db_logsv2.db"
|
| |
|
| | db_sqlite = sqlite3.connect(db_path, check_same_thread=False)
|
| | cursor_db = db_sqlite.cursor()
|
| |
|
| |
|
| | class WhisperLiveTranscription:
|
| | def __init__(
|
| | self,
|
| | model_id: str = "openai/whisper-small",
|
| | language: str = "french",
|
| | blacklist: list = None,
|
| | ):
|
| | """
|
| | Instancie un objet WhisperLiveTranscription.
|
| |
|
| | Args:
|
| | model_id (str, optional): Le nom du modèle whisper à utiliser.
|
| | language (str, optional): La langue supposée du flux audio.
|
| | blacklist (list, optional): Une liste de mots ou expressions
|
| | à ne pas renvoyer dans la transcription.
|
| |
|
| | processor (WhisperProcessor): The WhisperProcessor object for pre-processing audio.
|
| | model (WhisperForConditionalGeneration): The WhisperForConditionalGeneration model for transcription.
|
| | is_running (bool): Whether the transcription is running or not.
|
| | language (str): The language of the transcription.
|
| | audio_queue (queue.Queue): The queue for storing audio chunks.
|
| | result_queue (queue.Queue): The queue for storing the transcription results.
|
| | transcription_queue (queue.Queue): The queue for retrieving the transcription results.
|
| | """
|
| |
|
| |
|
| | self.device = "cuda" if torch.cuda.is_available() else "cpu"
|
| | print(f"Using device: {self.device}")
|
| |
|
| |
|
| | self.processor = WhisperProcessor.from_pretrained(model_id)
|
| | self.model = WhisperForConditionalGeneration.from_pretrained(model_id).to(
|
| | self.device
|
| | )
|
| |
|
| |
|
| |
|
| | self.FORMAT = pyaudio.paFloat32
|
| |
|
| | self.CHANNELS = 1
|
| |
|
| |
|
| | self.RATE = 16000
|
| |
|
| | self.CHUNK = 1024
|
| |
|
| | self.RECORD_SECONDS = 3
|
| |
|
| | self.SILENCE_THRESHOLD = 0.001
|
| |
|
| |
|
| | self.audio_queue = (
|
| | queue.Queue()
|
| | )
|
| | self.result_queue = (
|
| | queue.Queue()
|
| | )
|
| |
|
| | self.transcription_queue = (
|
| | queue.Queue()
|
| | )
|
| |
|
| |
|
| | self.is_running = False
|
| |
|
| |
|
| | self.language = language
|
| |
|
| |
|
| | self.audio_buffer = []
|
| |
|
| |
|
| | self.last_process_time = time.time()
|
| |
|
| |
|
| | if blacklist is not None:
|
| | self.blacklist = blacklist
|
| | else:
|
| | self.blacklist = ["...", "Sous-titrage Société Radio-Canada", "Merci."]
|
| |
|
| | def start_recording(self):
|
| | """
|
| | Utilisée pour lancer l'enregistrement audio et la transcription.
|
| |
|
| | Cette méthode démarre un flux PyAudio pour capturer l'entrée audio depuis
|
| | le périphérique d'entrée par défaut. Les données audio sont ensuite
|
| | traitées dans un thread séparé pour vérifier si le niveau audio dépasse
|
| | le seuil de silence. Si le niveau audio est supérieur au seuil, les
|
| | données sont ajoutées dans un buffer. Ce buffer est ensuite traité
|
| | dans un thread séparé pour générer une transcription.
|
| |
|
| | La transcription est ensuite stockée dans une file d'attente pour être
|
| | récupérée. La méthode `get_transcription` peut ensuite être utilisée
|
| | pour récupérer la dernière transcription.
|
| | """
|
| |
|
| |
|
| | self.is_running = True
|
| |
|
| |
|
| | self.p = pyaudio.PyAudio()
|
| |
|
| | def audio_callback(
|
| | in_data: bytes,
|
| | frame_count: int,
|
| | time_info: Dict[str, float],
|
| | status: int,
|
| | ) -> tuple[bytes, int]:
|
| | """
|
| | Fonction callback pour le flux PyAudio entrant.
|
| |
|
| | (pyaudio demande tous les arguments même si ne les utilise pas)
|
| |
|
| | Argument utilisé :
|
| | in_data (bytes) : Les données audio reçues du périphérique d'entrée.
|
| |
|
| | Retour :
|
| | tuple[bytes, int] : Un tuple contenant les données audio originales et
|
| | un indicateur indiquant la continuité du flux.
|
| |
|
| | Cette fonction est appelée par PyAudio pour chaque chunk audio reçu.
|
| | Elle convertit les données audio de bytes en tableau NumPy
|
| | et les place dans la file d'attente pour traitement.
|
| | """
|
| | audio_data = np.frombuffer(in_data, dtype=np.float32)
|
| | self.audio_queue.put(audio_data)
|
| | return (in_data, pyaudio.paContinue)
|
| |
|
| |
|
| | self.stream = self.p.open(
|
| | format=self.FORMAT,
|
| | channels=self.CHANNELS,
|
| | rate=self.RATE,
|
| | input=True,
|
| | frames_per_buffer=self.CHUNK,
|
| | stream_callback=audio_callback,
|
| | )
|
| |
|
| | print("Recording started")
|
| | self.stream.start_stream()
|
| |
|
| | self.process_thread = threading.Thread(target=self._process_audio)
|
| | self.transcribe_thread = threading.Thread(target=self._transcribe_audio)
|
| | self.process_thread.start()
|
| | self.transcribe_thread.start()
|
| |
|
| | def _process_audio(self):
|
| | """
|
| | Traite et met en mémoire tampon les données audio de la file d'attente.
|
| |
|
| | Cette méthode récupère les morceaux audio de la file d'attente audio,
|
| | vérifie si le niveau audio dépasse le seuil de silence,
|
| | et si oui met les données audio en mémoire tampon.
|
| |
|
| | Cette mémoire est constituée d'une liste "audio_buffer".
|
| | """
|
| |
|
| |
|
| | while self.is_running:
|
| | try:
|
| | raw_chunk = self.audio_queue.get(timeout=1)
|
| | audio_chunk = raw_chunk
|
| |
|
| |
|
| |
|
| | if np.max(np.abs(audio_chunk)) > self.SILENCE_THRESHOLD:
|
| | self.audio_buffer.extend(audio_chunk)
|
| |
|
| |
|
| |
|
| |
|
| | current_time = time.time()
|
| | if current_time - self.last_process_time >= self.RECORD_SECONDS:
|
| | if self.audio_buffer:
|
| |
|
| |
|
| | audio_data = np.array(self.audio_buffer)
|
| |
|
| |
|
| | self.result_queue.put(audio_data)
|
| |
|
| |
|
| | self.audio_buffer = []
|
| |
|
| |
|
| | self.last_process_time = current_time
|
| | except queue.Empty:
|
| | continue
|
| |
|
| | def _transcribe_audio(self):
|
| | """
|
| | Méthode pour transcrire l'audio en texte.
|
| |
|
| | Cette méthode s'exécute dans une boucle continue tant que le programme tourne ou que la file d'attente
|
| | n'est pas vide. Elle traite les données audio pour les convertir en texte via le modèle Whisper.
|
| |
|
| | Le processus inclut:
|
| | - Récupération des données audio depuis la file d'attente
|
| | - Traitement et transcription via le modèle Whisper
|
| | - Sauvegarde de la transcription dans un fichier texte
|
| | - Ajout de la transcription à une file d'attente dédiée
|
| |
|
| | La transcription est filtrée via une liste noire (blacklist) avant d'être sauvegardée ou transmise.
|
| | """
|
| | while self.is_running or not self.result_queue.empty():
|
| | try:
|
| | audio_data = self.result_queue.get(timeout=1)
|
| |
|
| | input_features = self.processor(
|
| | audio_data, sampling_rate=self.RATE, return_tensors="pt"
|
| | ).input_features.to(self.device)
|
| |
|
| | with torch.no_grad():
|
| |
|
| |
|
| |
|
| | predicted_ids = self.model.generate(
|
| | input_features,
|
| | language=self.language,
|
| | task="transcribe",
|
| | max_length=448,
|
| | no_repeat_ngram_size=3,
|
| | )
|
| |
|
| | transcription = self.processor.batch_decode(
|
| | predicted_ids, skip_special_tokens=True
|
| | )[0]
|
| |
|
| | timestamp = datetime.now().strftime("%H:%M:%S")
|
| | print(f"[{timestamp}] {transcription}")
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | files = sorted(glob.glob("transcription*.txt"))
|
| | filename = files[-1]
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | with open(filename, "a", encoding="utf-8") as f:
|
| | if not any(item == transcription for item in self.blacklist):
|
| | f.write(f"{transcription}\n")
|
| |
|
| |
|
| |
|
| | if not any(item == transcription for item in self.blacklist):
|
| | self.transcription_queue.put(
|
| | {"text": transcription, "timestamp": timestamp}
|
| | )
|
| |
|
| | except queue.Empty:
|
| | continue
|
| | except Exception as e:
|
| | print(f"Error during transcription: {e}")
|
| | continue
|
| |
|
| | def get_transcription(
|
| | self, block: bool = False, timeout: Optional[float] = None
|
| | ) -> Optional[Dict[str, str]]:
|
| | """
|
| | Méthode utilisée pour récupérer la transcription depuis la file d'attente.
|
| |
|
| | Paramètres :
|
| | - block (bool): Si True, on attend jusqu'à ce qu'une transcription soit disponible,
|
| | ou que le délai d'attente soit atteint.
|
| | - timeout (float or None): le délai d'attente maximum pour récupérer une transcription.
|
| | Ignoré si block est à False.
|
| |
|
| | Renvoie :
|
| | - dict or None: un dico contenant le texte de la transcription et le timestamp.
|
| | """
|
| | try:
|
| | if block:
|
| | return self.transcription_queue.get(block=True, timeout=timeout)
|
| | else:
|
| | return self.transcription_queue.get_nowait()
|
| | except queue.Empty:
|
| | return None
|
| |
|
| | def stop_recording(self):
|
| | """
|
| | Arrête l'enregistrement et le traitement des segments audio,
|
| | puis traite les segments audio restants dans la file d'attente.
|
| |
|
| | Si le thread de transcription est toujours actif après 7 secondes,
|
| | on l'arrête quand même et un message d'avertissement est affiché.
|
| | """
|
| |
|
| | print("\nEn cours d'arrêt... Traitement des derniers segments audios...")
|
| |
|
| |
|
| | if hasattr(self, "stream"):
|
| | self.stream.stop_stream()
|
| | self.stream.close()
|
| | if hasattr(self, "p"):
|
| | self.p.terminate()
|
| |
|
| | if self.audio_buffer:
|
| |
|
| |
|
| | final_audio = np.array(self.audio_buffer)
|
| | self.result_queue.put(final_audio)
|
| |
|
| |
|
| | self.is_running = False
|
| |
|
| |
|
| | while not self.audio_queue.empty():
|
| | chunk = self.audio_queue.get()
|
| | self.audio_buffer.extend(chunk)
|
| |
|
| | if self.audio_buffer:
|
| | final_audio = np.array(self.audio_buffer)
|
| | self.result_queue.put(final_audio)
|
| | input_features = self.processor(
|
| | final_audio, sampling_rate=self.RATE, return_tensors="pt"
|
| | ).input_features.to(self.device)
|
| | with torch.no_grad():
|
| | predicted_ids = self.model.generate(
|
| | input_features, language=self.language, task="transcribe"
|
| | )
|
| | final_transcription = self.processor.batch_decode(
|
| | predicted_ids, skip_special_tokens=True
|
| | )[0]
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | files = sorted(glob.glob("transcription*.txt"))
|
| | filename = files[-1]
|
| |
|
| |
|
| | with open(filename, "a", encoding="utf-8") as f:
|
| | if not any(item == final_transcription for item in self.blacklist):
|
| | f.write(f"{final_transcription}\n")
|
| |
|
| | if hasattr(self, "process_thread"):
|
| | self.process_thread.join()
|
| | if hasattr(self, "transcribe_thread"):
|
| | timeout = 7
|
| | self.transcribe_thread.join(timeout=timeout)
|
| | if self.transcribe_thread.is_alive():
|
| | print("Attenion, le dernier chunk audio n'a pas été transcrit")
|
| |
|
| | print("Enregistrement et transcription arrêtés")
|
| |
|