FaceX / src /streamlit_app.py
Rishu Ranjan
update
a318f14
import streamlit as st
import requests
import tempfile
import os
import re
from pathlib import Path
from typing import Optional, Dict, List
from dotenv import load_dotenv
load_dotenv()
# Configuration
API_KEY = os.getenv("FaceX_API_KEY")
CHECK_RATE_LIMIT_URL = os.getenv("CHECK_RATE_LIMIT_URL")
TASK_UPLOAD_URL = os.getenv("TASK_UPLOAD_URL")
DEMO_VIDEOS_PATH = "./assets" # Change this path as needed
GENERATED_DEMO_VIDEOS_PATH = "./generated_demos" # Path for generated demo videos showcase
REQUEST_TIMEOUT = 3600
MAX_FILE_SIZE_MB = 10
MAX_TEXT_LENGTH = 300
LANGUAGE_VOICES: Dict[str, List[str]] = {
"Hindi": ["Kavya", "Priya", "Jyoti"],
"Telugu": ["Nandini", "Rashmi", "Riya"],
"English": ["Maria", "Ishita", "Aditi"],
"Tamil": ["Jessica", "Jasmine", "Rashmi", "Amy", "Fatima"],
"Kannada": ["Manisha", "Hema", "Uma"],
"Bhojpuri": ["Anju", "Gayatri", "Radha"],
"Bengali": ["Madhumita", "Shrabonti", "Subhashree"],
"Maithili": ["Jaanki", "Shital", "Anuradha"],
"Magahi": ["Amrita", "Anu", "Radhika"],
"Gujarati": ["Hetvi", "Parul", "Janvi"],
"Marathi": ['Vashnavi', "Varsha", "Savita"],
"Malayalam": ["Nandini", "Uma", "Hema", "Manisha"],
"Chattisgarhi": ["Kusum", "Mamata", "Kamla"]
}
def init_state():
"""Initialize session state variables"""
if "email_verified" not in st.session_state:
st.session_state.email_verified = False
if "email" not in st.session_state:
st.session_state.email = ""
if "rate_limit_info" not in st.session_state:
st.session_state.rate_limit_info = {}
if "generated_path" not in st.session_state:
st.session_state.generated_path = None
if "processing" not in st.session_state:
st.session_state.processing = False
if "selected_demo_video" not in st.session_state:
st.session_state.selected_demo_video = None
if "reference_audio" not in st.session_state:
st.session_state.reference_audio = None
if "reference_audio_path" not in st.session_state:
st.session_state.reference_audio_path = None
def validate_email(email: str) -> bool:
"""Validate email format using regex"""
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return re.match(pattern, email) is not None
def _on_text_change():
"""
Called whenever the text area value changes (typing or paste).
Sets session flags used by the UI to show warnings.
"""
text = st.session_state.get("text_input", "") or ""
st.session_state["text_exceeded"] = len(text) > MAX_TEXT_LENGTH
st.session_state["text_exceeded_count"] = len(text)
def check_rate_limit(email: str) -> Optional[Dict]:
"""Check rate limit for email via API"""
try:
response = requests.post(
CHECK_RATE_LIMIT_URL,
json={"email": email},
headers={"Authorization": f"Bearer {API_KEY}"},
timeout=10
)
if response.status_code == 200:
return response.json()
else:
st.error(f"Error checking rate limit: {response.json().get('detail', 'Unknown error')}")
return None
except Exception as e:
st.error(f"Failed to connect to server: {str(e)}")
return None
def check_audio_duration(audio_file) -> tuple[bool, str, float]:
"""Check if audio file duration is between 5 and 300 seconds"""
try:
import wave
import contextlib
# Save uploaded file temporarily to check duration
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file:
audio_file.seek(0)
tmp_file.write(audio_file.read())
tmp_file.flush()
tmp_path = tmp_file.name
# Get audio duration
with contextlib.closing(wave.open(tmp_path, 'r')) as f:
frames = f.getnframes()
rate = f.getframerate()
duration = frames / float(rate)
audio_file.seek(0) # Reset file pointer
if duration < 5:
os.unlink(tmp_path)
return False, f"Audio too short ({duration:.1f}s). Minimum 5 seconds required.", duration
elif duration > 300:
os.unlink(tmp_path)
return False, f"Audio too long ({duration:.1f}s). Maximum 300 seconds allowed.", duration
return True, f"Audio duration: {duration:.1f} seconds", duration
except Exception as e:
return False, f"Error checking audio duration: {str(e)}", 0
def check_file_size(file_obj) -> tuple[bool, str]:
"""Check if uploaded file is within size limits"""
try:
file_obj.seek(0, os.SEEK_END)
file_size = file_obj.tell()
file_obj.seek(0)
file_size_mb = file_size / (1024 * 1024)
if file_size_mb > MAX_FILE_SIZE_MB:
return False, f"File size ({file_size_mb:.2f} MB) exceeds maximum of {MAX_FILE_SIZE_MB} MB"
return True, f"File size: {file_size_mb:.2f} MB"
except Exception as e:
return False, f"Error checking file size: {str(e)}"
def get_demo_videos() -> List[Dict[str, str]]:
"""Get list of demo videos from demo folder"""
demo_videos = []
if os.path.exists(DEMO_VIDEOS_PATH):
for file in os.listdir(DEMO_VIDEOS_PATH):
if file.endswith(('.mp4', '.avi', '.mov')):
demo_videos.append({
"name": file,
"path": os.path.join(DEMO_VIDEOS_PATH, file)
})
return demo_videos
def get_generated_demo_videos() -> List[Dict[str, str]]:
"""Get list of generated demo videos for showcase"""
generated_demos = []
if os.path.exists(GENERATED_DEMO_VIDEOS_PATH):
for file in sorted(os.listdir(GENERATED_DEMO_VIDEOS_PATH)):
if file.endswith(('.mp4', '.avi', '.mov')):
generated_demos.append({
"name": file,
"path": os.path.join(GENERATED_DEMO_VIDEOS_PATH, file)
})
return generated_demos
def stream_post_upload(video_source, filename: str, text: str, voice_name: str, language: str, email: str, reference_audio_path: Optional[str] = None) -> Optional[str]:
"""
Stream multipart/form-data POST to backend
video_source can be file object or path string (for demo videos)
reference_audio_path: path to cloned voice audio file (if using voice cloning)
"""
try:
headers = {"Authorization": f"Bearer {API_KEY}"}
lang_dict = {
"Hindi": "hi",
"Telugu": "te",
"English": "en",
"Tamil": "ta",
"Kannada": "kn",
"Bhojpuri": "bho",
"Bengali": "bn",
"Maithili": "mai",
"Magahi": "mag",
"Gujarati": "gu",
"Marathi": "mr",
"Malayalam": "ml",
"Chattisgarhi": "hne"
}
data = {
"text": text,
"email": email,
"language": lang_dict.get(language, "hi")
}
# Add voice_name (can be None if using voice cloning)
data["voice_name"] = voice_name if voice_name else ""
files = {}
# Handle demo video (path string) or uploaded file (file object)
if isinstance(video_source, str):
# Demo video - open file
video_file = open(video_source, "rb")
files["video"] = (filename, video_file, "video/mp4")
else:
# Uploaded file
video_source.seek(0)
files["video"] = (filename, video_source, "video/mp4")
# Add voice_cloning file if provided
if reference_audio_path and os.path.exists(reference_audio_path):
audio_file = open(reference_audio_path, "rb")
files["voice_cloning"] = ("reference.wav", audio_file, "audio/wav")
try:
resp = requests.post(
TASK_UPLOAD_URL,
headers=headers,
data=data,
files=files,
stream=True,
timeout=REQUEST_TIMEOUT
)
finally:
# Close any opened files
if isinstance(video_source, str):
video_file.close()
if reference_audio_path and 'audio_file' in locals():
audio_file.close()
if not resp.ok:
try:
err = resp.json()
except:
err = resp.text
raise RuntimeError(f"Server returned {resp.status_code}: {err}")
# Check if email was sent
email_sent = resp.headers.get('X-Email-Sent', 'true') != 'false'
# Save streamed response
tmp_out = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4")
tmp_out_path = tmp_out.name
tmp_out.close()
with open(tmp_out_path, "wb") as out_f:
for chunk in resp.iter_content(chunk_size=8192):
if chunk:
out_f.write(chunk)
return tmp_out_path, email_sent
except requests.exceptions.Timeout:
raise TimeoutError("Server timeout. Please try again later.")
except requests.exceptions.ConnectionError:
raise ConnectionError("Unable to connect to server.")
except Exception as e:
raise RuntimeError(f"Upload failed: {str(e)}")
# Page config
st.set_page_config(page_title="ORI-FaceX", layout="wide")
init_state()
st.title("FaceX")
st.markdown("Transform your videos with AI-powered lip-sync and expressive voices!")
# Sidebar with info and rate limit display
with st.sidebar:
st.header("ℹ️ How It Works")
st.markdown("""
1. **Enter your email** to verify eligibility
2. **Select a language** and voice
3. **Choose** demo video or upload your own
4. **Enter text** in your chosen language (max 300 characters)
- Use [this tool](https://www.easyhindityping.com/english-to-hindi-translation) for easy typing in Indian languages
5. **Generate** and download your video
""")
st.markdown("---")
# Show rate limit info if email verified
if st.session_state.email_verified:
st.success(f"πŸ“§ **Email:** {st.session_state.email}")
info = st.session_state.rate_limit_info
current = info.get('current_count', 0)
remaining = info.get('remaining', 5)
st.metric("Videos Used", f"{current}/5")
st.metric("Remaining", remaining)
# Progress bar
progress = current / 5
st.progress(progress)
if remaining == 0:
st.error("⚠️ Limit reached!")
st.markdown("---")
st.warning("⚠️ **Use responsibly!** Do not create misleading or harmful content.")
st.markdown("---")
st.markdown("Reach out to us at ai-team@oriserve.com")
# Generated Demo Videos Showcase Section
st.markdown("---")
st.subheader("πŸŽ₯ See What's Possible")
st.markdown("Check out these examples of AI-generated videos using TalkMorph:")
generated_demos = get_generated_demo_videos()
if generated_demos:
# Create tabs or columns to display generated demos with consistent sizing
demo_cols = st.columns(3)
for idx, demo in enumerate(generated_demos):
with demo_cols[idx % 3]:
# Use container with fixed aspect ratio
with st.container():
st.video(demo["path"], start_time=0)
# Remove file extension and format name nicely
display_name = os.path.splitext(demo["name"])[0].replace("_", " ").title()
st.caption(f"**{display_name}**")
if len(generated_demos) > 3:
st.markdown("*More examples available - scroll down to see all*")
else:
st.info(f"πŸ“ No demo videos found. Add videos to `{GENERATED_DEMO_VIDEOS_PATH}` to showcase examples.")
# Main content
st.markdown("---")
# Email Verification Section
if not st.session_state.email_verified:
st.subheader("πŸ“§ Email Verification")
st.info("Enter your email to check eligibility. Each email can generate up to **5 videos**.")
col1, col2 = st.columns([3, 1])
with col1:
email_input = st.text_input(
"Email Address",
placeholder="your.email@example.com",
key="email_input",
disabled=st.session_state.processing
)
with col2:
st.write("") # Spacing
st.write("") # Spacing
verify_btn = st.button("Verify Email", type="primary", disabled=st.session_state.processing)
if verify_btn:
if not email_input:
st.error("Please enter your email address")
elif not validate_email(email_input):
st.error("❌ Invalid email format. Please enter a valid email.")
else:
with st.spinner("Checking your email..."):
rate_limit_info = check_rate_limit(email_input)
if rate_limit_info:
if rate_limit_info['can_proceed']:
st.session_state.email = email_input
st.session_state.email_verified = True
st.session_state.rate_limit_info = rate_limit_info
st.success(f"βœ… Email verified! You have {rate_limit_info['remaining']} videos remaining.")
st.rerun()
else:
st.error(f"❌ {rate_limit_info['message']}")
st.info("πŸ’‘ Try with a different email address to continue.")
# Video Generation Section (only after email verification)
else:
# Show locked email with change option
col1, col2 = st.columns([4, 1])
with col1:
st.text_input(
"πŸ“§ Email Address (Verified)",
value=st.session_state.email,
disabled=True,
key="email_locked"
)
with col2:
st.write("") # Spacing
st.write("") # Spacing
if st.button("Change Email", disabled=st.session_state.processing):
st.session_state.email_verified = False
st.session_state.email = ""
st.session_state.rate_limit_info = {}
st.rerun()
st.markdown("---")
st.subheader("🎬 Generate Your Video")
# Language selection
language = st.selectbox(
"🌍 Select Language",
options=list(LANGUAGE_VOICES.keys()),
disabled=st.session_state.processing
)
# Voice Mode Selection
st.markdown("---")
voice_mode = st.radio(
"πŸŽ™οΈ Voice Selection Mode",
options=["Default Voice", "Clone Voice"],
horizontal=True,
disabled=st.session_state.processing,
help="Choose a pre-configured voice or clone a custom voice from audio"
)
voice_name = None
reference_audio = None
if voice_mode == "Default Voice":
# Get voices for selected language
available_voices = LANGUAGE_VOICES.get(language, [])
voice_name = st.selectbox(
"Select Voice",
options=available_voices,
disabled=st.session_state.processing
)
st.session_state.reference_audio = None
st.session_state.reference_audio_path = None
else: # Clone Voice
st.info("πŸ“’ **Voice Cloning:** Provide a reference audio (5-300 seconds) to clone the voice")
audio_source = st.radio(
"Reference Audio Source",
options=["Upload Audio File", "Record Audio"],
horizontal=True,
disabled=st.session_state.processing
)
if audio_source == "Upload Audio File":
reference_audio = st.file_uploader(
"Upload Reference Audio (.wav format, 5-300 seconds)",
type=["wav"],
disabled=st.session_state.processing,
key="audio_uploader"
)
if reference_audio:
# Check audio duration
is_valid, duration_msg, duration = check_audio_duration(reference_audio)
if is_valid:
st.success(f"βœ… {duration_msg}")
# Save to temporary file in session state
if st.session_state.reference_audio_path:
try:
os.unlink(st.session_state.reference_audio_path)
except:
pass
tmp_audio = tempfile.NamedTemporaryFile(delete=False, suffix=".wav")
reference_audio.seek(0)
tmp_audio.write(reference_audio.read())
tmp_audio.flush()
tmp_audio.close()
st.session_state.reference_audio = reference_audio
st.session_state.reference_audio_path = tmp_audio.name
# Show audio player
st.audio(reference_audio, format="audio/wav")
else:
st.error(f"❌ {duration_msg}")
st.session_state.reference_audio = None
st.session_state.reference_audio_path = None
else: # Record Audio
st.warning("🎀 **Note:** Recorded audio must be between 5-300 seconds")
reference_audio = st.audio_input(
"Record Reference Audio (speak for 5-300 seconds)",
disabled=st.session_state.processing,
key="audio_recorder"
)
if reference_audio:
# Check audio duration
is_valid, duration_msg, duration = check_audio_duration(reference_audio)
if is_valid:
st.success(f"βœ… {duration_msg}")
# Save to temporary file in session state
if st.session_state.reference_audio_path:
try:
os.unlink(st.session_state.reference_audio_path)
except:
pass
tmp_audio = tempfile.NamedTemporaryFile(delete=False, suffix=".wav")
reference_audio.seek(0)
tmp_audio.write(reference_audio.read())
tmp_audio.flush()
tmp_audio.close()
st.session_state.reference_audio = reference_audio
st.session_state.reference_audio_path = tmp_audio.name
# Show audio player
st.audio(reference_audio, format="audio/wav")
else:
st.error(f"❌ {duration_msg}")
st.session_state.reference_audio = None
st.session_state.reference_audio_path = None
# Video source selection
st.markdown("---")
video_source_option = st.radio(
"πŸ“Ή Video Source",
options=["Upload Video", "Use Demo Video"],
horizontal=True,
disabled=st.session_state.processing
)
video_file = None
demo_video_path = None
if video_source_option == "Upload Video":
video_file = st.file_uploader(
f"Upload video (mp4) - Max {MAX_FILE_SIZE_MB} MB",
type=["mp4"],
disabled=st.session_state.processing
)
if video_file:
is_valid, size_msg = check_file_size(video_file)
if is_valid:
st.info(size_msg)
else:
st.error(size_msg)
else: # Use Demo Video
demo_videos = get_demo_videos()
if not demo_videos:
st.warning(f"⚠️ No demo videos found in `{DEMO_VIDEOS_PATH}` folder.")
else:
st.markdown("**Select a demo video:**")
# Display demo videos in a more compact grid
cols = st.columns(3)
for idx, demo in enumerate(demo_videos):
with cols[idx % 3]:
# Use expander for cleaner look
with st.expander(f"πŸ“Ή {demo['name']}", expanded=False):
st.video(demo["path"], start_time=0)
if st.button(
"βœ… Use this video",
key=f"demo_{idx}",
disabled=st.session_state.processing,
use_container_width=True
):
st.session_state.selected_demo_video = demo["path"]
st.success(f"Selected!")
if st.session_state.selected_demo_video:
demo_video_path = st.session_state.selected_demo_video
st.success(f"βœ… Using demo video: **{os.path.basename(demo_video_path)}**")
# Text input with character counter
st.markdown("---")
st.markdown(f"**πŸ“ Enter Text in {language}**")
st.info("πŸ’‘ **Need help typing in Indian languages?** Use [this typing tool](https://www.easyhindityping.com/english-to-hindi-translation) to easily convert your text to the desired script.")
# Initialize session state for tracking
if "prev_text_length" not in st.session_state:
st.session_state.prev_text_length = 0
text_input = st.text_area(
f"Enter your text (Max {MAX_TEXT_LENGTH} characters)",
value="",
height=150,
disabled=st.session_state.processing,
key="text_input_area",
help="You can paste any length of text. We'll validate it when you submit.",
label_visibility="collapsed"
)
# Character count
char_count = len(text_input)
# Detect paste (large jump in character count)
if st.session_state.prev_text_length > 0:
char_diff = char_count - st.session_state.prev_text_length
if char_diff > 50: # Likely a paste operation
st.info(f"πŸ“‹ Pasted text detected ({char_diff} characters added)")
st.session_state.prev_text_length = char_count
# Real-time character counter with color coding
if char_count > MAX_TEXT_LENGTH:
excess = char_count - MAX_TEXT_LENGTH
st.error(f"❌ **Text exceeds limit by {excess} characters!** {char_count}/{MAX_TEXT_LENGTH} - Please trim before submitting")
elif char_count > MAX_TEXT_LENGTH - 20:
st.warning(f"⚠️ **Approaching limit:** {char_count}/{MAX_TEXT_LENGTH} characters")
elif char_count > 0:
st.info(f"✍️ **Characters:** {char_count}/{MAX_TEXT_LENGTH}")
else:
st.info(f"πŸ“ **Characters:** 0/{MAX_TEXT_LENGTH}")
# Submit button
st.markdown("---")
submit_btn = st.button(
"πŸš€ Generate Video",
type="primary",
disabled=st.session_state.processing
)
# Status area
status_placeholder = st.empty()
progress_placeholder = st.empty()
# Result area
video_placeholder = st.empty()
download_placeholder = st.empty()
# Show last generated video if available
if st.session_state.generated_path and os.path.exists(st.session_state.generated_path):
with video_placeholder.container():
st.success("βœ… Generated video available below:")
# Use columns to constrain video width
col1, col2, col3 = st.columns([1, 2, 1])
with col2:
st.video(st.session_state.generated_path, start_time=0)
with download_placeholder.container():
# Center the download button
col1, col2, col3 = st.columns([1, 2, 1])
with col2:
with open(st.session_state.generated_path, "rb") as f:
st.download_button(
"⬇️ Download Video",
data=f,
file_name="generated.mp4",
mime="video/mp4",
use_container_width=True
)
# Handle submission
if submit_btn:
# Validation
if video_source_option == "Upload Video" and not video_file:
st.error("❌ Please upload a video first.")
elif video_source_option == "Use Demo Video" and not demo_video_path:
st.error("❌ Please select a demo video first.")
elif voice_mode == "Clone Voice" and not st.session_state.reference_audio_path:
st.error("❌ Please provide a reference audio for voice cloning (5-300 seconds).")
elif not text_input.strip():
st.error("❌ Please enter text.")
elif len(text_input) > MAX_TEXT_LENGTH:
excess = len(text_input) - MAX_TEXT_LENGTH
st.error(f"❌ **Text exceeds limit by {excess} characters!** Current: {len(text_input)}/{MAX_TEXT_LENGTH}")
st.warning(f"πŸ’‘ **Tip:** Please trim your text to exactly {MAX_TEXT_LENGTH} characters or less before submitting.")
else:
# Check file size for uploaded videos
if video_source_option == "Upload Video":
is_valid, size_msg = check_file_size(video_file)
if not is_valid:
st.error(size_msg)
st.stop()
# Start processing
st.session_state.processing = True
status_placeholder.info("""🎬 Processing your video...
You will receive an email with the video link...""")
progress_placeholder.progress(0)
try:
# Determine video source
if video_source_option == "Upload Video":
video_source = video_file
filename = video_file.name
else:
video_source = demo_video_path
filename = os.path.basename(demo_video_path)
# Upload and process
generated_path, email_sent = stream_post_upload(
video_source=video_source,
filename=filename,
text=text_input,
voice_name=voice_name,
language=language,
email=st.session_state.email,
reference_audio_path=st.session_state.reference_audio_path
)
progress_placeholder.progress(100)
if generated_path and os.path.exists(generated_path):
st.session_state.generated_path = generated_path
status_placeholder.success("βœ… Video generated successfully!")
# Show email status
if email_sent:
st.info(f"πŸ“§ Download link sent to {st.session_state.email}")
else:
st.warning("⚠️ Video generated but email notification failed. Download below.")
# Update rate limit info
new_info = check_rate_limit(st.session_state.email)
if new_info:
st.session_state.rate_limit_info = new_info
# Display video
video_placeholder.empty()
download_placeholder.empty()
with video_placeholder.container():
st.success("βœ… Generated video:")
# Use columns to constrain video width
col1, col2, col3 = st.columns([1, 2, 1])
with col2:
st.video(generated_path, start_time=0)
with download_placeholder.container():
# Center the download button
col1, col2, col3 = st.columns([1, 2, 1])
with col2:
with open(generated_path, "rb") as f:
st.download_button(
"⬇️ Download Video",
data=f,
file_name="generated.mp4",
mime="video/mp4",
use_container_width=True
)
# Check if limit reached
if st.session_state.rate_limit_info.get('remaining', 0) == 0:
st.error("⚠️ You've reached your limit of 5 videos. Please use a different email to continue.")
else:
status_placeholder.error("❌ No video was generated.")
except TimeoutError as e:
status_placeholder.error("⏱️ Server Timeout")
st.error(f"Server is under heavy load. Please try again later.\n\n{str(e)}")
except ConnectionError as e:
status_placeholder.error("❌ Connection Error")
st.error(str(e))
except RuntimeError as e:
status_placeholder.error("❌ Request Failed")
st.error(str(e))
except Exception as e:
status_placeholder.error("❌ Unexpected Error")
st.error(f"An unexpected error occurred: {str(e)}")
finally:
st.session_state.processing = False
progress_placeholder.empty()
# Cleanup temporary audio file after successful generation
if st.session_state.reference_audio_path and os.path.exists(st.session_state.reference_audio_path):
try:
os.unlink(st.session_state.reference_audio_path)
st.session_state.reference_audio_path = None
st.session_state.reference_audio = None
except:
pass