Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import moviepy.editor as mp | |
| from moviepy.video.fx.all import crop # For zoom effect if needed for crop | |
| from PIL import Image, ImageDraw, ImageFont, ImageFilter, ImageOps | |
| import numpy as np | |
| import os | |
| import gdown | |
| import requests | |
| from urllib.parse import urlparse # Used in user's download_from_google_drive | |
| import tempfile | |
| import re | |
| import time | |
| import traceback | |
| INSTA_ONEAPI_KEY=os.environ.get('INSTAONEAPIKEY') | |
| # --- Helper Functions (Prioritizing from your pasted-text.txt) --- | |
| def download_from_google_drive(url): | |
| """Download file from Google Drive URL with improved parsing (from user's file)""" | |
| try: | |
| if not url or not isinstance(url, str): | |
| raise ValueError("Invalid URL provided") | |
| print(f"Processing Google Drive URL: {url}") | |
| url = url.strip() | |
| if 'drive.google.com' in url: | |
| patterns = [ | |
| r'/file/d/([a-zA-Z0-9-_]+)', | |
| r'id=([a-zA-Z0-9-_]+)', | |
| r'/d/([a-zA-Z0-9-_]+)', | |
| r'file/d/([a-zA-Z0-9-_]+)/view', | |
| r'open\?id=([a-zA-Z0-9-_]+)' | |
| ] | |
| file_id = None | |
| for pattern in patterns: | |
| match = re.search(pattern, url) | |
| if match: | |
| file_id = match.group(1) | |
| break | |
| if not file_id: | |
| if '/file/d/' in url: | |
| try: | |
| file_id = url.split('/file/d/')[1].split('/')[0] | |
| except IndexError: pass | |
| elif 'id=' in url: | |
| try: | |
| file_id = url.split('id=')[1].split('&')[0] | |
| except IndexError: pass | |
| if not file_id: | |
| raise ValueError("Could not extract file ID from Google Drive URL") | |
| print(f"Extracted file ID: {file_id}") | |
| download_url = f"https://drive.google.com/uc?export=download&id={file_id}" | |
| # Using NamedTemporaryFile for safer temp file creation | |
| temp_file = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) | |
| output_path = temp_file.name | |
| temp_file.close() # Close it so gdown can write to it | |
| try: | |
| gdown.download(download_url, output_path, quiet=False) | |
| if os.path.exists(output_path) and os.path.getsize(output_path) > 0: | |
| print(f"Successfully downloaded from Google Drive to {output_path}") | |
| return output_path | |
| else: | |
| # Try alternative download method | |
| print("Initial gdown failed or file empty, trying alternative.") | |
| if os.path.exists(output_path): os.remove(output_path) # Clean up before retry | |
| gdown.download(f"https://drive.google.com/file/d/{file_id}/view?usp=sharing", | |
| output_path, quiet=False, fuzzy=True) | |
| if os.path.exists(output_path) and os.path.getsize(output_path) > 0: | |
| print(f"Successfully downloaded (alternative) from Google Drive to {output_path}") | |
| return output_path | |
| else: | |
| raise Exception("Alternative gdown download also failed or resulted in empty file.") | |
| except Exception as e: | |
| print(f"gdown failed for {url}: {e}") | |
| # Fallback to direct requests | |
| try: | |
| print(f"Attempting direct requests fallback for {download_url}") | |
| response = requests.get(download_url, stream=True, timeout=60) | |
| response.raise_for_status() | |
| with open(output_path, 'wb') as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| if os.path.exists(output_path) and os.path.getsize(output_path) > 0: | |
| print(f"Successfully downloaded (direct requests) to {output_path}") | |
| return output_path | |
| else: | |
| raise Exception("Direct requests download failed or resulted in empty file.") | |
| except Exception as e2: | |
| print(f"Direct download also failed for {url}: {e2}") | |
| if os.path.exists(output_path): os.remove(output_path) | |
| raise e2 # Re-raise the error from direct download | |
| else: # Assumed direct URL if not Google Drive | |
| print(f"Processing as direct URL: {url}") | |
| temp_file = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) | |
| output_path = temp_file.name | |
| temp_file.close() | |
| response = requests.get(url, stream=True, timeout=60) | |
| response.raise_for_status() | |
| with open(output_path, 'wb') as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| if os.path.exists(output_path) and os.path.getsize(output_path) > 0: | |
| print(f"Successfully downloaded (direct URL) to {output_path}") | |
| return output_path | |
| else: | |
| if os.path.exists(output_path): os.remove(output_path) | |
| raise Exception("Direct URL download failed or resulted in empty file.") | |
| except Exception as e: | |
| print(f"Error downloading file from URL '{url}': {e}") | |
| traceback.print_exc() | |
| # Clean up temp file if it exists and error occurred | |
| if 'output_path' in locals() and os.path.exists(output_path): | |
| try: os.remove(output_path) | |
| except: pass | |
| return None | |
| def download_audio_file(url): | |
| """Download audio file from URL (from user's file)""" | |
| try: | |
| if not url or not url.strip(): | |
| return None | |
| url = url.strip() | |
| print(f"Downloading audio from: {url}") | |
| # Determine file extension | |
| parsed_url = urlparse(url) | |
| path = parsed_url.path | |
| base, ext = os.path.splitext(path) | |
| if ext.lower() in ['.mp3', '.wav', '.m4a', '.aac']: | |
| suffix = ext | |
| else: | |
| # Check Content-Type header if no obvious extension | |
| try: | |
| head_resp = requests.head(url, timeout=10, allow_redirects=True) | |
| content_type = head_resp.headers.get('content-type', '').lower() | |
| if 'audio/mpeg' in content_type: suffix = '.mp3' | |
| elif 'audio/wav' in content_type: suffix = '.wav' | |
| elif 'audio/aac' in content_type: suffix = '.aac' | |
| elif 'audio/mp4' in content_type: suffix = '.m4a' # Often for AAC in MP4 container | |
| else: suffix = '.mp3' # Default | |
| except Exception: | |
| suffix = '.mp3' # Default on error | |
| temp_audio_file = tempfile.NamedTemporaryFile(suffix=suffix, delete=False) | |
| output_path = temp_audio_file.name | |
| temp_audio_file.close() | |
| response = requests.get(url, stream=True, timeout=60) | |
| response.raise_for_status() | |
| with open(output_path, 'wb') as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| if os.path.exists(output_path) and os.path.getsize(output_path) > 0: | |
| print(f"Successfully downloaded audio to {output_path}") | |
| return output_path | |
| else: | |
| if os.path.exists(output_path): os.remove(output_path) | |
| print(f"Failed to download audio or file is empty: {url}") | |
| return None | |
| except Exception as e: | |
| print(f"Error downloading audio from '{url}': {e}") | |
| traceback.print_exc() | |
| if 'output_path' in locals() and os.path.exists(output_path): | |
| try: os.remove(output_path) | |
| except: pass | |
| return None | |
| def hex_to_rgb(hex_color): | |
| """Convert hex color to RGB tuple (from user's file)""" | |
| hex_color = hex_color.lstrip('#') | |
| if len(hex_color) == 3: # Handle short hex like #RGB | |
| hex_color = "".join([c*2 for c in hex_color]) | |
| if len(hex_color) == 6: | |
| return tuple(int(hex_color[i:i+2], 16) for i in (0, 2, 4)) | |
| print(f"Warning: Invalid hex color format '{hex_color}'. Defaulting to white.") | |
| return (255, 255, 255) # Default to white | |
| def calculate_font_size(text, max_width, max_height, custom_size=None): | |
| """Calculate optimal font size based on text length and video dimensions (from user's file)""" | |
| if custom_size and custom_size > 0: | |
| return int(custom_size) # Ensure integer | |
| if not text: return 24 # Default for empty text | |
| char_count = len(text) | |
| if char_count < 50: | |
| base_size = max_width // 15 | |
| elif char_count < 100: | |
| base_size = max_width // 20 | |
| elif char_count < 200: | |
| base_size = max_width // 25 | |
| else: | |
| base_size = max_width // 30 | |
| font_size = max(24, min(base_size, max_height // 8)) # Ensure min 24, max relative to height | |
| return int(font_size) | |
| def detect_rtl_text(text): | |
| """Detect if text contains RTL characters (Arabic, Persian, Hebrew, etc.) (from user's file)""" | |
| if not text: return False | |
| rtl_chars_specific = { # Specific common Persian/Arabic letters for quick check | |
| 'ا', 'ب', 'پ', 'ت', 'ث', 'ج', 'چ', 'ح', 'خ', 'د', 'ذ', 'ر', 'ز', 'ژ', 'س', 'ش', | |
| 'ص', 'ض', 'ط', 'ظ', 'ع', 'غ', 'ف', 'ق', 'ک', 'گ', 'ل', 'م', 'ن', 'و', 'ه', 'ی', | |
| 'ء', 'آ', 'اً', 'ة', 'ی' | |
| } | |
| # General Unicode ranges for RTL scripts | |
| # Arabic: U+0600 to U+06FF | |
| # Hebrew: U+0590 to U+05FF | |
| # Syriac: U+0700 to U+074F | |
| # Thaana: U+0780 to U+07BF | |
| # N'Ko: U+07C0 to U+07FF | |
| # Arabic Supplement: U+0750 to U+077F | |
| # Arabic Extended-A: U+08A0 to U+08FF | |
| for char in text: | |
| if char in rtl_chars_specific: | |
| return True | |
| if '\u0600' <= char <= '\u06FF' or \ | |
| '\u0590' <= char <= '\u05FF' or \ | |
| '\u0700' <= char <= '\u074F' or \ | |
| '\u0780' <= char <= '\u07BF' or \ | |
| '\u07C0' <= char <= '\u07FF' or \ | |
| '\u0750' <= char <= '\u077F' or \ | |
| '\u08A0' <= char <= '\u08FF': | |
| return True | |
| return False | |
| def create_glow_effect(img, glow_radius=5, glow_color=(255, 255, 255, 100)): | |
| """Create glow effect for text (from user's file, img is PIL RGBA)""" | |
| # Create glow layer. glow_color is expected to be RGBA. | |
| base_for_glow = Image.new('RGBA', img.size, (0,0,0,0)) | |
| # Create a mask from the text's alpha channel | |
| alpha_mask = img.getchannel('A') | |
| # Apply the glow color to the areas defined by the alpha mask | |
| colored_text_shape = Image.new('RGBA', img.size, glow_color) | |
| base_for_glow.paste(colored_text_shape, mask=alpha_mask) | |
| # Blur this colored shape multiple times | |
| glow_effect_layer = base_for_glow | |
| for i in range(max(1, glow_radius // 2)): # Apply blur progressively or multiple times | |
| glow_effect_layer = glow_effect_layer.filter(ImageFilter.GaussianBlur(radius=2)) # Smaller radius, more iterations | |
| if glow_radius > 1: # Apply a final larger blur | |
| glow_effect_layer = glow_effect_layer.filter(ImageFilter.GaussianBlur(radius=glow_radius / 2.0)) | |
| # Composite the blurred glow behind the original image | |
| # Image.alpha_composite expects background first, then foreground | |
| final_image_with_glow = Image.alpha_composite(glow_effect_layer, img) | |
| return final_image_with_glow | |
| def insta_oneapi(url, api_key): | |
| """Downloads video from Instagram using one-api.ir""" | |
| shortcode_match = re.search(r"(?:instagram\.com|instagr\.am)\/(?:p|reel|reels|tv)\/([a-zA-Z0-9_-]+)", url) | |
| if shortcode_match: | |
| shortcode = shortcode_match.group(1) | |
| else: | |
| parts = url.strip("/").split("/") | |
| shortcode = parts[-1] if parts and parts[-1] not in ["", "feed"] else "" # handle cases like "username/feed/" | |
| if not shortcode and len(parts) > 1 and parts[-2] in ["p", "reel", "reels", "tv"]: # a bit more robust for just ID | |
| shortcode = parts[-1] | |
| print(f"Extracted shortcode: '{shortcode}' from URL: {url}") | |
| if not shortcode: | |
| print("Error: Could not extract shortcode from Instagram URL.") | |
| return None | |
| api_url = f"https://api.one-api.ir/instagram/v1/post/?shortcode={shortcode}" | |
| headers = {"one-api-token": api_key, "Content-Type": "application/json"} | |
| print(f"Requesting Instagram post info from: {api_url}") | |
| try: | |
| response = requests.get(api_url, headers=headers, timeout=20) | |
| print(f"Instagram API Response Status: {response.status_code}") | |
| response.raise_for_status() | |
| result = response.json() | |
| if result.get("status") != 200 or "result" not in result: | |
| print(f"API Error: Status {result.get('status')} - {result.get('message', 'Unknown API error')}") | |
| print(f"Full API response: {result}") | |
| return None | |
| media_items = result.get("result", {}).get("media", []) | |
| if not media_items: | |
| print("Error: 'media' not found in API response or is empty.") | |
| print(f"Full API response: {result}") | |
| return None | |
| video_download_url = None | |
| for item in media_items: | |
| if item.get("type") != "image" and item.get("url"): | |
| video_download_url = item["url"] | |
| break | |
| if not video_download_url and media_items and media_items[0].get("url"): | |
| video_download_url = media_items[0]["url"] # Fallback to first item's URL | |
| if not video_download_url: | |
| print("Error: No suitable media URL found in API response.") | |
| print(f"Full API response: {result}") | |
| return None | |
| print(f"Video download URL obtained: {video_download_url}") | |
| print(f"Waiting for 2 seconds before downloading video...") # Reduced wait time | |
| time.sleep(2) | |
| response_video = requests.get(video_download_url, stream=True, timeout=60) | |
| response_video.raise_for_status() | |
| temp_vid_file = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) | |
| output_filename = temp_vid_file.name | |
| temp_vid_file.close() | |
| with open(output_filename, 'wb') as file: | |
| for chunk in response_video.iter_content(chunk_size=8192*4): | |
| if chunk: file.write(chunk) | |
| if os.path.exists(output_filename) and os.path.getsize(output_filename) > 0: | |
| print(f"Downloaded Instagram video successfully to {output_filename}") | |
| return output_filename | |
| else: | |
| if os.path.exists(output_filename): os.remove(output_filename) | |
| print(f"Failed to download Instagram video or file is empty: {video_download_url}") | |
| return None | |
| except requests.exceptions.Timeout: | |
| print(f"Timeout error communicating with Instagram API or downloading video for URL: {url}") | |
| return None | |
| except requests.exceptions.RequestException as e: | |
| print(f"RequestException with Instagram API or download for {url}: {e}") | |
| return None | |
| except KeyError as e: | |
| print(f"Error: Could not find expected key '{e}' in API response for {url}.") | |
| if 'result' in locals(): print(f"Full API response: {result}") | |
| return None | |
| except Exception as e: | |
| print(f"An unexpected error occurred in insta_oneapi for {url}: {e}") | |
| traceback.print_exc() | |
| return None | |
| def create_advanced_text_clip(text, font_size, video_size, position_key, duration, start_time=1, | |
| font_path="Dima Shekasteh.ttf", text_color_hex="#FFFFFF", | |
| effect_type="fade_in", apply_glow_effect=False, custom_position=None, | |
| line_spacing_ratio=0.3, text_align="center", glow_color_hex=None, glow_radius_ratio=0.1): | |
| """Create advanced animated text clip using user's create_glow_effect if enabled""" | |
| width, height = video_size | |
| # Ensure font_size is an integer | |
| font_size = int(font_size) | |
| img = Image.new('RGBA', (width, height), (0, 0, 0, 0)) | |
| draw = ImageDraw.Draw(img) | |
| try: | |
| font = ImageFont.truetype(font_path, font_size) | |
| except IOError: | |
| print(f"Warning: Font '{font_path}' not found. Trying system fallbacks.") | |
| fallback_fonts = ["Arial.ttf", "DejaVuSans.ttf", "tahoma.ttf", "B Nazanin.ttf", "XB Niloofar.ttf", "/System/Library/Fonts/Supplemental/GeezaPro.ttf", "/usr/share/fonts/truetype/noto/NotoNaskhArabic-Regular.ttf"] | |
| font_loaded = False | |
| for f_path in fallback_fonts: | |
| try: | |
| font = ImageFont.truetype(f_path, font_size) | |
| font_loaded = True; print(f"Using fallback font: {f_path}"); break | |
| except IOError: continue | |
| if not font_loaded: print("Warning: No suitable fallback font found. Using default PIL font."); font = ImageFont.load_default() | |
| lines = text.split('\n') | |
| is_rtl = detect_rtl_text(text) | |
| # Determine effective alignment for RTL text | |
| effective_align = text_align | |
| if is_rtl: | |
| if text_align == "center": effective_align = "right" # Common preference for centered RTL | |
| elif text_align == "left": effective_align = "right" # Force right for RTL if left is chosen | |
| text_color_rgb_pil = hex_to_rgb(text_color_hex) | |
| text_color_rgba_pil = text_color_rgb_pil + (255,) # Full opacity for text | |
| line_heights = [] | |
| line_widths = [] | |
| for line in lines: | |
| try: | |
| bbox = draw.textbbox((0,0), line, font=font, direction="rtl" if is_rtl else None, features=["raqm"] if is_rtl else None) | |
| line_w = bbox[2] - bbox[0] | |
| line_h = bbox[3] - bbox[1] | |
| except Exception: # Fallback for older PIL or issues | |
| (line_w, line_h) = draw.textsize(line, font=font) | |
| line_widths.append(line_w) | |
| line_heights.append(line_h) | |
| dynamic_line_spacing = int(font_size * line_spacing_ratio) | |
| total_text_height = sum(line_heights) + (len(lines) - 1) * dynamic_line_spacing | |
| max_line_width = max(line_widths) if line_widths else 0 | |
| if custom_position: | |
| base_x, base_y = custom_position | |
| else: # Automatic positioning based on position_key | |
| margin_v_factor = 0.10; margin_h_factor = 0.05 | |
| if position_key == 'center': base_x = (width - max_line_width) // 2; base_y = (height - total_text_height) // 2 | |
| elif position_key == 'bottom': base_x = (width - max_line_width) // 2; base_y = height - total_text_height - int(height * margin_v_factor) | |
| elif position_key == 'top': base_x = (width - max_line_width) // 2; base_y = int(height * margin_v_factor) | |
| elif position_key == 'right': base_x = width - max_line_width - int(width*margin_h_factor); base_y = (height - total_text_height) // 2 | |
| elif position_key == 'left': base_x = int(width*margin_h_factor); base_y = (height - total_text_height) // 2 | |
| else: base_x = (width - max_line_width) // 2; base_y = (height - total_text_height) // 2 # Default center | |
| current_y = base_y | |
| for i, line in enumerate(lines): | |
| line_w_current = line_widths[i] | |
| if effective_align == "center": line_x = base_x + (max_line_width - line_w_current) // 2 | |
| elif effective_align == "right": line_x = base_x + (max_line_width - line_w_current) | |
| else: line_x = base_x # Left align | |
| shadow_offset = max(1, font_size // 30) | |
| shadow_color = (0, 0, 0, 100) # Semi-transparent black shadow | |
| draw.text((line_x + shadow_offset, current_y + shadow_offset), line, font=font, fill=shadow_color, direction="rtl" if is_rtl else None, features=["raqm"] if is_rtl else None) | |
| draw.text((line_x, current_y), line, font=font, fill=text_color_rgba_pil, direction="rtl" if is_rtl else None, features=["raqm"] if is_rtl else None) | |
| current_y += line_heights[i] + dynamic_line_spacing | |
| if apply_glow_effect: | |
| glow_rgb = hex_to_rgb(glow_color_hex if glow_color_hex else text_color_hex) | |
| glow_rgba_for_effect = glow_rgb + (100,) # Use fixed alpha 100 for glow, as in user's example | |
| final_glow_radius = max(3, int(font_size * glow_radius_ratio)) | |
| img = create_glow_effect(img, glow_radius=final_glow_radius, glow_color=glow_rgba_for_effect) | |
| img_array = np.array(img) | |
| img_clip = mp.ImageClip(img_array).set_duration(duration).set_start(start_time) | |
| if effect_type == "fade_in": img_clip = img_clip.fadein(min(1.0, duration / 3.0)) | |
| elif effect_type == "fade_out": img_clip = img_clip.fadeout(min(1.0, duration / 3.0)) | |
| elif effect_type == "fade_in_out": | |
| fade_dur = min(1.0, duration / 4.0) | |
| img_clip = img_clip.fadein(fade_dur).fadeout(fade_dur) | |
| elif effect_type == "slide_up": | |
| # Basic slide up: final_pos needs to be defined by how ImageClip positions itself. | |
| # This is tricky without knowing the clip's size relative to video_size if it's not full screen. | |
| # Assuming the `img` is the size of `video_size`, and text is drawn onto it. | |
| # The position set by set_position below acts on the whole `img_clip`. | |
| img_clip = img_clip.set_position(lambda t: ('center', height - (height - base_y + total_text_height/2) * (t / min(1.0, duration * 0.3)))) | |
| img_clip = img_clip.fadein(0.5) | |
| return img_clip | |
| def create_divider_line(width, line_type="simple", color_rgba=(255, 255, 255, 150), thickness=2, margin_ratio=0.1): | |
| """Creates a PIL Image of a divider line.""" | |
| line_width_actual = int(width * (1 - 2 * margin_ratio)) | |
| img_height = thickness + 10 # Padding | |
| img = Image.new('RGBA', (width, img_height), (0,0,0,0)) | |
| draw = ImageDraw.Draw(img) | |
| start_x = int(width * margin_ratio) | |
| end_x = int(width * (1 - margin_ratio)) | |
| y_pos = img_height // 2 | |
| if line_type == "simple": | |
| draw.line([(start_x, y_pos), (end_x, y_pos)], fill=color_rgba, width=thickness) | |
| # Add dotted/dashed if needed, similar to previous versions | |
| return img | |
| def create_decorative_frame(width, height, frame_type="simple", color_rgba=(255, 255, 255, 100), thickness=5, margin_percent=0.02): | |
| img = Image.new('RGBA', (width, height), (0,0,0,0)) | |
| draw = ImageDraw.Draw(img) | |
| margin_px = int(min(width, height) * margin_percent) | |
| if frame_type == "simple": | |
| draw.rectangle([(margin_px, margin_px), (width - margin_px -1, height - margin_px-1)], outline=color_rgba, width=thickness) | |
| return img | |
| def create_background_overlay(video_size, overlay_type="vignette", opacity=0.3, color_hex="#000000"): | |
| width, height = video_size | |
| img = Image.new('RGBA', (width, height), (0,0,0,0)) | |
| overlay_rgb = hex_to_rgb(color_hex) | |
| alpha_int = int(opacity * 255) | |
| if overlay_type == "vignette": | |
| center_x, center_y = width / 2, height / 2 | |
| max_dist = np.sqrt(center_x**2 + center_y**2) if center_x > 0 and center_y > 0 else 1 | |
| pixels = img.load() | |
| for x_coord in range(width): | |
| for y_coord in range(height): | |
| dist = np.sqrt((x_coord - center_x)**2 + (y_coord - center_y)**2) | |
| vignette_alpha = int(alpha_int * (dist / max_dist)**1.5) | |
| vignette_alpha = min(alpha_int, vignette_alpha) | |
| vignette_base_color = (0,0,0) # Vignettes are typically blackish | |
| pixels[x_coord, y_coord] = vignette_base_color + (vignette_alpha,) | |
| elif overlay_type == "solid_color": | |
| draw = ImageDraw.Draw(img) | |
| draw.rectangle([(0,0), (width, height)], fill=overlay_rgb + (alpha_int,)) | |
| return img | |
| # --- Main Video Processing Function --- | |
| def process_video(video_url, music_url, poem_verse, poet_name, username, | |
| auto_font_size_enabled, poem_font_size_manual, poet_font_size_manual, username_font_size_manual, | |
| poem_color, poet_color, username_color, | |
| poem_effect, poet_effect, username_effect, | |
| enable_glow_effect, glow_color_main, # Single glow color for all text if enabled | |
| add_divider_line, divider_type_style, | |
| add_frame_overlay, frame_type_style, | |
| background_overlay_type, overlay_opacity_value, overlay_color_value, | |
| poem_text_position, poet_original_position, username_text_position, | |
| progress=gr.Progress(track_tqdm=True)): | |
| progress(0, desc="Initializing...") | |
| # Temp file storage | |
| downloaded_video_path = None | |
| downloaded_audio_path = None | |
| final_output_path = None # This will be the path returned | |
| # Lists for resource cleanup | |
| clips_to_close = [] | |
| files_to_delete = [] | |
| try: | |
| # --- Input Validation (Basic) --- | |
| if not video_url: return "Error: Video URL is required.", None | |
| if not poem_verse: return "Error: Poem verse is required.", None | |
| if not poet_name: return "Error: Poet name is required.", None | |
| if not username: return "Error: Username is required.", None | |
| progress(0.1, desc="Downloading video...") | |
| if "instagram.com" in video_url or "instagr.am" in video_url: | |
| INSTA_API_KEY =os.environ.get('INSTAONEAPIKEY') | |
| if not INSTA_API_KEY or INSTA_API_KEY == "YOUR_API_KEY_HERE": # Basic check | |
| return "Error: Instagram API Key (INSTA_ONEAPI_KEY) not configured in environment.", None | |
| downloaded_video_path = insta_oneapi(video_url, INSTA_API_KEY) | |
| elif "drive.google.com" in video_url: | |
| downloaded_video_path = download_from_google_drive(video_url) | |
| else: # Assume direct link | |
| downloaded_video_path = download_from_google_drive(video_url) # User's GDrive func also handles direct links | |
| if not downloaded_video_path or not os.path.exists(downloaded_video_path): | |
| return f"Error: Failed to download video from {video_url}. Check URL and logs.", None | |
| files_to_delete.append(downloaded_video_path) | |
| progress(0.2, desc="Loading video...") | |
| video = mp.VideoFileClip(downloaded_video_path) | |
| clips_to_close.append(video) | |
| video_duration = video.duration | |
| if video_duration is None or video_duration <= 0: | |
| return "Error: Video has invalid duration.", None | |
| video_w, video_h = video.w, video.h | |
| # --- Audio Processing --- | |
| processed_audio_clip = None | |
| if music_url and music_url.strip(): | |
| progress(0.25, desc="Downloading audio...") | |
| downloaded_audio_path = download_audio_file(music_url) | |
| if downloaded_audio_path and os.path.exists(downloaded_audio_path): | |
| files_to_delete.append(downloaded_audio_path) | |
| progress(0.3, desc="Processing audio...") | |
| try: | |
| audio_clip_temp = mp.AudioFileClip(downloaded_audio_path) | |
| clips_to_close.append(audio_clip_temp) | |
| if audio_clip_temp.duration > video_duration: | |
| processed_audio_clip = audio_clip_temp.subclip(0, video_duration) | |
| elif audio_clip_temp.duration < video_duration: | |
| if audio_clip_temp.duration > 0: | |
| num_loops = int(np.ceil(video_duration / audio_clip_temp.duration)) | |
| looped_clips = [audio_clip_temp] * num_loops | |
| concatenated_audio = mp.concatenate_audioclips(looped_clips) | |
| clips_to_close.append(concatenated_audio) | |
| processed_audio_clip = concatenated_audio.subclip(0, video_duration) | |
| else: processed_audio_clip = video.audio # Original if downloaded audio is silent | |
| else: | |
| processed_audio_clip = audio_clip_temp | |
| # If processed_audio_clip is a subclip or new clip, original audio_clip_temp might not be needed further by this var | |
| if processed_audio_clip != audio_clip_temp and audio_clip_temp not in clips_to_close: | |
| clips_to_close.append(audio_clip_temp) # Ensure it's closed if different | |
| elif processed_audio_clip == audio_clip_temp and audio_clip_temp in clips_to_close: | |
| clips_to_close.remove(audio_clip_temp) # Avoid double closing if assigned directly | |
| clips_to_close.append(processed_audio_clip) | |
| except Exception as e: | |
| print(f"Audio processing failed: {e}. Using video's original audio.") | |
| traceback.print_exc() | |
| processed_audio_clip = video.audio # Fallback | |
| else: | |
| print("Audio download failed or no audio URL. Using video's original audio.") | |
| processed_audio_clip = video.audio | |
| else: | |
| processed_audio_clip = video.audio | |
| progress(0.4, desc="Calculating font sizes...") | |
| # Use user's calculate_font_size function | |
| poem_fs = calculate_font_size(poem_verse, video_w, video_h, custom_size=None if auto_font_size_enabled else poem_font_size_manual) | |
| poet_fs = calculate_font_size(poet_name, video_w, video_h, custom_size=None if auto_font_size_enabled else poet_font_size_manual) | |
| username_text_for_calc = f"@{username}" | |
| username_fs = calculate_font_size(username_text_for_calc, video_w, video_h, custom_size=None if auto_font_size_enabled else username_font_size_manual) | |
| print(f"Font sizes: Poem={poem_fs}, Poet={poet_fs}, Username={username_fs}") | |
| # --- Create Text & Decorative Clips --- | |
| text_and_deco_clips = [] # To be composited over the main video | |
| progress(0.45, desc="Creating username overlay...") | |
| # Username typically at top or bottom, less dynamic start time | |
| username_clip = create_advanced_text_clip( | |
| username_text_for_calc, username_fs, (video_w, video_h), username_text_position, video_duration - 0.5, start_time=0.25, | |
| text_color_hex=username_color, effect_type=username_effect, apply_glow_effect=enable_glow_effect, glow_color_hex=glow_color_main, | |
| text_align="center" # Usernames usually centered or simple alignment | |
| ) | |
| text_and_deco_clips.append(username_clip) | |
| progress(0.5, desc="Creating poem verse overlay...") | |
| poem_start_time = 0.5; poem_duration = video_duration - 1.0 | |
| verse_clip = create_advanced_text_clip( | |
| poem_verse, poem_fs, (video_w, video_h), poem_text_position, poem_duration, start_time=poem_start_time, | |
| text_color_hex=poem_color, effect_type=poem_effect, apply_glow_effect=enable_glow_effect, glow_color_hex=glow_color_main, | |
| text_align="right" if detect_rtl_text(poem_verse) else "center" | |
| ) | |
| text_and_deco_clips.append(verse_clip) | |
| # Create poet clip first, its position might be adjusted by divider | |
| poet_start_time = poem_start_time + 0.5 # Slightly after poem | |
| poet_duration = poem_duration - 0.5 | |
| poet_text_content = f"- {poet_name}" | |
| # Create poet clip (initial position) | |
| poet_clip = create_advanced_text_clip( | |
| poet_text_content, poet_fs, (video_w, video_h), poet_original_position, poet_duration, start_time=poet_start_time, | |
| text_color_hex=poet_color, effect_type=poet_effect, apply_glow_effect=enable_glow_effect, glow_color_hex=glow_color_main, | |
| text_align="right" if detect_rtl_text(poet_name) else "center" | |
| ) | |
| if add_divider_line: | |
| progress(0.55, desc="Adding divider line...") | |
| divider_color_rgba = hex_to_rgb(poet_color) + (200,) # Use poet color, semi-transparent | |
| divider_thickness = max(2, int(poet_fs * 0.08)) | |
| divider_margin_ratio = 0.15 | |
| divider_pil_img = create_divider_line(video_w, divider_type_style, divider_color_rgba, divider_thickness, divider_margin_ratio) | |
| divider_img_h_pil = divider_pil_img.height | |
| divider_clip = mp.ImageClip(np.array(divider_pil_img)).set_duration(poet_duration).set_start(poet_start_time).fadein(0.5) | |
| # Position divider: Let's place it a bit above the default "bottom" area or poet's original spot. | |
| # Example: 75% down the screen, or above poet's original position. | |
| # For this version, let's try a fixed relative position for simplicity if poet_original_position is bottom. | |
| if poet_original_position == 'bottom': | |
| # Place divider above typical bottom text area | |
| divider_y_center_target_abs = video_h * 0.78 | |
| else: # If poet is 'center', place divider below poem or fixed. | |
| divider_y_center_target_abs = video_h * 0.70 # A general lower-middle position | |
| divider_y_top_for_moviepy = divider_y_center_target_abs - (divider_img_h_pil / 2.0) | |
| divider_clip = divider_clip.set_position(('center', divider_y_top_for_moviepy)) | |
| text_and_deco_clips.append(divider_clip) | |
| # --- Reposition poet name to be centered horizontally, just below the divider --- | |
| # Get actual height of the poet text from a temporary render for precise placement | |
| temp_poet_draw_img = Image.new('RGBA', (1,1)) # Minimal image for textbbox | |
| temp_poet_font = ImageFont.truetype(poet_clip.text_options['font'], int(poet_fs)) # Use actual font and size | |
| poet_bbox = ImageDraw.Draw(temp_poet_draw_img).textbbox((0,0), poet_text_content, font=temp_poet_font, direction="rtl" if detect_rtl_text(poet_name) else None) | |
| actual_poet_text_height = poet_bbox[3] - poet_bbox[1] if poet_bbox else poet_fs | |
| poet_gap_below_divider = int(poet_fs * 0.2) # Small gap | |
| poet_y_top_target_abs = divider_y_top_for_moviepy + divider_img_h_pil + poet_gap_below_divider | |
| # Create new poet_clip with this specific custom_position (top-left of text block) | |
| # To center it horizontally, we need its width. Max line width is already calculated in create_advanced_text_clip. | |
| # We can pass custom_position = (x_centered, poet_y_top_target_abs) | |
| # This is tricky because create_advanced_text_clip calculates its own x. | |
| # Alternative: use set_position on the existing poet_clip. | |
| # poet_clip.w is the width of the *entire image clip*, not just text. | |
| # For 'center', Y means center of the clip. If clip height is full video height, this is not text center. | |
| # The easiest is to re-create poet_clip using custom_position focusing on Y, and let create_advanced_text_clip handle X centering. | |
| # Or, set position as ('center', Y_for_center_of_text_block) | |
| # Let's set the position of the *existing* poet_clip (which is full video canvas size with text drawn on it) | |
| # The 'y' in .set_position(('center', y)) refers to the y of the center of the clip. | |
| # If poet_clip is full video height, its center is video_h/2. | |
| # We want the center of the *text block* to be at a certain Y. | |
| # poet_y_center_of_text_target = poet_y_top_target_abs + actual_poet_text_height / 2 | |
| # We need to find where to place the full-canvas clip so its internal text appears there. | |
| # This means adjusting the original base_y inside create_advanced_text_clip. | |
| # The current `poet_clip` was made with position_key. We just need to adjust its Y. | |
| poet_clip = poet_clip.set_position(('center', poet_y_top_target_abs + actual_poet_text_height / 2 - poet_clip.h / 2)) | |
| # The above is complicated if poet_clip.h is video_h. | |
| # Simpler: create poet_clip with custom_position Y and let internal logic center X. | |
| # The poet_clip is already made. Let's just try setting its Y position. | |
| # We need the Y for the *top* of the poet_clip, assuming text is drawn near top if custom_pos. | |
| # Let's try setting the top of the poet_clip (which is an ImageClip of video_size) | |
| # such that the text *within it* appears where we want. | |
| # This requires knowing where create_advanced_text_clip placed text if custom_pos is used. | |
| # Re-create poet_clip for precise custom Y positioning, allowing its internal X centering to work. | |
| # We need to calculate the X for the text block if we give a custom_position=(x,y) | |
| # The current create_advanced_text_clip uses position_key OR custom_position. | |
| # For this case, we'll set the Y position using moviepy's set_position on the already created clip. | |
| # Assume poet_clip (the transparent canvas) will be positioned such that its drawn text is correct. | |
| # The (x,y) in set_position is top-left of the clip. | |
| # We want the text's top at `poet_y_top_target_abs`. | |
| # The poet_clip is an ImageClip, likely full video height. Its text is drawn at some `base_y` within it. | |
| # This is hard. Let's simplify: position the poet_clip (canvas) such that the center of its text block is at the desired Y. | |
| # We want the TOP of the poet TEXT to be at `poet_y_top_target_abs`. | |
| # The `poet_clip` already has the text rendered on it. Its `pos` can be set. | |
| # The text is at some `y_offset_within_clip` from the top of `poet_clip`. | |
| # We want `y_of_poet_clip_top + y_offset_within_clip = poet_y_top_target_abs`. | |
| # This internal y_offset is `base_y` from `create_advanced_text_clip`. | |
| # This is too complex. Let's use a simpler MoviePy positioning: | |
| # Position the poet_clip (which is a full canvas with text drawn) such that its *center* | |
| # is at `divider_y_top_for_moviepy + divider_img_h_pil + poet_gap_below_divider + actual_poet_text_height / 2`. | |
| # This sets the *center of the text block* correctly. | |
| desired_y_for_center_of_poet_text_block = divider_y_top_for_moviepy + divider_img_h_pil + poet_gap_below_divider + (actual_poet_text_height / 2.0) | |
| poet_clip = poet_clip.set_position(('center', desired_y_for_center_of_poet_text_block)) | |
| text_and_deco_clips.append(poet_clip) # Add the (potentially re-positioned) poet_clip | |
| if add_frame_overlay: | |
| progress(0.6, desc="Adding frame...") | |
| frame_color_rgba = hex_to_rgb(poem_color) + (120,) # Use poem color, semi-transparent | |
| frame_thickness = max(3, video_w // 250) | |
| frame_img = create_decorative_frame(video_w, video_h, frame_type_style, frame_color_rgba, frame_thickness) | |
| frame_clip = mp.ImageClip(np.array(frame_img)).set_duration(video_duration).set_start(0).fadein(0.5) | |
| text_and_deco_clips.insert(0, frame_clip) # Frame at the bottom of overlays | |
| if background_overlay_type != "none" and background_overlay_type: | |
| progress(0.65, desc="Adding background overlay...") | |
| overlay_img = create_background_overlay((video_w, video_h), background_overlay_type, overlay_opacity_value, overlay_color_value) | |
| overlay_clip = mp.ImageClip(np.array(overlay_img)).set_duration(video_duration) | |
| insert_idx = 1 if add_frame_overlay else 0 # Behind text, above video (and frame if exists) | |
| text_and_deco_clips.insert(insert_idx, overlay_clip) | |
| progress(0.7, desc="Applying video effects...") | |
| # Enhanced Zoom/Pan Effect | |
| zoom_factor_max = 1.03 # Subtle zoom | |
| pan_amplitude = max(5, video_w // 200) # Subtle pan | |
| def zoom_pan_effect(get_frame, t): | |
| frame = get_frame(t) | |
| pil_img = Image.fromarray(frame) | |
| orig_w, orig_h = pil_img.size | |
| current_zoom = 1 + (zoom_factor_max - 1) * (0.5 * (1 - np.cos(np.pi * t / video_duration))) | |
| new_w, new_h = int(orig_w * current_zoom), int(orig_h * current_zoom) | |
| resized_pil = pil_img.resize((new_w, new_h), Image.Resampling.LANCZOS) | |
| angle = 2 * np.pi * t / (video_duration / 1.5) # Pan cycle 1.5 times | |
| pan_x = int(pan_amplitude * np.sin(angle * 0.6)) | |
| pan_y = int(pan_amplitude * np.cos(angle)) | |
| crop_x = (new_w - orig_w) // 2 + pan_x; crop_y = (new_h - orig_h) // 2 + pan_y | |
| crop_x = max(0, min(crop_x, new_w - orig_w)); crop_y = max(0, min(crop_y, new_h - orig_h)) | |
| cropped_pil = resized_pil.crop((crop_x, crop_y, crop_x + orig_w, crop_y + orig_h)) | |
| return np.array(cropped_pil) | |
| video_with_anim_effects = video.fl(zoom_pan_effect) | |
| progress(0.8, desc="Compositing final video...") | |
| # Add all text and decor clips to the list of clips to close | |
| for tc in text_and_deco_clips: clips_to_close.append(tc) | |
| final_composite = mp.CompositeVideoClip([video_with_anim_effects] + text_and_deco_clips, size=(video_w, video_h)) | |
| clips_to_close.append(final_composite) | |
| if processed_audio_clip: | |
| final_composite = final_composite.set_audio(processed_audio_clip) | |
| progress(0.9, desc="Rendering final video...") | |
| temp_final_output = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) | |
| final_output_path = temp_final_output.name | |
| temp_final_output.close() | |
| # No need to add final_output_path to files_to_delete, it's the return value. | |
| output_fps = video.fps if video.fps and video.fps > 0 else 24 | |
| output_fps = min(output_fps, 30) # Cap FPS | |
| final_composite.write_videofile( | |
| final_output_path, | |
| codec='libx264', | |
| audio_codec='aac', | |
| fps=output_fps, | |
| preset='medium', | |
| threads=os.cpu_count(), # Use available cores | |
| ffmpeg_params=['-crf', '23', '-pix_fmt', 'yuv420p'] # CRF 23 good balance | |
| ) | |
| progress(1.0, desc="Process complete!") | |
| return final_output_path, f"Video processed successfully: {os.path.basename(final_output_path)}" | |
| except Exception as e: | |
| print(f"An error occurred in process_video: {e}") | |
| traceback.print_exc() | |
| # Ensure temp output file is cleaned if error occurs before return | |
| if final_output_path and os.path.exists(final_output_path): | |
| try: os.remove(final_output_path) | |
| except: pass | |
| return None, f"Error: {str(e)}. Check console for details." | |
| finally: | |
| print("--- Cleaning up resources ---") | |
| for clip in clips_to_close: | |
| try: | |
| if clip: clip.close() | |
| except Exception as e_clean: | |
| print(f"Error closing a clip: {e_clean}") | |
| for file_path in files_to_delete: | |
| if file_path and os.path.exists(file_path): | |
| try: | |
| os.remove(file_path) | |
| print(f"Cleaned up temporary file: {file_path}") | |
| except Exception as e_clean: | |
| print(f"Error cleaning up temp file {file_path}: {e_clean}") | |
| print("--- Cleanup finished ---") | |
| # --- Gradio Interface Definition --- | |
| with gr.Blocks(theme=gr.themes.Soft()) as iface: | |
| gr.Markdown("# Persian Poetry Video Creator ✨") | |
| gr.Markdown("Overlay Persian poetry onto videos with Instagram/Google Drive/Direct URL support. Ensure `INSTA_ONEAPI_KEY` is set for Instagram.") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| video_url = gr.Textbox(label="Video URL (Instagram, Google Drive, Direct Link)", placeholder="e.g., https://www.instagram.com/p/...") | |
| music_url = gr.Textbox(label="Background Music URL (Direct Link, Optional)", placeholder="e.g., https://example.com/music.mp3") | |
| poem_verse = gr.TextArea(label="Poem Verse (use '\\n' for new lines)", lines=3, placeholder="مثال:\nاین قافله عمر عجب میگذرد\nدریاب دمی که با طرب میگذرد") | |
| poet_name = gr.Textbox(label="Poet Name", placeholder="e.g., خیام") | |
| username = gr.Textbox(label="Your Username (will be displayed with @)", placeholder="e.g., persian_poetry_lover") | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Output") | |
| output_video = gr.Video(label="Processed Video") | |
| status_message = gr.Textbox(label="Status", interactive=False) | |
| process_button = gr.Button("Create Video", variant="primary") | |
| with gr.Accordion("⚙️ Customization Options", open=False): | |
| gr.Markdown("#### Font Settings") | |
| with gr.Row(): | |
| auto_font_size_enabled = gr.Checkbox(label="Auto Adjust Font Sizes", value=True) | |
| poem_font_size_manual = gr.Number(label="Poem Font Size (if Auto off)", value=60, minimum=10, maximum=200, step=1) | |
| with gr.Row(): | |
| poet_font_size_manual = gr.Number(label="Poet Font Size (if Auto off)", value=40, minimum=10, maximum=150, step=1) | |
| username_font_size_manual = gr.Number(label="Username Font Size (if Auto off)", value=30, minimum=10, maximum=100, step=1) | |
| gr.Markdown("#### Text Colors") | |
| with gr.Row(): | |
| poem_color = gr.ColorPicker(label="Poem Color", value="#FFFFFF") | |
| poet_color = gr.ColorPicker(label="Poet Color", value="#E0E0E0") # Slightly off-white for poet | |
| username_color = gr.ColorPicker(label="Username Color", value="#B0B0B0") # Greyish for username | |
| gr.Markdown("#### Text Effects & Styling") | |
| with gr.Row(): | |
| poem_effect = gr.Radio(["none", "fade_in", "fade_out", "fade_in_out", "slide_up"], label="Poem Animation", value="fade_in") | |
| poet_effect = gr.Radio(["none", "fade_in", "fade_out", "fade_in_out"], label="Poet Animation", value="fade_in") # Slide up for poet might conflict with divider | |
| username_effect = gr.Radio(["none", "fade_in", "fade_out", "fade_in_out"], label="Username Animation", value="fade_in") | |
| with gr.Row(): | |
| enable_glow_effect = gr.Checkbox(label="Enable Text Glow Effect", value=True) | |
| glow_color_main = gr.ColorPicker(label="Glow Color (if enabled)", value="#FFFFFF", info="Applied to all text if glow is on") | |
| gr.Markdown("#### Decorative Elements") | |
| with gr.Row(): | |
| add_divider_line = gr.Checkbox(label="Add Divider Line (below poem, above poet)", value=True) | |
| divider_type_style = gr.Radio(["simple"], label="Divider Type", value="simple", info="More types can be added.") # Kept simple for now | |
| with gr.Row(): | |
| add_frame_overlay = gr.Checkbox(label="Add Decorative Frame", value=False) | |
| frame_type_style = gr.Radio(["simple"], label="Frame Type", value="simple", info="More types can be added.") | |
| gr.Markdown("#### Background Overlay") | |
| with gr.Row(): | |
| background_overlay_type = gr.Radio(["none", "vignette", "solid_color"], label="Background Overlay Type", value="vignette") | |
| overlay_opacity_value = gr.Slider(minimum=0.0, maximum=1.0, step=0.05, label="Overlay Opacity", value=0.3) | |
| overlay_color_value = gr.ColorPicker(label="Solid Overlay Color", value="#000000", info="Used if 'solid_color' overlay is chosen") | |
| gr.Markdown("#### Text Positioning (Advanced)") | |
| with gr.Row(): | |
| poem_text_position = gr.Radio(["center", "top", "bottom"], label="Poem Base Position", value="center") | |
| poet_original_position = gr.Radio(["bottom", "center"], label="Poet Original Anchor (influences divider if active)", value="bottom") | |
| username_text_position = gr.Radio(["top", "bottom"], label="Username Position", value="top") | |
| # Connect button to function | |
| process_button.click( | |
| fn=process_video, | |
| inputs=[ | |
| video_url, music_url, poem_verse, poet_name, username, | |
| auto_font_size_enabled, poem_font_size_manual, poet_font_size_manual, username_font_size_manual, | |
| poem_color, poet_color, username_color, | |
| poem_effect, poet_effect, username_effect, | |
| enable_glow_effect, glow_color_main, | |
| add_divider_line, divider_type_style, | |
| add_frame_overlay, frame_type_style, | |
| background_overlay_type, overlay_opacity_value, overlay_color_value, | |
| poem_text_position, poet_original_position, username_text_position | |
| ], | |
| outputs=[output_video, status_message] | |
| ) | |
| if __name__ == "__main__": | |
| iface.launch(debug=True) # Launch the Gradio app | |