File size: 5,706 Bytes
4051511
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
import os
import tempfile
from pathlib import Path
from typing import Tuple, Optional
import ffmpeg
from pydub import AudioSegment
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class AudioProcessor:
    """Handles audio extraction, conversion, and chunking"""

    SUPPORTED_FORMATS = {
        'audio': ['.mp3', '.wav', '.m4a', '.flac', '.aac', '.ogg', '.wma'],
        'video': ['.mp4', '.avi', '.mkv', '.mov', '.wmv', '.flv', '.webm']
    }

    CHUNK_DURATION_MS = 30 * 60 * 1000  # 30 minutes in milliseconds
    OVERLAP_MS = 2000  # 2 second overlap between chunks

    @staticmethod
    def is_supported_file(file_path: str) -> bool:
        """Check if file format is supported"""
        ext = Path(file_path).suffix.lower()
        all_formats = AudioProcessor.SUPPORTED_FORMATS['audio'] + AudioProcessor.SUPPORTED_FORMATS['video']
        return ext in all_formats

    @staticmethod
    def extract_audio(input_file: str, output_format: str = 'wav', progress_callback=None) -> str:
        """
        Extract audio from video or convert audio to desired format

        Args:
            input_file: Path to input file
            output_format: Desired output format (wav, mp3)
            progress_callback: Optional callback for progress updates

        Returns:
            Path to extracted/converted audio file
        """
        if progress_callback:
            progress_callback("Extracting audio from file...")

        output_file = tempfile.NamedTemporaryFile(
            delete=False,
            suffix=f'.{output_format}'
        ).name

        try:
            # Use ffmpeg to extract audio
            stream = ffmpeg.input(input_file)
            stream = ffmpeg.output(
                stream,
                output_file,
                acodec='pcm_s16le' if output_format == 'wav' else 'libmp3lame',
                ar='16000',  # 16kHz sample rate (Whisper's preference)
                ac=1  # Mono channel
            )
            ffmpeg.run(stream, overwrite_output=True, capture_stdout=True, capture_stderr=True)

            if progress_callback:
                progress_callback("Audio extraction complete")

            logger.info(f"Audio extracted to: {output_file}")
            return output_file

        except ffmpeg.Error as e:
            logger.error(f"FFmpeg error: {e.stderr.decode()}")
            raise Exception(f"Failed to extract audio: {e.stderr.decode()}")

    @staticmethod
    def get_audio_duration(file_path: str) -> float:
        """Get audio duration in seconds"""
        try:
            probe = ffmpeg.probe(file_path)
            duration = float(probe['streams'][0]['duration'])
            return duration
        except Exception as e:
            logger.error(f"Failed to get duration: {e}")
            # Fallback to pydub
            audio = AudioSegment.from_file(file_path)
            return len(audio) / 1000.0

    @staticmethod
    def chunk_audio(file_path: str, progress_callback=None) -> list:
        """
        Split audio into chunks for processing large files

        Args:
            file_path: Path to audio file
            progress_callback: Optional callback for progress updates

        Returns:
            List of tuples: [(chunk_file_path, start_time_offset), ...]
        """
        if progress_callback:
            progress_callback("Loading audio file for chunking...")

        audio = AudioSegment.from_file(file_path)
        duration_ms = len(audio)

        # If audio is shorter than chunk duration, return as single chunk
        if duration_ms <= AudioProcessor.CHUNK_DURATION_MS:
            if progress_callback:
                progress_callback("File is small enough, no chunking needed")
            return [(file_path, 0.0)]

        chunks = []
        chunk_index = 0
        start_ms = 0

        total_chunks = (duration_ms // AudioProcessor.CHUNK_DURATION_MS) + 1

        while start_ms < duration_ms:
            if progress_callback:
                progress_callback(f"Creating chunk {chunk_index + 1}/{total_chunks}...")

            # Calculate end position
            end_ms = min(start_ms + AudioProcessor.CHUNK_DURATION_MS, duration_ms)

            # Extract chunk
            chunk = audio[start_ms:end_ms]

            # Save chunk to temporary file
            chunk_file = tempfile.NamedTemporaryFile(
                delete=False,
                suffix='.wav',
                prefix=f'chunk_{chunk_index}_'
            ).name

            chunk.export(chunk_file, format='wav')

            # Store chunk with its time offset in seconds
            chunks.append((chunk_file, start_ms / 1000.0))

            logger.info(f"Created chunk {chunk_index}: {start_ms/1000:.2f}s - {end_ms/1000:.2f}s")

            # Move to next chunk with overlap
            start_ms += AudioProcessor.CHUNK_DURATION_MS - AudioProcessor.OVERLAP_MS
            chunk_index += 1

        if progress_callback:
            progress_callback(f"Created {len(chunks)} chunks for processing")

        return chunks

    @staticmethod
    def cleanup_temp_files(*file_paths):
        """Clean up temporary files"""
        for file_path in file_paths:
            try:
                if file_path and os.path.exists(file_path):
                    os.remove(file_path)
                    logger.info(f"Cleaned up: {file_path}")
            except Exception as e:
                logger.warning(f"Failed to clean up {file_path}: {e}")

    @staticmethod
    def get_file_size_mb(file_path: str) -> float:
        """Get file size in MB"""
        size_bytes = os.path.getsize(file_path)
        return size_bytes / (1024 * 1024)