|
|
|
|
|
import streamlit as st |
|
|
import requests |
|
|
import tempfile |
|
|
import os |
|
|
import re |
|
|
from pathlib import Path |
|
|
from typing import Optional, Dict, List |
|
|
from dotenv import load_dotenv |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
|
|
|
API_KEY = os.getenv("FaceX_API_KEY") |
|
|
CHECK_RATE_LIMIT_URL = os.getenv("CHECK_RATE_LIMIT_URL") |
|
|
TASK_UPLOAD_URL = os.getenv("TASK_UPLOAD_URL") |
|
|
DEMO_VIDEOS_PATH = "./assets" |
|
|
GENERATED_DEMO_VIDEOS_PATH = "./generated_demos" |
|
|
REQUEST_TIMEOUT = 3600 |
|
|
MAX_FILE_SIZE_MB = 10 |
|
|
MAX_TEXT_LENGTH = 300 |
|
|
|
|
|
LANGUAGE_VOICES: Dict[str, List[str]] = { |
|
|
"Hindi": ["Kavya", "Priya", "Jyoti"], |
|
|
"Telugu": ["Nandini", "Rashmi", "Riya"], |
|
|
"English": ["Maria", "Ishita", "Aditi"], |
|
|
"Tamil": ["Jessica", "Jasmine", "Rashmi", "Amy", "Fatima"], |
|
|
"Kannada": ["Manisha", "Hema", "Uma"], |
|
|
"Bhojpuri": ["Anju", "Gayatri", "Radha"], |
|
|
"Bengali": ["Madhumita", "Shrabonti", "Subhashree"], |
|
|
"Maithili": ["Jaanki", "Shital", "Anuradha"], |
|
|
"Magahi": ["Amrita", "Anu", "Radhika"], |
|
|
"Gujarati": ["Hetvi", "Parul", "Janvi"], |
|
|
"Marathi": ['Vashnavi', "Varsha", "Savita"], |
|
|
"Malayalam": ["Nandini", "Uma", "Hema", "Manisha"], |
|
|
"Chattisgarhi": ["Kusum", "Mamata", "Kamla"] |
|
|
} |
|
|
|
|
|
def init_state(): |
|
|
"""Initialize session state variables""" |
|
|
if "email_verified" not in st.session_state: |
|
|
st.session_state.email_verified = False |
|
|
if "email" not in st.session_state: |
|
|
st.session_state.email = "" |
|
|
if "rate_limit_info" not in st.session_state: |
|
|
st.session_state.rate_limit_info = {} |
|
|
if "generated_path" not in st.session_state: |
|
|
st.session_state.generated_path = None |
|
|
if "processing" not in st.session_state: |
|
|
st.session_state.processing = False |
|
|
if "selected_demo_video" not in st.session_state: |
|
|
st.session_state.selected_demo_video = None |
|
|
if "reference_audio" not in st.session_state: |
|
|
st.session_state.reference_audio = None |
|
|
if "reference_audio_path" not in st.session_state: |
|
|
st.session_state.reference_audio_path = None |
|
|
|
|
|
def validate_email(email: str) -> bool: |
|
|
"""Validate email format using regex""" |
|
|
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' |
|
|
return re.match(pattern, email) is not None |
|
|
|
|
|
def _on_text_change(): |
|
|
""" |
|
|
Called whenever the text area value changes (typing or paste). |
|
|
Sets session flags used by the UI to show warnings. |
|
|
""" |
|
|
text = st.session_state.get("text_input", "") or "" |
|
|
st.session_state["text_exceeded"] = len(text) > MAX_TEXT_LENGTH |
|
|
st.session_state["text_exceeded_count"] = len(text) |
|
|
|
|
|
def check_rate_limit(email: str) -> Optional[Dict]: |
|
|
"""Check rate limit for email via API""" |
|
|
try: |
|
|
response = requests.post( |
|
|
CHECK_RATE_LIMIT_URL, |
|
|
json={"email": email}, |
|
|
headers={"Authorization": f"Bearer {API_KEY}"}, |
|
|
timeout=10 |
|
|
) |
|
|
if response.status_code == 200: |
|
|
return response.json() |
|
|
else: |
|
|
st.error(f"Error checking rate limit: {response.json().get('detail', 'Unknown error')}") |
|
|
return None |
|
|
except Exception as e: |
|
|
st.error(f"Failed to connect to server: {str(e)}") |
|
|
return None |
|
|
|
|
|
def check_audio_duration(audio_file) -> tuple[bool, str, float]: |
|
|
"""Check if audio file duration is between 5 and 300 seconds""" |
|
|
try: |
|
|
import wave |
|
|
import contextlib |
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file: |
|
|
audio_file.seek(0) |
|
|
tmp_file.write(audio_file.read()) |
|
|
tmp_file.flush() |
|
|
tmp_path = tmp_file.name |
|
|
|
|
|
|
|
|
with contextlib.closing(wave.open(tmp_path, 'r')) as f: |
|
|
frames = f.getnframes() |
|
|
rate = f.getframerate() |
|
|
duration = frames / float(rate) |
|
|
|
|
|
audio_file.seek(0) |
|
|
|
|
|
if duration < 5: |
|
|
os.unlink(tmp_path) |
|
|
return False, f"Audio too short ({duration:.1f}s). Minimum 5 seconds required.", duration |
|
|
elif duration > 300: |
|
|
os.unlink(tmp_path) |
|
|
return False, f"Audio too long ({duration:.1f}s). Maximum 300 seconds allowed.", duration |
|
|
|
|
|
return True, f"Audio duration: {duration:.1f} seconds", duration |
|
|
|
|
|
except Exception as e: |
|
|
return False, f"Error checking audio duration: {str(e)}", 0 |
|
|
|
|
|
def check_file_size(file_obj) -> tuple[bool, str]: |
|
|
"""Check if uploaded file is within size limits""" |
|
|
try: |
|
|
file_obj.seek(0, os.SEEK_END) |
|
|
file_size = file_obj.tell() |
|
|
file_obj.seek(0) |
|
|
|
|
|
file_size_mb = file_size / (1024 * 1024) |
|
|
|
|
|
if file_size_mb > MAX_FILE_SIZE_MB: |
|
|
return False, f"File size ({file_size_mb:.2f} MB) exceeds maximum of {MAX_FILE_SIZE_MB} MB" |
|
|
|
|
|
return True, f"File size: {file_size_mb:.2f} MB" |
|
|
except Exception as e: |
|
|
return False, f"Error checking file size: {str(e)}" |
|
|
|
|
|
def get_demo_videos() -> List[Dict[str, str]]: |
|
|
"""Get list of demo videos from demo folder""" |
|
|
demo_videos = [] |
|
|
if os.path.exists(DEMO_VIDEOS_PATH): |
|
|
for file in os.listdir(DEMO_VIDEOS_PATH): |
|
|
if file.endswith(('.mp4', '.avi', '.mov')): |
|
|
demo_videos.append({ |
|
|
"name": file, |
|
|
"path": os.path.join(DEMO_VIDEOS_PATH, file) |
|
|
}) |
|
|
return demo_videos |
|
|
|
|
|
def get_generated_demo_videos() -> List[Dict[str, str]]: |
|
|
"""Get list of generated demo videos for showcase""" |
|
|
generated_demos = [] |
|
|
if os.path.exists(GENERATED_DEMO_VIDEOS_PATH): |
|
|
for file in sorted(os.listdir(GENERATED_DEMO_VIDEOS_PATH)): |
|
|
if file.endswith(('.mp4', '.avi', '.mov')): |
|
|
generated_demos.append({ |
|
|
"name": file, |
|
|
"path": os.path.join(GENERATED_DEMO_VIDEOS_PATH, file) |
|
|
}) |
|
|
return generated_demos |
|
|
|
|
|
def stream_post_upload(video_source, filename: str, text: str, voice_name: str, language: str, email: str, reference_audio_path: Optional[str] = None) -> Optional[str]: |
|
|
""" |
|
|
Stream multipart/form-data POST to backend |
|
|
video_source can be file object or path string (for demo videos) |
|
|
reference_audio_path: path to cloned voice audio file (if using voice cloning) |
|
|
""" |
|
|
try: |
|
|
headers = {"Authorization": f"Bearer {API_KEY}"} |
|
|
lang_dict = { |
|
|
"Hindi": "hi", |
|
|
"Telugu": "te", |
|
|
"English": "en", |
|
|
"Tamil": "ta", |
|
|
"Kannada": "kn", |
|
|
"Bhojpuri": "bho", |
|
|
"Bengali": "bn", |
|
|
"Maithili": "mai", |
|
|
"Magahi": "mag", |
|
|
"Gujarati": "gu", |
|
|
"Marathi": "mr", |
|
|
"Malayalam": "ml", |
|
|
"Chattisgarhi": "hne" |
|
|
} |
|
|
|
|
|
data = { |
|
|
"text": text, |
|
|
"email": email, |
|
|
"language": lang_dict.get(language, "hi") |
|
|
} |
|
|
|
|
|
|
|
|
data["voice_name"] = voice_name if voice_name else "" |
|
|
|
|
|
files = {} |
|
|
|
|
|
|
|
|
if isinstance(video_source, str): |
|
|
|
|
|
video_file = open(video_source, "rb") |
|
|
files["video"] = (filename, video_file, "video/mp4") |
|
|
else: |
|
|
|
|
|
video_source.seek(0) |
|
|
files["video"] = (filename, video_source, "video/mp4") |
|
|
|
|
|
|
|
|
if reference_audio_path and os.path.exists(reference_audio_path): |
|
|
audio_file = open(reference_audio_path, "rb") |
|
|
files["voice_cloning"] = ("reference.wav", audio_file, "audio/wav") |
|
|
|
|
|
try: |
|
|
resp = requests.post( |
|
|
TASK_UPLOAD_URL, |
|
|
headers=headers, |
|
|
data=data, |
|
|
files=files, |
|
|
stream=True, |
|
|
timeout=REQUEST_TIMEOUT |
|
|
) |
|
|
finally: |
|
|
|
|
|
if isinstance(video_source, str): |
|
|
video_file.close() |
|
|
if reference_audio_path and 'audio_file' in locals(): |
|
|
audio_file.close() |
|
|
|
|
|
if not resp.ok: |
|
|
try: |
|
|
err = resp.json() |
|
|
except: |
|
|
err = resp.text |
|
|
raise RuntimeError(f"Server returned {resp.status_code}: {err}") |
|
|
|
|
|
|
|
|
email_sent = resp.headers.get('X-Email-Sent', 'true') != 'false' |
|
|
|
|
|
|
|
|
tmp_out = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") |
|
|
tmp_out_path = tmp_out.name |
|
|
tmp_out.close() |
|
|
|
|
|
with open(tmp_out_path, "wb") as out_f: |
|
|
for chunk in resp.iter_content(chunk_size=8192): |
|
|
if chunk: |
|
|
out_f.write(chunk) |
|
|
|
|
|
return tmp_out_path, email_sent |
|
|
|
|
|
except requests.exceptions.Timeout: |
|
|
raise TimeoutError("Server timeout. Please try again later.") |
|
|
except requests.exceptions.ConnectionError: |
|
|
raise ConnectionError("Unable to connect to server.") |
|
|
except Exception as e: |
|
|
raise RuntimeError(f"Upload failed: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
st.set_page_config(page_title="ORI-FaceX", layout="wide") |
|
|
init_state() |
|
|
|
|
|
st.title("FaceX") |
|
|
st.markdown("Transform your videos with AI-powered lip-sync and expressive voices!") |
|
|
|
|
|
|
|
|
with st.sidebar: |
|
|
st.header("βΉοΈ How It Works") |
|
|
st.markdown(""" |
|
|
1. **Enter your email** to verify eligibility |
|
|
2. **Select a language** and voice |
|
|
3. **Choose** demo video or upload your own |
|
|
4. **Enter text** in your chosen language (max 300 characters) |
|
|
- Use [this tool](https://www.easyhindityping.com/english-to-hindi-translation) for easy typing in Indian languages |
|
|
5. **Generate** and download your video |
|
|
""") |
|
|
|
|
|
st.markdown("---") |
|
|
|
|
|
|
|
|
if st.session_state.email_verified: |
|
|
st.success(f"π§ **Email:** {st.session_state.email}") |
|
|
info = st.session_state.rate_limit_info |
|
|
current = info.get('current_count', 0) |
|
|
remaining = info.get('remaining', 5) |
|
|
|
|
|
st.metric("Videos Used", f"{current}/5") |
|
|
st.metric("Remaining", remaining) |
|
|
|
|
|
|
|
|
progress = current / 5 |
|
|
st.progress(progress) |
|
|
|
|
|
if remaining == 0: |
|
|
st.error("β οΈ Limit reached!") |
|
|
|
|
|
st.markdown("---") |
|
|
st.warning("β οΈ **Use responsibly!** Do not create misleading or harmful content.") |
|
|
|
|
|
st.markdown("---") |
|
|
st.markdown("Reach out to us at ai-team@oriserve.com") |
|
|
|
|
|
|
|
|
|
|
|
st.markdown("---") |
|
|
st.subheader("π₯ See What's Possible") |
|
|
st.markdown("Check out these examples of AI-generated videos using TalkMorph:") |
|
|
|
|
|
generated_demos = get_generated_demo_videos() |
|
|
|
|
|
if generated_demos: |
|
|
|
|
|
demo_cols = st.columns(3) |
|
|
|
|
|
for idx, demo in enumerate(generated_demos): |
|
|
with demo_cols[idx % 3]: |
|
|
|
|
|
with st.container(): |
|
|
st.video(demo["path"], start_time=0) |
|
|
|
|
|
display_name = os.path.splitext(demo["name"])[0].replace("_", " ").title() |
|
|
st.caption(f"**{display_name}**") |
|
|
|
|
|
if len(generated_demos) > 3: |
|
|
st.markdown("*More examples available - scroll down to see all*") |
|
|
else: |
|
|
st.info(f"π No demo videos found. Add videos to `{GENERATED_DEMO_VIDEOS_PATH}` to showcase examples.") |
|
|
|
|
|
|
|
|
st.markdown("---") |
|
|
|
|
|
|
|
|
if not st.session_state.email_verified: |
|
|
st.subheader("π§ Email Verification") |
|
|
st.info("Enter your email to check eligibility. Each email can generate up to **5 videos**.") |
|
|
|
|
|
col1, col2 = st.columns([3, 1]) |
|
|
|
|
|
with col1: |
|
|
email_input = st.text_input( |
|
|
"Email Address", |
|
|
placeholder="your.email@example.com", |
|
|
key="email_input", |
|
|
disabled=st.session_state.processing |
|
|
) |
|
|
|
|
|
with col2: |
|
|
st.write("") |
|
|
st.write("") |
|
|
verify_btn = st.button("Verify Email", type="primary", disabled=st.session_state.processing) |
|
|
|
|
|
if verify_btn: |
|
|
if not email_input: |
|
|
st.error("Please enter your email address") |
|
|
elif not validate_email(email_input): |
|
|
st.error("β Invalid email format. Please enter a valid email.") |
|
|
else: |
|
|
with st.spinner("Checking your email..."): |
|
|
rate_limit_info = check_rate_limit(email_input) |
|
|
|
|
|
if rate_limit_info: |
|
|
if rate_limit_info['can_proceed']: |
|
|
st.session_state.email = email_input |
|
|
st.session_state.email_verified = True |
|
|
st.session_state.rate_limit_info = rate_limit_info |
|
|
st.success(f"β
Email verified! You have {rate_limit_info['remaining']} videos remaining.") |
|
|
st.rerun() |
|
|
else: |
|
|
st.error(f"β {rate_limit_info['message']}") |
|
|
st.info("π‘ Try with a different email address to continue.") |
|
|
|
|
|
|
|
|
else: |
|
|
|
|
|
col1, col2 = st.columns([4, 1]) |
|
|
with col1: |
|
|
st.text_input( |
|
|
"π§ Email Address (Verified)", |
|
|
value=st.session_state.email, |
|
|
disabled=True, |
|
|
key="email_locked" |
|
|
) |
|
|
with col2: |
|
|
st.write("") |
|
|
st.write("") |
|
|
if st.button("Change Email", disabled=st.session_state.processing): |
|
|
st.session_state.email_verified = False |
|
|
st.session_state.email = "" |
|
|
st.session_state.rate_limit_info = {} |
|
|
st.rerun() |
|
|
|
|
|
st.markdown("---") |
|
|
st.subheader("π¬ Generate Your Video") |
|
|
|
|
|
|
|
|
language = st.selectbox( |
|
|
"π Select Language", |
|
|
options=list(LANGUAGE_VOICES.keys()), |
|
|
disabled=st.session_state.processing |
|
|
) |
|
|
|
|
|
|
|
|
st.markdown("---") |
|
|
voice_mode = st.radio( |
|
|
"ποΈ Voice Selection Mode", |
|
|
options=["Default Voice", "Clone Voice"], |
|
|
horizontal=True, |
|
|
disabled=st.session_state.processing, |
|
|
help="Choose a pre-configured voice or clone a custom voice from audio" |
|
|
) |
|
|
|
|
|
voice_name = None |
|
|
reference_audio = None |
|
|
|
|
|
if voice_mode == "Default Voice": |
|
|
|
|
|
available_voices = LANGUAGE_VOICES.get(language, []) |
|
|
voice_name = st.selectbox( |
|
|
"Select Voice", |
|
|
options=available_voices, |
|
|
disabled=st.session_state.processing |
|
|
) |
|
|
st.session_state.reference_audio = None |
|
|
st.session_state.reference_audio_path = None |
|
|
|
|
|
else: |
|
|
st.info("π’ **Voice Cloning:** Provide a reference audio (5-300 seconds) to clone the voice") |
|
|
|
|
|
audio_source = st.radio( |
|
|
"Reference Audio Source", |
|
|
options=["Upload Audio File", "Record Audio"], |
|
|
horizontal=True, |
|
|
disabled=st.session_state.processing |
|
|
) |
|
|
|
|
|
if audio_source == "Upload Audio File": |
|
|
reference_audio = st.file_uploader( |
|
|
"Upload Reference Audio (.wav format, 5-300 seconds)", |
|
|
type=["wav"], |
|
|
disabled=st.session_state.processing, |
|
|
key="audio_uploader" |
|
|
) |
|
|
|
|
|
if reference_audio: |
|
|
|
|
|
is_valid, duration_msg, duration = check_audio_duration(reference_audio) |
|
|
|
|
|
if is_valid: |
|
|
st.success(f"β
{duration_msg}") |
|
|
|
|
|
|
|
|
if st.session_state.reference_audio_path: |
|
|
try: |
|
|
os.unlink(st.session_state.reference_audio_path) |
|
|
except: |
|
|
pass |
|
|
|
|
|
tmp_audio = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") |
|
|
reference_audio.seek(0) |
|
|
tmp_audio.write(reference_audio.read()) |
|
|
tmp_audio.flush() |
|
|
tmp_audio.close() |
|
|
|
|
|
st.session_state.reference_audio = reference_audio |
|
|
st.session_state.reference_audio_path = tmp_audio.name |
|
|
|
|
|
|
|
|
st.audio(reference_audio, format="audio/wav") |
|
|
else: |
|
|
st.error(f"β {duration_msg}") |
|
|
st.session_state.reference_audio = None |
|
|
st.session_state.reference_audio_path = None |
|
|
|
|
|
else: |
|
|
st.warning("π€ **Note:** Recorded audio must be between 5-300 seconds") |
|
|
reference_audio = st.audio_input( |
|
|
"Record Reference Audio (speak for 5-300 seconds)", |
|
|
disabled=st.session_state.processing, |
|
|
key="audio_recorder" |
|
|
) |
|
|
|
|
|
if reference_audio: |
|
|
|
|
|
is_valid, duration_msg, duration = check_audio_duration(reference_audio) |
|
|
|
|
|
if is_valid: |
|
|
st.success(f"β
{duration_msg}") |
|
|
|
|
|
|
|
|
if st.session_state.reference_audio_path: |
|
|
try: |
|
|
os.unlink(st.session_state.reference_audio_path) |
|
|
except: |
|
|
pass |
|
|
|
|
|
tmp_audio = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") |
|
|
reference_audio.seek(0) |
|
|
tmp_audio.write(reference_audio.read()) |
|
|
tmp_audio.flush() |
|
|
tmp_audio.close() |
|
|
|
|
|
st.session_state.reference_audio = reference_audio |
|
|
st.session_state.reference_audio_path = tmp_audio.name |
|
|
|
|
|
|
|
|
st.audio(reference_audio, format="audio/wav") |
|
|
else: |
|
|
st.error(f"β {duration_msg}") |
|
|
st.session_state.reference_audio = None |
|
|
st.session_state.reference_audio_path = None |
|
|
|
|
|
|
|
|
st.markdown("---") |
|
|
video_source_option = st.radio( |
|
|
"πΉ Video Source", |
|
|
options=["Upload Video", "Use Demo Video"], |
|
|
horizontal=True, |
|
|
disabled=st.session_state.processing |
|
|
) |
|
|
|
|
|
video_file = None |
|
|
demo_video_path = None |
|
|
|
|
|
if video_source_option == "Upload Video": |
|
|
video_file = st.file_uploader( |
|
|
f"Upload video (mp4) - Max {MAX_FILE_SIZE_MB} MB", |
|
|
type=["mp4"], |
|
|
disabled=st.session_state.processing |
|
|
) |
|
|
|
|
|
if video_file: |
|
|
is_valid, size_msg = check_file_size(video_file) |
|
|
if is_valid: |
|
|
st.info(size_msg) |
|
|
else: |
|
|
st.error(size_msg) |
|
|
|
|
|
else: |
|
|
demo_videos = get_demo_videos() |
|
|
|
|
|
if not demo_videos: |
|
|
st.warning(f"β οΈ No demo videos found in `{DEMO_VIDEOS_PATH}` folder.") |
|
|
else: |
|
|
st.markdown("**Select a demo video:**") |
|
|
|
|
|
|
|
|
cols = st.columns(3) |
|
|
for idx, demo in enumerate(demo_videos): |
|
|
with cols[idx % 3]: |
|
|
|
|
|
with st.expander(f"πΉ {demo['name']}", expanded=False): |
|
|
st.video(demo["path"], start_time=0) |
|
|
if st.button( |
|
|
"β
Use this video", |
|
|
key=f"demo_{idx}", |
|
|
disabled=st.session_state.processing, |
|
|
use_container_width=True |
|
|
): |
|
|
st.session_state.selected_demo_video = demo["path"] |
|
|
st.success(f"Selected!") |
|
|
|
|
|
if st.session_state.selected_demo_video: |
|
|
demo_video_path = st.session_state.selected_demo_video |
|
|
st.success(f"β
Using demo video: **{os.path.basename(demo_video_path)}**") |
|
|
|
|
|
|
|
|
st.markdown("---") |
|
|
st.markdown(f"**π Enter Text in {language}**") |
|
|
st.info("π‘ **Need help typing in Indian languages?** Use [this typing tool](https://www.easyhindityping.com/english-to-hindi-translation) to easily convert your text to the desired script.") |
|
|
|
|
|
|
|
|
if "prev_text_length" not in st.session_state: |
|
|
st.session_state.prev_text_length = 0 |
|
|
|
|
|
text_input = st.text_area( |
|
|
f"Enter your text (Max {MAX_TEXT_LENGTH} characters)", |
|
|
value="", |
|
|
height=150, |
|
|
disabled=st.session_state.processing, |
|
|
key="text_input_area", |
|
|
help="You can paste any length of text. We'll validate it when you submit.", |
|
|
label_visibility="collapsed" |
|
|
) |
|
|
|
|
|
|
|
|
char_count = len(text_input) |
|
|
|
|
|
|
|
|
if st.session_state.prev_text_length > 0: |
|
|
char_diff = char_count - st.session_state.prev_text_length |
|
|
if char_diff > 50: |
|
|
st.info(f"π Pasted text detected ({char_diff} characters added)") |
|
|
|
|
|
st.session_state.prev_text_length = char_count |
|
|
|
|
|
|
|
|
if char_count > MAX_TEXT_LENGTH: |
|
|
excess = char_count - MAX_TEXT_LENGTH |
|
|
st.error(f"β **Text exceeds limit by {excess} characters!** {char_count}/{MAX_TEXT_LENGTH} - Please trim before submitting") |
|
|
elif char_count > MAX_TEXT_LENGTH - 20: |
|
|
st.warning(f"β οΈ **Approaching limit:** {char_count}/{MAX_TEXT_LENGTH} characters") |
|
|
elif char_count > 0: |
|
|
st.info(f"βοΈ **Characters:** {char_count}/{MAX_TEXT_LENGTH}") |
|
|
else: |
|
|
st.info(f"π **Characters:** 0/{MAX_TEXT_LENGTH}") |
|
|
|
|
|
|
|
|
st.markdown("---") |
|
|
submit_btn = st.button( |
|
|
"π Generate Video", |
|
|
type="primary", |
|
|
disabled=st.session_state.processing |
|
|
) |
|
|
|
|
|
|
|
|
status_placeholder = st.empty() |
|
|
progress_placeholder = st.empty() |
|
|
|
|
|
|
|
|
video_placeholder = st.empty() |
|
|
download_placeholder = st.empty() |
|
|
|
|
|
|
|
|
if st.session_state.generated_path and os.path.exists(st.session_state.generated_path): |
|
|
with video_placeholder.container(): |
|
|
st.success("β
Generated video available below:") |
|
|
|
|
|
col1, col2, col3 = st.columns([1, 2, 1]) |
|
|
with col2: |
|
|
st.video(st.session_state.generated_path, start_time=0) |
|
|
with download_placeholder.container(): |
|
|
|
|
|
col1, col2, col3 = st.columns([1, 2, 1]) |
|
|
with col2: |
|
|
with open(st.session_state.generated_path, "rb") as f: |
|
|
st.download_button( |
|
|
"β¬οΈ Download Video", |
|
|
data=f, |
|
|
file_name="generated.mp4", |
|
|
mime="video/mp4", |
|
|
use_container_width=True |
|
|
) |
|
|
|
|
|
|
|
|
if submit_btn: |
|
|
|
|
|
if video_source_option == "Upload Video" and not video_file: |
|
|
st.error("β Please upload a video first.") |
|
|
elif video_source_option == "Use Demo Video" and not demo_video_path: |
|
|
st.error("β Please select a demo video first.") |
|
|
elif voice_mode == "Clone Voice" and not st.session_state.reference_audio_path: |
|
|
st.error("β Please provide a reference audio for voice cloning (5-300 seconds).") |
|
|
elif not text_input.strip(): |
|
|
st.error("β Please enter text.") |
|
|
elif len(text_input) > MAX_TEXT_LENGTH: |
|
|
excess = len(text_input) - MAX_TEXT_LENGTH |
|
|
st.error(f"β **Text exceeds limit by {excess} characters!** Current: {len(text_input)}/{MAX_TEXT_LENGTH}") |
|
|
st.warning(f"π‘ **Tip:** Please trim your text to exactly {MAX_TEXT_LENGTH} characters or less before submitting.") |
|
|
else: |
|
|
|
|
|
if video_source_option == "Upload Video": |
|
|
is_valid, size_msg = check_file_size(video_file) |
|
|
if not is_valid: |
|
|
st.error(size_msg) |
|
|
st.stop() |
|
|
|
|
|
|
|
|
st.session_state.processing = True |
|
|
status_placeholder.info("""π¬ Processing your video... |
|
|
You will receive an email with the video link...""") |
|
|
progress_placeholder.progress(0) |
|
|
|
|
|
try: |
|
|
|
|
|
if video_source_option == "Upload Video": |
|
|
video_source = video_file |
|
|
filename = video_file.name |
|
|
else: |
|
|
video_source = demo_video_path |
|
|
filename = os.path.basename(demo_video_path) |
|
|
|
|
|
|
|
|
generated_path, email_sent = stream_post_upload( |
|
|
video_source=video_source, |
|
|
filename=filename, |
|
|
text=text_input, |
|
|
voice_name=voice_name, |
|
|
language=language, |
|
|
email=st.session_state.email, |
|
|
reference_audio_path=st.session_state.reference_audio_path |
|
|
) |
|
|
|
|
|
progress_placeholder.progress(100) |
|
|
|
|
|
if generated_path and os.path.exists(generated_path): |
|
|
st.session_state.generated_path = generated_path |
|
|
status_placeholder.success("β
Video generated successfully!") |
|
|
|
|
|
|
|
|
if email_sent: |
|
|
st.info(f"π§ Download link sent to {st.session_state.email}") |
|
|
else: |
|
|
st.warning("β οΈ Video generated but email notification failed. Download below.") |
|
|
|
|
|
|
|
|
new_info = check_rate_limit(st.session_state.email) |
|
|
if new_info: |
|
|
st.session_state.rate_limit_info = new_info |
|
|
|
|
|
|
|
|
video_placeholder.empty() |
|
|
download_placeholder.empty() |
|
|
|
|
|
with video_placeholder.container(): |
|
|
st.success("β
Generated video:") |
|
|
|
|
|
col1, col2, col3 = st.columns([1, 2, 1]) |
|
|
with col2: |
|
|
st.video(generated_path, start_time=0) |
|
|
|
|
|
with download_placeholder.container(): |
|
|
|
|
|
col1, col2, col3 = st.columns([1, 2, 1]) |
|
|
with col2: |
|
|
with open(generated_path, "rb") as f: |
|
|
st.download_button( |
|
|
"β¬οΈ Download Video", |
|
|
data=f, |
|
|
file_name="generated.mp4", |
|
|
mime="video/mp4", |
|
|
use_container_width=True |
|
|
) |
|
|
|
|
|
|
|
|
if st.session_state.rate_limit_info.get('remaining', 0) == 0: |
|
|
st.error("β οΈ You've reached your limit of 5 videos. Please use a different email to continue.") |
|
|
else: |
|
|
status_placeholder.error("β No video was generated.") |
|
|
|
|
|
except TimeoutError as e: |
|
|
status_placeholder.error("β±οΈ Server Timeout") |
|
|
st.error(f"Server is under heavy load. Please try again later.\n\n{str(e)}") |
|
|
|
|
|
except ConnectionError as e: |
|
|
status_placeholder.error("β Connection Error") |
|
|
st.error(str(e)) |
|
|
|
|
|
except RuntimeError as e: |
|
|
status_placeholder.error("β Request Failed") |
|
|
st.error(str(e)) |
|
|
|
|
|
except Exception as e: |
|
|
status_placeholder.error("β Unexpected Error") |
|
|
st.error(f"An unexpected error occurred: {str(e)}") |
|
|
|
|
|
finally: |
|
|
st.session_state.processing = False |
|
|
progress_placeholder.empty() |
|
|
|
|
|
|
|
|
if st.session_state.reference_audio_path and os.path.exists(st.session_state.reference_audio_path): |
|
|
try: |
|
|
os.unlink(st.session_state.reference_audio_path) |
|
|
st.session_state.reference_audio_path = None |
|
|
st.session_state.reference_audio = None |
|
|
except: |
|
|
pass |