Spaces:
Paused
Paused
| import streamlit as st | |
| import os | |
| import tempfile | |
| import shutil | |
| from pathlib import Path | |
| import subprocess | |
| import json | |
| from file_manager import get_path, get_session_dir | |
| import pycaps.video.render.audio_utils as audio_utils | |
| from pycaps import WhisperAudioTranscriber, GoogleAudioTranscriber | |
| from utils import go_to_step, acquire_lock_slot, handle_unexpected_exception | |
| from config import MAX_VIDEO_SIZE, MAX_VIDEO_DURATION, MAX_CONCURRENT_JOBS, SUPPORTED_LANGUAGES | |
| def get_video_duration(video_path: str) -> float: | |
| """Gets video duration in seconds using ffprobe.""" | |
| try: | |
| cmd = [ | |
| "ffprobe", | |
| "-v", "quiet", | |
| "-print_format", "json", | |
| "-show_format", | |
| str(video_path), | |
| ] | |
| result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True) | |
| data = json.loads(result.stdout) | |
| return float(data["format"]["duration"]) | |
| except (subprocess.CalledProcessError, FileNotFoundError, KeyError, json.JSONDecodeError) as e: | |
| st.error(f"Could not analyze video file to get duration. Error: {e}") | |
| return -1 | |
| def setup_google_credentials(): | |
| if "GOOGLE_JSON_CREDENTIALS" not in os.environ: | |
| return False | |
| with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json", encoding="utf-8", dir=get_session_dir()) as temp_file: | |
| temp_file.write(os.environ["GOOGLE_JSON_CREDENTIALS"]) | |
| os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = temp_file.name | |
| return True | |
| def get_transcriber_instance(language_key: str): | |
| """ | |
| Dynamically selects the best available transcriber. | |
| Prefers Google STT if available, otherwise falls back to Whisper. | |
| """ | |
| google_lang_code, whisper_lang_code = SUPPORTED_LANGUAGES[language_key] | |
| try: | |
| was_set = setup_google_credentials() | |
| if not was_set: | |
| raise Exception("Unable to setup google credentials") | |
| transcriber = GoogleAudioTranscriber(language=google_lang_code) | |
| transcriber._get_client() | |
| st.session_state.transcriber_used = "Google Speech-to-Text V1" | |
| return transcriber | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| st.warning("Google Speech-to-Text not available, falling back to Whisper. Processing may be slower.") | |
| st.session_state.transcriber_used = "Whisper (base model)" | |
| return WhisperAudioTranscriber(model_size="base", language=whisper_lang_code) | |
| def render_step1(): | |
| st.header("Upload Your Video") | |
| if st.session_state.active_jobs >= MAX_CONCURRENT_JOBS: | |
| st.warning("🚧 All our processing slots are currently busy. Please check back in a few minutes.") | |
| st.info("Tip: You can also duplicate this space to get your own private and free, full-speed version instantly!") | |
| st.progress(1.0) | |
| if st.button("Refresh Status"): | |
| st.rerun() | |
| return | |
| st.warning( | |
| """ | |
| **Heads-up on Transcription Quality:** | |
| To keep this online demo fast, it uses a basic real-time transcription model. The accuracy might be lower than you'd expect. | |
| For the highest quality and powerful AI transcription, please use the main `pycaps` tool, which leverages **Whisper**. You can check it out on [GitHub](https://github.com/francozanardi/pycaps). | |
| """ | |
| ) | |
| st.info( | |
| """ | |
| **Note on Performance:** | |
| This is a free, shared demo running on community hardware. If you experience slowdowns or queues, it's because others are using it too! | |
| For a private, full-speed experience, you can **duplicate this Space for free** on your own Hugging Face account in just one click. | |
| """ | |
| ) | |
| if 'audio_being_analyzed' not in st.session_state: | |
| st.session_state['audio_being_analyzed'] = False | |
| st.info(f"For this demo, please upload a video shorter than **{MAX_VIDEO_DURATION} seconds**.") | |
| col1, col2 = st.columns([2, 1]) | |
| with col1: | |
| uploaded_file = st.file_uploader( | |
| f"Select a video file (max {MAX_VIDEO_SIZE // (1024*1024)}MB)", | |
| type=["mp4", "mov"], | |
| key=f"uploader_{st.session_state.session_id}" | |
| ) | |
| with col2: | |
| selected_language_key = st.selectbox( | |
| "Select Audio Language", | |
| options=list(SUPPORTED_LANGUAGES.keys()), | |
| key="language_selector" | |
| ) | |
| if not uploaded_file: | |
| return | |
| if uploaded_file.size > MAX_VIDEO_SIZE: | |
| st.error(f"File is too large ({uploaded_file.size / (1024*1024):.1f}MB). Max is {MAX_VIDEO_SIZE // (1024*1024)}MB.") | |
| return | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=Path(uploaded_file.name).suffix) as tmp_file: | |
| tmp_file.write(uploaded_file.getvalue()) | |
| temp_video_path = tmp_file.name | |
| duration = get_video_duration(temp_video_path) | |
| if duration < 0: | |
| os.remove(temp_video_path) | |
| return | |
| if duration > MAX_VIDEO_DURATION: | |
| st.error(f"Video is too long ({duration:.1f}s). Max duration for the demo is {MAX_VIDEO_DURATION} seconds.") | |
| os.remove(temp_video_path) | |
| return | |
| # Si todo está bien, mostramos el botón | |
| if st.button("Start Transcription", type="primary", disabled=st.session_state.audio_being_analyzed): | |
| lock_file = acquire_lock_slot() | |
| if not lock_file: | |
| st.error("Sorry, all slots were taken just now. Please try again.") | |
| os.remove(temp_video_path) | |
| st.rerun() | |
| st.session_state.lock_file_path = lock_file | |
| st.session_state.temp_video_path = temp_video_path | |
| st.session_state.selected_language = selected_language_key | |
| st.session_state.audio_being_analyzed = True | |
| st.rerun() | |
| if st.session_state.audio_being_analyzed: | |
| try: | |
| video_path = Path(st.session_state.temp_video_path) | |
| language_key = st.session_state.selected_language | |
| transcriber = get_transcriber_instance(language_key) | |
| with st.spinner(f"Transcribing audio with {st.session_state.transcriber_used}... 🎧"): | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_audio: | |
| audio_path = tmp_audio.name | |
| audio_utils.extract_audio_for_whisper(str(video_path), audio_path) | |
| document = transcriber.transcribe(audio_path) | |
| st.session_state.transcribed_doc = document.to_dict() | |
| persisted_path = get_path("input.mp4") | |
| shutil.copy(video_path, persisted_path) | |
| st.session_state.video_path = persisted_path | |
| os.remove(video_path) | |
| os.remove(audio_path) | |
| del st.session_state.temp_video_path | |
| del st.session_state.selected_language | |
| st.session_state.audio_being_analyzed = False | |
| go_to_step(2) | |
| st.rerun() | |
| except Exception as e: | |
| if "temp_video_path" in st.session_state and os.path.exists(st.session_state.temp_video_path): | |
| os.remove(st.session_state.temp_video_path) | |
| handle_unexpected_exception(e) | |