| | """ |
| | ASR Audio Analysis Pipeline |
| | |
| | Complete pipeline: Diarization + Whisper Transcription + Professional Audio Analysis |
| | """ |
| |
|
| | import os |
| | import sys |
| | from pathlib import Path |
| |
|
| | from stereo_diarizer import StereoCallDiarizer |
| | from whisper_transcriber import WhisperTranscriber |
| | from audio_analyzer import AudioAnalyzer |
| |
|
| |
|
| | class ASRPipeline: |
| | """ASR End-to-end audio analysis pipeline""" |
| |
|
| | def __init__(self, |
| | input_folder: str, |
| | output_folder: str, |
| | whisper_model: str, |
| | min_silence_len: int = 500, |
| | silence_thresh: int = -40, |
| | device: str = "cpu", |
| | verbose: bool = True): |
| |
|
| | self.input_folder = Path(input_folder) |
| | self.output_folder = Path(output_folder) |
| | self.whisper_model = whisper_model |
| | self.min_silence_len = min_silence_len |
| | self.silence_thresh = silence_thresh |
| | self.device = device |
| | self.verbose = verbose |
| |
|
| | self.stats = { |
| | 'total_files': 0, |
| | 'processed': 0, |
| | 'failed': 0, |
| | 'stereo': 0, |
| | 'mono': 0, |
| | 'failed_files': [], |
| | 'total_duration': 0.0 |
| | } |
| |
|
| | self.analyzer = AudioAnalyzer(verbose=self.verbose) |
| | self.transcriber = None |
| |
|
| | def _init_transcriber(self): |
| | if self.transcriber is None: |
| | self.transcriber = WhisperTranscriber( |
| | self.whisper_model, self.device, self.verbose |
| | ) |
| |
|
| | def get_audio_files(self): |
| | formats = {'.wav', '.mp3', '.m4a', '.flac', '.ogg', '.opus'} |
| | return sorted([ |
| | f for f in self.input_folder.iterdir() |
| | if f.is_file() and f.suffix.lower() in formats |
| | ]) |
| |
|
| | def process_single(self, audio_file: Path) -> bool: |
| | output_dir = self.output_folder / audio_file.stem |
| | output_dir.mkdir(parents=True, exist_ok=True) |
| |
|
| | if self.verbose: |
| | print(f"\n{'='*60}") |
| | print(f"PROCESSING: {audio_file.name}") |
| | print(f"{'='*60}") |
| |
|
| | try: |
| | |
| | if self.verbose: |
| | print("\n[1/3] DIARIZATION") |
| |
|
| | diarizer = StereoCallDiarizer( |
| | str(audio_file), self.min_silence_len, |
| | self.silence_thresh, self.verbose |
| | ) |
| | diarizer.load_audio() |
| |
|
| | if diarizer.is_stereo: |
| | self.stats['stereo'] += 1 |
| | else: |
| | self.stats['mono'] += 1 |
| |
|
| | left, right = diarizer.detect_speech_segments() |
| | diarizer.create_timeline(left, right) |
| |
|
| | segments = diarizer.export_segments(str(output_dir)) |
| | diarizer.export_full_speakers(str(output_dir)) |
| | diarizer.export_transcript_txt(str(output_dir)) |
| | diarizer.export_transcript_json(str(output_dir)) |
| |
|
| | duration = len(diarizer.audio) / 1000 |
| | self.stats['total_duration'] += duration |
| |
|
| | |
| | if self.verbose: |
| | print("\n[2/3] TRANSCRIPTION") |
| |
|
| | self._init_transcriber() |
| | transcribed = self.transcriber.transcribe_segments( |
| | segments, diarizer.timeline |
| | ) |
| | self.transcriber.export_transcription(transcribed, str(output_dir)) |
| |
|
| | |
| | if self.verbose: |
| | print("\n[3/3] AUDIO ANALYSIS") |
| |
|
| | analysis = self.analyzer.analyze_call( |
| | segments, diarizer.timeline, |
| | audio_file.stem, diarizer.is_stereo |
| | ) |
| | self.analyzer.export_analysis(analysis, str(output_dir)) |
| |
|
| | if self.verbose: |
| | print(f"\nSUCCESS: {audio_file.name}") |
| | print(f"Type: {'STEREO' if diarizer.is_stereo else 'MONO'}") |
| | print(f"Duration: {duration:.1f}s | Quality: {analysis.overall_quality_score}/100") |
| |
|
| | return True |
| |
|
| | except Exception as e: |
| | if self.verbose: |
| | print(f"\nFAILED: {audio_file.name}") |
| | print(f"Error: {e}") |
| | import traceback |
| | traceback.print_exc() |
| | return False |
| |
|
| | def run(self): |
| | print("\n" + "="*60) |
| | print("ASR AUDIO ANALYSIS PIPELINE") |
| | print("="*60) |
| |
|
| | files = self.get_audio_files() |
| | self.stats['total_files'] = len(files) |
| |
|
| | if not files: |
| | print(f"\nNo audio files in {self.input_folder}") |
| | return |
| |
|
| | print(f"\nFound {len(files)} file(s)") |
| | print(f"Input: {self.input_folder}") |
| | print(f"Output: {self.output_folder}") |
| |
|
| | for i, f in enumerate(files, 1): |
| | print(f"\n[{i}/{len(files)}]") |
| | if self.process_single(f): |
| | self.stats['processed'] += 1 |
| | else: |
| | self.stats['failed'] += 1 |
| | self.stats['failed_files'].append(f.name) |
| |
|
| | print("\n" + "="*60) |
| | print("COMPLETE") |
| | print("="*60) |
| | print(f"Processed: {self.stats['processed']}/{self.stats['total_files']}") |
| | print(f"Stereo: {self.stats['stereo']} | Mono: {self.stats['mono']}") |
| | print(f"Total duration: {self.stats['total_duration']:.1f}s") |
| |
|
| | if self.stats['failed_files']: |
| | print(f"\nFailed: {', '.join(self.stats['failed_files'])}") |
| |
|
| | print(f"\nResults: {self.output_folder}") |
| | print("\nRun 'python api_server.py' and open http://localhost:5001") |
| |
|
| |
|
| | def main(): |
| | INPUT_FOLDER = "/home/ramal/Downloads/Archive" |
| | OUTPUT_FOLDER = "output" |
| | WHISPER_MODEL = "/home/ramal/Desktop/end-to-end/whisper-small-az/checkpoint-157959" |
| |
|
| | if not os.path.exists(INPUT_FOLDER): |
| | print(f"Error: {INPUT_FOLDER} not found") |
| | sys.exit(1) |
| |
|
| | os.makedirs(OUTPUT_FOLDER, exist_ok=True) |
| |
|
| | pipeline = ASRPipeline( |
| | input_folder=INPUT_FOLDER, |
| | output_folder=OUTPUT_FOLDER, |
| | whisper_model=WHISPER_MODEL, |
| | device="cpu", |
| | verbose=True |
| | ) |
| |
|
| | pipeline.run() |
| |
|
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|