import os import uuid import subprocess import PIL.Image # Monkeypatch PIL.Image.ANTIALIAS for compatibility with moviepy and newer Pillow versions if not hasattr(PIL.Image, 'ANTIALIAS'): PIL.Image.ANTIALIAS = PIL.Image.Resampling.LANCZOS from flask import Flask, request, jsonify, send_from_directory, render_template from utils.audio_engine import ( generate_speech, create_mixed_audio, generate_preview, generate_ai_video, VOICES, VOICE_STYLES, MOOD_LABELS, AGE_PRESETS ) from utils.video_effects import concatenate_clips, apply_copyright_filters, slice_video app = Flask(__name__, static_folder='static', static_url_path='') # Configuration UPLOAD_FOLDER = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'uploads') OUTPUT_FOLDER = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'outputs') os.makedirs(UPLOAD_FOLDER, exist_ok=True) os.makedirs(OUTPUT_FOLDER, exist_ok=True) # ── Clean up any leftover temp/partial files on startup ── for _f in os.listdir(OUTPUT_FOLDER): if '.temp' in _f or 'TEMP_MPY' in _f or _f.endswith('.part'): try: os.remove(os.path.join(OUTPUT_FOLDER, _f)) print(f'[cleanup] Removed temp file: {_f}') except Exception: pass # Helper to execute shell commands (e.g. yt-dlp using local venv) def get_pip_binary(binary_name): # Returns path to binary in virtual env venv_bin = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'venv', 'bin', binary_name) if os.path.exists(venv_bin): return venv_bin return binary_name @app.route('/') def index(): return send_from_directory('static', 'index.html') @app.route('/api/voices', methods=['GET']) def get_voices(): """Returns voices + style + age metadata.""" return jsonify({ 'voices': VOICES, 'voice_styles': VOICE_STYLES, 'mood_labels': MOOD_LABELS, 'age_presets': AGE_PRESETS, }) @app.route('/api/preview-voice', methods=['POST']) def preview_voice(): """ Generate a short voice preview sample and stream it back as audio/mpeg. Body: { voice, lang, style, style_degree, rate, pitch } """ import tempfile from flask import send_file voice = request.form.get('voice', 'en-US-EmmaMultilingualNeural') lang = request.form.get('lang', 'en-US') style = request.form.get('style', '') style_degree = float(request.form.get('style_degree', 1.0)) rate = request.form.get('rate', '+0%') pitch = request.form.get('pitch', '+0Hz') # Write to a temp file tmp = tempfile.NamedTemporaryFile(suffix='.mp3', delete=False, dir=UPLOAD_FOLDER) tmp.close() out_path = tmp.name ok = generate_preview(voice=voice, lang_prefix=lang, style=style, style_degree=style_degree, rate=rate, pitch=pitch, output_path=out_path) if not ok or not os.path.exists(out_path): return jsonify({'error': 'Preview generation failed'}), 500 return send_file(out_path, mimetype='audio/mpeg', as_attachment=False, download_name='preview.mp3') @app.route('/api/clear-library', methods=['POST']) def clear_library(): """Delete all exported files from the outputs folder.""" deleted, failed = [], [] for f in os.listdir(OUTPUT_FOLDER): if f.endswith('.mp4') or f.endswith('.mp3'): try: os.remove(os.path.join(OUTPUT_FOLDER, f)) deleted.append(f) except Exception as ex: failed.append({'file': f, 'error': str(ex)}) return jsonify({'deleted': len(deleted), 'failed': failed}) @app.route('/api/merge', methods=['POST']) def merge_videos(): """ Merge uploaded video clips and overlay generated/uploaded audio. Supports style, rate, pitch, style_degree for TTS voices. """ try: aspect_ratio = request.form.get('aspect_ratio', 'vertical') audio_source = request.form.get('audio_source', 'script') language = request.form.get('language', 'ur-PK') voice_id = request.form.get('voice', 'ur-PK-UzmaNeural') script_text = request.form.get('script_text', '') trim_audio = request.form.get('trim_audio', 'true') == 'true' # Voice style / mood params voice_style = request.form.get('style', '') voice_style_degree = float(request.form.get('style_degree', 1.0)) voice_rate = request.form.get('rate', '+0%') voice_pitch = request.form.get('pitch', '+0Hz') # Handle video uploads uploaded_videos = request.files.getlist('videos') if not uploaded_videos or len(uploaded_videos) == 0 or uploaded_videos[0].filename == '': return jsonify({'success': False, 'error': 'No video files uploaded.'}), 400 video_paths = [] job_id = str(uuid.uuid4()) # Save video uploads for idx, file in enumerate(uploaded_videos): filename = f"{job_id}_video_{idx}.mp4" filepath = os.path.join(UPLOAD_FOLDER, filename) file.save(filepath) video_paths.append(filepath) # 1. Merge videos together merged_temp_video = os.path.join(UPLOAD_FOLDER, f"{job_id}_merged_raw.mp4") duration = concatenate_clips(video_paths, aspect_ratio=aspect_ratio, output_path=merged_temp_video) # 2. Process / Generate audio final_audio_path = None speech_path = None bg_music_path = None if audio_source == 'script' and script_text.strip(): # Generate speech audio speech_path = os.path.join(UPLOAD_FOLDER, f"{job_id}_speech.mp3") generate_speech(script_text, voice_id, speech_path, rate=voice_rate, pitch=voice_pitch, style=voice_style, style_degree=voice_style_degree) elif audio_source == 'upload': uploaded_audio = request.files.get('audio_file') if uploaded_audio and uploaded_audio.filename != '': speech_path = os.path.join(UPLOAD_FOLDER, f"{job_id}_user_audio.mp3") uploaded_audio.save(speech_path) # Handle optional background music upload bg_music_file = request.files.get('bg_music_file') if bg_music_file and bg_music_file.filename != '': bg_music_path = os.path.join(UPLOAD_FOLDER, f"{job_id}_bg_music.mp3") bg_music_file.save(bg_music_path) # Mix audio tracks if we have speech or background music if speech_path or bg_music_path: mixed_audio = os.path.join(UPLOAD_FOLDER, f"{job_id}_mixed.mp3") ok = create_mixed_audio( voiceover_path=speech_path, bg_music_path=bg_music_path, target_duration=duration, output_path=mixed_audio ) if ok and os.path.exists(mixed_audio): final_audio_path = mixed_audio # 3. Combine merged video and final audio output_filename = f"merged_{job_id[:8]}.mp4" final_output_path = os.path.join(OUTPUT_FOLDER, output_filename) if final_audio_path and os.path.exists(final_audio_path): # Combine via FFmpeg for speed and precision cmd = [ "ffmpeg", "-y", "-i", merged_temp_video, "-i", final_audio_path, "-map", "0:v", "-map", "1:a", "-c:v", "copy", "-c:a", "aac", "-shortest" if trim_audio else "", final_output_path ] # Remove empty arguments from command cmd = [c for c in cmd if c != ""] subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True) else: # No audio overlay, rename raw merged video os.rename(merged_temp_video, final_output_path) # Cleanup temp upload files for p in video_paths + [merged_temp_video, speech_path, bg_music_path, final_audio_path]: if p and os.path.exists(p): try: os.remove(p) except Exception: pass return jsonify({ 'success': True, 'message': 'Videos merged successfully!', 'filename': output_filename, 'duration': f"{duration:.2f}s" }) except Exception as e: import traceback traceback.print_exc() return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/clip', methods=['POST']) def clip_youtube_video(): """ Downloads a YouTube video, cuts it into clips, and applies anti-copyright filters. """ try: url = request.form.get('url') if not url: return jsonify({'success': False, 'error': 'YouTube URL is required.'}), 400 mode = request.form.get('mode', 'auto') # 'auto' or 'timestamps' interval = int(request.form.get('interval', 8)) timestamps_str = request.form.get('timestamps', '') # Filter options filters = { 'aspect_ratio': request.form.get('aspect_ratio', 'original'), 'mirror': request.form.get('mirror', 'true') == 'true', 'zoom': request.form.get('zoom', 'true') == 'true', 'speed': float(request.form.get('speed', 1.04)), 'pitch_shift': float(request.form.get('pitch_shift', 0.8)) } job_id = str(uuid.uuid4()) raw_download_path = os.path.join(UPLOAD_FOLDER, f"{job_id}_raw.mp4") # 1. Download YouTube Video using yt-dlp ytdlp_bin = get_pip_binary('yt-dlp') print(f"Downloading video from {url}...") # Check if curl-cffi is available for --impersonate support try: import curl_cffi has_curl_cffi = True except ImportError: has_curl_cffi = False download_cmd = [ ytdlp_bin, # Quality: max 720p "-f", "bestvideo[height<=720][ext=mp4]+bestaudio[ext=m4a]/bestvideo[height<=720]+bestaudio/best[height<=720]/best", "--merge-output-format", "mp4", # Player client: ios/android work for most videos without embedding restrictions # tv_embedded is intentionally excluded — it fails for non-embeddable videos "--extractor-args", "youtube:player_client=ios,android,mweb,web", # Geo-bypass "--geo-bypass", # SSL resilience "--no-check-certificates", "--socket-timeout", "60", # Retry logic "--retries", "10", "--fragment-retries", "10", "--retry-sleep", "exp=1:30", # Output "-o", raw_download_path, url ] # Only add --impersonate if curl-cffi is installed if has_curl_cffi: download_cmd.insert(3, "chrome") download_cmd.insert(3, "--impersonate") result = subprocess.run(download_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=480) stderr_text = result.stderr.decode('utf-8', errors='ignore') if result.returncode != 0 or not os.path.exists(raw_download_path) or os.path.getsize(raw_download_path) == 0: # Check impersonation error FIRST before generic "unavailable" if 'impersonate' in stderr_text.lower() or ('curl' in stderr_text.lower() and 'cffi' in stderr_text.lower()): err = 'Browser impersonation library missing. Try again (it will retry without impersonation).' elif 'SSL' in stderr_text or 'EOF' in stderr_text: err = 'YouTube SSL error. Try a different video or try again in a few seconds.' elif 'Private' in stderr_text or 'members-only' in stderr_text: err = 'This video is private or members-only.' elif 'Sign in' in stderr_text: err = 'YouTube requires sign-in for this video. Try a fully public video.' elif 'removed' in stderr_text or 'unavailable' in stderr_text or 'not available' in stderr_text: err = 'This video is unavailable or has been removed from YouTube.' elif 'bot' in stderr_text.lower() or 'detected' in stderr_text.lower(): err = 'YouTube detected bot activity. Try a different video.' else: # Show raw error so we can diagnose err = f'yt-dlp error: {stderr_text[-800:]}' if stderr_text else 'Unknown download error' return jsonify({'success': False, 'error': f'Download failed: {err}'}), 500 # 2. Slice downloaded video temp_clips_dir = os.path.join(UPLOAD_FOLDER, f"{job_id}_slices") os.makedirs(temp_clips_dir, exist_ok=True) custom_ranges = [] if mode == 'timestamps' and timestamps_str: # Parse timestamps "10-20, 30-45" parts = timestamps_str.split(',') for part in parts: subparts = part.strip().split('-') if len(subparts) == 2: try: start_t = float(subparts[0].strip()) end_t = float(subparts[1].strip()) custom_ranges.append([start_t, end_t]) except ValueError: pass sliced_files = slice_video( raw_download_path, temp_clips_dir, mode=mode, intervals=interval, custom_ranges=custom_ranges ) if not sliced_files: return jsonify({'success': False, 'error': 'No clips generated during slicing.'}), 500 # 3. Apply safety filters to each clip and save to outputs processed_files = [] for idx, file_path in enumerate(sliced_files, 1): out_filename = f"clip_{job_id[:8]}_{idx}.mp4" out_path = os.path.join(OUTPUT_FOLDER, out_filename) apply_copyright_filters(file_path, out_path, filters) if os.path.exists(out_path): processed_files.append(out_filename) # Cleanup raw downloaded file & sliced folder if os.path.exists(raw_download_path): os.remove(raw_download_path) import shutil if os.path.exists(temp_clips_dir): shutil.rmtree(temp_clips_dir) return jsonify({ 'success': True, 'message': f'YouTube video processed into {len(processed_files)} clips!', 'filenames': processed_files }) except Exception as e: import traceback traceback.print_exc() return jsonify({'success': False, 'error': str(e)}), 500 def get_video_duration(path): try: cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", path ] res = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) if res.returncode == 0: duration = float(res.stdout.strip()) return f"{duration:.1f}s" except Exception: pass return "unknown" @app.route('/api/outputs', methods=['GET']) def get_outputs(): """Lists all real (non-temp) output files, newest first.""" SKIP_PATTERNS = ('.temp', 'TEMP_MPY', '.part', '.tmp') files = [] for f in os.listdir(OUTPUT_FOLDER): # Only include clean .mp4 files — skip any temp/partial artifacts if not f.endswith('.mp4'): continue if any(pat in f for pat in SKIP_PATTERNS): # Also delete them from disk so they don't pile up try: os.remove(os.path.join(OUTPUT_FOLDER, f)) except Exception: pass continue path = os.path.join(OUTPUT_FOLDER, f) size_mb = os.path.getsize(path) / (1024 * 1024) files.append({ 'filename': f, 'size': f"{size_mb:.2f} MB", 'duration': get_video_duration(path) }) # Sort newest first files.sort( key=lambda x: os.path.getmtime(os.path.join(OUTPUT_FOLDER, x['filename'])), reverse=True ) return jsonify(files) @app.route('/api/generate-video', methods=['POST']) def generate_video(): """ End-to-end AI script-to-video generation route. """ try: script_text = request.form.get('script_text', '') theme = request.form.get('theme', 'auto') aspect_ratio = request.form.get('aspect_ratio', 'vertical') voice_id = request.form.get('voice', 'ur-PK-UzmaNeural') voice_rate = request.form.get('rate', '+0%') voice_pitch = request.form.get('pitch', '+0Hz') trim_audio = request.form.get('trim_audio', 'true') == 'true' if not script_text.strip(): return jsonify({'success': False, 'error': 'Script text is required.'}), 400 bg_music_file = request.files.get('bg_music_file') job_id = str(uuid.uuid4()) output_filename = f"ai_video_{job_id[:8]}.mp4" final_output_path = os.path.join(OUTPUT_FOLDER, output_filename) res = generate_ai_video( script_text=script_text, theme=theme, aspect_ratio=aspect_ratio, voice_id=voice_id, rate=voice_rate, pitch=voice_pitch, bg_music_file=bg_music_file, trim_audio=trim_audio, output_path=final_output_path ) if res.get('success'): return jsonify({ 'success': True, 'message': 'AI Video generated successfully!', 'filename': output_filename, 'duration': f"{res.get('duration'):.2f}s", 'slides': res.get('sentences_count') }) else: return jsonify({'success': False, 'error': 'Video generation failed.'}), 500 except Exception as e: import traceback traceback.print_exc() return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/outputs/', methods=['GET']) def download_file(filename): """Serves file from outputs folder.""" return send_from_directory(OUTPUT_FOLDER, filename) if __name__ == '__main__': import os port = int(os.environ.get('PORT', 5005)) host = os.environ.get('HOST', '127.0.0.1') debug = os.environ.get('FLASK_ENV', 'production') == 'development' app.run(debug=debug, host=host, port=port)