Spaces:
Build error
Build error
| """ | |
| SongLab AI - Gradio Interface | |
| Modern, minimalist music generation interface with WordPress integration | |
| Includes: Music, Vocals, Video Generation, Audio Effects | |
| """ | |
| import os | |
| import io | |
| import json | |
| import uuid | |
| import base64 | |
| import time | |
| import tempfile | |
| import subprocess | |
| import threading | |
| from typing import Optional, Tuple, Dict, Any, List | |
| from datetime import datetime | |
| import gradio as gr | |
| import requests | |
| import numpy as np | |
| from pydub import AudioSegment | |
| from pydub.effects import low_pass_filter, high_pass_filter | |
| from gradio_client import Client | |
| import jwt | |
| try: | |
| from huggingface_hub import HfApi, upload_file | |
| HF_AVAILABLE = True | |
| except ImportError: | |
| HF_AVAILABLE = False | |
| try: | |
| from gtts import gTTS | |
| GTTS_AVAILABLE = True | |
| except ImportError: | |
| GTTS_AVAILABLE = False | |
| # Configuration | |
| WORDPRESS_BASE_URL = "https://songlabai.com" | |
| LYRICS_API_URL = os.environ.get("LYRICS_API_URL") | |
| BEARER_TOKEN = os.environ.get("BEARER_TOKEN") | |
| # Video Configuration | |
| VIDEO_ENDPOINT_URL = os.environ.get("VIDEO_ENDPOINT_URL", "") | |
| VIDEO_HF_TOKEN = os.environ.get("VIDEO_HF_TOKEN", "") | |
| VIDEO_REPO_ID = os.environ.get("VIDEO_REPO_ID", "Phoenixak99/savedvideos") | |
| HF_TOKEN_DOWNLOAD = os.environ.get("HF_TOKEN_DOWNLOAD", "") | |
| # Global state management | |
| user_sessions = {} | |
| # Example commercial ads | |
| EXAMPLE_COMMERCIAL_ADS = { | |
| "Select an example...": {"jingle": "", "lyrics": "", "video": ""}, | |
| "Luxury Car Launch": { | |
| "jingle": "cinematic orchestral, powerful strings, epic drums, sophisticated and premium, rising crescendo", | |
| "lyrics": "Drive the future, feel the power, elegance in motion, every hour", | |
| "video": "Sleek black luxury sedan driving through mountain roads at golden hour, dramatic low angle shots, cinematic camera movements, reflections on polished surface, mist rolling through valleys" | |
| }, | |
| "Coffee Shop Morning": { | |
| "jingle": "acoustic guitar, warm and cozy, jazzy piano, uplifting morning vibes, folk pop", | |
| "lyrics": "Morning starts with you, fresh and true, every sip a new day, brewing your way", | |
| "video": "Steaming coffee cup on rustic wooden table by cafe window, warm morning sunlight streaming through, barista pouring latte art, cozy atmosphere with bokeh background" | |
| }, | |
| "Fitness App Energy": { | |
| "jingle": "electronic dance, energetic beat, motivational synths, upbeat tempo, powerful bass", | |
| "lyrics": "Push your limits, break the wall, stronger faster, give your all", | |
| "video": "Dynamic fitness montage with athletes training, slow motion sweat drops, vibrant gym lighting, rapid cuts between exercises, motivational energy" | |
| }, | |
| "Gourmet Restaurant": { | |
| "jingle": "smooth jazz, elegant piano, sophisticated strings, upscale dining ambiance", | |
| "lyrics": "Taste perfection, pure delight, culinary art, every bite", | |
| "video": "Close-up of gourmet dish being plated with artistic precision, chef's hands in motion, golden lighting on fine china, wine being poured, elegant restaurant atmosphere" | |
| }, | |
| "Adventure Travel": { | |
| "jingle": "world music fusion, adventurous drums, uplifting melodies, wanderlust vibes", | |
| "lyrics": "Explore the world, discover more, adventure waits, through every door", | |
| "video": "Epic landscape shots transitioning through mountains, beaches, cities, aerial drone views, backpacker silhouettes at sunset, dynamic travel montage" | |
| }, | |
| "Tech Innovation": { | |
| "jingle": "futuristic electronic, pulsing synths, modern tech sounds, innovative vibes", | |
| "lyrics": "Future is here, innovation clear, technology near, progress we steer", | |
| "video": "Sleek technology interfaces with holographic displays, circuit board macro shots, blue LED lighting, smooth product reveals, modern minimalist aesthetic" | |
| }, | |
| "Summer Beach Party": { | |
| "jingle": "tropical house, beach vibes, summer energy, catchy melody, feel-good beats", | |
| "lyrics": "Sun and sand, fun at hand, summer days, ocean waves", | |
| "video": "Beach party scenes with friends laughing, sunset over ocean, volleyball in slow motion, colorful beach umbrellas, festive summer atmosphere" | |
| }, | |
| "Luxury Perfume": { | |
| "jingle": "elegant orchestral, mysterious atmosphere, sophisticated strings, sensual rhythm", | |
| "lyrics": "Essence of elegance, fragrance divine, timeless beauty, forever shine", | |
| "video": "Perfume bottle rotating on reflective black surface, dramatic lighting with golden highlights, elegant hand reaching for bottle, misty atmospheric effects, luxury aesthetic" | |
| } | |
| } | |
| class AudioProcessor: | |
| """Handles all audio processing operations""" | |
| def apply_stereo_effect(audio: AudioSegment, separation: int = 30) -> AudioSegment: | |
| """Apply stereo widening effect""" | |
| if audio.channels == 1: | |
| audio = audio.set_channels(2) | |
| left = audio.split_to_mono()[0] | |
| right = audio.split_to_mono()[1] if audio.channels == 2 else left | |
| left_filtered = high_pass_filter(left, 200) | |
| right_filtered = low_pass_filter(right, 8000) | |
| return AudioSegment.from_mono_audiosegments(left_filtered, right_filtered) | |
| def apply_reverse(audio: AudioSegment) -> AudioSegment: | |
| """Reverse the audio""" | |
| return audio.reverse() | |
| def adjust_volume(audio: AudioSegment, db_change: float) -> AudioSegment: | |
| """Adjust volume by dB""" | |
| return audio + db_change | |
| def change_pitch(audio: AudioSegment, semitones: int) -> AudioSegment: | |
| """Change pitch by semitones""" | |
| if semitones == 0: | |
| return audio | |
| new_sample_rate = int(audio.frame_rate * (2.0 ** (semitones / 12.0))) | |
| return audio._spawn(audio.raw_data, overrides={'frame_rate': new_sample_rate}).set_frame_rate(audio.frame_rate) | |
| def trim_audio(audio: AudioSegment, duration_seconds: int) -> AudioSegment: | |
| """Trim audio to specified duration""" | |
| return audio[:duration_seconds * 1000] | |
| class WordPressAPI: | |
| """Handles all WordPress API interactions""" | |
| def __init__(self, base_url: str): | |
| self.base_url = base_url | |
| def _get_headers(self, jwt_token: str) -> Dict[str, str]: | |
| """Generate API headers with JWT token""" | |
| return { | |
| "Authorization": f"Bearer {jwt_token}", | |
| "Content-Type": "application/json", | |
| "Cache-Control": "no-store" | |
| } | |
| def get_user_credits(self, jwt_token: str) -> Dict[str, int]: | |
| """Fetch user's audio credits from new audio API""" | |
| try: | |
| response = requests.get( | |
| f"{self.base_url}/wp-json/audio-api/v1/audio-credits", | |
| headers=self._get_headers(jwt_token), | |
| timeout=10 | |
| ) | |
| # Don't use raise_for_status() - handle status codes manually | |
| if response.status_code != 200: | |
| print(f"[CREDIT DEBUG] API returned status {response.status_code}: {response.text}") | |
| return { | |
| 'remaining_credits': 0, | |
| 'free_samples_remaining': 0, | |
| 'total_credits': 0, | |
| 'used_credits': 0, | |
| 'free_samples_used': 0 | |
| } | |
| data = response.json() | |
| print(f"[CREDIT DEBUG] Raw API response: {data}") | |
| print(f"[CREDIT DEBUG] remaining_credits: {data.get('remaining_credits', 'MISSING')}") | |
| print(f"[CREDIT DEBUG] free_samples_remaining: {data.get('free_samples_remaining', 'MISSING')}") | |
| print(f"[CREDIT DEBUG] total_credits: {data.get('total_credits', 'MISSING')}") | |
| # New API returns different structure | |
| # IMPORTANT: Convert string values to int (API returns strings) | |
| result = { | |
| 'remaining_credits': int(data.get('remaining_credits', 0)), | |
| 'free_samples_remaining': int(data.get('free_samples_remaining', 0)), | |
| 'total_credits': int(data.get('total_credits', 0)), | |
| 'used_credits': int(data.get('used_credits', 0)), | |
| 'free_samples_used': int(data.get('free_samples_used', 0)) | |
| } | |
| print(f"[CREDIT DEBUG] Final parsed result: {result}") | |
| return result | |
| except Exception as e: | |
| print(f"[CREDIT DEBUG] Error fetching credits: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return { | |
| 'remaining_credits': 0, | |
| 'free_samples_remaining': 0, | |
| 'total_credits': 0, | |
| 'used_credits': 0, | |
| 'free_samples_used': 0 | |
| } | |
| def check_audio_eligibility(self, jwt_token: str, duration: int, has_lyrics: bool = False) -> Dict[str, Any]: | |
| """Check if user can generate audio (includes 1 free sample)""" | |
| try: | |
| response = requests.post( | |
| f"{self.base_url}/wp-json/audio-api/v1/check-eligibility", | |
| headers=self._get_headers(jwt_token), | |
| json={'duration': duration, 'has_lyrics': has_lyrics}, | |
| timeout=10 | |
| ) | |
| response.raise_for_status() | |
| return response.json() | |
| except Exception as e: | |
| print(f"Error checking audio eligibility: {e}") | |
| # On error, allow 1 free sample as fallback | |
| return { | |
| 'can_generate': True, | |
| 'reason': 'error_fallback', | |
| 'uses_credit': False, | |
| 'free_samples_remaining': 1, | |
| 'remaining_credits': 0 | |
| } | |
| def get_video_credits(self, jwt_token: str) -> Dict[str, Any]: | |
| """Fetch user's video credits""" | |
| try: | |
| response = requests.get( | |
| f"{self.base_url}/wp-json/video-api/v1/video-credits", | |
| headers=self._get_headers(jwt_token), | |
| timeout=10 | |
| ) | |
| response.raise_for_status() | |
| return response.json() | |
| except Exception as e: | |
| print(f"Error fetching video credits: {e}") | |
| return {} | |
| def check_video_eligibility(self, jwt_token: str, is_custom_mode: bool = False) -> Dict[str, Any]: | |
| """Check if user can generate videos (includes 1 free preset generation)""" | |
| try: | |
| # Get video credits | |
| video_credits = self.get_video_credits(jwt_token) | |
| if not video_credits: | |
| # If API fails, allow 1 free generation as fallback | |
| return { | |
| 'can_generate': True, | |
| 'free_remaining': 1, | |
| 'credits': 0, | |
| 'reason': 'free_generation' | |
| } | |
| # Check user ID for admin status | |
| try: | |
| decoded = jwt.decode(jwt_token, options={"verify_signature": False}) | |
| user_id = decoded.get('data', {}).get('user', {}).get('id') | |
| is_admin = user_id in [1, 206] # Admin IDs | |
| except: | |
| is_admin = False | |
| # Admins can always generate | |
| if is_admin: | |
| return { | |
| 'can_generate': True, | |
| 'free_remaining': 999, | |
| 'credits': 999, | |
| 'reason': 'admin' | |
| } | |
| free_remaining = video_credits.get('free_generations_remaining', 0) | |
| credits_remaining = video_credits.get('remaining_credits', 0) | |
| # PRESET MODE (Non-custom): Allow if free generations remain | |
| if not is_custom_mode: | |
| if free_remaining > 0: | |
| return { | |
| 'can_generate': True, | |
| 'free_remaining': free_remaining, | |
| 'credits': credits_remaining, | |
| 'reason': 'free_preset' | |
| } | |
| # CUSTOM MODE or NO FREE GENERATIONS: Require credits | |
| if credits_remaining > 0: | |
| return { | |
| 'can_generate': True, | |
| 'free_remaining': free_remaining, | |
| 'credits': credits_remaining, | |
| 'reason': 'credits' | |
| } | |
| # No eligibility | |
| return { | |
| 'can_generate': False, | |
| 'free_remaining': free_remaining, | |
| 'credits': credits_remaining, | |
| 'reason': 'no_credits_or_free_gens' | |
| } | |
| except Exception as e: | |
| print(f"Error checking video eligibility: {e}") | |
| # On error, allow 1 free generation as fallback | |
| return { | |
| 'can_generate': True, | |
| 'free_remaining': 1, | |
| 'credits': 0, | |
| 'reason': 'error_fallback' | |
| } | |
| def use_video_credit(self, jwt_token: str, generation_data: Dict[str, Any]) -> bool: | |
| """Deduct video credit after successful generation""" | |
| try: | |
| response = requests.post( | |
| f"{self.base_url}/wp-json/video-api/v1/use-video-credit", | |
| headers=self._get_headers(jwt_token), | |
| json=generation_data, | |
| timeout=10 | |
| ) | |
| response.raise_for_status() | |
| return True | |
| except Exception as e: | |
| print(f"Error using video credit: {e}") | |
| return False | |
| def use_audio_credit(self, jwt_token: str, duration: int, has_lyrics: bool = False, metadata: Dict[str, Any] = None) -> bool: | |
| """Deduct an audio credit or free sample based on duration""" | |
| try: | |
| payload = { | |
| 'duration': duration, | |
| 'has_lyrics': has_lyrics, | |
| 'metadata': metadata or {} | |
| } | |
| response = requests.post( | |
| f"{self.base_url}/wp-json/audio-api/v1/use-audio-credit", | |
| headers=self._get_headers(jwt_token), | |
| json=payload, | |
| timeout=10 | |
| ) | |
| response.raise_for_status() | |
| return response.json().get('success', False) | |
| except Exception as e: | |
| print(f"Error using credit: {e}") | |
| return False | |
| def upload_track(self, jwt_token: str, audio_bytes: bytes, metadata: Dict[str, Any]) -> bool: | |
| """Upload track to WordPress""" | |
| try: | |
| # Decode JWT to get user info | |
| decoded = jwt.decode(jwt_token, options={"verify_signature": False}) | |
| user_info = decoded.get('data', {}).get('user', {}) | |
| files = {'audio_file': ('track.wav', audio_bytes, 'audio/wav')} | |
| data = { | |
| 'user_id': str(user_info.get('id', '')), | |
| 'user_email': user_info.get('user_email', ''), | |
| 'user_name': user_info.get('display_name', ''), | |
| 'subscription_tier': 'free', | |
| 'saved_by_user': '1', | |
| **metadata | |
| } | |
| response = requests.post( | |
| f"{self.base_url}/wp-admin/admin-ajax.php?action=upload_audio_direct", | |
| files=files, | |
| data=data, | |
| headers={"Authorization": f"Bearer {jwt_token}"}, | |
| timeout=30 | |
| ) | |
| response.raise_for_status() | |
| return True | |
| except Exception as e: | |
| print(f"Error uploading track: {e}") | |
| return False | |
| class MusicGenerator: | |
| """Handles music generation via MusicGen""" | |
| def __init__(self): | |
| self.client = None | |
| self.space_name = "Healthydater/musicgen" | |
| def get_client(self) -> Client: | |
| """Get or create MusicGen client with retry logic""" | |
| if self.client is None: | |
| for attempt in range(3): | |
| try: | |
| self.client = Client(self.space_name) | |
| return self.client | |
| except Exception as e: | |
| if attempt < 2: | |
| time.sleep(120) | |
| else: | |
| raise Exception(f"Failed to connect to MusicGen: {e}") | |
| return self.client | |
| def generate_music(self, prompt, duration, progress=gr.Progress()): | |
| """Generate music from prompt""" | |
| max_retries = 3 | |
| retry_delay = 180 # 3 minutes for cold start | |
| for attempt in range(max_retries): | |
| try: | |
| progress(0.1 + (attempt * 0.05), desc=f"Connecting to MusicGen (attempt {attempt + 1}/{max_retries})...") | |
| client = self.get_client() | |
| progress(0.3, desc="Generating music...") | |
| result = client.predict( | |
| prompt=prompt, | |
| duration=duration, | |
| temperature=1.0, | |
| top_k=250, | |
| top_p=0.0, | |
| cfg_coef=3.0, | |
| use_sampling=True, | |
| extend_stride=18.0, | |
| api_name="/generate_music" | |
| ) | |
| progress(0.9, desc="Processing audio...") | |
| # Handle different response formats | |
| print(f"[DEBUG] MusicGen result type: {type(result)}") | |
| # Format 1: Tuple (sample_rate, audio_array or dict) | |
| if isinstance(result, tuple) and len(result) == 2: | |
| sample_rate, samples = result | |
| # Samples might be numpy array directly | |
| if isinstance(samples, np.ndarray): | |
| print(f"[DEBUG] Tuple with numpy array") | |
| return sample_rate, samples | |
| # OR samples might be a dict with audio data | |
| elif isinstance(samples, dict): | |
| print(f"[DEBUG] Tuple with dict, keys: {samples.keys()}") | |
| if 'error' in samples: | |
| raise Exception(f"API error: {samples['error']}") | |
| # Extract audio from dict | |
| if 'generated_audio' in samples: | |
| audio_list = samples['generated_audio'] | |
| actual_sample_rate = samples.get('sample_rate', sample_rate) | |
| audio_array = np.array(audio_list, dtype=np.float32) | |
| print(f"[DEBUG] Extracted audio: {len(audio_array)} samples at {actual_sample_rate}Hz") | |
| return actual_sample_rate, audio_array | |
| else: | |
| raise Exception(f"Dict in tuple missing 'generated_audio'. Keys: {list(samples.keys())}") | |
| else: | |
| raise Exception(f"Expected numpy array or dict in tuple, got: {type(samples)}") | |
| # Format 2: Dict with audio data (no tuple) | |
| elif isinstance(result, dict): | |
| print(f"[DEBUG] Direct dict, keys: {result.keys()}") | |
| # Check for error | |
| if 'error' in result: | |
| raise Exception(f"API error: {result['error']}") | |
| # Extract audio from dict | |
| if 'generated_audio' in result and 'sample_rate' in result: | |
| audio_list = result['generated_audio'] | |
| sample_rate = result['sample_rate'] | |
| samples = np.array(audio_list, dtype=np.float32) | |
| return sample_rate, samples | |
| else: | |
| raise Exception(f"Unexpected dict format. Keys: {list(result.keys())}") | |
| # Format 3: File path (fallback) | |
| elif isinstance(result, str): | |
| print(f"[DEBUG] File path result") | |
| audio = AudioSegment.from_file(result) | |
| samples = np.array(audio.get_array_of_samples()).astype(np.float32) / 32768.0 | |
| if audio.channels == 2: | |
| samples = samples.reshape((-1, 2)) | |
| return audio.frame_rate, samples | |
| else: | |
| raise Exception(f"Unexpected result format: {type(result)}") | |
| except (requests.exceptions.Timeout, requests.exceptions.ReadTimeout, TimeoutError) as e: | |
| error_msg = str(e) | |
| if attempt < max_retries - 1: | |
| progress(0.2, desc=f"Timeout - model loading... retrying in {retry_delay}s") | |
| time.sleep(retry_delay) | |
| continue | |
| else: | |
| raise Exception(f"Generation timed out after {max_retries} attempts. Model may be starting up - please try again in a few minutes.") | |
| except Exception as e: | |
| error_msg = str(e) | |
| # Check if it's a 503 or loading error | |
| if "503" in error_msg or "loading" in error_msg.lower() or "starting" in error_msg.lower(): | |
| if attempt < max_retries - 1: | |
| progress(0.2, desc=f"Model loading... retrying in {retry_delay}s") | |
| time.sleep(retry_delay) | |
| continue | |
| # Not a loading error - raise it | |
| raise Exception(f"Generation failed: {str(e)}") | |
| def generate_with_lyrics(self, prompt, lyrics, duration, progress=gr.Progress()): | |
| """Generate music with vocals/lyrics""" | |
| if not LYRICS_API_URL or not BEARER_TOKEN: | |
| raise Exception("Lyrics API not configured") | |
| try: | |
| progress(0.1, desc="Preparing lyrics generation...") | |
| payload = { | |
| "inputs": { | |
| "prompt": prompt, | |
| "lyrics": lyrics | |
| }, | |
| "parameters": { | |
| "duration": duration, | |
| "infer_step": 60, | |
| "guidance_scale": 15, | |
| "scheduler_type": "euler", | |
| "guidance_interval": 0.5, | |
| "guidance_interval_decay": 0.0, | |
| "min_guidance_scale": 3.0, | |
| "use_erg_tag": True, | |
| "use_erg_lyric": False, | |
| "use_erg_diffusion": True | |
| } | |
| } | |
| headers = { | |
| "Authorization": f"Bearer {BEARER_TOKEN}", | |
| "Content-Type": "application/json" | |
| } | |
| # Retry logic for 503 errors (endpoint warming up) | |
| max_retries = 5 | |
| retry_delay = 180 # Start with 3 minutes for cold start | |
| for attempt in range(max_retries): | |
| try: | |
| wait_time = retry_delay if attempt == 0 else retry_delay // 2 # 3min first, then 90s | |
| progress(0.2 + (attempt * 0.1), desc=f"Generating with vocals (attempt {attempt + 1}/{max_retries})...") | |
| response = requests.post( | |
| LYRICS_API_URL, | |
| headers=headers, | |
| json=payload, | |
| timeout=300 | |
| ) | |
| # Handle 503 - endpoint warming up | |
| if response.status_code == 503: | |
| if attempt < max_retries - 1: | |
| progress(0.3, desc=f"Endpoint warming up... retrying in {wait_time}s") | |
| time.sleep(wait_time) | |
| continue | |
| else: | |
| raise Exception("Lyrics endpoint unavailable after all retries. Please try again in a few minutes.") | |
| response.raise_for_status() | |
| # The jingles handler returns JSON with base64 encoded audio | |
| content_type = response.headers.get('Content-Type', '') | |
| if 'application/json' in content_type: | |
| # JSON response with base64 audio | |
| json_data = response.json() | |
| if 'error' in json_data: | |
| raise Exception(f"API error: {json_data['error']}") | |
| if 'audio' not in json_data: | |
| raise Exception(f"Invalid JSON response: missing 'audio' field. Keys: {list(json_data.keys())}") | |
| # Decode base64 audio | |
| audio_base64 = json_data['audio'] | |
| audio_bytes = base64.b64decode(audio_base64) | |
| # Verify audio bytes are valid | |
| if len(audio_bytes) < 1000: | |
| raise Exception("Received invalid audio data (too small)") | |
| elif 'audio' in content_type or 'octet-stream' in content_type: | |
| # Raw audio bytes (old format) | |
| audio_bytes = response.content | |
| if len(audio_bytes) < 1000: | |
| raise Exception("Received invalid audio data (too small)") | |
| else: | |
| raise Exception(f"Invalid response from API. Expected audio or JSON, got: {content_type}") | |
| progress(0.9, desc="Processing audio...") | |
| audio = AudioSegment.from_file(io.BytesIO(audio_bytes)) | |
| samples = np.array(audio.get_array_of_samples()).astype(np.float32) / 32768.0 | |
| if audio.channels == 2: | |
| samples = samples.reshape((-1, 2)) | |
| return audio.frame_rate, samples | |
| except requests.exceptions.Timeout: | |
| if attempt < max_retries - 1: | |
| progress(0.3, desc=f"Request timeout, retrying in {wait_time}s...") | |
| time.sleep(wait_time) | |
| continue | |
| else: | |
| raise Exception("Request timed out after all retries") | |
| except Exception as e: | |
| raise Exception(f"Lyrics generation failed: {str(e)}") | |
| class VideoGenerator: | |
| """Handles video generation with retry logic""" | |
| def __init__(self): | |
| self.api = HfApi() if HF_AVAILABLE else None | |
| def calculate_temp(self, duration: float, fps: int) -> int: | |
| """Calculate temp value based on duration and FPS""" | |
| # Reference: 31 temp = 10s at 24fps (240 frames) | |
| return int((duration * fps * 31) / 240) | |
| def warmup_endpoint(self): | |
| """Warmup video endpoint (non-blocking)""" | |
| try: | |
| warmup_payload = { | |
| "inputs": { | |
| "prompt": "warmup", | |
| "mode": "text_to_video", | |
| "width": 640, | |
| "height": 384, | |
| "temp": 16, | |
| "guidance_scale": 7.0, | |
| "video_guidance_scale": 5.0, | |
| "num_inference_steps": [5, 5, 5], | |
| "video_num_inference_steps": [3, 3, 3], | |
| "fps": 24 | |
| } | |
| } | |
| headers = { | |
| "Authorization": f"Bearer {VIDEO_HF_TOKEN}", | |
| "Content-Type": "application/json" | |
| } | |
| requests.post(VIDEO_ENDPOINT_URL, headers=headers, json=warmup_payload, timeout=10) | |
| except: | |
| pass # Silent fail | |
| def generate_video(self, mode, prompt, duration, fps, | |
| guidance_scale, video_guidance_scale, | |
| image_base64: Optional[str] = None, progress=gr.Progress()): | |
| """Generate video with retry logic""" | |
| if not VIDEO_ENDPOINT_URL or not VIDEO_HF_TOKEN: | |
| raise Exception("Video generation not configured") | |
| temp_value = self.calculate_temp(duration, fps) | |
| payload = { | |
| "inputs": { | |
| "prompt": prompt, | |
| "mode": mode, | |
| "width": 1280, | |
| "height": 768, | |
| "temp": temp_value, | |
| "guidance_scale": guidance_scale, | |
| "video_guidance_scale": video_guidance_scale, | |
| "num_inference_steps": [20, 20, 20], | |
| "video_num_inference_steps": [10, 10, 10], | |
| "fps": fps | |
| } | |
| } | |
| if mode == "image_to_video" and image_base64: | |
| payload["inputs"]["image"] = image_base64 | |
| headers = { | |
| "Authorization": f"Bearer {VIDEO_HF_TOKEN}", | |
| "Content-Type": "application/json" | |
| } | |
| # Send async request - don't wait for response, poll dataset instead | |
| # Video generation happens in background and uploads to dataset | |
| progress(0.1, desc="Sending video generation request...") | |
| try: | |
| # Fire and forget - use short timeout since we'll poll the dataset | |
| response = requests.post( | |
| VIDEO_ENDPOINT_URL, | |
| headers=headers, | |
| json=payload, | |
| timeout=5 # Short timeout - just to send the request | |
| ) | |
| if response.status_code == 503: | |
| progress(0.15, desc="Endpoint warming up, waiting 60s...") | |
| time.sleep(60) | |
| # Try one more time | |
| response = requests.post(VIDEO_ENDPOINT_URL, headers=headers, json=payload, timeout=5) | |
| except requests.exceptions.Timeout: | |
| # Expected - request sent, video generating in background | |
| progress(0.2, desc="Request sent, monitoring dataset...") | |
| pass | |
| except Exception as e: | |
| # Non-timeout error - still proceed to monitor in case request went through | |
| print(f"Request error (non-fatal): {e}") | |
| progress(0.2, desc="Monitoring dataset for video...") | |
| # Monitor for video completion by polling the dataset | |
| return self.monitor_video_completion(duration, fps, progress) | |
| def monitor_video_completion(self, duration, fps, progress=gr.Progress()): | |
| """Monitor HuggingFace dataset for video completion""" | |
| if not self.api or not HF_TOKEN_DOWNLOAD: | |
| raise Exception("HuggingFace API not available") | |
| # Get initial file count | |
| try: | |
| files = self.api.list_repo_files( | |
| repo_id=VIDEO_REPO_ID, | |
| repo_type="dataset", | |
| token=HF_TOKEN_DOWNLOAD | |
| ) | |
| initial_count = len([f for f in files if f.endswith('.mp4')]) | |
| except Exception as e: | |
| raise Exception(f"Failed to access video repository: {e}") | |
| # Calculate timeout | |
| estimated_minutes = (duration * fps) / 16 | |
| max_wait_seconds = int((estimated_minutes + 3) * 60 * 1.2) | |
| check_interval = 10 | |
| start_time = time.time() | |
| progress(0.3, desc=f"Generating video (est: ~{int(estimated_minutes)} min)...") | |
| while True: | |
| elapsed = time.time() - start_time | |
| if elapsed > max_wait_seconds: | |
| raise Exception(f"Video generation timeout ({int(max_wait_seconds/60)} minutes)") | |
| # Update progress | |
| progress_pct = min(0.3 + (elapsed / (max_wait_seconds * 0.95)) * 0.6, 0.9) | |
| progress(progress_pct, desc=f"Generating... {elapsed/60:.1f} min elapsed") | |
| # Check for new video | |
| try: | |
| files = self.api.list_repo_files( | |
| repo_id=VIDEO_REPO_ID, | |
| repo_type="dataset", | |
| token=HF_TOKEN_DOWNLOAD | |
| ) | |
| video_files = [f for f in files if f.endswith('.mp4')] | |
| if len(video_files) > initial_count: | |
| # New video detected | |
| latest_video = sorted(video_files)[-1] | |
| video_url = f"https://huggingface.co/datasets/{VIDEO_REPO_ID}/resolve/main/{latest_video}" | |
| progress(0.95, desc="Downloading video...") | |
| # Download video | |
| download_headers = {"Authorization": f"Bearer {HF_TOKEN_DOWNLOAD}"} | |
| video_response = requests.get(video_url, headers=download_headers, timeout=120) | |
| if video_response.status_code == 200: | |
| # Save to temp file | |
| with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as f: | |
| f.write(video_response.content) | |
| progress(1.0, desc="Video complete!") | |
| return f.name | |
| raise Exception("Failed to download video") | |
| except Exception as e: | |
| print(f"Error checking for video: {e}") | |
| time.sleep(check_interval) | |
| def merge_audio_video(self, video_path: str, audio_path: str) -> str: | |
| """Merge jingle audio with video using FFmpeg""" | |
| output_path = tempfile.mktemp(suffix="_final.mp4") | |
| cmd = [ | |
| "ffmpeg", "-y", | |
| "-i", video_path, | |
| "-i", audio_path, | |
| "-c:v", "copy", | |
| "-c:a", "aac", | |
| "-b:a", "192k", | |
| "-shortest", | |
| output_path | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| if result.returncode != 0: | |
| raise Exception(f"FFmpeg merge failed: {result.stderr}") | |
| return output_path | |
| def generate_intro_with_voiceover(self, intro_prompt, company_name, | |
| progress=gr.Progress()): | |
| """Generate 5-second intro with voiceover""" | |
| # Generate 5s intro video at 8fps | |
| progress(0.1, desc="Generating intro video...") | |
| intro_path = self.generate_video( | |
| mode="text_to_video", | |
| prompt=intro_prompt, | |
| duration=5.0, | |
| fps=8, | |
| guidance_scale=7.0, | |
| video_guidance_scale=5.0, | |
| progress=progress | |
| ) | |
| if not intro_path or not GTTS_AVAILABLE or not company_name: | |
| return intro_path | |
| # Generate voiceover | |
| try: | |
| progress(0.8, desc="Adding voiceover...") | |
| tts = gTTS(text=company_name, lang='en', slow=False) | |
| voiceover_path = tempfile.mktemp(suffix="_voice.mp3") | |
| tts.save(voiceover_path) | |
| # Merge intro + voiceover | |
| output_path = tempfile.mktemp(suffix="_intro_final.mp4") | |
| cmd = [ | |
| 'ffmpeg', '-y', | |
| '-i', intro_path, | |
| '-i', voiceover_path, | |
| '-c:v', 'copy', | |
| '-c:a', 'aac', | |
| '-b:a', '192k', | |
| '-filter_complex', '[1:a]afade=t=in:st=0:d=0.5,afade=t=out:st=4.5:d=0.5[a]', | |
| '-map', '0:v', | |
| '-map', '[a]', | |
| '-shortest', | |
| output_path | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| if result.returncode == 0: | |
| os.remove(intro_path) | |
| os.remove(voiceover_path) | |
| return output_path | |
| except Exception as e: | |
| print(f"Voiceover generation failed: {e}") | |
| return intro_path | |
| def concatenate_videos(self, intro_path: str, main_path: str) -> str: | |
| """Concatenate intro + main video""" | |
| concat_list = tempfile.mktemp(suffix="_concat.txt") | |
| output_path = tempfile.mktemp(suffix="_combined.mp4") | |
| with open(concat_list, 'w') as f: | |
| f.write(f"file '{intro_path}'\n") | |
| f.write(f"file '{main_path}'\n") | |
| cmd = [ | |
| 'ffmpeg', '-y', | |
| '-f', 'concat', | |
| '-safe', '0', | |
| '-i', concat_list, | |
| '-c', 'copy', | |
| output_path | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| if result.returncode != 0: | |
| raise Exception(f"Video concatenation failed: {result.stderr}") | |
| os.remove(concat_list) | |
| return output_path | |
| # Initialize global instances | |
| wp_api = WordPressAPI(WORDPRESS_BASE_URL) | |
| music_gen = MusicGenerator() | |
| audio_processor = AudioProcessor() | |
| video_gen = VideoGenerator() | |
| def get_session(session_id: str) -> Dict[str, Any]: | |
| """Get or create user session""" | |
| if session_id not in user_sessions: | |
| user_sessions[session_id] = { | |
| 'jwt_token': None, | |
| 'user_info': None, | |
| 'credits': { | |
| 'remaining_credits': 0, | |
| 'free_samples_remaining': 0, | |
| 'total_credits': 0, | |
| 'used_credits': 0, | |
| 'free_samples_used': 0 | |
| }, | |
| 'video_credits': {}, | |
| 'generated_audio': None, | |
| 'processed_audio': None, | |
| 'metadata': {}, | |
| 'video_path': None, | |
| 'jingle_path': None | |
| } | |
| return user_sessions[session_id] | |
| def check_download_access(session_id): | |
| """Check if user has download access for their generated audio""" | |
| session = get_session(session_id) | |
| if not session['jwt_token']: | |
| return False, "Please log in first" | |
| if session['generated_audio'] is None: | |
| return False, "No track generated yet" | |
| # Get generation_id if available | |
| generation_id = session.get('metadata', {}).get('generation_id', None) | |
| try: | |
| response = requests.post( | |
| f"{WORDPRESS_BASE_URL}/wp-json/audio-api/v1/check-download-unlock", | |
| headers={ | |
| 'Authorization': f'Bearer {session["jwt_token"]}', | |
| 'Content-Type': 'application/json' | |
| }, | |
| json={'generation_id': generation_id}, | |
| timeout=10 | |
| ) | |
| if response.status_code == 200: | |
| data = response.json() | |
| return data.get('has_access', False), data.get('message', 'Check complete') | |
| else: | |
| return False, f"Error checking access: {response.status_code}" | |
| except Exception as e: | |
| return False, f"Error checking download access: {str(e)}" | |
| def purchase_download_unlock(session_id): | |
| """Purchase download unlock for $5""" | |
| session = get_session(session_id) | |
| if not session['jwt_token']: | |
| return "Please log in first", None | |
| if session['generated_audio'] is None: | |
| return "No track generated yet", None | |
| # Get generation_id if available | |
| generation_id = session.get('metadata', {}).get('generation_id', None) | |
| try: | |
| response = requests.post( | |
| f"{WORDPRESS_BASE_URL}/wp-json/audio-api/v1/purchase-download-unlock", | |
| headers={ | |
| 'Authorization': f'Bearer {session["jwt_token"]}', | |
| 'Content-Type': 'application/json' | |
| }, | |
| json={ | |
| 'generation_id': generation_id, | |
| 'unlock_type': 'single' | |
| }, | |
| timeout=10 | |
| ) | |
| if response.status_code == 200: | |
| data = response.json() | |
| if data.get('success'): | |
| return data.get('message', 'Checkout created'), data.get('checkout_url') | |
| else: | |
| return data.get('message', 'Purchase failed'), None | |
| else: | |
| return f"Error creating checkout: {response.status_code}", None | |
| except Exception as e: | |
| return f"Error purchasing unlock: {str(e)}", None | |
| def authenticate_user(jwt_token, user_id, session_id): | |
| """Authenticate user via JWT token from WordPress""" | |
| print(f"[AUTH DEBUG] authenticate_user called with jwt_token: {jwt_token[:50] if jwt_token else 'None'}...") | |
| print(f"[AUTH DEBUG] user_id: {user_id}, session_id: {session_id}") | |
| if not jwt_token or jwt_token == "null": | |
| print("[AUTH DEBUG] No JWT token provided") | |
| return ( | |
| "⚠️ Not Logged In", | |
| "Please log in to WordPress to use SongLab AI", | |
| "Login required to access generation features" | |
| ) | |
| try: | |
| # Decode JWT (without verification for client-side) | |
| print(f"[AUTH DEBUG] Decoding JWT...") | |
| decoded = jwt.decode(jwt_token, options={"verify_signature": False}) | |
| user_info = decoded.get('data', {}).get('user', {}) | |
| print(f"[AUTH DEBUG] Decoded user_info: {user_info}") | |
| # Store in session | |
| session = get_session(session_id) | |
| session['jwt_token'] = jwt_token | |
| session['user_info'] = user_info | |
| print(f"[AUTH DEBUG] Session updated with JWT") | |
| # Fetch credits (handle 403 gracefully) | |
| print(f"[AUTH DEBUG] Fetching user credits...") | |
| try: | |
| credits = wp_api.get_user_credits(jwt_token) | |
| print(f"[AUTH DEBUG] Credits received: {credits}") | |
| except Exception as credit_error: | |
| print(f"[AUTH DEBUG] Credit fetch failed (non-fatal): {credit_error}") | |
| credits = { | |
| 'remaining_credits': 0, | |
| 'free_samples_remaining': 0, | |
| 'total_credits': 0, | |
| 'used_credits': 0, | |
| 'free_samples_used': 0 | |
| } | |
| session['credits'] = credits | |
| # Fetch video credits (handle 403 gracefully) | |
| print(f"[AUTH DEBUG] Fetching video credits...") | |
| try: | |
| video_credits = wp_api.get_video_credits(jwt_token) | |
| print(f"[AUTH DEBUG] Video credits received: {video_credits}") | |
| except Exception as video_error: | |
| print(f"[AUTH DEBUG] Video credit fetch failed (non-fatal): {video_error}") | |
| video_credits = {'remaining_credits': 0, 'package_tier': 'free'} | |
| session['video_credits'] = video_credits | |
| display_name = user_info.get('display_name', 'User') | |
| credit_info = f"Audio: {credits['remaining_credits']} credits | {credits['free_samples_remaining']} free samples (30s) | Video: {video_credits.get('remaining_credits', 0)}" | |
| print(f"[AUTH DEBUG] Authentication successful for {display_name}") | |
| return ( | |
| f"✓ Logged in as {display_name}", | |
| credit_info, | |
| f"Welcome, {display_name}! Credits are charged per generation." | |
| ) | |
| except Exception as e: | |
| print(f"[AUTH DEBUG] Authentication failed with error: {type(e).__name__}: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| return ( | |
| "⚠️ Authentication Error", | |
| f"Error: {type(e).__name__} - {str(e)}", | |
| "Please refresh and try again" | |
| ) | |
| def format_prompt(genre: str, energy: str, tempo: int, description: str) -> str: | |
| """Format generation prompt""" | |
| return f"Genre: {genre}, Energy Level: {energy}, Tempo: {tempo} bpm, Description: {description}" | |
| def generate_track(genre, energy, tempo, description, duration, session_id, progress=gr.Progress()): | |
| """Generate music track""" | |
| session = get_session(session_id) | |
| if not session['jwt_token']: | |
| return None, "❌ Please log in first", "" | |
| if not description or len(description.strip()) < 10: | |
| return None, "❌ Please provide a detailed description (at least 10 characters)", "" | |
| try: | |
| prompt = format_prompt(genre, energy, tempo, description) | |
| sample_rate, audio_data = music_gen.generate_music(prompt, float(duration), progress) | |
| # Convert to AudioSegment for storage | |
| audio_bytes = io.BytesIO() | |
| audio_int = (audio_data * 32768).astype(np.int16) | |
| audio_seg = AudioSegment( | |
| audio_int.tobytes(), | |
| frame_rate=sample_rate, | |
| sample_width=2, | |
| channels=2 if len(audio_int.shape) > 1 else 1 | |
| ) | |
| audio_seg.export(audio_bytes, format="wav") | |
| # Store in session | |
| session['generated_audio'] = audio_seg | |
| session['metadata'] = { | |
| 'genre': genre, | |
| 'energy': energy, | |
| 'tempo': tempo, | |
| 'description': description, | |
| 'duration': duration, | |
| 'has_lyrics': False | |
| } | |
| # DEDUCT CREDIT/FREE SAMPLE AFTER SUCCESSFUL GENERATION | |
| try: | |
| user_id = session.get('user_info', {}).get('id') # Corrected to get user_id from user_info | |
| jwt_token = session.get('jwt_token') | |
| if user_id and jwt_token and user_id not in [1, 206]: # Skip for admin accounts | |
| print(f"[CREDIT] Deducting credit for normal track generation: user={user_id}, duration={duration}, has_lyrics=False") | |
| credit_used = wp_api.use_audio_credit( | |
| jwt_token, | |
| int(duration), | |
| has_lyrics=False, | |
| metadata={'type': 'normal_generation', 'duration': f'{duration}s'} | |
| ) | |
| if credit_used: | |
| print("[CREDIT] ✅ Credit/free sample deducted successfully") | |
| else: | |
| print("[CREDIT] ⚠️ Warning: Credit deduction failed (generation still successful)") | |
| except Exception as e: | |
| print(f"[CREDIT] Error deducting credit: {e}") | |
| # Don't fail the generation if credit deduction fails | |
| return (sample_rate, audio_data), "✓ Generation complete!", f"Generated {duration}s track: {genre} ({energy} energy, {tempo} bpm)" | |
| except Exception as e: | |
| return None, f"❌ Generation failed: {str(e)}", "" | |
| def generate_with_vocals(genre, energy, tempo, description, lyrics, duration, session_id, progress=gr.Progress()): | |
| """Generate track with vocals""" | |
| session = get_session(session_id) | |
| if not session['jwt_token']: | |
| return None, "❌ Please log in first", "" | |
| if not description or not lyrics: | |
| return None, "❌ Please provide both description and lyrics", "" | |
| try: | |
| prompt = format_prompt(genre, energy, tempo, description) | |
| sample_rate, audio_data = music_gen.generate_with_lyrics(prompt, lyrics, float(duration), progress) | |
| # Convert to AudioSegment | |
| audio_bytes = io.BytesIO() | |
| audio_int = (audio_data * 32768).astype(np.int16) | |
| audio_seg = AudioSegment( | |
| audio_int.tobytes(), | |
| frame_rate=sample_rate, | |
| sample_width=2, | |
| channels=2 if len(audio_int.shape) > 1 else 1 | |
| ) | |
| audio_seg.export(audio_bytes, format="wav") | |
| session['generated_audio'] = audio_seg | |
| session['metadata'] = { | |
| 'genre': genre, | |
| 'energy': energy, | |
| 'tempo': tempo, | |
| 'description': description, | |
| 'lyrics': lyrics, | |
| 'duration': duration, | |
| 'has_lyrics': True | |
| } | |
| # DEDUCT CREDIT/FREE SAMPLE AFTER SUCCESSFUL GENERATION | |
| try: | |
| user_id = session.get('user_info', {}).get('id') | |
| jwt_token = session.get('jwt_token') | |
| if user_id and jwt_token and user_id not in [1, 206]: # Skip for admin accounts | |
| print(f"[CREDIT] Deducting credit for lyrics generation: user={user_id}, duration={duration}, has_lyrics=True") | |
| credit_used = wp_api.use_audio_credit( | |
| jwt_token, | |
| int(duration), | |
| has_lyrics=True, | |
| metadata={'type': 'lyrics_generation', 'duration': f'{duration}s'} | |
| ) | |
| if credit_used: | |
| print("[CREDIT] ✅ Credit/free sample deducted successfully") | |
| else: | |
| print("[CREDIT] ⚠️ Warning: Credit deduction failed (generation still successful)") | |
| except Exception as e: | |
| print(f"[CREDIT] Error deducting credit: {e}") | |
| # Don't fail the generation if credit deduction fails | |
| return (sample_rate, audio_data), "✓ Generation with vocals complete!", f"Generated {duration}s track with vocals" | |
| except Exception as e: | |
| return None, f"❌ Generation failed: {str(e)}", "" | |
| def download_track(session_id): | |
| """Download track - requires $5 unlock for free samples""" | |
| session = get_session(session_id) | |
| if not session['jwt_token']: | |
| return None, "❌ Please log in first" | |
| if session['generated_audio'] is None: | |
| return None, "❌ No track generated yet" | |
| # Check if user has download access | |
| has_access, message = check_download_access(session_id) | |
| if not has_access: | |
| return None, f"🔒 {message}\n\nPay $5 to unlock download, or purchase credits for unlimited downloads." | |
| # Export full generated audio | |
| output_path = f"/tmp/songlab_track_{uuid.uuid4().hex}.wav" | |
| session['generated_audio'].export(output_path, format="wav") | |
| return output_path, f"✓ Downloaded track!" | |
| def unlock_download(session_id): | |
| """Handle unlock download button click""" | |
| message, checkout_url = purchase_download_unlock(session_id) | |
| if checkout_url: | |
| return f"✓ {message}\n\n**[Click here to complete payment]({checkout_url})**\n\nAfter payment, you can download your track!" | |
| else: | |
| return f"❌ {message}" | |
| def apply_effects(stereo, reverse, volume, pitch, session_id): | |
| """Apply audio effects""" | |
| session = get_session(session_id) | |
| if session['generated_audio'] is None: | |
| return None, "❌ No track generated yet" | |
| try: | |
| audio = session['generated_audio'] | |
| if stereo: | |
| audio = audio_processor.apply_stereo_effect(audio) | |
| if reverse: | |
| audio = audio_processor.apply_reverse(audio) | |
| if volume != 0: | |
| audio = audio_processor.adjust_volume(audio, volume) | |
| if pitch != 0: | |
| audio = audio_processor.change_pitch(audio, pitch) | |
| session['processed_audio'] = audio | |
| # Convert to numpy for playback | |
| samples = np.array(audio.get_array_of_samples()).astype(np.float32) / 32768.0 | |
| if audio.channels == 2: | |
| samples = samples.reshape((-1, 2)) | |
| return (audio.frame_rate, samples), "✓ Effects applied!" | |
| except Exception as e: | |
| return None, f"❌ Effect processing failed: {str(e)}" | |
| def generate_commercial_ad( | |
| mode, | |
| video_prompt, | |
| jingle_prompt, | |
| jingle_lyrics, | |
| uploaded_image, | |
| duration, | |
| fps, | |
| guidance_scale, | |
| video_guidance_scale, | |
| is_custom, | |
| add_intro, | |
| intro_prompt, | |
| company_name, | |
| session_id, | |
| progress=gr.Progress() | |
| ): | |
| """Generate commercial ad (video, jingle+video, or image-to-video)""" | |
| session = get_session(session_id) | |
| if not session['jwt_token']: | |
| return None, "❌ Please log in first" | |
| # Check eligibility (include is_custom mode to allow 2 free preset generations) | |
| eligibility = wp_api.check_video_eligibility(session['jwt_token'], is_custom_mode=is_custom) | |
| if not eligibility.get('can_generate', False): | |
| reason = eligibility.get('reason', 'unknown') | |
| free_remaining = eligibility.get('free_remaining', 0) | |
| credits = eligibility.get('credits', 0) | |
| error_msg = f"❌ Cannot generate video: {reason}" | |
| if free_remaining == 0 and credits == 0: | |
| error_msg += "\n\n💡 You have 1 FREE generation with PRESET mode (non-custom settings)!" | |
| error_msg += "\n📦 Or purchase credits at https://songlabai.com/credit-pricing/" | |
| return None, error_msg | |
| try: | |
| video_path = None | |
| jingle_path = None | |
| # Generate jingle if needed | |
| if mode == "jingle_video": | |
| if not jingle_prompt: | |
| return None, "❌ Please provide jingle description" | |
| progress(0.1, desc="Generating jingle...") | |
| # Generate jingle with retry logic | |
| max_retries = 5 | |
| retry_delay = 60 | |
| for attempt in range(max_retries): | |
| try: | |
| jingle_payload = { | |
| "inputs": {"prompt": jingle_prompt, "lyrics": jingle_lyrics or ""}, | |
| "parameters": { | |
| "duration": float(duration), | |
| "infer_step": 60, | |
| "guidance_scale": 15, | |
| "scheduler_type": "euler", | |
| "cfg_type": "apg", | |
| "omega_scale": 10 | |
| } | |
| } | |
| response = requests.post( | |
| LYRICS_API_URL, | |
| headers={"Authorization": f"Bearer {BEARER_TOKEN}", "Content-Type": "application/json"}, | |
| json=jingle_payload, | |
| timeout=180 | |
| ) | |
| if response.status_code == 200: | |
| jingle_path = tempfile.mktemp(suffix=".wav") | |
| with open(jingle_path, 'wb') as f: | |
| f.write(response.content) | |
| break | |
| elif response.status_code == 503 and attempt < max_retries - 1: | |
| progress(0.15, desc=f"Jingle service warming up... waiting {retry_delay}s") | |
| time.sleep(retry_delay) | |
| else: | |
| raise Exception(f"Jingle API error: HTTP {response.status_code}") | |
| except requests.exceptions.Timeout: | |
| if attempt < max_retries - 1: | |
| progress(0.15, desc=f"Timeout, retrying...") | |
| time.sleep(retry_delay) | |
| else: | |
| raise Exception("Jingle generation timed out") | |
| progress(0.3, desc="Jingle complete! Starting video...") | |
| # Warmup video endpoint in background | |
| warmup_thread = threading.Thread(target=video_gen.warmup_endpoint) | |
| warmup_thread.daemon = True | |
| warmup_thread.start() | |
| # Generate intro if Expert tier | |
| intro_path = None | |
| if add_intro and intro_prompt: | |
| progress(0.35, desc="Generating 5s intro...") | |
| intro_path = video_gen.generate_intro_with_voiceover(intro_prompt, company_name, progress) | |
| progress(0.5, desc="Intro complete! Generating main video...") | |
| # Generate main video | |
| if mode == "text_to_video" or mode == "jingle_video": | |
| if not video_prompt: | |
| return None, "❌ Please provide video description" | |
| video_path = video_gen.generate_video( | |
| mode="text_to_video", | |
| prompt=video_prompt, | |
| duration=float(duration), | |
| fps=fps, | |
| guidance_scale=guidance_scale, | |
| video_guidance_scale=video_guidance_scale, | |
| progress=progress | |
| ) | |
| elif mode == "image_to_video": | |
| if not uploaded_image: | |
| return None, "❌ Please upload an image" | |
| # Convert uploaded image to base64 | |
| image_bytes = uploaded_image.read() if hasattr(uploaded_image, 'read') else uploaded_image | |
| image_base64 = base64.b64encode(image_bytes).decode('utf-8') | |
| video_path = video_gen.generate_video( | |
| mode="image_to_video", | |
| prompt=video_prompt, | |
| duration=float(duration), | |
| fps=fps, | |
| guidance_scale=guidance_scale, | |
| video_guidance_scale=video_guidance_scale, | |
| image_base64=image_base64, | |
| progress=progress | |
| ) | |
| if not video_path: | |
| return None, "❌ Video generation failed" | |
| # Combine intro + main if needed | |
| if intro_path: | |
| progress(0.95, desc="Combining intro + main video...") | |
| video_path = video_gen.concatenate_videos(intro_path, video_path) | |
| # Merge jingle + video if needed | |
| if jingle_path: | |
| progress(0.98, desc="Merging jingle with video...") | |
| video_path = video_gen.merge_audio_video(video_path, jingle_path) | |
| # Deduct credit | |
| generation_data = { | |
| 'mode': mode, | |
| 'duration': duration, | |
| 'fps': fps, | |
| 'guidance_scale': guidance_scale, | |
| 'video_guidance_scale': video_guidance_scale, | |
| 'is_preset': not is_custom, | |
| 'is_expert_intro': add_intro, | |
| 'prompt': video_prompt | |
| } | |
| wp_api.use_video_credit(session['jwt_token'], generation_data) | |
| progress(1.0, desc="Complete!") | |
| return video_path, f"✓ {'Expert commercial' if add_intro else 'Commercial ad'} generated successfully!" | |
| except Exception as e: | |
| return None, f"❌ Generation failed: {str(e)}" | |
| def load_example_ad(example_name): | |
| """Load example commercial ad""" | |
| if example_name in EXAMPLE_COMMERCIAL_ADS: | |
| example = EXAMPLE_COMMERCIAL_ADS[example_name] | |
| return example["jingle"], example["lyrics"], example["video"] | |
| return "", "", "" | |
| # Custom CSS for Suno-style theming | |
| custom_css = """ | |
| /* Hide Gradio's built-in download button */ | |
| .download-link, | |
| a[download], | |
| .icon-button[aria-label="Download"], | |
| button[aria-label="Download"] { | |
| display: none !important; | |
| } | |
| /* Suno AI Dark Theme */ | |
| .gradio-container { | |
| font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, sans-serif !important; | |
| } | |
| /* Dark background */ | |
| body, .gradio-container { | |
| background-color: #212126 !important; | |
| } | |
| /* Card styling */ | |
| .gr-box, .gr-form, .gr-panel { | |
| background-color: var(--surface-color) !important; | |
| border: 1px solid #334155 !important; | |
| border-radius: 12px !important; | |
| box-shadow: 0 4px 6px rgba(0, 0, 0, 0.2) !important; | |
| } | |
| /* Input fields */ | |
| .gr-input, .gr-text-input, .gr-text-area, .gr-dropdown { | |
| background-color: #0f172a !important; | |
| border: 1px solid #334155 !important; | |
| border-radius: 8px !important; | |
| color: var(--text-primary) !important; | |
| padding: 10px 12px !important; | |
| } | |
| /* Buttons */ | |
| .gr-button { | |
| background: linear-gradient(135deg, var(--primary-color) 0%, var(--secondary-color) 100%) !important; | |
| border: none !important; | |
| border-radius: 8px !important; | |
| color: white !important; | |
| font-weight: 600 !important; | |
| padding: 12px 24px !important; | |
| transition: all 0.3s ease !important; | |
| } | |
| .gr-button:hover { | |
| transform: translateY(-2px) !important; | |
| box-shadow: 0 6px 12px rgba(99, 102, 241, 0.4) !important; | |
| } | |
| .gr-button-primary { | |
| background: linear-gradient(135deg, #10b981 0%, #059669 100%) !important; | |
| } | |
| .gr-button-secondary { | |
| background: linear-gradient(135deg, #f59e0b 0%, #d97706 100%) !important; | |
| } | |
| /* Sliders */ | |
| .gr-slider input[type="range"] { | |
| background: #3a3a3f !important; | |
| } | |
| .gr-slider input[type="range"]::-webkit-slider-thumb { | |
| background: #6366f1 !important; | |
| } | |
| /* Audio/Video players */ | |
| audio, video { | |
| width: 100% !important; | |
| background-color: #1a1a1e !important; | |
| border-radius: 8px !important; | |
| } | |
| /* Text colors */ | |
| .gr-label, .gr-text, .markdown-text { | |
| color: #ffffff !important; | |
| } | |
| /* Header styling */ | |
| h1, h2, h3 { | |
| color: #ffffff !important; | |
| font-weight: 700 !important; | |
| } | |
| /* Tabs */ | |
| .tab-nav button { | |
| background-color: #26262B !important; | |
| border: 1px solid #3a3a3f !important; | |
| color: #999 !important; | |
| border-radius: 8px 8px 0 0 !important; | |
| transition: all 0.3s ease !important; | |
| } | |
| .tab-nav button.selected { | |
| background-color: #6366f1 !important; | |
| color: white !important; | |
| border-bottom: 3px solid #6366f1 !important; | |
| } | |
| /* Loading animation */ | |
| @keyframes pulse { | |
| 0%, 100% { opacity: 1; } | |
| 50% { opacity: 0.5; } | |
| } | |
| .generating { | |
| animation: pulse 2s ease-in-out infinite; | |
| } | |
| """ | |
| def create_ui(): | |
| """Create the main Gradio interface""" | |
| with gr.Blocks(title="SongLab AI") as app: | |
| # Session state | |
| session_id = gr.State(value=str(uuid.uuid4())) | |
| # URL parameters for WordPress authentication | |
| # Using hidden textboxes instead of State because JS can update textboxes but not State | |
| jwt_token_param = gr.Textbox(value="", visible=False, elem_id="jwt_token_hidden") | |
| user_id_param = gr.Textbox(value="", visible=False, elem_id="user_id_hidden") | |
| # Header | |
| gr.Markdown("# 🎵 SongLab AI", elem_classes=["header"]) | |
| gr.Markdown("*Professional Music & Video Generation - Powered by AI*") | |
| # Authentication section | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| auth_status = gr.Markdown("⚠️ Checking authentication...") | |
| with gr.Column(scale=2): | |
| credit_display = gr.Markdown("Credits: Loading...") | |
| with gr.Column(scale=1): | |
| gr.HTML( | |
| '<a href="https://songlabai.com/credit-pricing/" target="_blank" ' | |
| 'style="display: inline-block; padding: 8px 16px; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); ' | |
| 'color: white; text-decoration: none; border-radius: 6px; font-weight: 600; text-align: center;">' | |
| '💳 Buy Credits</a>' | |
| ) | |
| auth_message = gr.Markdown("") | |
| # Main tabs | |
| with gr.Tabs() as tabs: | |
| # Tab 1: Music Generation | |
| with gr.Tab("🎵 Generate Music"): | |
| gr.Markdown("### Create Your Track") | |
| gr.Markdown("*🎁 1 free sample! Pay $5 to download or purchase credits for unlimited downloads.*") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| genre = gr.Dropdown( | |
| choices=[ | |
| "Pop", "Rock", "Hip-Hop", "Electronic", "Jazz", | |
| "Classical", "R&B", "Country", "Reggae", "Blues", | |
| "Metal", "Funk", "Soul", "Disco", "House", | |
| "Techno", "Ambient", "Lo-fi", "Trap", "Dubstep", | |
| "Indie", "Folk", "Latin", "Gospel", "Afrobeat" | |
| ], | |
| label="Genre", | |
| value="Pop" | |
| ) | |
| energy = gr.Radio( | |
| choices=["Low", "Medium", "High"], | |
| label="Energy Level", | |
| value="Medium" | |
| ) | |
| tempo = gr.Slider( | |
| minimum=40, | |
| maximum=200, | |
| value=120, | |
| step=1, | |
| label="Tempo (BPM)" | |
| ) | |
| duration = gr.Slider( | |
| minimum=15, | |
| maximum=140, | |
| value=30, | |
| step=5, | |
| label="Duration (seconds)" | |
| ) | |
| with gr.Column(scale=3): | |
| description = gr.Textbox( | |
| label="Track Description", | |
| placeholder="Describe your track in detail... (e.g., 'Upbeat summer pop song with catchy chorus and bright synths')", | |
| lines=4 | |
| ) | |
| generate_btn = gr.Button("🎵 Generate Track", variant="primary", size="lg") | |
| status_music = gr.Markdown("") | |
| info_music = gr.Markdown("") | |
| audio_output = gr.Audio(label="Generated Track", interactive=False) | |
| download_btn = gr.Button("⬇️ Download Track", variant="primary", size="lg") | |
| unlock_btn = gr.Button("🔓 Unlock Download - $5", variant="secondary", size="lg") | |
| download_file = gr.File(label="Your Download") | |
| download_status = gr.Markdown("") | |
| unlock_status = gr.Markdown("") | |
| # Tab 2: Music with Vocals | |
| with gr.Tab("🎤 Generate with Vocals"): | |
| gr.Markdown("### Create Track with Lyrics") | |
| gr.Markdown("*Add vocals and lyrics to your music*") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| genre_v = gr.Dropdown( | |
| choices=[ | |
| "Pop", "Rock", "Hip-Hop", "Electronic", "Jazz", | |
| "Classical", "R&B", "Country", "Reggae", "Blues", | |
| "Metal", "Funk", "Soul", "Disco", "House", | |
| "Techno", "Ambient", "Lo-fi", "Trap", "Dubstep", | |
| "Indie", "Folk", "Latin", "Gospel", "Afrobeat" | |
| ], | |
| label="Genre", | |
| value="Pop" | |
| ) | |
| energy_v = gr.Radio( | |
| choices=["Low", "Medium", "High"], | |
| label="Energy Level", | |
| value="Medium" | |
| ) | |
| tempo_v = gr.Slider( | |
| minimum=40, | |
| maximum=200, | |
| value=120, | |
| step=1, | |
| label="Tempo (BPM)" | |
| ) | |
| duration_v = gr.Slider( | |
| minimum=15, | |
| maximum=140, | |
| value=30, | |
| step=5, | |
| label="Duration (seconds)" | |
| ) | |
| with gr.Column(scale=3): | |
| description_v = gr.Textbox( | |
| label="Track Description", | |
| placeholder="Describe your track...", | |
| lines=3 | |
| ) | |
| lyrics_v = gr.Textbox( | |
| label="Lyrics", | |
| placeholder="Enter your lyrics here...\n\nVerse 1:\n...\n\nChorus:\n...", | |
| lines=8 | |
| ) | |
| generate_vocals_btn = gr.Button("🎤 Generate with Vocals", variant="primary", size="lg") | |
| status_vocals = gr.Markdown("") | |
| info_vocals = gr.Markdown("") | |
| audio_output_v = gr.Audio(label="Generated Track with Vocals", interactive=False) | |
| download_btn_v = gr.Button("⬇️ Download Track", variant="primary", size="lg") | |
| unlock_btn_v = gr.Button("🔓 Unlock Download - $5", variant="secondary", size="lg") | |
| download_file_v = gr.File(label="Your Download") | |
| download_status_v = gr.Markdown("") | |
| unlock_status_v = gr.Markdown("") | |
| # Tab 3: Audio Effects | |
| with gr.Tab("🎚️ Audio Effects"): | |
| gr.Markdown("### Process Your Track") | |
| gr.Markdown("*Apply effects to your generated music*") | |
| with gr.Row(): | |
| stereo_effect = gr.Checkbox(label="Stereo Widening", value=False) | |
| reverse_effect = gr.Checkbox(label="Reverse", value=False) | |
| with gr.Row(): | |
| volume_slider = gr.Slider( | |
| minimum=-20, | |
| maximum=20, | |
| value=0, | |
| step=1, | |
| label="Volume (dB)" | |
| ) | |
| pitch_slider = gr.Slider( | |
| minimum=-12, | |
| maximum=12, | |
| value=0, | |
| step=1, | |
| label="Pitch (semitones)" | |
| ) | |
| apply_effects_btn = gr.Button("✨ Apply Effects", variant="primary", size="lg") | |
| status_effects = gr.Markdown("") | |
| audio_processed = gr.Audio(label="Processed Track", interactive=False) | |
| reset_btn = gr.Button("🔄 Reset Effects") | |
| # Tab 4: Commercial Ad (VIDEO) | |
| with gr.Tab("🎬 Commercial Ad"): | |
| gr.Markdown("### Generate Professional Commercial Ads") | |
| gr.Markdown("*Create video ads with optional jingles*") | |
| # Mode selector | |
| ad_mode = gr.Radio( | |
| choices=["text_to_video", "image_to_video", "jingle_video"], | |
| label="Generation Mode", | |
| value="jingle_video", # Default to full ad with jingle | |
| info="Choose how to create your ad" | |
| ) | |
| # Example selector | |
| example_selector = gr.Dropdown( | |
| choices=list(EXAMPLE_COMMERCIAL_ADS.keys()), | |
| label="📋 Example Commercial Ads", | |
| value="Select an example..." | |
| ) | |
| use_example_btn = gr.Button("Use This Example") | |
| # Inputs (conditional based on mode) | |
| with gr.Row(): | |
| with gr.Column(): | |
| video_prompt_input = gr.Textbox( | |
| label="Video Description", | |
| placeholder="Describe your video scene...", | |
| lines=4 | |
| ) | |
| image_upload = gr.File( | |
| label="Upload Image (for Image-to-Video)", | |
| file_types=["image"], | |
| visible=False | |
| ) | |
| jingle_prompt_input = gr.Textbox( | |
| label="Jingle Description (for Jingle+Video)", | |
| placeholder="energetic corporate music, upbeat tempo, inspiring melody...", | |
| lines=3, | |
| visible=True # Visible by default | |
| ) | |
| jingle_lyrics_input = gr.Textbox( | |
| label="Jingle Lyrics (Optional)", | |
| placeholder="Your brand, your way, every single day...", | |
| lines=2, | |
| visible=True # Visible by default | |
| ) | |
| with gr.Column(): | |
| # Settings | |
| custom_mode = gr.Checkbox(label="🎨 Customize Settings (Requires Credits)", value=False) | |
| duration_video = gr.Slider( | |
| minimum=2, | |
| maximum=15, | |
| value=3, | |
| step=1, | |
| label="Duration (seconds)", | |
| interactive=False | |
| ) | |
| fps_selector = gr.Dropdown( | |
| choices=["Standard (16fps)"], | |
| label="Quality (FPS)", | |
| value="Standard (16fps)", | |
| interactive=False | |
| ) | |
| creativity_slider = gr.Slider( | |
| minimum=1.0, | |
| maximum=15.0, | |
| value=7.0, | |
| step=0.5, | |
| label="🎨 Creativity", | |
| interactive=False | |
| ) | |
| motion_slider = gr.Slider( | |
| minimum=1.0, | |
| maximum=10.0, | |
| value=5.0, | |
| step=0.5, | |
| label="⚡ Motion", | |
| interactive=False | |
| ) | |
| # Expert intro | |
| expert_intro_check = gr.Checkbox( | |
| label="🎬 Add 5s Intro with Voiceover (Expert Tier)", | |
| value=False, | |
| visible=False | |
| ) | |
| intro_prompt_input = gr.Textbox( | |
| label="Intro Description", | |
| placeholder="Logo on gradient background...", | |
| visible=False | |
| ) | |
| company_name_input = gr.Textbox( | |
| label="Company Name (for voiceover)", | |
| placeholder="YourBrand Inc.", | |
| visible=False | |
| ) | |
| generate_ad_btn = gr.Button("🎬 Generate Commercial Ad", variant="primary", size="lg") | |
| ad_status = gr.Markdown("") | |
| ad_video_output = gr.Video(label="Your Commercial Ad") | |
| download_ad_btn = gr.Button("⬇️ Download Ad (MP4)", variant="secondary") | |
| # Event handlers - Mode switcher for video | |
| def update_video_ui(mode): | |
| return { | |
| image_upload: gr.update(visible=(mode == "image_to_video")), | |
| jingle_prompt_input: gr.update(visible=(mode == "jingle_video")), | |
| jingle_lyrics_input: gr.update(visible=(mode == "jingle_video")) | |
| } | |
| ad_mode.change( | |
| fn=update_video_ui, | |
| inputs=[ad_mode], | |
| outputs=[image_upload, jingle_prompt_input, jingle_lyrics_input] | |
| ) | |
| # Custom mode toggle | |
| def update_custom_ui(is_custom): | |
| return { | |
| duration_video: gr.update(interactive=is_custom), | |
| fps_selector: gr.update(interactive=is_custom), | |
| creativity_slider: gr.update(interactive=is_custom), | |
| motion_slider: gr.update(interactive=is_custom) | |
| } | |
| custom_mode.change( | |
| fn=update_custom_ui, | |
| inputs=[custom_mode], | |
| outputs=[duration_video, fps_selector, creativity_slider, motion_slider] | |
| ) | |
| # Example loader | |
| use_example_btn.click( | |
| fn=load_example_ad, | |
| inputs=[example_selector], | |
| outputs=[jingle_prompt_input, jingle_lyrics_input, video_prompt_input] | |
| ) | |
| # Generate music | |
| generate_btn.click( | |
| fn=generate_track, | |
| inputs=[genre, energy, tempo, description, duration, session_id], | |
| outputs=[audio_output, status_music, info_music] | |
| ) | |
| # Generate with vocals | |
| generate_vocals_btn.click( | |
| fn=generate_with_vocals, | |
| inputs=[genre_v, energy_v, tempo_v, description_v, lyrics_v, duration_v, session_id], | |
| outputs=[audio_output_v, status_vocals, info_vocals] | |
| ) | |
| # Download handlers | |
| download_btn.click( | |
| fn=download_track, | |
| inputs=[session_id], | |
| outputs=[download_file, download_status] | |
| ) | |
| download_btn_v.click( | |
| fn=download_track, | |
| inputs=[session_id], | |
| outputs=[download_file_v, download_status_v] | |
| ) | |
| # Unlock download handlers | |
| unlock_btn.click( | |
| fn=unlock_download, | |
| inputs=[session_id], | |
| outputs=[unlock_status] | |
| ) | |
| unlock_btn_v.click( | |
| fn=unlock_download, | |
| inputs=[session_id], | |
| outputs=[unlock_status_v] | |
| ) | |
| # Effects | |
| apply_effects_btn.click( | |
| fn=apply_effects, | |
| inputs=[stereo_effect, reverse_effect, volume_slider, pitch_slider, session_id], | |
| outputs=[audio_processed, status_effects] | |
| ) | |
| reset_btn.click( | |
| fn=lambda: (False, False, 0, 0, None, "✓ Effects reset"), | |
| outputs=[stereo_effect, reverse_effect, volume_slider, pitch_slider, audio_processed, status_effects] | |
| ) | |
| # Generate commercial ad | |
| def generate_ad_wrapper(mode, video_prompt, jingle_prompt, jingle_lyrics, image, duration, fps_str, creativity, motion, is_custom, add_intro, intro_prompt, company_name, session_id, progress=gr.Progress()): | |
| # Parse FPS from string | |
| fps = 16 # default | |
| if "8" in fps_str: | |
| fps = 8 | |
| elif "24" in fps_str: | |
| fps = 24 | |
| return generate_commercial_ad( | |
| mode, video_prompt, jingle_prompt, jingle_lyrics, image, | |
| duration, fps, creativity, motion, is_custom, | |
| add_intro, intro_prompt, company_name, session_id, progress | |
| ) | |
| generate_ad_btn.click( | |
| fn=generate_ad_wrapper, | |
| inputs=[ | |
| ad_mode, video_prompt_input, jingle_prompt_input, jingle_lyrics_input, | |
| image_upload, duration_video, fps_selector, creativity_slider, motion_slider, | |
| custom_mode, expert_intro_check, intro_prompt_input, company_name_input, session_id | |
| ], | |
| outputs=[ad_video_output, ad_status] | |
| ) | |
| # Auto-authenticate on page load using URL parameters | |
| app.load( | |
| fn=authenticate_user, | |
| inputs=[jwt_token_param, user_id_param, session_id], | |
| outputs=[auth_status, credit_display, auth_message], | |
| js=""" | |
| function() { | |
| const urlParams = new URLSearchParams(window.location.search); | |
| const jwt = urlParams.get('jwt_token') || urlParams.get('token') || ''; | |
| const userId = urlParams.get('user_id') || ''; | |
| console.log('SongLab AI: Auto-authenticating with JWT:', jwt ? 'present' : 'missing'); | |
| console.log('SongLab AI: Extracted values - JWT length:', jwt.length, 'UserID:', userId); | |
| // CRITICAL: Return as array for Gradio to pass to Python function | |
| return [jwt, userId]; | |
| } | |
| """ | |
| ) | |
| return app | |
| if __name__ == "__main__": | |
| demo = create_ui() | |
| demo.queue(max_size=20) | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| show_error=True, | |
| theme=gr.themes.Base(), | |
| css=custom_css, | |
| footer_links=["gradio", "settings"] # Hide API docs link | |
| ) | |