import streamlit as st import requests import tempfile import os import re from pathlib import Path from typing import Optional, Dict, List from dotenv import load_dotenv load_dotenv() # Configuration API_KEY = os.getenv("FaceX_API_KEY") CHECK_RATE_LIMIT_URL = os.getenv("CHECK_RATE_LIMIT_URL") TASK_UPLOAD_URL = os.getenv("TASK_UPLOAD_URL") DEMO_VIDEOS_PATH = "./assets" # Change this path as needed GENERATED_DEMO_VIDEOS_PATH = "./generated_demos" # Path for generated demo videos showcase REQUEST_TIMEOUT = 3600 MAX_FILE_SIZE_MB = 10 MAX_TEXT_LENGTH = 300 LANGUAGE_VOICES: Dict[str, List[str]] = { "Hindi": ["Kavya", "Priya", "Jyoti"], "Telugu": ["Nandini", "Rashmi", "Riya"], "English": ["Maria", "Ishita", "Aditi"], "Tamil": ["Jessica", "Jasmine", "Rashmi", "Amy", "Fatima"], "Kannada": ["Manisha", "Hema", "Uma"], "Bhojpuri": ["Anju", "Gayatri", "Radha"], "Bengali": ["Madhumita", "Shrabonti", "Subhashree"], "Maithili": ["Jaanki", "Shital", "Anuradha"], "Magahi": ["Amrita", "Anu", "Radhika"], "Gujarati": ["Hetvi", "Parul", "Janvi"], "Marathi": ['Vashnavi', "Varsha", "Savita"], "Malayalam": ["Nandini", "Uma", "Hema", "Manisha"], "Chattisgarhi": ["Kusum", "Mamata", "Kamla"] } def init_state(): """Initialize session state variables""" if "email_verified" not in st.session_state: st.session_state.email_verified = False if "email" not in st.session_state: st.session_state.email = "" if "rate_limit_info" not in st.session_state: st.session_state.rate_limit_info = {} if "generated_path" not in st.session_state: st.session_state.generated_path = None if "processing" not in st.session_state: st.session_state.processing = False if "selected_demo_video" not in st.session_state: st.session_state.selected_demo_video = None if "reference_audio" not in st.session_state: st.session_state.reference_audio = None if "reference_audio_path" not in st.session_state: st.session_state.reference_audio_path = None def validate_email(email: str) -> bool: """Validate email format using regex""" pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' return re.match(pattern, email) is not None def _on_text_change(): """ Called whenever the text area value changes (typing or paste). Sets session flags used by the UI to show warnings. """ text = st.session_state.get("text_input", "") or "" st.session_state["text_exceeded"] = len(text) > MAX_TEXT_LENGTH st.session_state["text_exceeded_count"] = len(text) def check_rate_limit(email: str) -> Optional[Dict]: """Check rate limit for email via API""" try: response = requests.post( CHECK_RATE_LIMIT_URL, json={"email": email}, headers={"Authorization": f"Bearer {API_KEY}"}, timeout=10 ) if response.status_code == 200: return response.json() else: st.error(f"Error checking rate limit: {response.json().get('detail', 'Unknown error')}") return None except Exception as e: st.error(f"Failed to connect to server: {str(e)}") return None def check_audio_duration(audio_file) -> tuple[bool, str, float]: """Check if audio file duration is between 5 and 300 seconds""" try: import wave import contextlib # Save uploaded file temporarily to check duration with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file: audio_file.seek(0) tmp_file.write(audio_file.read()) tmp_file.flush() tmp_path = tmp_file.name # Get audio duration with contextlib.closing(wave.open(tmp_path, 'r')) as f: frames = f.getnframes() rate = f.getframerate() duration = frames / float(rate) audio_file.seek(0) # Reset file pointer if duration < 5: os.unlink(tmp_path) return False, f"Audio too short ({duration:.1f}s). Minimum 5 seconds required.", duration elif duration > 300: os.unlink(tmp_path) return False, f"Audio too long ({duration:.1f}s). Maximum 300 seconds allowed.", duration return True, f"Audio duration: {duration:.1f} seconds", duration except Exception as e: return False, f"Error checking audio duration: {str(e)}", 0 def check_file_size(file_obj) -> tuple[bool, str]: """Check if uploaded file is within size limits""" try: file_obj.seek(0, os.SEEK_END) file_size = file_obj.tell() file_obj.seek(0) file_size_mb = file_size / (1024 * 1024) if file_size_mb > MAX_FILE_SIZE_MB: return False, f"File size ({file_size_mb:.2f} MB) exceeds maximum of {MAX_FILE_SIZE_MB} MB" return True, f"File size: {file_size_mb:.2f} MB" except Exception as e: return False, f"Error checking file size: {str(e)}" def get_demo_videos() -> List[Dict[str, str]]: """Get list of demo videos from demo folder""" demo_videos = [] if os.path.exists(DEMO_VIDEOS_PATH): for file in os.listdir(DEMO_VIDEOS_PATH): if file.endswith(('.mp4', '.avi', '.mov')): demo_videos.append({ "name": file, "path": os.path.join(DEMO_VIDEOS_PATH, file) }) return demo_videos def get_generated_demo_videos() -> List[Dict[str, str]]: """Get list of generated demo videos for showcase""" generated_demos = [] if os.path.exists(GENERATED_DEMO_VIDEOS_PATH): for file in sorted(os.listdir(GENERATED_DEMO_VIDEOS_PATH)): if file.endswith(('.mp4', '.avi', '.mov')): generated_demos.append({ "name": file, "path": os.path.join(GENERATED_DEMO_VIDEOS_PATH, file) }) return generated_demos def stream_post_upload(video_source, filename: str, text: str, voice_name: str, language: str, email: str, reference_audio_path: Optional[str] = None) -> Optional[str]: """ Stream multipart/form-data POST to backend video_source can be file object or path string (for demo videos) reference_audio_path: path to cloned voice audio file (if using voice cloning) """ try: headers = {"Authorization": f"Bearer {API_KEY}"} lang_dict = { "Hindi": "hi", "Telugu": "te", "English": "en", "Tamil": "ta", "Kannada": "kn", "Bhojpuri": "bho", "Bengali": "bn", "Maithili": "mai", "Magahi": "mag", "Gujarati": "gu", "Marathi": "mr", "Malayalam": "ml", "Chattisgarhi": "hne" } data = { "text": text, "email": email, "language": lang_dict.get(language, "hi") } # Add voice_name (can be None if using voice cloning) data["voice_name"] = voice_name if voice_name else "" files = {} # Handle demo video (path string) or uploaded file (file object) if isinstance(video_source, str): # Demo video - open file video_file = open(video_source, "rb") files["video"] = (filename, video_file, "video/mp4") else: # Uploaded file video_source.seek(0) files["video"] = (filename, video_source, "video/mp4") # Add voice_cloning file if provided if reference_audio_path and os.path.exists(reference_audio_path): audio_file = open(reference_audio_path, "rb") files["voice_cloning"] = ("reference.wav", audio_file, "audio/wav") try: resp = requests.post( TASK_UPLOAD_URL, headers=headers, data=data, files=files, stream=True, timeout=REQUEST_TIMEOUT ) finally: # Close any opened files if isinstance(video_source, str): video_file.close() if reference_audio_path and 'audio_file' in locals(): audio_file.close() if not resp.ok: try: err = resp.json() except: err = resp.text raise RuntimeError(f"Server returned {resp.status_code}: {err}") # Check if email was sent email_sent = resp.headers.get('X-Email-Sent', 'true') != 'false' # Save streamed response tmp_out = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") tmp_out_path = tmp_out.name tmp_out.close() with open(tmp_out_path, "wb") as out_f: for chunk in resp.iter_content(chunk_size=8192): if chunk: out_f.write(chunk) return tmp_out_path, email_sent except requests.exceptions.Timeout: raise TimeoutError("Server timeout. Please try again later.") except requests.exceptions.ConnectionError: raise ConnectionError("Unable to connect to server.") except Exception as e: raise RuntimeError(f"Upload failed: {str(e)}") # Page config st.set_page_config(page_title="ORI-FaceX", layout="wide") init_state() st.title("FaceX") st.markdown("Transform your videos with AI-powered lip-sync and expressive voices!") # Sidebar with info and rate limit display with st.sidebar: st.header("â„šī¸ How It Works") st.markdown(""" 1. **Enter your email** to verify eligibility 2. **Select a language** and voice 3. **Choose** demo video or upload your own 4. **Enter text** in your chosen language (max 300 characters) - Use [this tool](https://www.easyhindityping.com/english-to-hindi-translation) for easy typing in Indian languages 5. **Generate** and download your video """) st.markdown("---") # Show rate limit info if email verified if st.session_state.email_verified: st.success(f"📧 **Email:** {st.session_state.email}") info = st.session_state.rate_limit_info current = info.get('current_count', 0) remaining = info.get('remaining', 5) st.metric("Videos Used", f"{current}/5") st.metric("Remaining", remaining) # Progress bar progress = current / 5 st.progress(progress) if remaining == 0: st.error("âš ī¸ Limit reached!") st.markdown("---") st.warning("âš ī¸ **Use responsibly!** Do not create misleading or harmful content.") st.markdown("---") st.markdown("Reach out to us at ai-team@oriserve.com") # Generated Demo Videos Showcase Section st.markdown("---") st.subheader("đŸŽĨ See What's Possible") st.markdown("Check out these examples of AI-generated videos using TalkMorph:") generated_demos = get_generated_demo_videos() if generated_demos: # Create tabs or columns to display generated demos with consistent sizing demo_cols = st.columns(3) for idx, demo in enumerate(generated_demos): with demo_cols[idx % 3]: # Use container with fixed aspect ratio with st.container(): st.video(demo["path"], start_time=0) # Remove file extension and format name nicely display_name = os.path.splitext(demo["name"])[0].replace("_", " ").title() st.caption(f"**{display_name}**") if len(generated_demos) > 3: st.markdown("*More examples available - scroll down to see all*") else: st.info(f"📁 No demo videos found. Add videos to `{GENERATED_DEMO_VIDEOS_PATH}` to showcase examples.") # Main content st.markdown("---") # Email Verification Section if not st.session_state.email_verified: st.subheader("📧 Email Verification") st.info("Enter your email to check eligibility. Each email can generate up to **5 videos**.") col1, col2 = st.columns([3, 1]) with col1: email_input = st.text_input( "Email Address", placeholder="your.email@example.com", key="email_input", disabled=st.session_state.processing ) with col2: st.write("") # Spacing st.write("") # Spacing verify_btn = st.button("Verify Email", type="primary", disabled=st.session_state.processing) if verify_btn: if not email_input: st.error("Please enter your email address") elif not validate_email(email_input): st.error("❌ Invalid email format. Please enter a valid email.") else: with st.spinner("Checking your email..."): rate_limit_info = check_rate_limit(email_input) if rate_limit_info: if rate_limit_info['can_proceed']: st.session_state.email = email_input st.session_state.email_verified = True st.session_state.rate_limit_info = rate_limit_info st.success(f"✅ Email verified! You have {rate_limit_info['remaining']} videos remaining.") st.rerun() else: st.error(f"❌ {rate_limit_info['message']}") st.info("💡 Try with a different email address to continue.") # Video Generation Section (only after email verification) else: # Show locked email with change option col1, col2 = st.columns([4, 1]) with col1: st.text_input( "📧 Email Address (Verified)", value=st.session_state.email, disabled=True, key="email_locked" ) with col2: st.write("") # Spacing st.write("") # Spacing if st.button("Change Email", disabled=st.session_state.processing): st.session_state.email_verified = False st.session_state.email = "" st.session_state.rate_limit_info = {} st.rerun() st.markdown("---") st.subheader("đŸŽŦ Generate Your Video") # Language selection language = st.selectbox( "🌍 Select Language", options=list(LANGUAGE_VOICES.keys()), disabled=st.session_state.processing ) # Voice Mode Selection st.markdown("---") voice_mode = st.radio( "đŸŽ™ī¸ Voice Selection Mode", options=["Default Voice", "Clone Voice"], horizontal=True, disabled=st.session_state.processing, help="Choose a pre-configured voice or clone a custom voice from audio" ) voice_name = None reference_audio = None if voice_mode == "Default Voice": # Get voices for selected language available_voices = LANGUAGE_VOICES.get(language, []) voice_name = st.selectbox( "Select Voice", options=available_voices, disabled=st.session_state.processing ) st.session_state.reference_audio = None st.session_state.reference_audio_path = None else: # Clone Voice st.info("đŸ“ĸ **Voice Cloning:** Provide a reference audio (5-300 seconds) to clone the voice") audio_source = st.radio( "Reference Audio Source", options=["Upload Audio File", "Record Audio"], horizontal=True, disabled=st.session_state.processing ) if audio_source == "Upload Audio File": reference_audio = st.file_uploader( "Upload Reference Audio (.wav format, 5-300 seconds)", type=["wav"], disabled=st.session_state.processing, key="audio_uploader" ) if reference_audio: # Check audio duration is_valid, duration_msg, duration = check_audio_duration(reference_audio) if is_valid: st.success(f"✅ {duration_msg}") # Save to temporary file in session state if st.session_state.reference_audio_path: try: os.unlink(st.session_state.reference_audio_path) except: pass tmp_audio = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") reference_audio.seek(0) tmp_audio.write(reference_audio.read()) tmp_audio.flush() tmp_audio.close() st.session_state.reference_audio = reference_audio st.session_state.reference_audio_path = tmp_audio.name # Show audio player st.audio(reference_audio, format="audio/wav") else: st.error(f"❌ {duration_msg}") st.session_state.reference_audio = None st.session_state.reference_audio_path = None else: # Record Audio st.warning("🎤 **Note:** Recorded audio must be between 5-300 seconds") reference_audio = st.audio_input( "Record Reference Audio (speak for 5-300 seconds)", disabled=st.session_state.processing, key="audio_recorder" ) if reference_audio: # Check audio duration is_valid, duration_msg, duration = check_audio_duration(reference_audio) if is_valid: st.success(f"✅ {duration_msg}") # Save to temporary file in session state if st.session_state.reference_audio_path: try: os.unlink(st.session_state.reference_audio_path) except: pass tmp_audio = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") reference_audio.seek(0) tmp_audio.write(reference_audio.read()) tmp_audio.flush() tmp_audio.close() st.session_state.reference_audio = reference_audio st.session_state.reference_audio_path = tmp_audio.name # Show audio player st.audio(reference_audio, format="audio/wav") else: st.error(f"❌ {duration_msg}") st.session_state.reference_audio = None st.session_state.reference_audio_path = None # Video source selection st.markdown("---") video_source_option = st.radio( "📹 Video Source", options=["Upload Video", "Use Demo Video"], horizontal=True, disabled=st.session_state.processing ) video_file = None demo_video_path = None if video_source_option == "Upload Video": video_file = st.file_uploader( f"Upload video (mp4) - Max {MAX_FILE_SIZE_MB} MB", type=["mp4"], disabled=st.session_state.processing ) if video_file: is_valid, size_msg = check_file_size(video_file) if is_valid: st.info(size_msg) else: st.error(size_msg) else: # Use Demo Video demo_videos = get_demo_videos() if not demo_videos: st.warning(f"âš ī¸ No demo videos found in `{DEMO_VIDEOS_PATH}` folder.") else: st.markdown("**Select a demo video:**") # Display demo videos in a more compact grid cols = st.columns(3) for idx, demo in enumerate(demo_videos): with cols[idx % 3]: # Use expander for cleaner look with st.expander(f"📹 {demo['name']}", expanded=False): st.video(demo["path"], start_time=0) if st.button( "✅ Use this video", key=f"demo_{idx}", disabled=st.session_state.processing, use_container_width=True ): st.session_state.selected_demo_video = demo["path"] st.success(f"Selected!") if st.session_state.selected_demo_video: demo_video_path = st.session_state.selected_demo_video st.success(f"✅ Using demo video: **{os.path.basename(demo_video_path)}**") # Text input with character counter st.markdown("---") st.markdown(f"**📝 Enter Text in {language}**") st.info("💡 **Need help typing in Indian languages?** Use [this typing tool](https://www.easyhindityping.com/english-to-hindi-translation) to easily convert your text to the desired script.") # Initialize session state for tracking if "prev_text_length" not in st.session_state: st.session_state.prev_text_length = 0 text_input = st.text_area( f"Enter your text (Max {MAX_TEXT_LENGTH} characters)", value="", height=150, disabled=st.session_state.processing, key="text_input_area", help="You can paste any length of text. We'll validate it when you submit.", label_visibility="collapsed" ) # Character count char_count = len(text_input) # Detect paste (large jump in character count) if st.session_state.prev_text_length > 0: char_diff = char_count - st.session_state.prev_text_length if char_diff > 50: # Likely a paste operation st.info(f"📋 Pasted text detected ({char_diff} characters added)") st.session_state.prev_text_length = char_count # Real-time character counter with color coding if char_count > MAX_TEXT_LENGTH: excess = char_count - MAX_TEXT_LENGTH st.error(f"❌ **Text exceeds limit by {excess} characters!** {char_count}/{MAX_TEXT_LENGTH} - Please trim before submitting") elif char_count > MAX_TEXT_LENGTH - 20: st.warning(f"âš ī¸ **Approaching limit:** {char_count}/{MAX_TEXT_LENGTH} characters") elif char_count > 0: st.info(f"âœī¸ **Characters:** {char_count}/{MAX_TEXT_LENGTH}") else: st.info(f"📝 **Characters:** 0/{MAX_TEXT_LENGTH}") # Submit button st.markdown("---") submit_btn = st.button( "🚀 Generate Video", type="primary", disabled=st.session_state.processing ) # Status area status_placeholder = st.empty() progress_placeholder = st.empty() # Result area video_placeholder = st.empty() download_placeholder = st.empty() # Show last generated video if available if st.session_state.generated_path and os.path.exists(st.session_state.generated_path): with video_placeholder.container(): st.success("✅ Generated video available below:") # Use columns to constrain video width col1, col2, col3 = st.columns([1, 2, 1]) with col2: st.video(st.session_state.generated_path, start_time=0) with download_placeholder.container(): # Center the download button col1, col2, col3 = st.columns([1, 2, 1]) with col2: with open(st.session_state.generated_path, "rb") as f: st.download_button( "âŦ‡ī¸ Download Video", data=f, file_name="generated.mp4", mime="video/mp4", use_container_width=True ) # Handle submission if submit_btn: # Validation if video_source_option == "Upload Video" and not video_file: st.error("❌ Please upload a video first.") elif video_source_option == "Use Demo Video" and not demo_video_path: st.error("❌ Please select a demo video first.") elif voice_mode == "Clone Voice" and not st.session_state.reference_audio_path: st.error("❌ Please provide a reference audio for voice cloning (5-300 seconds).") elif not text_input.strip(): st.error("❌ Please enter text.") elif len(text_input) > MAX_TEXT_LENGTH: excess = len(text_input) - MAX_TEXT_LENGTH st.error(f"❌ **Text exceeds limit by {excess} characters!** Current: {len(text_input)}/{MAX_TEXT_LENGTH}") st.warning(f"💡 **Tip:** Please trim your text to exactly {MAX_TEXT_LENGTH} characters or less before submitting.") else: # Check file size for uploaded videos if video_source_option == "Upload Video": is_valid, size_msg = check_file_size(video_file) if not is_valid: st.error(size_msg) st.stop() # Start processing st.session_state.processing = True status_placeholder.info("""đŸŽŦ Processing your video... You will receive an email with the video link...""") progress_placeholder.progress(0) try: # Determine video source if video_source_option == "Upload Video": video_source = video_file filename = video_file.name else: video_source = demo_video_path filename = os.path.basename(demo_video_path) # Upload and process generated_path, email_sent = stream_post_upload( video_source=video_source, filename=filename, text=text_input, voice_name=voice_name, language=language, email=st.session_state.email, reference_audio_path=st.session_state.reference_audio_path ) progress_placeholder.progress(100) if generated_path and os.path.exists(generated_path): st.session_state.generated_path = generated_path status_placeholder.success("✅ Video generated successfully!") # Show email status if email_sent: st.info(f"📧 Download link sent to {st.session_state.email}") else: st.warning("âš ī¸ Video generated but email notification failed. Download below.") # Update rate limit info new_info = check_rate_limit(st.session_state.email) if new_info: st.session_state.rate_limit_info = new_info # Display video video_placeholder.empty() download_placeholder.empty() with video_placeholder.container(): st.success("✅ Generated video:") # Use columns to constrain video width col1, col2, col3 = st.columns([1, 2, 1]) with col2: st.video(generated_path, start_time=0) with download_placeholder.container(): # Center the download button col1, col2, col3 = st.columns([1, 2, 1]) with col2: with open(generated_path, "rb") as f: st.download_button( "âŦ‡ī¸ Download Video", data=f, file_name="generated.mp4", mime="video/mp4", use_container_width=True ) # Check if limit reached if st.session_state.rate_limit_info.get('remaining', 0) == 0: st.error("âš ī¸ You've reached your limit of 5 videos. Please use a different email to continue.") else: status_placeholder.error("❌ No video was generated.") except TimeoutError as e: status_placeholder.error("âąī¸ Server Timeout") st.error(f"Server is under heavy load. Please try again later.\n\n{str(e)}") except ConnectionError as e: status_placeholder.error("❌ Connection Error") st.error(str(e)) except RuntimeError as e: status_placeholder.error("❌ Request Failed") st.error(str(e)) except Exception as e: status_placeholder.error("❌ Unexpected Error") st.error(f"An unexpected error occurred: {str(e)}") finally: st.session_state.processing = False progress_placeholder.empty() # Cleanup temporary audio file after successful generation if st.session_state.reference_audio_path and os.path.exists(st.session_state.reference_audio_path): try: os.unlink(st.session_state.reference_audio_path) st.session_state.reference_audio_path = None st.session_state.reference_audio = None except: pass