Spaces:
Sleeping
Sleeping
Poker
fix: remove tv_embedded client (breaks non-embeddable videos), conditional impersonate, better error messages
b8a8778 | import os | |
| import uuid | |
| import subprocess | |
| import PIL.Image | |
| # Monkeypatch PIL.Image.ANTIALIAS for compatibility with moviepy and newer Pillow versions | |
| if not hasattr(PIL.Image, 'ANTIALIAS'): | |
| PIL.Image.ANTIALIAS = PIL.Image.Resampling.LANCZOS | |
| from flask import Flask, request, jsonify, send_from_directory, render_template | |
| from utils.audio_engine import ( | |
| generate_speech, create_mixed_audio, generate_preview, | |
| generate_ai_video, | |
| VOICES, VOICE_STYLES, MOOD_LABELS, AGE_PRESETS | |
| ) | |
| from utils.video_effects import concatenate_clips, apply_copyright_filters, slice_video | |
| app = Flask(__name__, static_folder='static', static_url_path='') | |
| # Configuration | |
| UPLOAD_FOLDER = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'uploads') | |
| OUTPUT_FOLDER = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'outputs') | |
| os.makedirs(UPLOAD_FOLDER, exist_ok=True) | |
| os.makedirs(OUTPUT_FOLDER, exist_ok=True) | |
| # ── Clean up any leftover temp/partial files on startup ── | |
| for _f in os.listdir(OUTPUT_FOLDER): | |
| if '.temp' in _f or 'TEMP_MPY' in _f or _f.endswith('.part'): | |
| try: | |
| os.remove(os.path.join(OUTPUT_FOLDER, _f)) | |
| print(f'[cleanup] Removed temp file: {_f}') | |
| except Exception: | |
| pass | |
| # Helper to execute shell commands (e.g. yt-dlp using local venv) | |
| def get_pip_binary(binary_name): | |
| # Returns path to binary in virtual env | |
| venv_bin = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'venv', 'bin', binary_name) | |
| if os.path.exists(venv_bin): | |
| return venv_bin | |
| return binary_name | |
| def index(): | |
| return send_from_directory('static', 'index.html') | |
| def get_voices(): | |
| """Returns voices + style + age metadata.""" | |
| return jsonify({ | |
| 'voices': VOICES, | |
| 'voice_styles': VOICE_STYLES, | |
| 'mood_labels': MOOD_LABELS, | |
| 'age_presets': AGE_PRESETS, | |
| }) | |
| def preview_voice(): | |
| """ | |
| Generate a short voice preview sample and stream it back as audio/mpeg. | |
| Body: { voice, lang, style, style_degree, rate, pitch } | |
| """ | |
| import tempfile | |
| from flask import send_file | |
| voice = request.form.get('voice', 'en-US-EmmaMultilingualNeural') | |
| lang = request.form.get('lang', 'en-US') | |
| style = request.form.get('style', '') | |
| style_degree = float(request.form.get('style_degree', 1.0)) | |
| rate = request.form.get('rate', '+0%') | |
| pitch = request.form.get('pitch', '+0Hz') | |
| # Write to a temp file | |
| tmp = tempfile.NamedTemporaryFile(suffix='.mp3', delete=False, dir=UPLOAD_FOLDER) | |
| tmp.close() | |
| out_path = tmp.name | |
| ok = generate_preview(voice=voice, lang_prefix=lang, | |
| style=style, style_degree=style_degree, | |
| rate=rate, pitch=pitch, | |
| output_path=out_path) | |
| if not ok or not os.path.exists(out_path): | |
| return jsonify({'error': 'Preview generation failed'}), 500 | |
| return send_file(out_path, mimetype='audio/mpeg', | |
| as_attachment=False, | |
| download_name='preview.mp3') | |
| def clear_library(): | |
| """Delete all exported files from the outputs folder.""" | |
| deleted, failed = [], [] | |
| for f in os.listdir(OUTPUT_FOLDER): | |
| if f.endswith('.mp4') or f.endswith('.mp3'): | |
| try: | |
| os.remove(os.path.join(OUTPUT_FOLDER, f)) | |
| deleted.append(f) | |
| except Exception as ex: | |
| failed.append({'file': f, 'error': str(ex)}) | |
| return jsonify({'deleted': len(deleted), 'failed': failed}) | |
| def merge_videos(): | |
| """ | |
| Merge uploaded video clips and overlay generated/uploaded audio. | |
| Supports style, rate, pitch, style_degree for TTS voices. | |
| """ | |
| try: | |
| aspect_ratio = request.form.get('aspect_ratio', 'vertical') | |
| audio_source = request.form.get('audio_source', 'script') | |
| language = request.form.get('language', 'ur-PK') | |
| voice_id = request.form.get('voice', 'ur-PK-UzmaNeural') | |
| script_text = request.form.get('script_text', '') | |
| trim_audio = request.form.get('trim_audio', 'true') == 'true' | |
| # Voice style / mood params | |
| voice_style = request.form.get('style', '') | |
| voice_style_degree = float(request.form.get('style_degree', 1.0)) | |
| voice_rate = request.form.get('rate', '+0%') | |
| voice_pitch = request.form.get('pitch', '+0Hz') | |
| # Handle video uploads | |
| uploaded_videos = request.files.getlist('videos') | |
| if not uploaded_videos or len(uploaded_videos) == 0 or uploaded_videos[0].filename == '': | |
| return jsonify({'success': False, 'error': 'No video files uploaded.'}), 400 | |
| video_paths = [] | |
| job_id = str(uuid.uuid4()) | |
| # Save video uploads | |
| for idx, file in enumerate(uploaded_videos): | |
| filename = f"{job_id}_video_{idx}.mp4" | |
| filepath = os.path.join(UPLOAD_FOLDER, filename) | |
| file.save(filepath) | |
| video_paths.append(filepath) | |
| # 1. Merge videos together | |
| merged_temp_video = os.path.join(UPLOAD_FOLDER, f"{job_id}_merged_raw.mp4") | |
| duration = concatenate_clips(video_paths, aspect_ratio=aspect_ratio, output_path=merged_temp_video) | |
| # 2. Process / Generate audio | |
| final_audio_path = None | |
| speech_path = None | |
| bg_music_path = None | |
| if audio_source == 'script' and script_text.strip(): | |
| # Generate speech audio | |
| speech_path = os.path.join(UPLOAD_FOLDER, f"{job_id}_speech.mp3") | |
| generate_speech(script_text, voice_id, speech_path, | |
| rate=voice_rate, pitch=voice_pitch, | |
| style=voice_style, style_degree=voice_style_degree) | |
| elif audio_source == 'upload': | |
| uploaded_audio = request.files.get('audio_file') | |
| if uploaded_audio and uploaded_audio.filename != '': | |
| speech_path = os.path.join(UPLOAD_FOLDER, f"{job_id}_user_audio.mp3") | |
| uploaded_audio.save(speech_path) | |
| # Handle optional background music upload | |
| bg_music_file = request.files.get('bg_music_file') | |
| if bg_music_file and bg_music_file.filename != '': | |
| bg_music_path = os.path.join(UPLOAD_FOLDER, f"{job_id}_bg_music.mp3") | |
| bg_music_file.save(bg_music_path) | |
| # Mix audio tracks if we have speech or background music | |
| if speech_path or bg_music_path: | |
| mixed_audio = os.path.join(UPLOAD_FOLDER, f"{job_id}_mixed.mp3") | |
| ok = create_mixed_audio( | |
| voiceover_path=speech_path, | |
| bg_music_path=bg_music_path, | |
| target_duration=duration, | |
| output_path=mixed_audio | |
| ) | |
| if ok and os.path.exists(mixed_audio): | |
| final_audio_path = mixed_audio | |
| # 3. Combine merged video and final audio | |
| output_filename = f"merged_{job_id[:8]}.mp4" | |
| final_output_path = os.path.join(OUTPUT_FOLDER, output_filename) | |
| if final_audio_path and os.path.exists(final_audio_path): | |
| # Combine via FFmpeg for speed and precision | |
| cmd = [ | |
| "ffmpeg", "-y", "-i", merged_temp_video, "-i", final_audio_path, | |
| "-map", "0:v", "-map", "1:a", "-c:v", "copy", "-c:a", "aac", | |
| "-shortest" if trim_audio else "", final_output_path | |
| ] | |
| # Remove empty arguments from command | |
| cmd = [c for c in cmd if c != ""] | |
| subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True) | |
| else: | |
| # No audio overlay, rename raw merged video | |
| os.rename(merged_temp_video, final_output_path) | |
| # Cleanup temp upload files | |
| for p in video_paths + [merged_temp_video, speech_path, bg_music_path, final_audio_path]: | |
| if p and os.path.exists(p): | |
| try: | |
| os.remove(p) | |
| except Exception: | |
| pass | |
| return jsonify({ | |
| 'success': True, | |
| 'message': 'Videos merged successfully!', | |
| 'filename': output_filename, | |
| 'duration': f"{duration:.2f}s" | |
| }) | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| return jsonify({'success': False, 'error': str(e)}), 500 | |
| def clip_youtube_video(): | |
| """ | |
| Downloads a YouTube video, cuts it into clips, and applies anti-copyright filters. | |
| """ | |
| try: | |
| url = request.form.get('url') | |
| if not url: | |
| return jsonify({'success': False, 'error': 'YouTube URL is required.'}), 400 | |
| mode = request.form.get('mode', 'auto') # 'auto' or 'timestamps' | |
| interval = int(request.form.get('interval', 8)) | |
| timestamps_str = request.form.get('timestamps', '') | |
| # Filter options | |
| filters = { | |
| 'aspect_ratio': request.form.get('aspect_ratio', 'original'), | |
| 'mirror': request.form.get('mirror', 'true') == 'true', | |
| 'zoom': request.form.get('zoom', 'true') == 'true', | |
| 'speed': float(request.form.get('speed', 1.04)), | |
| 'pitch_shift': float(request.form.get('pitch_shift', 0.8)) | |
| } | |
| job_id = str(uuid.uuid4()) | |
| raw_download_path = os.path.join(UPLOAD_FOLDER, f"{job_id}_raw.mp4") | |
| # 1. Download YouTube Video using yt-dlp | |
| ytdlp_bin = get_pip_binary('yt-dlp') | |
| print(f"Downloading video from {url}...") | |
| # Check if curl-cffi is available for --impersonate support | |
| try: | |
| import curl_cffi | |
| has_curl_cffi = True | |
| except ImportError: | |
| has_curl_cffi = False | |
| download_cmd = [ | |
| ytdlp_bin, | |
| # Quality: max 720p | |
| "-f", "bestvideo[height<=720][ext=mp4]+bestaudio[ext=m4a]/bestvideo[height<=720]+bestaudio/best[height<=720]/best", | |
| "--merge-output-format", "mp4", | |
| # Player client: ios/android work for most videos without embedding restrictions | |
| # tv_embedded is intentionally excluded — it fails for non-embeddable videos | |
| "--extractor-args", "youtube:player_client=ios,android,mweb,web", | |
| # Geo-bypass | |
| "--geo-bypass", | |
| # SSL resilience | |
| "--no-check-certificates", | |
| "--socket-timeout", "60", | |
| # Retry logic | |
| "--retries", "10", | |
| "--fragment-retries", "10", | |
| "--retry-sleep", "exp=1:30", | |
| # Output | |
| "-o", raw_download_path, | |
| url | |
| ] | |
| # Only add --impersonate if curl-cffi is installed | |
| if has_curl_cffi: | |
| download_cmd.insert(3, "chrome") | |
| download_cmd.insert(3, "--impersonate") | |
| result = subprocess.run(download_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=480) | |
| stderr_text = result.stderr.decode('utf-8', errors='ignore') | |
| if result.returncode != 0 or not os.path.exists(raw_download_path) or os.path.getsize(raw_download_path) == 0: | |
| # Check impersonation error FIRST before generic "unavailable" | |
| if 'impersonate' in stderr_text.lower() or ('curl' in stderr_text.lower() and 'cffi' in stderr_text.lower()): | |
| err = 'Browser impersonation library missing. Try again (it will retry without impersonation).' | |
| elif 'SSL' in stderr_text or 'EOF' in stderr_text: | |
| err = 'YouTube SSL error. Try a different video or try again in a few seconds.' | |
| elif 'Private' in stderr_text or 'members-only' in stderr_text: | |
| err = 'This video is private or members-only.' | |
| elif 'Sign in' in stderr_text: | |
| err = 'YouTube requires sign-in for this video. Try a fully public video.' | |
| elif 'removed' in stderr_text or 'unavailable' in stderr_text or 'not available' in stderr_text: | |
| err = 'This video is unavailable or has been removed from YouTube.' | |
| elif 'bot' in stderr_text.lower() or 'detected' in stderr_text.lower(): | |
| err = 'YouTube detected bot activity. Try a different video.' | |
| else: | |
| # Show raw error so we can diagnose | |
| err = f'yt-dlp error: {stderr_text[-800:]}' if stderr_text else 'Unknown download error' | |
| return jsonify({'success': False, 'error': f'Download failed: {err}'}), 500 | |
| # 2. Slice downloaded video | |
| temp_clips_dir = os.path.join(UPLOAD_FOLDER, f"{job_id}_slices") | |
| os.makedirs(temp_clips_dir, exist_ok=True) | |
| custom_ranges = [] | |
| if mode == 'timestamps' and timestamps_str: | |
| # Parse timestamps "10-20, 30-45" | |
| parts = timestamps_str.split(',') | |
| for part in parts: | |
| subparts = part.strip().split('-') | |
| if len(subparts) == 2: | |
| try: | |
| start_t = float(subparts[0].strip()) | |
| end_t = float(subparts[1].strip()) | |
| custom_ranges.append([start_t, end_t]) | |
| except ValueError: | |
| pass | |
| sliced_files = slice_video( | |
| raw_download_path, | |
| temp_clips_dir, | |
| mode=mode, | |
| intervals=interval, | |
| custom_ranges=custom_ranges | |
| ) | |
| if not sliced_files: | |
| return jsonify({'success': False, 'error': 'No clips generated during slicing.'}), 500 | |
| # 3. Apply safety filters to each clip and save to outputs | |
| processed_files = [] | |
| for idx, file_path in enumerate(sliced_files, 1): | |
| out_filename = f"clip_{job_id[:8]}_{idx}.mp4" | |
| out_path = os.path.join(OUTPUT_FOLDER, out_filename) | |
| apply_copyright_filters(file_path, out_path, filters) | |
| if os.path.exists(out_path): | |
| processed_files.append(out_filename) | |
| # Cleanup raw downloaded file & sliced folder | |
| if os.path.exists(raw_download_path): | |
| os.remove(raw_download_path) | |
| import shutil | |
| if os.path.exists(temp_clips_dir): | |
| shutil.rmtree(temp_clips_dir) | |
| return jsonify({ | |
| 'success': True, | |
| 'message': f'YouTube video processed into {len(processed_files)} clips!', | |
| 'filenames': processed_files | |
| }) | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| return jsonify({'success': False, 'error': str(e)}), 500 | |
| def get_video_duration(path): | |
| try: | |
| cmd = [ | |
| "ffprobe", "-v", "error", "-show_entries", "format=duration", | |
| "-of", "default=noprint_wrappers=1:nokey=1", path | |
| ] | |
| res = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) | |
| if res.returncode == 0: | |
| duration = float(res.stdout.strip()) | |
| return f"{duration:.1f}s" | |
| except Exception: | |
| pass | |
| return "unknown" | |
| def get_outputs(): | |
| """Lists all real (non-temp) output files, newest first.""" | |
| SKIP_PATTERNS = ('.temp', 'TEMP_MPY', '.part', '.tmp') | |
| files = [] | |
| for f in os.listdir(OUTPUT_FOLDER): | |
| # Only include clean .mp4 files — skip any temp/partial artifacts | |
| if not f.endswith('.mp4'): | |
| continue | |
| if any(pat in f for pat in SKIP_PATTERNS): | |
| # Also delete them from disk so they don't pile up | |
| try: | |
| os.remove(os.path.join(OUTPUT_FOLDER, f)) | |
| except Exception: | |
| pass | |
| continue | |
| path = os.path.join(OUTPUT_FOLDER, f) | |
| size_mb = os.path.getsize(path) / (1024 * 1024) | |
| files.append({ | |
| 'filename': f, | |
| 'size': f"{size_mb:.2f} MB", | |
| 'duration': get_video_duration(path) | |
| }) | |
| # Sort newest first | |
| files.sort( | |
| key=lambda x: os.path.getmtime(os.path.join(OUTPUT_FOLDER, x['filename'])), | |
| reverse=True | |
| ) | |
| return jsonify(files) | |
| def generate_video(): | |
| """ | |
| End-to-end AI script-to-video generation route. | |
| """ | |
| try: | |
| script_text = request.form.get('script_text', '') | |
| theme = request.form.get('theme', 'auto') | |
| aspect_ratio = request.form.get('aspect_ratio', 'vertical') | |
| voice_id = request.form.get('voice', 'ur-PK-UzmaNeural') | |
| voice_rate = request.form.get('rate', '+0%') | |
| voice_pitch = request.form.get('pitch', '+0Hz') | |
| trim_audio = request.form.get('trim_audio', 'true') == 'true' | |
| if not script_text.strip(): | |
| return jsonify({'success': False, 'error': 'Script text is required.'}), 400 | |
| bg_music_file = request.files.get('bg_music_file') | |
| job_id = str(uuid.uuid4()) | |
| output_filename = f"ai_video_{job_id[:8]}.mp4" | |
| final_output_path = os.path.join(OUTPUT_FOLDER, output_filename) | |
| res = generate_ai_video( | |
| script_text=script_text, | |
| theme=theme, | |
| aspect_ratio=aspect_ratio, | |
| voice_id=voice_id, | |
| rate=voice_rate, | |
| pitch=voice_pitch, | |
| bg_music_file=bg_music_file, | |
| trim_audio=trim_audio, | |
| output_path=final_output_path | |
| ) | |
| if res.get('success'): | |
| return jsonify({ | |
| 'success': True, | |
| 'message': 'AI Video generated successfully!', | |
| 'filename': output_filename, | |
| 'duration': f"{res.get('duration'):.2f}s", | |
| 'slides': res.get('sentences_count') | |
| }) | |
| else: | |
| return jsonify({'success': False, 'error': 'Video generation failed.'}), 500 | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| return jsonify({'success': False, 'error': str(e)}), 500 | |
| def download_file(filename): | |
| """Serves file from outputs folder.""" | |
| return send_from_directory(OUTPUT_FOLDER, filename) | |
| if __name__ == '__main__': | |
| import os | |
| port = int(os.environ.get('PORT', 5005)) | |
| host = os.environ.get('HOST', '127.0.0.1') | |
| debug = os.environ.get('FLASK_ENV', 'production') == 'development' | |
| app.run(debug=debug, host=host, port=port) | |