|
|
import os |
|
|
from typing import List, Dict |
|
|
|
|
|
from moviepy.editor import VideoFileClip |
|
|
from openai import OpenAI |
|
|
|
|
|
|
|
|
class AudioExtractor: |
|
|
def __init__(self, openai_api_key: str = None, **kwargs): |
|
|
self.openai_api_key = openai_api_key |
|
|
self.client = None |
|
|
if openai_api_key: |
|
|
self.client = OpenAI(api_key=openai_api_key) |
|
|
|
|
|
def extract_audio(self, video_path: str, output_path: str = None) -> str: |
|
|
""" |
|
|
Extract audio track from video. |
|
|
|
|
|
Returns: |
|
|
Path to extracted MP3 file (better for Whisper API) |
|
|
""" |
|
|
if output_path is None: |
|
|
output_path = video_path.rsplit('.', 1)[0] + '.mp3' |
|
|
|
|
|
video = VideoFileClip(video_path) |
|
|
video.audio.write_audiofile(output_path, codec='mp3', verbose=False, logger=None) |
|
|
video.close() |
|
|
|
|
|
return output_path |
|
|
|
|
|
def transcribe(self, audio_path: str) -> List[Dict]: |
|
|
""" |
|
|
Transcribe audio with timestamps using OpenAI Whisper API. |
|
|
|
|
|
Returns: |
|
|
List of segments: [ |
|
|
{"start": 0.0, "end": 3.2, "text": "Tired of everyday exhaustion?"}, |
|
|
{"start": 3.2, "end": 7.1, "text": "Meet the new SuperVit..."}, |
|
|
... |
|
|
] |
|
|
""" |
|
|
if not self.client: |
|
|
print("OpenAI API key not configured") |
|
|
return [] |
|
|
|
|
|
try: |
|
|
with open(audio_path, "rb") as audio_file: |
|
|
|
|
|
response = self.client.audio.transcriptions.create( |
|
|
model="whisper-1", |
|
|
file=audio_file, |
|
|
response_format="verbose_json", |
|
|
timestamp_granularities=["segment"] |
|
|
) |
|
|
|
|
|
segments = [] |
|
|
|
|
|
|
|
|
if hasattr(response, 'segments') and response.segments: |
|
|
for segment in response.segments: |
|
|
segments.append({ |
|
|
"start": segment.get('start', 0) if isinstance(segment, dict) else getattr(segment, 'start', 0), |
|
|
"end": segment.get('end', 0) if isinstance(segment, dict) else getattr(segment, 'end', 0), |
|
|
"text": (segment.get('text', '') if isinstance(segment, dict) else getattr(segment, 'text', '')).strip() |
|
|
}) |
|
|
elif hasattr(response, 'text') and response.text: |
|
|
|
|
|
segments.append({ |
|
|
"start": 0.0, |
|
|
"end": 0.0, |
|
|
"text": response.text.strip() |
|
|
}) |
|
|
|
|
|
return segments |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Transcription error: {e}") |
|
|
return [] |
|
|
|