| import io |
| import os |
| import tempfile |
| import threading |
| import base64 |
| import logging |
| from urllib.parse import urlparse |
|
|
| import dash |
| from dash import dcc, html, Input, Output, State, callback_context |
| import dash_bootstrap_components as dbc |
| from dash.exceptions import PreventUpdate |
|
|
| import requests |
| from pytube import YouTube |
| from pydub import AudioSegment |
| import openai |
|
|
| |
| try: |
| from moviepy.editor import VideoFileClip |
| except ImportError: |
| try: |
| import moviepy.editor as mpy |
| VideoFileClip = mpy.VideoFileClip |
| except ImportError: |
| try: |
| import moviepy |
| VideoFileClip = moviepy.VideoFileClip |
| except ImportError: |
| logging.error("Failed to import VideoFileClip from moviepy. Please check the installation.") |
| VideoFileClip = None |
|
|
| |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
| logger = logging.getLogger(__name__) |
|
|
| |
| app = dash.Dash(__name__, external_stylesheets=[dbc.themes.BOOTSTRAP]) |
|
|
| |
| OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") |
| if not OPENAI_API_KEY: |
| logger.error("OPENAI_API_KEY not found in environment variables") |
| raise ValueError("OPENAI_API_KEY not set") |
|
|
| openai.api_key = OPENAI_API_KEY |
|
|
| def is_valid_url(url): |
| try: |
| result = urlparse(url) |
| return all([result.scheme, result.netloc]) |
| except ValueError: |
| logger.error(f"Invalid URL: {url}") |
| return False |
|
|
| def download_media(url): |
| logger.info(f"Attempting to download media from URL: {url}") |
| try: |
| if "youtube.com" in url or "youtu.be" in url: |
| yt = YouTube(url) |
| stream = yt.streams.filter(progressive=True, file_extension='mp4').first() |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as temp_file: |
| stream.download(output_path=os.path.dirname(temp_file.name), filename=temp_file.name) |
| logger.info(f"YouTube video downloaded: {temp_file.name}") |
| return temp_file.name |
| else: |
| response = requests.get(url) |
| content_type = response.headers.get('content-type', '') |
| if 'video' in content_type: |
| suffix = '.mp4' |
| elif 'audio' in content_type: |
| suffix = '.mp3' |
| else: |
| suffix = '' |
| with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp_file: |
| temp_file.write(response.content) |
| logger.info(f"Media downloaded: {temp_file.name}") |
| return temp_file.name |
| except Exception as e: |
| logger.error(f"Error downloading media: {str(e)}") |
| raise |
|
|
| def extract_audio(file_path): |
| logger.info(f"Extracting audio from video: {file_path}") |
| try: |
| if VideoFileClip is None: |
| raise ImportError("VideoFileClip is not available. Cannot extract audio.") |
| video = VideoFileClip(file_path) |
| audio = video.audio |
| audio_file = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") |
| audio.write_audiofile(audio_file.name) |
| video.close() |
| audio.close() |
| logger.info(f"Audio extracted: {audio_file.name}") |
| return audio_file.name |
| except Exception as e: |
| logger.error(f"Error extracting audio: {str(e)}") |
| raise |
|
|
| def transcribe_audio(file_path): |
| logger.info(f"Transcribing audio: {file_path}") |
| try: |
| with open(file_path, "rb") as audio_file: |
| transcript = openai.Audio.transcribe("whisper-1", audio_file) |
| logger.info("Transcription completed successfully") |
| return transcript["text"] |
| except Exception as e: |
| logger.error(f"Error during transcription: {str(e)}") |
| raise |
|
|
| def process_media(contents, filename, url): |
| logger.info("Starting media processing") |
| try: |
| if contents: |
| content_type, content_string = contents.split(',') |
| decoded = base64.b64decode(content_string) |
| suffix = os.path.splitext(filename)[1] |
| with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp_file: |
| temp_file.write(decoded) |
| temp_file_path = temp_file.name |
| logger.info(f"File uploaded: {temp_file_path}") |
| elif url: |
| temp_file_path = download_media(url) |
| else: |
| logger.error("No input provided") |
| raise ValueError("No input provided") |
|
|
| if temp_file_path.lower().endswith(('.mp4', '.avi', '.mov', '.flv', '.wmv')): |
| logger.info("Video file detected, extracting audio") |
| audio_file_path = extract_audio(temp_file_path) |
| transcript = transcribe_audio(audio_file_path) |
| os.unlink(audio_file_path) |
| else: |
| logger.info("Audio file detected, transcribing directly") |
| transcript = transcribe_audio(temp_file_path) |
|
|
| os.unlink(temp_file_path) |
| return transcript |
| except Exception as e: |
| logger.error(f"Error in process_media: {str(e)}") |
| raise |
|
|
| app.layout = dbc.Container([ |
| html.H1("Audio/Video Transcription App", className="text-center my-4"), |
| dbc.Card([ |
| dbc.CardBody([ |
| dcc.Upload( |
| id='upload-media', |
| children=html.Div([ |
| 'Drag and Drop or ', |
| html.A('Select Audio/Video File') |
| ]), |
| style={ |
| 'width': '100%', |
| 'height': '60px', |
| 'lineHeight': '60px', |
| 'borderWidth': '1px', |
| 'borderStyle': 'dashed', |
| 'borderRadius': '5px', |
| 'textAlign': 'center', |
| 'margin': '10px' |
| }, |
| multiple=False |
| ), |
| html.Div(id='file-info', className="mt-2"), |
| dbc.Input(id="media-url", type="text", placeholder="Enter audio/video URL or YouTube link", className="my-3"), |
| dbc.Button("Transcribe", id="transcribe-button", color="primary", className="w-100 mb-3"), |
| dbc.Spinner(html.Div(id="transcription-output", className="mt-3")), |
| html.Div(id="progress-indicator", className="text-center mt-3"), |
| dbc.Button("Download Transcript", id="download-button", color="secondary", className="w-100 mt-3", style={'display': 'none'}), |
| dcc.Download(id="download-transcript"), |
| dcc.Interval(id='progress-interval', interval=500, n_intervals=0, disabled=True) |
| ]) |
| ]) |
| ]) |
|
|
| @app.callback( |
| Output("file-info", "children"), |
| Input("upload-media", "filename"), |
| Input("upload-media", "last_modified") |
| ) |
| def update_file_info(filename, last_modified): |
| if filename is not None: |
| return f"File uploaded: {filename}" |
| return "" |
|
|
| @app.callback( |
| Output("transcription-output", "children"), |
| Output("download-button", "style"), |
| Output("progress-indicator", "children"), |
| Output("progress-interval", "disabled"), |
| Input("transcribe-button", "n_clicks"), |
| Input("progress-interval", "n_intervals"), |
| State("upload-media", "contents"), |
| State("upload-media", "filename"), |
| State("media-url", "value"), |
| prevent_initial_call=True |
| ) |
| def update_transcription(n_clicks, n_intervals, contents, filename, url): |
| ctx = callback_context |
| if ctx.triggered_id == "transcribe-button": |
| if not contents and not url: |
| raise PreventUpdate |
|
|
| def transcribe(): |
| try: |
| return process_media(contents, filename, url) |
| except Exception as e: |
| logger.error(f"Transcription failed: {str(e)}") |
| return f"An error occurred: {str(e)}" |
|
|
| thread = threading.Thread(target=transcribe) |
| thread.start() |
| return html.Div("Processing..."), {'display': 'none'}, "", False |
|
|
| elif ctx.triggered_id == "progress-interval": |
| dots = "." * (n_intervals % 4) |
| return html.Div("Processing" + dots), {'display': 'none'}, "", False |
|
|
| thread = threading.current_thread() |
| if hasattr(thread, 'result'): |
| transcript = thread.result |
| if transcript and not transcript.startswith("An error occurred"): |
| logger.info("Transcription successful") |
| return dbc.Card([ |
| dbc.CardBody([ |
| html.H5("Transcription Result"), |
| html.Pre(transcript, style={"white-space": "pre-wrap", "word-wrap": "break-word"}) |
| ]) |
| ]), {'display': 'block'}, "", True |
| else: |
| logger.error(f"Transcription failed: {transcript}") |
| return transcript, {'display': 'none'}, "", True |
|
|
| return dash.no_update, dash.no_update, dash.no_update, dash.no_update |
|
|
| @app.callback( |
| Output("download-transcript", "data"), |
| Input("download-button", "n_clicks"), |
| State("transcription-output", "children"), |
| prevent_initial_call=True |
| ) |
| def download_transcript(n_clicks, transcription_output): |
| if not transcription_output: |
| raise PreventUpdate |
| |
| transcript = transcription_output['props']['children'][0]['props']['children'][1]['props']['children'] |
| return dict(content=transcript, filename="transcript.txt") |
|
|
| if __name__ == '__main__': |
| logger.info("Starting the Dash application...") |
| app.run(debug=True, host='0.0.0.0', port=7860) |
| logger.info("Dash application has finished running.") |