| """ |
| Output Formatting Module for Multilingual Audio Intelligence System |
| |
| This module consolidates processed data from speaker diarization, speech recognition, |
| and neural machine translation into various structured formats for different use cases. |
| Designed for maximum flexibility and user-friendly output presentation. |
| |
| Key Features: |
| - JSON format for programmatic access and API integration |
| - SRT subtitle format for video/media players with speaker labels |
| - Human-readable text format with rich metadata |
| - Interactive timeline format for web visualization |
| - CSV export for data analysis and spreadsheet applications |
| - Rich metadata preservation throughout all formats |
| - Error handling and graceful degradation |
| |
| Output Formats: JSON, SRT, Plain Text, CSV, Timeline |
| Dependencies: json, csv, dataclasses |
| """ |
|
|
| import json |
| import csv |
| import io |
| import logging |
| from typing import List, Dict, Optional, Union, Any |
| from dataclasses import dataclass, asdict |
| from datetime import timedelta |
| import textwrap |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
|
|
| @dataclass |
| class ProcessedSegment: |
| """ |
| Unified data structure for a processed audio segment with all metadata. |
| |
| Attributes: |
| start_time (float): Segment start time in seconds |
| end_time (float): Segment end time in seconds |
| speaker_id (str): Speaker identifier |
| original_text (str): Transcribed text in original language |
| original_language (str): Detected original language code |
| translated_text (str): English translation |
| confidence_diarization (float): Speaker diarization confidence |
| confidence_transcription (float): Speech recognition confidence |
| confidence_translation (float): Translation confidence |
| word_timestamps (List[Dict]): Word-level timing information |
| model_info (Dict): Information about models used |
| """ |
| start_time: float |
| end_time: float |
| speaker_id: str |
| original_text: str |
| original_language: str |
| translated_text: str |
| confidence_diarization: float = 1.0 |
| confidence_transcription: float = 1.0 |
| confidence_translation: float = 1.0 |
| word_timestamps: Optional[List[Dict]] = None |
| model_info: Optional[Dict] = None |
| |
| @property |
| def duration(self) -> float: |
| """Duration of the segment in seconds.""" |
| return self.end_time - self.start_time |
| |
| def to_dict(self) -> dict: |
| """Convert to dictionary for JSON serialization.""" |
| return asdict(self) |
|
|
|
|
| class OutputFormatter: |
| """ |
| Advanced output formatting for multilingual audio intelligence results. |
| |
| Converts processed audio data into multiple user-friendly formats with |
| comprehensive metadata and beautiful presentation. |
| """ |
| |
| def __init__(self, audio_filename: str = "audio_file"): |
| """ |
| Initialize the Output Formatter. |
| |
| Args: |
| audio_filename (str): Name of the original audio file for references |
| """ |
| self.audio_filename = audio_filename |
| self.creation_timestamp = None |
| self.processing_stats = {} |
| |
| def format_all_outputs(self, |
| segments: List[ProcessedSegment], |
| audio_metadata: Optional[Dict] = None, |
| processing_stats: Optional[Dict] = None) -> Dict[str, str]: |
| """ |
| Generate all output formats in one call. |
| |
| Args: |
| segments (List[ProcessedSegment]): Processed audio segments |
| audio_metadata (Dict, optional): Original audio file metadata |
| processing_stats (Dict, optional): Processing time and performance stats |
| |
| Returns: |
| Dict[str, str]: Dictionary with all formatted outputs |
| """ |
| self.processing_stats = processing_stats or {} |
| |
| return { |
| 'json': self.to_json(segments, audio_metadata), |
| 'srt_original': self.to_srt(segments, use_translation=False), |
| 'srt_translated': self.to_srt(segments, use_translation=True), |
| 'text': self.to_text(segments, audio_metadata), |
| 'csv': self.to_csv(segments), |
| 'timeline': self.to_timeline_json(segments), |
| 'summary': self.generate_summary(segments, audio_metadata) |
| } |
| |
| def to_json(self, |
| segments: List[ProcessedSegment], |
| audio_metadata: Optional[Dict] = None) -> str: |
| """ |
| Convert segments to comprehensive JSON format. |
| |
| Args: |
| segments (List[ProcessedSegment]): Processed segments |
| audio_metadata (Dict, optional): Audio file metadata |
| |
| Returns: |
| str: JSON formatted string |
| """ |
| |
| stats = self._generate_statistics(segments) |
| |
| |
| output = { |
| "metadata": { |
| "audio_filename": self.audio_filename, |
| "processing_timestamp": self._get_timestamp(), |
| "total_segments": len(segments), |
| "total_speakers": len(set(seg.speaker_id for seg in segments)), |
| "languages_detected": list(set(seg.original_language for seg in segments)), |
| "total_audio_duration": stats['total_duration'], |
| "total_speech_duration": stats['total_speech_duration'], |
| "speech_ratio": stats['speech_ratio'], |
| "audio_metadata": audio_metadata, |
| "processing_stats": self.processing_stats |
| }, |
| "statistics": stats, |
| "segments": [seg.to_dict() for seg in segments], |
| "speakers": self._generate_speaker_stats(segments), |
| "languages": self._generate_language_stats(segments) |
| } |
| |
| return json.dumps(output, indent=2, ensure_ascii=False) |
| |
| def to_srt(self, |
| segments: List[ProcessedSegment], |
| use_translation: bool = False, |
| include_speaker_labels: bool = True) -> str: |
| """ |
| Convert segments to SRT subtitle format. |
| |
| Args: |
| segments (List[ProcessedSegment]): Processed segments |
| use_translation (bool): Use translated text instead of original |
| include_speaker_labels (bool): Include speaker names in subtitles |
| |
| Returns: |
| str: SRT formatted string |
| """ |
| srt_lines = [] |
| |
| for i, segment in enumerate(segments, 1): |
| |
| start_time = self._seconds_to_srt_time(segment.start_time) |
| end_time = self._seconds_to_srt_time(segment.end_time) |
| |
| |
| text = segment.translated_text if use_translation else segment.original_text |
| |
| |
| if include_speaker_labels: |
| speaker_name = self._format_speaker_name(segment.speaker_id) |
| text = f"<v {speaker_name}>{text}" |
| |
| |
| if not use_translation and segment.original_language != 'en': |
| text = f"[{segment.original_language.upper()}] {text}" |
| |
| |
| srt_entry = [ |
| str(i), |
| f"{start_time} --> {end_time}", |
| text, |
| "" |
| ] |
| |
| srt_lines.extend(srt_entry) |
| |
| return "\n".join(srt_lines) |
| |
| def to_text(self, |
| segments: List[ProcessedSegment], |
| audio_metadata: Optional[Dict] = None, |
| include_word_timestamps: bool = False) -> str: |
| """ |
| Convert segments to human-readable text format. |
| |
| Args: |
| segments (List[ProcessedSegment]): Processed segments |
| audio_metadata (Dict, optional): Audio file metadata |
| include_word_timestamps (bool): Include detailed word timing |
| |
| Returns: |
| str: Formatted text string |
| """ |
| lines = [] |
| |
| |
| lines.append("=" * 80) |
| lines.append("MULTILINGUAL AUDIO INTELLIGENCE ANALYSIS") |
| lines.append("=" * 80) |
| lines.append("") |
| |
| |
| lines.append(f"Audio File: {self.audio_filename}") |
| lines.append(f"Analysis Date: {self._get_timestamp()}") |
| |
| if audio_metadata: |
| lines.append(f"Duration: {self._format_duration(audio_metadata.get('duration_seconds', 0))}") |
| lines.append(f"Sample Rate: {audio_metadata.get('sample_rate', 'Unknown')} Hz") |
| lines.append(f"Channels: {audio_metadata.get('channels', 'Unknown')}") |
| |
| lines.append("") |
| |
| |
| stats = self._generate_statistics(segments) |
| lines.append("ANALYSIS SUMMARY") |
| lines.append("-" * 40) |
| lines.append(f"Total Speakers: {len(set(seg.speaker_id for seg in segments))}") |
| lines.append(f"Languages Detected: {', '.join(set(seg.original_language for seg in segments))}") |
| lines.append(f"Total Segments: {len(segments)}") |
| lines.append(f"Speech Duration: {self._format_duration(stats['total_speech_duration'])}") |
| lines.append(f"Speech Ratio: {stats['speech_ratio']:.1%}") |
| |
| if self.processing_stats: |
| lines.append(f"Processing Time: {self.processing_stats.get('total_time', 'Unknown')}") |
| |
| lines.append("") |
| |
| |
| speaker_stats = self._generate_speaker_stats(segments) |
| lines.append("SPEAKER BREAKDOWN") |
| lines.append("-" * 40) |
| |
| for speaker_id, stats in speaker_stats.items(): |
| speaker_name = self._format_speaker_name(speaker_id) |
| lines.append(f"{speaker_name}:") |
| lines.append(f" Speaking Time: {self._format_duration(stats['total_speaking_time'])}") |
| lines.append(f" Number of Turns: {stats['number_of_turns']}") |
| lines.append(f" Average Turn: {self._format_duration(stats['average_turn_duration'])}") |
| lines.append(f" Longest Turn: {self._format_duration(stats['longest_turn'])}") |
| if stats['languages']: |
| lines.append(f" Languages: {', '.join(stats['languages'])}") |
| |
| lines.append("") |
| |
| |
| lines.append("FULL TRANSCRIPT") |
| lines.append("=" * 80) |
| lines.append("") |
| |
| for i, segment in enumerate(segments, 1): |
| |
| timestamp = f"[{self._format_duration(segment.start_time)} - {self._format_duration(segment.end_time)}]" |
| speaker_name = self._format_speaker_name(segment.speaker_id) |
| |
| lines.append(f"#{i:3d} {timestamp} {speaker_name}") |
| |
| |
| if segment.original_language != 'en': |
| lines.append(f" Original ({segment.original_language}): {segment.original_text}") |
| lines.append(f" Translation: {segment.translated_text}") |
| else: |
| lines.append(f" Text: {segment.original_text}") |
| |
| |
| lines.append(f" Confidence: D:{segment.confidence_diarization:.2f} " |
| f"T:{segment.confidence_transcription:.2f} " |
| f"TR:{segment.confidence_translation:.2f}") |
| |
| |
| if include_word_timestamps and segment.word_timestamps: |
| lines.append(" Word Timing:") |
| word_lines = [] |
| for word_info in segment.word_timestamps[:10]: |
| word_time = f"{word_info['start']:.1f}s" |
| word_lines.append(f"'{word_info['word']}'@{word_time}") |
| |
| lines.append(f" {', '.join(word_lines)}") |
| if len(segment.word_timestamps) > 10: |
| lines.append(f" ... and {len(segment.word_timestamps) - 10} more words") |
| |
| lines.append("") |
| |
| |
| lines.append("=" * 80) |
| lines.append("Generated by Multilingual Audio Intelligence System") |
| lines.append("=" * 80) |
| |
| return "\n".join(lines) |
| |
| def to_csv(self, segments: List[ProcessedSegment]) -> str: |
| """ |
| Convert segments to CSV format for data analysis. |
| |
| Args: |
| segments (List[ProcessedSegment]): Processed segments |
| |
| Returns: |
| str: CSV formatted string |
| """ |
| output = io.StringIO() |
| |
| fieldnames = [ |
| 'segment_id', 'start_time', 'end_time', 'duration', |
| 'speaker_id', 'original_language', 'original_text', |
| 'translated_text', 'confidence_diarization', |
| 'confidence_transcription', 'confidence_translation', |
| 'word_count_original', 'word_count_translated' |
| ] |
| |
| writer = csv.DictWriter(output, fieldnames=fieldnames) |
| writer.writeheader() |
| |
| for i, segment in enumerate(segments, 1): |
| row = { |
| 'segment_id': i, |
| 'start_time': segment.start_time, |
| 'end_time': segment.end_time, |
| 'duration': segment.duration, |
| 'speaker_id': segment.speaker_id, |
| 'original_language': segment.original_language, |
| 'original_text': segment.original_text, |
| 'translated_text': segment.translated_text, |
| 'confidence_diarization': segment.confidence_diarization, |
| 'confidence_transcription': segment.confidence_transcription, |
| 'confidence_translation': segment.confidence_translation, |
| 'word_count_original': len(segment.original_text.split()), |
| 'word_count_translated': len(segment.translated_text.split()) |
| } |
| writer.writerow(row) |
| |
| return output.getvalue() |
| |
| def to_timeline_json(self, segments: List[ProcessedSegment]) -> str: |
| """ |
| Convert segments to timeline JSON format for interactive visualization. |
| |
| Args: |
| segments (List[ProcessedSegment]): Processed segments |
| |
| Returns: |
| str: Timeline JSON formatted string |
| """ |
| |
| timeline_data = { |
| "title": { |
| "text": { |
| "headline": f"Audio Analysis: {self.audio_filename}", |
| "text": f"Interactive timeline of speaker segments and transcription" |
| } |
| }, |
| "events": [] |
| } |
| |
| for i, segment in enumerate(segments): |
| event = { |
| "start_date": { |
| "second": int(segment.start_time) |
| }, |
| "end_date": { |
| "second": int(segment.end_time) |
| }, |
| "text": { |
| "headline": f"{self._format_speaker_name(segment.speaker_id)} ({segment.original_language})", |
| "text": f"<p><strong>Original:</strong> {segment.original_text}</p>" |
| f"<p><strong>Translation:</strong> {segment.translated_text}</p>" |
| f"<p><em>Duration: {segment.duration:.1f}s, " |
| f"Confidence: {segment.confidence_transcription:.2f}</em></p>" |
| }, |
| "group": segment.speaker_id, |
| "media": { |
| "caption": f"Segment {i+1}: {self._format_duration(segment.start_time)} - {self._format_duration(segment.end_time)}" |
| } |
| } |
| |
| timeline_data["events"].append(event) |
| |
| return json.dumps(timeline_data, indent=2, ensure_ascii=False) |
| |
| def generate_summary(self, |
| segments: List[ProcessedSegment], |
| audio_metadata: Optional[Dict] = None) -> str: |
| """ |
| Generate a concise summary of the analysis. |
| |
| Args: |
| segments (List[ProcessedSegment]): Processed segments |
| audio_metadata (Dict, optional): Audio file metadata |
| |
| Returns: |
| str: Summary text |
| """ |
| if not segments: |
| return "No speech segments were detected in the audio file." |
| |
| stats = self._generate_statistics(segments) |
| speaker_stats = self._generate_speaker_stats(segments) |
| |
| summary_lines = [] |
| |
| |
| summary_lines.append(f"ANALYSIS SUMMARY FOR {self.audio_filename}") |
| summary_lines.append("=" * 50) |
| summary_lines.append("") |
| |
| |
| summary_lines.append(f"• {len(set(seg.speaker_id for seg in segments))} speakers detected") |
| summary_lines.append(f"• {len(segments)} speech segments identified") |
| summary_lines.append(f"• {len(set(seg.original_language for seg in segments))} languages detected: " |
| f"{', '.join(set(seg.original_language for seg in segments))}") |
| summary_lines.append(f"• {stats['speech_ratio']:.1%} of audio contains speech") |
| summary_lines.append("") |
| |
| |
| summary_lines.append("SPEAKER BREAKDOWN:") |
| for speaker_id, stats in speaker_stats.items(): |
| speaker_name = self._format_speaker_name(speaker_id) |
| percentage = (stats['total_speaking_time'] / sum(s['total_speaking_time'] for s in speaker_stats.values())) * 100 |
| summary_lines.append(f"• {speaker_name}: {self._format_duration(stats['total_speaking_time'])} " |
| f"({percentage:.1f}%) across {stats['number_of_turns']} turns") |
| |
| summary_lines.append("") |
| |
| |
| languages = set(seg.original_language for seg in segments) |
| if len(languages) > 1: |
| summary_lines.append("LANGUAGE BREAKDOWN:") |
| lang_stats = self._generate_language_stats(segments) |
| for lang, stats in lang_stats.items(): |
| percentage = (stats['speaking_time'] / sum(s['speaking_time'] for s in lang_stats.values())) * 100 |
| summary_lines.append(f"• {lang.upper()}: {self._format_duration(stats['speaking_time'])} " |
| f"({percentage:.1f}%) in {stats['segment_count']} segments") |
| summary_lines.append("") |
| |
| |
| summary_lines.append("KEY INSIGHTS:") |
| |
| |
| most_active = max(speaker_stats.items(), key=lambda x: x[1]['total_speaking_time']) |
| summary_lines.append(f"• Most active speaker: {self._format_speaker_name(most_active[0])}") |
| |
| |
| longest_segment = max(segments, key=lambda s: s.duration) |
| summary_lines.append(f"• Longest speaking turn: {self._format_duration(longest_segment.duration)} " |
| f"by {self._format_speaker_name(longest_segment.speaker_id)}") |
| |
| |
| avg_confidence = sum(seg.confidence_transcription for seg in segments) / len(segments) |
| summary_lines.append(f"• Average transcription confidence: {avg_confidence:.2f}") |
| |
| if len(languages) > 1: |
| |
| code_switches = 0 |
| for i in range(1, len(segments)): |
| if segments[i-1].speaker_id == segments[i].speaker_id and segments[i-1].original_language != segments[i].original_language: |
| code_switches += 1 |
| if code_switches > 0: |
| summary_lines.append(f"• {code_switches} potential code-switching instances detected") |
| |
| return "\n".join(summary_lines) |
| |
| def _generate_statistics(self, segments: List[ProcessedSegment]) -> Dict[str, Any]: |
| """Generate comprehensive statistics from segments.""" |
| if not segments: |
| return {} |
| |
| total_speech_duration = sum(seg.duration for seg in segments) |
| total_duration = max(seg.end_time for seg in segments) if segments else 0 |
| |
| return { |
| 'total_duration': total_duration, |
| 'total_speech_duration': total_speech_duration, |
| 'speech_ratio': total_speech_duration / total_duration if total_duration > 0 else 0, |
| 'average_segment_duration': total_speech_duration / len(segments), |
| 'longest_segment': max(seg.duration for seg in segments), |
| 'shortest_segment': min(seg.duration for seg in segments), |
| 'average_confidence_diarization': sum(seg.confidence_diarization for seg in segments) / len(segments), |
| 'average_confidence_transcription': sum(seg.confidence_transcription for seg in segments) / len(segments), |
| 'average_confidence_translation': sum(seg.confidence_translation for seg in segments) / len(segments), |
| 'total_words_original': sum(len(seg.original_text.split()) for seg in segments), |
| 'total_words_translated': sum(len(seg.translated_text.split()) for seg in segments) |
| } |
| |
| def _generate_speaker_stats(self, segments: List[ProcessedSegment]) -> Dict[str, Dict]: |
| """Generate per-speaker statistics.""" |
| speaker_stats = {} |
| |
| for segment in segments: |
| speaker_id = segment.speaker_id |
| |
| if speaker_id not in speaker_stats: |
| speaker_stats[speaker_id] = { |
| 'total_speaking_time': 0.0, |
| 'number_of_turns': 0, |
| 'longest_turn': 0.0, |
| 'shortest_turn': float('inf'), |
| 'languages': set() |
| } |
| |
| stats = speaker_stats[speaker_id] |
| stats['total_speaking_time'] += segment.duration |
| stats['number_of_turns'] += 1 |
| stats['longest_turn'] = max(stats['longest_turn'], segment.duration) |
| stats['shortest_turn'] = min(stats['shortest_turn'], segment.duration) |
| stats['languages'].add(segment.original_language) |
| |
| |
| for speaker_id, stats in speaker_stats.items(): |
| if stats['number_of_turns'] > 0: |
| stats['average_turn_duration'] = stats['total_speaking_time'] / stats['number_of_turns'] |
| else: |
| stats['average_turn_duration'] = 0.0 |
| |
| if stats['shortest_turn'] == float('inf'): |
| stats['shortest_turn'] = 0.0 |
| |
| stats['languages'] = list(stats['languages']) |
| |
| return speaker_stats |
| |
| def _generate_language_stats(self, segments: List[ProcessedSegment]) -> Dict[str, Dict]: |
| """Generate per-language statistics.""" |
| language_stats = {} |
| |
| for segment in segments: |
| lang = segment.original_language |
| |
| if lang not in language_stats: |
| language_stats[lang] = { |
| 'speaking_time': 0.0, |
| 'segment_count': 0, |
| 'speakers': set() |
| } |
| |
| stats = language_stats[lang] |
| stats['speaking_time'] += segment.duration |
| stats['segment_count'] += 1 |
| stats['speakers'].add(segment.speaker_id) |
| |
| |
| for lang, stats in language_stats.items(): |
| stats['speakers'] = list(stats['speakers']) |
| |
| return language_stats |
| |
| def _seconds_to_srt_time(self, seconds: float) -> str: |
| """Convert seconds to SRT timestamp format (HH:MM:SS,mmm).""" |
| td = timedelta(seconds=seconds) |
| hours, remainder = divmod(td.total_seconds(), 3600) |
| minutes, seconds = divmod(remainder, 60) |
| milliseconds = int((seconds % 1) * 1000) |
| |
| return f"{int(hours):02d}:{int(minutes):02d}:{int(seconds):02d},{milliseconds:03d}" |
| |
| def _format_duration(self, seconds: float) -> str: |
| """Format duration in human-readable format.""" |
| if seconds < 60: |
| return f"{seconds:.1f}s" |
| elif seconds < 3600: |
| minutes = int(seconds // 60) |
| secs = seconds % 60 |
| return f"{minutes}m {secs:.1f}s" |
| else: |
| hours = int(seconds // 3600) |
| minutes = int((seconds % 3600) // 60) |
| secs = seconds % 60 |
| return f"{hours}h {minutes}m {secs:.1f}s" |
| |
| def _format_speaker_name(self, speaker_id: str) -> str: |
| """Format speaker ID into a readable name.""" |
| if speaker_id.startswith("SPEAKER_"): |
| number = speaker_id.replace("SPEAKER_", "") |
| return f"Speaker {number}" |
| return speaker_id.replace("_", " ").title() |
| |
| def _get_timestamp(self) -> str: |
| """Get current timestamp in ISO format.""" |
| from datetime import datetime |
| return datetime.now().isoformat() |
|
|
|
|
| |
| def create_processed_segment(start_time: float, |
| end_time: float, |
| speaker_id: str, |
| original_text: str, |
| original_language: str, |
| translated_text: str, |
| **kwargs) -> ProcessedSegment: |
| """ |
| Convenience function to create a ProcessedSegment. |
| |
| Args: |
| start_time (float): Segment start time |
| end_time (float): Segment end time |
| speaker_id (str): Speaker identifier |
| original_text (str): Original transcribed text |
| original_language (str): Original language code |
| translated_text (str): Translated text |
| **kwargs: Additional optional parameters |
| |
| Returns: |
| ProcessedSegment: Created segment object |
| """ |
| return ProcessedSegment( |
| start_time=start_time, |
| end_time=end_time, |
| speaker_id=speaker_id, |
| original_text=original_text, |
| original_language=original_language, |
| translated_text=translated_text, |
| **kwargs |
| ) |
|
|
|
|
| def format_pipeline_output(diarization_segments, |
| transcription_segments, |
| translation_results, |
| audio_filename: str = "audio_file", |
| audio_metadata: Optional[Dict] = None) -> Dict[str, str]: |
| """ |
| Convenience function to format complete pipeline output. |
| |
| Args: |
| diarization_segments: Speaker diarization results |
| transcription_segments: Speech recognition results |
| translation_results: Translation results |
| audio_filename (str): Original audio filename |
| audio_metadata (Dict, optional): Audio file metadata |
| |
| Returns: |
| Dict[str, str]: All formatted outputs |
| """ |
| |
| processed_segments = [] |
| |
| |
| for i, (diar_seg, trans_seg, trans_result) in enumerate( |
| zip(diarization_segments, transcription_segments, translation_results) |
| ): |
| segment = ProcessedSegment( |
| start_time=diar_seg.start_time, |
| end_time=diar_seg.end_time, |
| speaker_id=diar_seg.speaker_id, |
| original_text=trans_seg.text, |
| original_language=trans_seg.language, |
| translated_text=trans_result.translated_text, |
| confidence_diarization=diar_seg.confidence, |
| confidence_transcription=trans_seg.confidence, |
| confidence_translation=trans_result.confidence, |
| word_timestamps=trans_seg.word_timestamps |
| ) |
| processed_segments.append(segment) |
| |
| |
| formatter = OutputFormatter(audio_filename) |
| return formatter.format_all_outputs(processed_segments, audio_metadata) |
|
|
|
|
| |
| if __name__ == "__main__": |
| import argparse |
| |
| def main(): |
| """Command line interface for testing output formatting.""" |
| parser = argparse.ArgumentParser(description="Audio Analysis Output Formatter") |
| parser.add_argument("--demo", action="store_true", |
| help="Run with demo data") |
| parser.add_argument("--format", choices=["json", "srt", "text", "csv", "timeline", "all"], |
| default="all", help="Output format to generate") |
| parser.add_argument("--output-file", "-o", |
| help="Save output to file instead of printing") |
| |
| args = parser.parse_args() |
| |
| if args.demo: |
| |
| demo_segments = [ |
| ProcessedSegment( |
| start_time=0.0, end_time=3.5, |
| speaker_id="SPEAKER_00", |
| original_text="Hello, how are you today?", |
| original_language="en", |
| translated_text="Hello, how are you today?", |
| confidence_diarization=0.95, |
| confidence_transcription=0.92, |
| confidence_translation=1.0, |
| word_timestamps=[ |
| {"word": "Hello", "start": 0.0, "end": 0.5, "confidence": 0.99}, |
| {"word": "how", "start": 1.0, "end": 1.2, "confidence": 0.98}, |
| {"word": "are", "start": 1.3, "end": 1.5, "confidence": 0.97}, |
| {"word": "you", "start": 1.6, "end": 1.9, "confidence": 0.98}, |
| {"word": "today", "start": 2.5, "end": 3.2, "confidence": 0.96} |
| ] |
| ), |
| ProcessedSegment( |
| start_time=4.0, end_time=7.8, |
| speaker_id="SPEAKER_01", |
| original_text="Bonjour, comment allez-vous?", |
| original_language="fr", |
| translated_text="Hello, how are you?", |
| confidence_diarization=0.87, |
| confidence_transcription=0.89, |
| confidence_translation=0.94 |
| ), |
| ProcessedSegment( |
| start_time=8.5, end_time=12.1, |
| speaker_id="SPEAKER_00", |
| original_text="I'm doing well, thank you. What about you?", |
| original_language="en", |
| translated_text="I'm doing well, thank you. What about you?", |
| confidence_diarization=0.93, |
| confidence_transcription=0.95, |
| confidence_translation=1.0 |
| ), |
| ProcessedSegment( |
| start_time=13.0, end_time=16.2, |
| speaker_id="SPEAKER_01", |
| original_text="Ça va très bien, merci beaucoup!", |
| original_language="fr", |
| translated_text="I'm doing very well, thank you very much!", |
| confidence_diarization=0.91, |
| confidence_transcription=0.88, |
| confidence_translation=0.92 |
| ) |
| ] |
| |
| demo_metadata = { |
| "duration_seconds": 16.2, |
| "sample_rate": 16000, |
| "channels": 1 |
| } |
| |
| |
| formatter = OutputFormatter("demo_conversation.wav") |
| |
| if args.format == "all": |
| outputs = formatter.format_all_outputs(demo_segments, demo_metadata) |
| |
| if args.output_file: |
| |
| base_name = args.output_file.rsplit('.', 1)[0] |
| for format_type, content in outputs.items(): |
| filename = f"{base_name}.{format_type}" |
| with open(filename, 'w', encoding='utf-8') as f: |
| f.write(content) |
| print(f"Saved {format_type} output to {filename}") |
| else: |
| |
| for format_type, content in outputs.items(): |
| print(f"\n{'='*20} {format_type.upper()} {'='*20}") |
| print(content) |
| else: |
| |
| if args.format == "json": |
| output = formatter.to_json(demo_segments, demo_metadata) |
| elif args.format == "srt": |
| output = formatter.to_srt(demo_segments, use_translation=False) |
| elif args.format == "text": |
| output = formatter.to_text(demo_segments, demo_metadata) |
| elif args.format == "csv": |
| output = formatter.to_csv(demo_segments) |
| elif args.format == "timeline": |
| output = formatter.to_timeline_json(demo_segments) |
| |
| if args.output_file: |
| with open(args.output_file, 'w', encoding='utf-8') as f: |
| f.write(output) |
| print(f"Output saved to {args.output_file}") |
| else: |
| print(output) |
| |
| else: |
| print("Please use --demo flag to run with demo data, or integrate with your audio processing pipeline.") |
| |
| main() |