| |
| import os |
|
|
| |
| from omegaconf import OmegaConf |
| from nemo.collections.asr.models.msdd_models import NeuralDiarizer |
|
|
| |
| from src.audio.utils import Formatter |
| from src.audio.metrics import SilenceStats |
| from src.audio.error import DialogueDetecting |
| from src.audio.alignment import ForcedAligner |
| from src.audio.effect import DemucsVocalSeparator |
| from src.audio.preprocessing import SpeechEnhancement |
| from src.audio.io import SpeakerTimestampReader, TranscriptWriter |
| from src.audio.analysis import WordSpeakerMapper, SentenceSpeakerMapper, Audio |
| from src.audio.processing import AudioProcessor, Transcriber, PunctuationRestorer |
| from src.text.utils import Annotator |
| from src.text.llm import LLMOrchestrator, LLMResultHandler |
| from src.utils.utils import Cleaner, Watcher |
| from src.db.manager import Database |
|
|
|
|
| async def main(audio_file_path: str): |
| """ |
| Process an audio file to perform diarization, transcription, punctuation restoration, |
| and speaker role classification. |
| |
| Parameters |
| ---------- |
| audio_file_path : str |
| The path to the input audio file to be processed. |
| |
| Returns |
| ------- |
| None |
| """ |
| |
| config_nemo = "config/nemo/diar_infer_telephonic.yaml" |
| manifest_path = ".temp/manifest.json" |
| temp_dir = ".temp" |
| rttm_file_path = os.path.join(temp_dir, "pred_rttms", "mono_file.rttm") |
| transcript_output_path = ".temp/output.txt" |
| srt_output_path = ".temp/output.srt" |
| config_path = "config/config.yaml" |
| prompt_path = "config/prompt.yaml" |
| db_path = ".db/Callytics.sqlite" |
| db_topic_fetch_path = "src/db/sql/TopicFetch.sql" |
| db_topic_insert_path = "src/db/sql/TopicInsert.sql" |
| db_audio_properties_insert_path = "src/db/sql/AudioPropertiesInsert.sql" |
| db_utterance_insert_path = "src/db/sql/UtteranceInsert.sql" |
|
|
| |
| config = OmegaConf.load(config_path) |
| device = config.runtime.device |
| compute_type = config.runtime.compute_type |
| os.environ["PYTORCH_CUDA_ALLOC_CONF"] = config.runtime.cuda_alloc_conf |
|
|
| |
| dialogue_detector = DialogueDetecting(delete_original=True) |
| enhancer = SpeechEnhancement(config_path=config_path, output_dir=temp_dir) |
| separator = DemucsVocalSeparator() |
| processor = AudioProcessor(audio_path=audio_file_path, temp_dir=temp_dir) |
| transcriber = Transcriber(device=device, compute_type=compute_type) |
| aligner = ForcedAligner(device=device) |
| llm_handler = LLMOrchestrator(config_path=config_path, prompt_config_path=prompt_path, model_id="openai") |
| llm_result_handler = LLMResultHandler() |
| cleaner = Cleaner() |
| formatter = Formatter() |
| db = Database(db_path) |
| audio_feature_extractor = Audio(audio_file_path) |
|
|
| |
| has_dialogue = dialogue_detector.process(audio_file_path) |
| if not has_dialogue: |
| return |
|
|
| |
| audio_path = enhancer.enhance_audio( |
| input_path=audio_file_path, |
| output_path=os.path.join(temp_dir, "enhanced.wav"), |
| noise_threshold=0.0001, |
| verbose=True |
| ) |
|
|
| |
| vocal_path = separator.separate_vocals(audio_file=audio_path, output_dir=temp_dir) |
|
|
| |
| transcript, info = transcriber.transcribe(audio_path=vocal_path) |
| detected_language = info["language"] |
|
|
| |
| word_timestamps = aligner.align( |
| audio_path=vocal_path, |
| transcript=transcript, |
| language=detected_language |
| ) |
|
|
| |
| processor.audio_path = vocal_path |
| mono_audio_path = processor.convert_to_mono() |
| processor.audio_path = mono_audio_path |
| processor.create_manifest(manifest_path) |
| cfg = OmegaConf.load(config_nemo) |
| cfg.diarizer.manifest_filepath = manifest_path |
| cfg.diarizer.out_dir = temp_dir |
| msdd_model = NeuralDiarizer(cfg=cfg) |
| msdd_model.diarize() |
|
|
| |
| |
| speaker_reader = SpeakerTimestampReader(rttm_path=rttm_file_path) |
| speaker_ts = speaker_reader.read_speaker_timestamps() |
|
|
| |
| word_speaker_mapper = WordSpeakerMapper(word_timestamps, speaker_ts) |
| wsm = word_speaker_mapper.get_words_speaker_mapping() |
|
|
| |
| punct_restorer = PunctuationRestorer(language=detected_language) |
| wsm = punct_restorer.restore_punctuation(wsm) |
| word_speaker_mapper.word_speaker_mapping = wsm |
| word_speaker_mapper.realign_with_punctuation() |
| wsm = word_speaker_mapper.word_speaker_mapping |
|
|
| |
| sentence_mapper = SentenceSpeakerMapper() |
| ssm = sentence_mapper.get_sentences_speaker_mapping(wsm) |
|
|
| |
| writer = TranscriptWriter() |
| writer.write_transcript(ssm, transcript_output_path) |
| writer.write_srt(ssm, srt_output_path) |
|
|
| |
| speaker_roles = await llm_handler.generate("Classification", ssm) |
|
|
| |
| ssm = llm_result_handler.validate_and_fallback(speaker_roles, ssm) |
| llm_result_handler.log_result(ssm, speaker_roles) |
|
|
| |
| ssm_with_indices = formatter.add_indices_to_ssm(ssm) |
| annotator = Annotator(ssm_with_indices) |
| sentiment_results = await llm_handler.generate("SentimentAnalysis", user_input=ssm) |
| annotator.add_sentiment(sentiment_results) |
|
|
| |
| profane_results = await llm_handler.generate("ProfanityWordDetection", user_input=ssm) |
| annotator.add_profanity(profane_results) |
|
|
| |
| summary_result = await llm_handler.generate("Summary", user_input=ssm) |
| annotator.add_summary(summary_result) |
|
|
| |
| conflict_result = await llm_handler.generate("ConflictDetection", user_input=ssm) |
| annotator.add_conflict(conflict_result) |
|
|
| |
| topics = db.fetch(db_topic_fetch_path) |
| topic_result = await llm_handler.generate( |
| "TopicDetection", |
| user_input=ssm, |
| system_input=topics |
| ) |
| annotator.add_topic(topic_result) |
|
|
| |
| props = audio_feature_extractor.properties() |
|
|
| ( |
| name, |
| file_extension, |
| absolute_file_path, |
| sample_rate, |
| min_frequency, |
| max_frequency, |
| audio_bit_depth, |
| num_channels, |
| audio_duration, |
| rms_loudness, |
| final_features |
| ) = props |
|
|
| rms_loudness_db = final_features["RMSLoudness"] |
| zero_crossing_rate_db = final_features["ZeroCrossingRate"] |
| spectral_centroid_db = final_features["SpectralCentroid"] |
| eq_20_250_db = final_features["EQ_20_250_Hz"] |
| eq_250_2000_db = final_features["EQ_250_2000_Hz"] |
| eq_2000_6000_db = final_features["EQ_2000_6000_Hz"] |
| eq_6000_20000_db = final_features["EQ_6000_20000_Hz"] |
| mfcc_values = [final_features[f"MFCC_{i}"] for i in range(1, 14)] |
|
|
| final_output = annotator.finalize() |
|
|
| |
| stats = SilenceStats.from_segments(final_output['ssm']) |
| t_std = stats.threshold_std(factor=0.99) |
| final_output["silence"] = t_std |
|
|
| print("Final_Output:", final_output) |
|
|
| |
| |
| summary = final_output.get("summary", "") |
| conflict_flag = 1 if final_output.get("conflict", False) else 0 |
| silence_value = final_output.get("silence", 0.0) |
| detected_topic = final_output.get("topic", "Unknown") |
|
|
| topic_id = db.get_or_insert_topic_id(detected_topic, topics, db_topic_insert_path) |
|
|
| params = ( |
| name, |
| topic_id, |
| file_extension, |
| absolute_file_path, |
| sample_rate, |
| min_frequency, |
| max_frequency, |
| audio_bit_depth, |
| num_channels, |
| audio_duration, |
| rms_loudness_db, |
| zero_crossing_rate_db, |
| spectral_centroid_db, |
| eq_20_250_db, |
| eq_250_2000_db, |
| eq_2000_6000_db, |
| eq_6000_20000_db, |
| *mfcc_values, |
| summary, |
| conflict_flag, |
| silence_value |
| ) |
|
|
| last_id = db.insert(db_audio_properties_insert_path, params) |
| print(f"Audio properties inserted successfully into the File table with ID: {last_id}") |
|
|
| |
| utterances = final_output["ssm"] |
|
|
| for utterance in utterances: |
| file_id = last_id |
| speaker = utterance["speaker"] |
| sequence = utterance["index"] |
| start_time = utterance["start_time"] / 1000.0 |
| end_time = utterance["end_time"] / 1000.0 |
| content = utterance["text"] |
| sentiment = utterance["sentiment"] |
| profane = 1 if utterance["profane"] else 0 |
|
|
| utterance_params = ( |
| file_id, |
| speaker, |
| sequence, |
| start_time, |
| end_time, |
| content, |
| sentiment, |
| profane |
| ) |
|
|
| db.insert(db_utterance_insert_path, utterance_params) |
|
|
| print("Utterances inserted successfully into the Utterance table.") |
|
|
| |
| cleaner.cleanup(temp_dir, audio_file_path) |
|
|
|
|
| async def process(path: str): |
| """ |
| Asynchronous callback function that is triggered when a new audio file is detected. |
| |
| Parameters |
| ---------- |
| path : str |
| The path to the newly created audio file. |
| |
| Returns |
| ------- |
| None |
| """ |
| print(f"Processing new audio file: {path}") |
| await main(path) |
|
|
|
|
| if __name__ == "__main__": |
| directory_to_watch = ".data/input" |
| Watcher.start_watcher(directory_to_watch, process) |
|
|