pycaps / src /ui /step1_upload.py
Franco Zanardi
fix: google stt was not available
82e6c79
import streamlit as st
import os
import tempfile
import shutil
from pathlib import Path
import subprocess
import json
from file_manager import get_path, get_session_dir
import pycaps.video.render.audio_utils as audio_utils
from pycaps import WhisperAudioTranscriber, GoogleAudioTranscriber
from utils import go_to_step, acquire_lock_slot, handle_unexpected_exception
from config import MAX_VIDEO_SIZE, MAX_VIDEO_DURATION, MAX_CONCURRENT_JOBS, SUPPORTED_LANGUAGES
def get_video_duration(video_path: str) -> float:
"""Gets video duration in seconds using ffprobe."""
try:
cmd = [
"ffprobe",
"-v", "quiet",
"-print_format", "json",
"-show_format",
str(video_path),
]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True)
data = json.loads(result.stdout)
return float(data["format"]["duration"])
except (subprocess.CalledProcessError, FileNotFoundError, KeyError, json.JSONDecodeError) as e:
st.error(f"Could not analyze video file to get duration. Error: {e}")
return -1
def setup_google_credentials():
if "GOOGLE_JSON_CREDENTIALS" not in os.environ:
return False
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json", encoding="utf-8", dir=get_session_dir()) as temp_file:
temp_file.write(os.environ["GOOGLE_JSON_CREDENTIALS"])
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = temp_file.name
return True
def get_transcriber_instance(language_key: str):
"""
Dynamically selects the best available transcriber.
Prefers Google STT if available, otherwise falls back to Whisper.
"""
google_lang_code, whisper_lang_code = SUPPORTED_LANGUAGES[language_key]
try:
was_set = setup_google_credentials()
if not was_set:
raise Exception("Unable to setup google credentials")
transcriber = GoogleAudioTranscriber(language=google_lang_code)
transcriber._get_client()
st.session_state.transcriber_used = "Google Speech-to-Text V1"
return transcriber
except Exception as e:
import traceback
traceback.print_exc()
st.warning("Google Speech-to-Text not available, falling back to Whisper. Processing may be slower.")
st.session_state.transcriber_used = "Whisper (base model)"
return WhisperAudioTranscriber(model_size="base", language=whisper_lang_code)
def render_step1():
st.header("Upload Your Video")
if st.session_state.active_jobs >= MAX_CONCURRENT_JOBS:
st.warning("🚧 All our processing slots are currently busy. Please check back in a few minutes.")
st.info("Tip: You can also duplicate this space to get your own private and free, full-speed version instantly!")
st.progress(1.0)
if st.button("Refresh Status"):
st.rerun()
return
st.warning(
"""
**Heads-up on Transcription Quality:**
To keep this online demo fast, it uses a basic real-time transcription model. The accuracy might be lower than you'd expect.
For the highest quality and powerful AI transcription, please use the main `pycaps` tool, which leverages **Whisper**. You can check it out on [GitHub](https://github.com/francozanardi/pycaps).
"""
)
st.info(
"""
**Note on Performance:**
This is a free, shared demo running on community hardware. If you experience slowdowns or queues, it's because others are using it too!
For a private, full-speed experience, you can **duplicate this Space for free** on your own Hugging Face account in just one click.
"""
)
if 'audio_being_analyzed' not in st.session_state:
st.session_state['audio_being_analyzed'] = False
st.info(f"For this demo, please upload a video shorter than **{MAX_VIDEO_DURATION} seconds**.")
col1, col2 = st.columns([2, 1])
with col1:
uploaded_file = st.file_uploader(
f"Select a video file (max {MAX_VIDEO_SIZE // (1024*1024)}MB)",
type=["mp4", "mov"],
key=f"uploader_{st.session_state.session_id}"
)
with col2:
selected_language_key = st.selectbox(
"Select Audio Language",
options=list(SUPPORTED_LANGUAGES.keys()),
key="language_selector"
)
if not uploaded_file:
return
if uploaded_file.size > MAX_VIDEO_SIZE:
st.error(f"File is too large ({uploaded_file.size / (1024*1024):.1f}MB). Max is {MAX_VIDEO_SIZE // (1024*1024)}MB.")
return
with tempfile.NamedTemporaryFile(delete=False, suffix=Path(uploaded_file.name).suffix) as tmp_file:
tmp_file.write(uploaded_file.getvalue())
temp_video_path = tmp_file.name
duration = get_video_duration(temp_video_path)
if duration < 0:
os.remove(temp_video_path)
return
if duration > MAX_VIDEO_DURATION:
st.error(f"Video is too long ({duration:.1f}s). Max duration for the demo is {MAX_VIDEO_DURATION} seconds.")
os.remove(temp_video_path)
return
# Si todo está bien, mostramos el botón
if st.button("Start Transcription", type="primary", disabled=st.session_state.audio_being_analyzed):
lock_file = acquire_lock_slot()
if not lock_file:
st.error("Sorry, all slots were taken just now. Please try again.")
os.remove(temp_video_path)
st.rerun()
st.session_state.lock_file_path = lock_file
st.session_state.temp_video_path = temp_video_path
st.session_state.selected_language = selected_language_key
st.session_state.audio_being_analyzed = True
st.rerun()
if st.session_state.audio_being_analyzed:
try:
video_path = Path(st.session_state.temp_video_path)
language_key = st.session_state.selected_language
transcriber = get_transcriber_instance(language_key)
with st.spinner(f"Transcribing audio with {st.session_state.transcriber_used}... 🎧"):
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_audio:
audio_path = tmp_audio.name
audio_utils.extract_audio_for_whisper(str(video_path), audio_path)
document = transcriber.transcribe(audio_path)
st.session_state.transcribed_doc = document.to_dict()
persisted_path = get_path("input.mp4")
shutil.copy(video_path, persisted_path)
st.session_state.video_path = persisted_path
os.remove(video_path)
os.remove(audio_path)
del st.session_state.temp_video_path
del st.session_state.selected_language
st.session_state.audio_being_analyzed = False
go_to_step(2)
st.rerun()
except Exception as e:
if "temp_video_path" in st.session_state and os.path.exists(st.session_state.temp_video_path):
os.remove(st.session_state.temp_video_path)
handle_unexpected_exception(e)