Songlabaimain / app_gradio.py
Phoenixak99's picture
Update app_gradio.py
e707630 verified
"""
SongLab AI - Gradio Interface
Modern, minimalist music generation interface with WordPress integration
Includes: Music, Vocals, Video Generation, Audio Effects
"""
import os
import io
import json
import uuid
import base64
import time
import tempfile
import subprocess
import threading
from typing import Optional, Tuple, Dict, Any, List
from datetime import datetime
import gradio as gr
import requests
import numpy as np
from pydub import AudioSegment
from pydub.effects import low_pass_filter, high_pass_filter
from gradio_client import Client
import jwt
try:
from huggingface_hub import HfApi, upload_file
HF_AVAILABLE = True
except ImportError:
HF_AVAILABLE = False
try:
from gtts import gTTS
GTTS_AVAILABLE = True
except ImportError:
GTTS_AVAILABLE = False
# Configuration
WORDPRESS_BASE_URL = "https://songlabai.com"
LYRICS_API_URL = os.environ.get("LYRICS_API_URL")
BEARER_TOKEN = os.environ.get("BEARER_TOKEN")
# Video Configuration
VIDEO_ENDPOINT_URL = os.environ.get("VIDEO_ENDPOINT_URL", "")
VIDEO_HF_TOKEN = os.environ.get("VIDEO_HF_TOKEN", "")
VIDEO_REPO_ID = os.environ.get("VIDEO_REPO_ID", "Phoenixak99/savedvideos")
HF_TOKEN_DOWNLOAD = os.environ.get("HF_TOKEN_DOWNLOAD", "")
# Global state management
user_sessions = {}
# Example commercial ads
EXAMPLE_COMMERCIAL_ADS = {
"Select an example...": {"jingle": "", "lyrics": "", "video": ""},
"Luxury Car Launch": {
"jingle": "cinematic orchestral, powerful strings, epic drums, sophisticated and premium, rising crescendo",
"lyrics": "Drive the future, feel the power, elegance in motion, every hour",
"video": "Sleek black luxury sedan driving through mountain roads at golden hour, dramatic low angle shots, cinematic camera movements, reflections on polished surface, mist rolling through valleys"
},
"Coffee Shop Morning": {
"jingle": "acoustic guitar, warm and cozy, jazzy piano, uplifting morning vibes, folk pop",
"lyrics": "Morning starts with you, fresh and true, every sip a new day, brewing your way",
"video": "Steaming coffee cup on rustic wooden table by cafe window, warm morning sunlight streaming through, barista pouring latte art, cozy atmosphere with bokeh background"
},
"Fitness App Energy": {
"jingle": "electronic dance, energetic beat, motivational synths, upbeat tempo, powerful bass",
"lyrics": "Push your limits, break the wall, stronger faster, give your all",
"video": "Dynamic fitness montage with athletes training, slow motion sweat drops, vibrant gym lighting, rapid cuts between exercises, motivational energy"
},
"Gourmet Restaurant": {
"jingle": "smooth jazz, elegant piano, sophisticated strings, upscale dining ambiance",
"lyrics": "Taste perfection, pure delight, culinary art, every bite",
"video": "Close-up of gourmet dish being plated with artistic precision, chef's hands in motion, golden lighting on fine china, wine being poured, elegant restaurant atmosphere"
},
"Adventure Travel": {
"jingle": "world music fusion, adventurous drums, uplifting melodies, wanderlust vibes",
"lyrics": "Explore the world, discover more, adventure waits, through every door",
"video": "Epic landscape shots transitioning through mountains, beaches, cities, aerial drone views, backpacker silhouettes at sunset, dynamic travel montage"
},
"Tech Innovation": {
"jingle": "futuristic electronic, pulsing synths, modern tech sounds, innovative vibes",
"lyrics": "Future is here, innovation clear, technology near, progress we steer",
"video": "Sleek technology interfaces with holographic displays, circuit board macro shots, blue LED lighting, smooth product reveals, modern minimalist aesthetic"
},
"Summer Beach Party": {
"jingle": "tropical house, beach vibes, summer energy, catchy melody, feel-good beats",
"lyrics": "Sun and sand, fun at hand, summer days, ocean waves",
"video": "Beach party scenes with friends laughing, sunset over ocean, volleyball in slow motion, colorful beach umbrellas, festive summer atmosphere"
},
"Luxury Perfume": {
"jingle": "elegant orchestral, mysterious atmosphere, sophisticated strings, sensual rhythm",
"lyrics": "Essence of elegance, fragrance divine, timeless beauty, forever shine",
"video": "Perfume bottle rotating on reflective black surface, dramatic lighting with golden highlights, elegant hand reaching for bottle, misty atmospheric effects, luxury aesthetic"
}
}
class AudioProcessor:
"""Handles all audio processing operations"""
@staticmethod
def apply_stereo_effect(audio: AudioSegment, separation: int = 30) -> AudioSegment:
"""Apply stereo widening effect"""
if audio.channels == 1:
audio = audio.set_channels(2)
left = audio.split_to_mono()[0]
right = audio.split_to_mono()[1] if audio.channels == 2 else left
left_filtered = high_pass_filter(left, 200)
right_filtered = low_pass_filter(right, 8000)
return AudioSegment.from_mono_audiosegments(left_filtered, right_filtered)
@staticmethod
def apply_reverse(audio: AudioSegment) -> AudioSegment:
"""Reverse the audio"""
return audio.reverse()
@staticmethod
def adjust_volume(audio: AudioSegment, db_change: float) -> AudioSegment:
"""Adjust volume by dB"""
return audio + db_change
@staticmethod
def change_pitch(audio: AudioSegment, semitones: int) -> AudioSegment:
"""Change pitch by semitones"""
if semitones == 0:
return audio
new_sample_rate = int(audio.frame_rate * (2.0 ** (semitones / 12.0)))
return audio._spawn(audio.raw_data, overrides={'frame_rate': new_sample_rate}).set_frame_rate(audio.frame_rate)
@staticmethod
def trim_audio(audio: AudioSegment, duration_seconds: int) -> AudioSegment:
"""Trim audio to specified duration"""
return audio[:duration_seconds * 1000]
class WordPressAPI:
"""Handles all WordPress API interactions"""
def __init__(self, base_url: str):
self.base_url = base_url
def _get_headers(self, jwt_token: str) -> Dict[str, str]:
"""Generate API headers with JWT token"""
return {
"Authorization": f"Bearer {jwt_token}",
"Content-Type": "application/json",
"Cache-Control": "no-store"
}
def get_user_credits(self, jwt_token: str) -> Dict[str, int]:
"""Fetch user's audio credits from new audio API"""
try:
response = requests.get(
f"{self.base_url}/wp-json/audio-api/v1/audio-credits",
headers=self._get_headers(jwt_token),
timeout=10
)
# Don't use raise_for_status() - handle status codes manually
if response.status_code != 200:
print(f"[CREDIT DEBUG] API returned status {response.status_code}: {response.text}")
return {
'remaining_credits': 0,
'free_samples_remaining': 0,
'total_credits': 0,
'used_credits': 0,
'free_samples_used': 0
}
data = response.json()
print(f"[CREDIT DEBUG] Raw API response: {data}")
print(f"[CREDIT DEBUG] remaining_credits: {data.get('remaining_credits', 'MISSING')}")
print(f"[CREDIT DEBUG] free_samples_remaining: {data.get('free_samples_remaining', 'MISSING')}")
print(f"[CREDIT DEBUG] total_credits: {data.get('total_credits', 'MISSING')}")
# New API returns different structure
# IMPORTANT: Convert string values to int (API returns strings)
result = {
'remaining_credits': int(data.get('remaining_credits', 0)),
'free_samples_remaining': int(data.get('free_samples_remaining', 0)),
'total_credits': int(data.get('total_credits', 0)),
'used_credits': int(data.get('used_credits', 0)),
'free_samples_used': int(data.get('free_samples_used', 0))
}
print(f"[CREDIT DEBUG] Final parsed result: {result}")
return result
except Exception as e:
print(f"[CREDIT DEBUG] Error fetching credits: {e}")
import traceback
traceback.print_exc()
return {
'remaining_credits': 0,
'free_samples_remaining': 0,
'total_credits': 0,
'used_credits': 0,
'free_samples_used': 0
}
def check_audio_eligibility(self, jwt_token: str, duration: int, has_lyrics: bool = False) -> Dict[str, Any]:
"""Check if user can generate audio (includes 1 free sample)"""
try:
response = requests.post(
f"{self.base_url}/wp-json/audio-api/v1/check-eligibility",
headers=self._get_headers(jwt_token),
json={'duration': duration, 'has_lyrics': has_lyrics},
timeout=10
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error checking audio eligibility: {e}")
# On error, allow 1 free sample as fallback
return {
'can_generate': True,
'reason': 'error_fallback',
'uses_credit': False,
'free_samples_remaining': 1,
'remaining_credits': 0
}
def get_video_credits(self, jwt_token: str) -> Dict[str, Any]:
"""Fetch user's video credits"""
try:
response = requests.get(
f"{self.base_url}/wp-json/video-api/v1/video-credits",
headers=self._get_headers(jwt_token),
timeout=10
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error fetching video credits: {e}")
return {}
def check_video_eligibility(self, jwt_token: str, is_custom_mode: bool = False) -> Dict[str, Any]:
"""Check if user can generate videos (includes 1 free preset generation)"""
try:
# Get video credits
video_credits = self.get_video_credits(jwt_token)
if not video_credits:
# If API fails, allow 1 free generation as fallback
return {
'can_generate': True,
'free_remaining': 1,
'credits': 0,
'reason': 'free_generation'
}
# Check user ID for admin status
try:
decoded = jwt.decode(jwt_token, options={"verify_signature": False})
user_id = decoded.get('data', {}).get('user', {}).get('id')
is_admin = user_id in [1, 206] # Admin IDs
except:
is_admin = False
# Admins can always generate
if is_admin:
return {
'can_generate': True,
'free_remaining': 999,
'credits': 999,
'reason': 'admin'
}
free_remaining = video_credits.get('free_generations_remaining', 0)
credits_remaining = video_credits.get('remaining_credits', 0)
# PRESET MODE (Non-custom): Allow if free generations remain
if not is_custom_mode:
if free_remaining > 0:
return {
'can_generate': True,
'free_remaining': free_remaining,
'credits': credits_remaining,
'reason': 'free_preset'
}
# CUSTOM MODE or NO FREE GENERATIONS: Require credits
if credits_remaining > 0:
return {
'can_generate': True,
'free_remaining': free_remaining,
'credits': credits_remaining,
'reason': 'credits'
}
# No eligibility
return {
'can_generate': False,
'free_remaining': free_remaining,
'credits': credits_remaining,
'reason': 'no_credits_or_free_gens'
}
except Exception as e:
print(f"Error checking video eligibility: {e}")
# On error, allow 1 free generation as fallback
return {
'can_generate': True,
'free_remaining': 1,
'credits': 0,
'reason': 'error_fallback'
}
def use_video_credit(self, jwt_token: str, generation_data: Dict[str, Any]) -> bool:
"""Deduct video credit after successful generation"""
try:
response = requests.post(
f"{self.base_url}/wp-json/video-api/v1/use-video-credit",
headers=self._get_headers(jwt_token),
json=generation_data,
timeout=10
)
response.raise_for_status()
return True
except Exception as e:
print(f"Error using video credit: {e}")
return False
def use_audio_credit(self, jwt_token: str, duration: int, has_lyrics: bool = False, metadata: Dict[str, Any] = None) -> bool:
"""Deduct an audio credit or free sample based on duration"""
try:
payload = {
'duration': duration,
'has_lyrics': has_lyrics,
'metadata': metadata or {}
}
response = requests.post(
f"{self.base_url}/wp-json/audio-api/v1/use-audio-credit",
headers=self._get_headers(jwt_token),
json=payload,
timeout=10
)
response.raise_for_status()
return response.json().get('success', False)
except Exception as e:
print(f"Error using credit: {e}")
return False
def upload_track(self, jwt_token: str, audio_bytes: bytes, metadata: Dict[str, Any]) -> bool:
"""Upload track to WordPress"""
try:
# Decode JWT to get user info
decoded = jwt.decode(jwt_token, options={"verify_signature": False})
user_info = decoded.get('data', {}).get('user', {})
files = {'audio_file': ('track.wav', audio_bytes, 'audio/wav')}
data = {
'user_id': str(user_info.get('id', '')),
'user_email': user_info.get('user_email', ''),
'user_name': user_info.get('display_name', ''),
'subscription_tier': 'free',
'saved_by_user': '1',
**metadata
}
response = requests.post(
f"{self.base_url}/wp-admin/admin-ajax.php?action=upload_audio_direct",
files=files,
data=data,
headers={"Authorization": f"Bearer {jwt_token}"},
timeout=30
)
response.raise_for_status()
return True
except Exception as e:
print(f"Error uploading track: {e}")
return False
class MusicGenerator:
"""Handles music generation via MusicGen"""
def __init__(self):
self.client = None
self.space_name = "Healthydater/musicgen"
def get_client(self) -> Client:
"""Get or create MusicGen client with retry logic"""
if self.client is None:
for attempt in range(3):
try:
self.client = Client(self.space_name)
return self.client
except Exception as e:
if attempt < 2:
time.sleep(120)
else:
raise Exception(f"Failed to connect to MusicGen: {e}")
return self.client
def generate_music(self, prompt, duration, progress=gr.Progress()):
"""Generate music from prompt"""
max_retries = 3
retry_delay = 180 # 3 minutes for cold start
for attempt in range(max_retries):
try:
progress(0.1 + (attempt * 0.05), desc=f"Connecting to MusicGen (attempt {attempt + 1}/{max_retries})...")
client = self.get_client()
progress(0.3, desc="Generating music...")
result = client.predict(
prompt=prompt,
duration=duration,
temperature=1.0,
top_k=250,
top_p=0.0,
cfg_coef=3.0,
use_sampling=True,
extend_stride=18.0,
api_name="/generate_music"
)
progress(0.9, desc="Processing audio...")
# Handle different response formats
print(f"[DEBUG] MusicGen result type: {type(result)}")
# Format 1: Tuple (sample_rate, audio_array or dict)
if isinstance(result, tuple) and len(result) == 2:
sample_rate, samples = result
# Samples might be numpy array directly
if isinstance(samples, np.ndarray):
print(f"[DEBUG] Tuple with numpy array")
return sample_rate, samples
# OR samples might be a dict with audio data
elif isinstance(samples, dict):
print(f"[DEBUG] Tuple with dict, keys: {samples.keys()}")
if 'error' in samples:
raise Exception(f"API error: {samples['error']}")
# Extract audio from dict
if 'generated_audio' in samples:
audio_list = samples['generated_audio']
actual_sample_rate = samples.get('sample_rate', sample_rate)
audio_array = np.array(audio_list, dtype=np.float32)
print(f"[DEBUG] Extracted audio: {len(audio_array)} samples at {actual_sample_rate}Hz")
return actual_sample_rate, audio_array
else:
raise Exception(f"Dict in tuple missing 'generated_audio'. Keys: {list(samples.keys())}")
else:
raise Exception(f"Expected numpy array or dict in tuple, got: {type(samples)}")
# Format 2: Dict with audio data (no tuple)
elif isinstance(result, dict):
print(f"[DEBUG] Direct dict, keys: {result.keys()}")
# Check for error
if 'error' in result:
raise Exception(f"API error: {result['error']}")
# Extract audio from dict
if 'generated_audio' in result and 'sample_rate' in result:
audio_list = result['generated_audio']
sample_rate = result['sample_rate']
samples = np.array(audio_list, dtype=np.float32)
return sample_rate, samples
else:
raise Exception(f"Unexpected dict format. Keys: {list(result.keys())}")
# Format 3: File path (fallback)
elif isinstance(result, str):
print(f"[DEBUG] File path result")
audio = AudioSegment.from_file(result)
samples = np.array(audio.get_array_of_samples()).astype(np.float32) / 32768.0
if audio.channels == 2:
samples = samples.reshape((-1, 2))
return audio.frame_rate, samples
else:
raise Exception(f"Unexpected result format: {type(result)}")
except (requests.exceptions.Timeout, requests.exceptions.ReadTimeout, TimeoutError) as e:
error_msg = str(e)
if attempt < max_retries - 1:
progress(0.2, desc=f"Timeout - model loading... retrying in {retry_delay}s")
time.sleep(retry_delay)
continue
else:
raise Exception(f"Generation timed out after {max_retries} attempts. Model may be starting up - please try again in a few minutes.")
except Exception as e:
error_msg = str(e)
# Check if it's a 503 or loading error
if "503" in error_msg or "loading" in error_msg.lower() or "starting" in error_msg.lower():
if attempt < max_retries - 1:
progress(0.2, desc=f"Model loading... retrying in {retry_delay}s")
time.sleep(retry_delay)
continue
# Not a loading error - raise it
raise Exception(f"Generation failed: {str(e)}")
def generate_with_lyrics(self, prompt, lyrics, duration, progress=gr.Progress()):
"""Generate music with vocals/lyrics"""
if not LYRICS_API_URL or not BEARER_TOKEN:
raise Exception("Lyrics API not configured")
try:
progress(0.1, desc="Preparing lyrics generation...")
payload = {
"inputs": {
"prompt": prompt,
"lyrics": lyrics
},
"parameters": {
"duration": duration,
"infer_step": 60,
"guidance_scale": 15,
"scheduler_type": "euler",
"guidance_interval": 0.5,
"guidance_interval_decay": 0.0,
"min_guidance_scale": 3.0,
"use_erg_tag": True,
"use_erg_lyric": False,
"use_erg_diffusion": True
}
}
headers = {
"Authorization": f"Bearer {BEARER_TOKEN}",
"Content-Type": "application/json"
}
# Retry logic for 503 errors (endpoint warming up)
max_retries = 5
retry_delay = 180 # Start with 3 minutes for cold start
for attempt in range(max_retries):
try:
wait_time = retry_delay if attempt == 0 else retry_delay // 2 # 3min first, then 90s
progress(0.2 + (attempt * 0.1), desc=f"Generating with vocals (attempt {attempt + 1}/{max_retries})...")
response = requests.post(
LYRICS_API_URL,
headers=headers,
json=payload,
timeout=300
)
# Handle 503 - endpoint warming up
if response.status_code == 503:
if attempt < max_retries - 1:
progress(0.3, desc=f"Endpoint warming up... retrying in {wait_time}s")
time.sleep(wait_time)
continue
else:
raise Exception("Lyrics endpoint unavailable after all retries. Please try again in a few minutes.")
response.raise_for_status()
# The jingles handler returns JSON with base64 encoded audio
content_type = response.headers.get('Content-Type', '')
if 'application/json' in content_type:
# JSON response with base64 audio
json_data = response.json()
if 'error' in json_data:
raise Exception(f"API error: {json_data['error']}")
if 'audio' not in json_data:
raise Exception(f"Invalid JSON response: missing 'audio' field. Keys: {list(json_data.keys())}")
# Decode base64 audio
audio_base64 = json_data['audio']
audio_bytes = base64.b64decode(audio_base64)
# Verify audio bytes are valid
if len(audio_bytes) < 1000:
raise Exception("Received invalid audio data (too small)")
elif 'audio' in content_type or 'octet-stream' in content_type:
# Raw audio bytes (old format)
audio_bytes = response.content
if len(audio_bytes) < 1000:
raise Exception("Received invalid audio data (too small)")
else:
raise Exception(f"Invalid response from API. Expected audio or JSON, got: {content_type}")
progress(0.9, desc="Processing audio...")
audio = AudioSegment.from_file(io.BytesIO(audio_bytes))
samples = np.array(audio.get_array_of_samples()).astype(np.float32) / 32768.0
if audio.channels == 2:
samples = samples.reshape((-1, 2))
return audio.frame_rate, samples
except requests.exceptions.Timeout:
if attempt < max_retries - 1:
progress(0.3, desc=f"Request timeout, retrying in {wait_time}s...")
time.sleep(wait_time)
continue
else:
raise Exception("Request timed out after all retries")
except Exception as e:
raise Exception(f"Lyrics generation failed: {str(e)}")
class VideoGenerator:
"""Handles video generation with retry logic"""
def __init__(self):
self.api = HfApi() if HF_AVAILABLE else None
def calculate_temp(self, duration: float, fps: int) -> int:
"""Calculate temp value based on duration and FPS"""
# Reference: 31 temp = 10s at 24fps (240 frames)
return int((duration * fps * 31) / 240)
def warmup_endpoint(self):
"""Warmup video endpoint (non-blocking)"""
try:
warmup_payload = {
"inputs": {
"prompt": "warmup",
"mode": "text_to_video",
"width": 640,
"height": 384,
"temp": 16,
"guidance_scale": 7.0,
"video_guidance_scale": 5.0,
"num_inference_steps": [5, 5, 5],
"video_num_inference_steps": [3, 3, 3],
"fps": 24
}
}
headers = {
"Authorization": f"Bearer {VIDEO_HF_TOKEN}",
"Content-Type": "application/json"
}
requests.post(VIDEO_ENDPOINT_URL, headers=headers, json=warmup_payload, timeout=10)
except:
pass # Silent fail
def generate_video(self, mode, prompt, duration, fps,
guidance_scale, video_guidance_scale,
image_base64: Optional[str] = None, progress=gr.Progress()):
"""Generate video with retry logic"""
if not VIDEO_ENDPOINT_URL or not VIDEO_HF_TOKEN:
raise Exception("Video generation not configured")
temp_value = self.calculate_temp(duration, fps)
payload = {
"inputs": {
"prompt": prompt,
"mode": mode,
"width": 1280,
"height": 768,
"temp": temp_value,
"guidance_scale": guidance_scale,
"video_guidance_scale": video_guidance_scale,
"num_inference_steps": [20, 20, 20],
"video_num_inference_steps": [10, 10, 10],
"fps": fps
}
}
if mode == "image_to_video" and image_base64:
payload["inputs"]["image"] = image_base64
headers = {
"Authorization": f"Bearer {VIDEO_HF_TOKEN}",
"Content-Type": "application/json"
}
# Send async request - don't wait for response, poll dataset instead
# Video generation happens in background and uploads to dataset
progress(0.1, desc="Sending video generation request...")
try:
# Fire and forget - use short timeout since we'll poll the dataset
response = requests.post(
VIDEO_ENDPOINT_URL,
headers=headers,
json=payload,
timeout=5 # Short timeout - just to send the request
)
if response.status_code == 503:
progress(0.15, desc="Endpoint warming up, waiting 60s...")
time.sleep(60)
# Try one more time
response = requests.post(VIDEO_ENDPOINT_URL, headers=headers, json=payload, timeout=5)
except requests.exceptions.Timeout:
# Expected - request sent, video generating in background
progress(0.2, desc="Request sent, monitoring dataset...")
pass
except Exception as e:
# Non-timeout error - still proceed to monitor in case request went through
print(f"Request error (non-fatal): {e}")
progress(0.2, desc="Monitoring dataset for video...")
# Monitor for video completion by polling the dataset
return self.monitor_video_completion(duration, fps, progress)
def monitor_video_completion(self, duration, fps, progress=gr.Progress()):
"""Monitor HuggingFace dataset for video completion"""
if not self.api or not HF_TOKEN_DOWNLOAD:
raise Exception("HuggingFace API not available")
# Get initial file count
try:
files = self.api.list_repo_files(
repo_id=VIDEO_REPO_ID,
repo_type="dataset",
token=HF_TOKEN_DOWNLOAD
)
initial_count = len([f for f in files if f.endswith('.mp4')])
except Exception as e:
raise Exception(f"Failed to access video repository: {e}")
# Calculate timeout
estimated_minutes = (duration * fps) / 16
max_wait_seconds = int((estimated_minutes + 3) * 60 * 1.2)
check_interval = 10
start_time = time.time()
progress(0.3, desc=f"Generating video (est: ~{int(estimated_minutes)} min)...")
while True:
elapsed = time.time() - start_time
if elapsed > max_wait_seconds:
raise Exception(f"Video generation timeout ({int(max_wait_seconds/60)} minutes)")
# Update progress
progress_pct = min(0.3 + (elapsed / (max_wait_seconds * 0.95)) * 0.6, 0.9)
progress(progress_pct, desc=f"Generating... {elapsed/60:.1f} min elapsed")
# Check for new video
try:
files = self.api.list_repo_files(
repo_id=VIDEO_REPO_ID,
repo_type="dataset",
token=HF_TOKEN_DOWNLOAD
)
video_files = [f for f in files if f.endswith('.mp4')]
if len(video_files) > initial_count:
# New video detected
latest_video = sorted(video_files)[-1]
video_url = f"https://huggingface.co/datasets/{VIDEO_REPO_ID}/resolve/main/{latest_video}"
progress(0.95, desc="Downloading video...")
# Download video
download_headers = {"Authorization": f"Bearer {HF_TOKEN_DOWNLOAD}"}
video_response = requests.get(video_url, headers=download_headers, timeout=120)
if video_response.status_code == 200:
# Save to temp file
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as f:
f.write(video_response.content)
progress(1.0, desc="Video complete!")
return f.name
raise Exception("Failed to download video")
except Exception as e:
print(f"Error checking for video: {e}")
time.sleep(check_interval)
def merge_audio_video(self, video_path: str, audio_path: str) -> str:
"""Merge jingle audio with video using FFmpeg"""
output_path = tempfile.mktemp(suffix="_final.mp4")
cmd = [
"ffmpeg", "-y",
"-i", video_path,
"-i", audio_path,
"-c:v", "copy",
"-c:a", "aac",
"-b:a", "192k",
"-shortest",
output_path
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise Exception(f"FFmpeg merge failed: {result.stderr}")
return output_path
def generate_intro_with_voiceover(self, intro_prompt, company_name,
progress=gr.Progress()):
"""Generate 5-second intro with voiceover"""
# Generate 5s intro video at 8fps
progress(0.1, desc="Generating intro video...")
intro_path = self.generate_video(
mode="text_to_video",
prompt=intro_prompt,
duration=5.0,
fps=8,
guidance_scale=7.0,
video_guidance_scale=5.0,
progress=progress
)
if not intro_path or not GTTS_AVAILABLE or not company_name:
return intro_path
# Generate voiceover
try:
progress(0.8, desc="Adding voiceover...")
tts = gTTS(text=company_name, lang='en', slow=False)
voiceover_path = tempfile.mktemp(suffix="_voice.mp3")
tts.save(voiceover_path)
# Merge intro + voiceover
output_path = tempfile.mktemp(suffix="_intro_final.mp4")
cmd = [
'ffmpeg', '-y',
'-i', intro_path,
'-i', voiceover_path,
'-c:v', 'copy',
'-c:a', 'aac',
'-b:a', '192k',
'-filter_complex', '[1:a]afade=t=in:st=0:d=0.5,afade=t=out:st=4.5:d=0.5[a]',
'-map', '0:v',
'-map', '[a]',
'-shortest',
output_path
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
os.remove(intro_path)
os.remove(voiceover_path)
return output_path
except Exception as e:
print(f"Voiceover generation failed: {e}")
return intro_path
def concatenate_videos(self, intro_path: str, main_path: str) -> str:
"""Concatenate intro + main video"""
concat_list = tempfile.mktemp(suffix="_concat.txt")
output_path = tempfile.mktemp(suffix="_combined.mp4")
with open(concat_list, 'w') as f:
f.write(f"file '{intro_path}'\n")
f.write(f"file '{main_path}'\n")
cmd = [
'ffmpeg', '-y',
'-f', 'concat',
'-safe', '0',
'-i', concat_list,
'-c', 'copy',
output_path
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise Exception(f"Video concatenation failed: {result.stderr}")
os.remove(concat_list)
return output_path
# Initialize global instances
wp_api = WordPressAPI(WORDPRESS_BASE_URL)
music_gen = MusicGenerator()
audio_processor = AudioProcessor()
video_gen = VideoGenerator()
def get_session(session_id: str) -> Dict[str, Any]:
"""Get or create user session"""
if session_id not in user_sessions:
user_sessions[session_id] = {
'jwt_token': None,
'user_info': None,
'credits': {
'remaining_credits': 0,
'free_samples_remaining': 0,
'total_credits': 0,
'used_credits': 0,
'free_samples_used': 0
},
'video_credits': {},
'generated_audio': None,
'processed_audio': None,
'metadata': {},
'video_path': None,
'jingle_path': None
}
return user_sessions[session_id]
def check_download_access(session_id):
"""Check if user has download access for their generated audio"""
session = get_session(session_id)
if not session['jwt_token']:
return False, "Please log in first"
if session['generated_audio'] is None:
return False, "No track generated yet"
# Get generation_id if available
generation_id = session.get('metadata', {}).get('generation_id', None)
try:
response = requests.post(
f"{WORDPRESS_BASE_URL}/wp-json/audio-api/v1/check-download-unlock",
headers={
'Authorization': f'Bearer {session["jwt_token"]}',
'Content-Type': 'application/json'
},
json={'generation_id': generation_id},
timeout=10
)
if response.status_code == 200:
data = response.json()
return data.get('has_access', False), data.get('message', 'Check complete')
else:
return False, f"Error checking access: {response.status_code}"
except Exception as e:
return False, f"Error checking download access: {str(e)}"
def purchase_download_unlock(session_id):
"""Purchase download unlock for $5"""
session = get_session(session_id)
if not session['jwt_token']:
return "Please log in first", None
if session['generated_audio'] is None:
return "No track generated yet", None
# Get generation_id if available
generation_id = session.get('metadata', {}).get('generation_id', None)
try:
response = requests.post(
f"{WORDPRESS_BASE_URL}/wp-json/audio-api/v1/purchase-download-unlock",
headers={
'Authorization': f'Bearer {session["jwt_token"]}',
'Content-Type': 'application/json'
},
json={
'generation_id': generation_id,
'unlock_type': 'single'
},
timeout=10
)
if response.status_code == 200:
data = response.json()
if data.get('success'):
return data.get('message', 'Checkout created'), data.get('checkout_url')
else:
return data.get('message', 'Purchase failed'), None
else:
return f"Error creating checkout: {response.status_code}", None
except Exception as e:
return f"Error purchasing unlock: {str(e)}", None
def authenticate_user(jwt_token, user_id, session_id):
"""Authenticate user via JWT token from WordPress"""
print(f"[AUTH DEBUG] authenticate_user called with jwt_token: {jwt_token[:50] if jwt_token else 'None'}...")
print(f"[AUTH DEBUG] user_id: {user_id}, session_id: {session_id}")
if not jwt_token or jwt_token == "null":
print("[AUTH DEBUG] No JWT token provided")
return (
"⚠️ Not Logged In",
"Please log in to WordPress to use SongLab AI",
"Login required to access generation features"
)
try:
# Decode JWT (without verification for client-side)
print(f"[AUTH DEBUG] Decoding JWT...")
decoded = jwt.decode(jwt_token, options={"verify_signature": False})
user_info = decoded.get('data', {}).get('user', {})
print(f"[AUTH DEBUG] Decoded user_info: {user_info}")
# Store in session
session = get_session(session_id)
session['jwt_token'] = jwt_token
session['user_info'] = user_info
print(f"[AUTH DEBUG] Session updated with JWT")
# Fetch credits (handle 403 gracefully)
print(f"[AUTH DEBUG] Fetching user credits...")
try:
credits = wp_api.get_user_credits(jwt_token)
print(f"[AUTH DEBUG] Credits received: {credits}")
except Exception as credit_error:
print(f"[AUTH DEBUG] Credit fetch failed (non-fatal): {credit_error}")
credits = {
'remaining_credits': 0,
'free_samples_remaining': 0,
'total_credits': 0,
'used_credits': 0,
'free_samples_used': 0
}
session['credits'] = credits
# Fetch video credits (handle 403 gracefully)
print(f"[AUTH DEBUG] Fetching video credits...")
try:
video_credits = wp_api.get_video_credits(jwt_token)
print(f"[AUTH DEBUG] Video credits received: {video_credits}")
except Exception as video_error:
print(f"[AUTH DEBUG] Video credit fetch failed (non-fatal): {video_error}")
video_credits = {'remaining_credits': 0, 'package_tier': 'free'}
session['video_credits'] = video_credits
display_name = user_info.get('display_name', 'User')
credit_info = f"Audio: {credits['remaining_credits']} credits | {credits['free_samples_remaining']} free samples (30s) | Video: {video_credits.get('remaining_credits', 0)}"
print(f"[AUTH DEBUG] Authentication successful for {display_name}")
return (
f"✓ Logged in as {display_name}",
credit_info,
f"Welcome, {display_name}! Credits are charged per generation."
)
except Exception as e:
print(f"[AUTH DEBUG] Authentication failed with error: {type(e).__name__}: {str(e)}")
import traceback
traceback.print_exc()
return (
"⚠️ Authentication Error",
f"Error: {type(e).__name__} - {str(e)}",
"Please refresh and try again"
)
def format_prompt(genre: str, energy: str, tempo: int, description: str) -> str:
"""Format generation prompt"""
return f"Genre: {genre}, Energy Level: {energy}, Tempo: {tempo} bpm, Description: {description}"
def generate_track(genre, energy, tempo, description, duration, session_id, progress=gr.Progress()):
"""Generate music track"""
session = get_session(session_id)
if not session['jwt_token']:
return None, "❌ Please log in first", ""
if not description or len(description.strip()) < 10:
return None, "❌ Please provide a detailed description (at least 10 characters)", ""
try:
prompt = format_prompt(genre, energy, tempo, description)
sample_rate, audio_data = music_gen.generate_music(prompt, float(duration), progress)
# Convert to AudioSegment for storage
audio_bytes = io.BytesIO()
audio_int = (audio_data * 32768).astype(np.int16)
audio_seg = AudioSegment(
audio_int.tobytes(),
frame_rate=sample_rate,
sample_width=2,
channels=2 if len(audio_int.shape) > 1 else 1
)
audio_seg.export(audio_bytes, format="wav")
# Store in session
session['generated_audio'] = audio_seg
session['metadata'] = {
'genre': genre,
'energy': energy,
'tempo': tempo,
'description': description,
'duration': duration,
'has_lyrics': False
}
# DEDUCT CREDIT/FREE SAMPLE AFTER SUCCESSFUL GENERATION
try:
user_id = session.get('user_info', {}).get('id') # Corrected to get user_id from user_info
jwt_token = session.get('jwt_token')
if user_id and jwt_token and user_id not in [1, 206]: # Skip for admin accounts
print(f"[CREDIT] Deducting credit for normal track generation: user={user_id}, duration={duration}, has_lyrics=False")
credit_used = wp_api.use_audio_credit(
jwt_token,
int(duration),
has_lyrics=False,
metadata={'type': 'normal_generation', 'duration': f'{duration}s'}
)
if credit_used:
print("[CREDIT] ✅ Credit/free sample deducted successfully")
else:
print("[CREDIT] ⚠️ Warning: Credit deduction failed (generation still successful)")
except Exception as e:
print(f"[CREDIT] Error deducting credit: {e}")
# Don't fail the generation if credit deduction fails
return (sample_rate, audio_data), "✓ Generation complete!", f"Generated {duration}s track: {genre} ({energy} energy, {tempo} bpm)"
except Exception as e:
return None, f"❌ Generation failed: {str(e)}", ""
def generate_with_vocals(genre, energy, tempo, description, lyrics, duration, session_id, progress=gr.Progress()):
"""Generate track with vocals"""
session = get_session(session_id)
if not session['jwt_token']:
return None, "❌ Please log in first", ""
if not description or not lyrics:
return None, "❌ Please provide both description and lyrics", ""
try:
prompt = format_prompt(genre, energy, tempo, description)
sample_rate, audio_data = music_gen.generate_with_lyrics(prompt, lyrics, float(duration), progress)
# Convert to AudioSegment
audio_bytes = io.BytesIO()
audio_int = (audio_data * 32768).astype(np.int16)
audio_seg = AudioSegment(
audio_int.tobytes(),
frame_rate=sample_rate,
sample_width=2,
channels=2 if len(audio_int.shape) > 1 else 1
)
audio_seg.export(audio_bytes, format="wav")
session['generated_audio'] = audio_seg
session['metadata'] = {
'genre': genre,
'energy': energy,
'tempo': tempo,
'description': description,
'lyrics': lyrics,
'duration': duration,
'has_lyrics': True
}
# DEDUCT CREDIT/FREE SAMPLE AFTER SUCCESSFUL GENERATION
try:
user_id = session.get('user_info', {}).get('id')
jwt_token = session.get('jwt_token')
if user_id and jwt_token and user_id not in [1, 206]: # Skip for admin accounts
print(f"[CREDIT] Deducting credit for lyrics generation: user={user_id}, duration={duration}, has_lyrics=True")
credit_used = wp_api.use_audio_credit(
jwt_token,
int(duration),
has_lyrics=True,
metadata={'type': 'lyrics_generation', 'duration': f'{duration}s'}
)
if credit_used:
print("[CREDIT] ✅ Credit/free sample deducted successfully")
else:
print("[CREDIT] ⚠️ Warning: Credit deduction failed (generation still successful)")
except Exception as e:
print(f"[CREDIT] Error deducting credit: {e}")
# Don't fail the generation if credit deduction fails
return (sample_rate, audio_data), "✓ Generation with vocals complete!", f"Generated {duration}s track with vocals"
except Exception as e:
return None, f"❌ Generation failed: {str(e)}", ""
def download_track(session_id):
"""Download track - requires $5 unlock for free samples"""
session = get_session(session_id)
if not session['jwt_token']:
return None, "❌ Please log in first"
if session['generated_audio'] is None:
return None, "❌ No track generated yet"
# Check if user has download access
has_access, message = check_download_access(session_id)
if not has_access:
return None, f"🔒 {message}\n\nPay $5 to unlock download, or purchase credits for unlimited downloads."
# Export full generated audio
output_path = f"/tmp/songlab_track_{uuid.uuid4().hex}.wav"
session['generated_audio'].export(output_path, format="wav")
return output_path, f"✓ Downloaded track!"
def unlock_download(session_id):
"""Handle unlock download button click"""
message, checkout_url = purchase_download_unlock(session_id)
if checkout_url:
return f"✓ {message}\n\n**[Click here to complete payment]({checkout_url})**\n\nAfter payment, you can download your track!"
else:
return f"❌ {message}"
def apply_effects(stereo, reverse, volume, pitch, session_id):
"""Apply audio effects"""
session = get_session(session_id)
if session['generated_audio'] is None:
return None, "❌ No track generated yet"
try:
audio = session['generated_audio']
if stereo:
audio = audio_processor.apply_stereo_effect(audio)
if reverse:
audio = audio_processor.apply_reverse(audio)
if volume != 0:
audio = audio_processor.adjust_volume(audio, volume)
if pitch != 0:
audio = audio_processor.change_pitch(audio, pitch)
session['processed_audio'] = audio
# Convert to numpy for playback
samples = np.array(audio.get_array_of_samples()).astype(np.float32) / 32768.0
if audio.channels == 2:
samples = samples.reshape((-1, 2))
return (audio.frame_rate, samples), "✓ Effects applied!"
except Exception as e:
return None, f"❌ Effect processing failed: {str(e)}"
def generate_commercial_ad(
mode,
video_prompt,
jingle_prompt,
jingle_lyrics,
uploaded_image,
duration,
fps,
guidance_scale,
video_guidance_scale,
is_custom,
add_intro,
intro_prompt,
company_name,
session_id,
progress=gr.Progress()
):
"""Generate commercial ad (video, jingle+video, or image-to-video)"""
session = get_session(session_id)
if not session['jwt_token']:
return None, "❌ Please log in first"
# Check eligibility (include is_custom mode to allow 2 free preset generations)
eligibility = wp_api.check_video_eligibility(session['jwt_token'], is_custom_mode=is_custom)
if not eligibility.get('can_generate', False):
reason = eligibility.get('reason', 'unknown')
free_remaining = eligibility.get('free_remaining', 0)
credits = eligibility.get('credits', 0)
error_msg = f"❌ Cannot generate video: {reason}"
if free_remaining == 0 and credits == 0:
error_msg += "\n\n💡 You have 1 FREE generation with PRESET mode (non-custom settings)!"
error_msg += "\n📦 Or purchase credits at https://songlabai.com/credit-pricing/"
return None, error_msg
try:
video_path = None
jingle_path = None
# Generate jingle if needed
if mode == "jingle_video":
if not jingle_prompt:
return None, "❌ Please provide jingle description"
progress(0.1, desc="Generating jingle...")
# Generate jingle with retry logic
max_retries = 5
retry_delay = 60
for attempt in range(max_retries):
try:
jingle_payload = {
"inputs": {"prompt": jingle_prompt, "lyrics": jingle_lyrics or ""},
"parameters": {
"duration": float(duration),
"infer_step": 60,
"guidance_scale": 15,
"scheduler_type": "euler",
"cfg_type": "apg",
"omega_scale": 10
}
}
response = requests.post(
LYRICS_API_URL,
headers={"Authorization": f"Bearer {BEARER_TOKEN}", "Content-Type": "application/json"},
json=jingle_payload,
timeout=180
)
if response.status_code == 200:
jingle_path = tempfile.mktemp(suffix=".wav")
with open(jingle_path, 'wb') as f:
f.write(response.content)
break
elif response.status_code == 503 and attempt < max_retries - 1:
progress(0.15, desc=f"Jingle service warming up... waiting {retry_delay}s")
time.sleep(retry_delay)
else:
raise Exception(f"Jingle API error: HTTP {response.status_code}")
except requests.exceptions.Timeout:
if attempt < max_retries - 1:
progress(0.15, desc=f"Timeout, retrying...")
time.sleep(retry_delay)
else:
raise Exception("Jingle generation timed out")
progress(0.3, desc="Jingle complete! Starting video...")
# Warmup video endpoint in background
warmup_thread = threading.Thread(target=video_gen.warmup_endpoint)
warmup_thread.daemon = True
warmup_thread.start()
# Generate intro if Expert tier
intro_path = None
if add_intro and intro_prompt:
progress(0.35, desc="Generating 5s intro...")
intro_path = video_gen.generate_intro_with_voiceover(intro_prompt, company_name, progress)
progress(0.5, desc="Intro complete! Generating main video...")
# Generate main video
if mode == "text_to_video" or mode == "jingle_video":
if not video_prompt:
return None, "❌ Please provide video description"
video_path = video_gen.generate_video(
mode="text_to_video",
prompt=video_prompt,
duration=float(duration),
fps=fps,
guidance_scale=guidance_scale,
video_guidance_scale=video_guidance_scale,
progress=progress
)
elif mode == "image_to_video":
if not uploaded_image:
return None, "❌ Please upload an image"
# Convert uploaded image to base64
image_bytes = uploaded_image.read() if hasattr(uploaded_image, 'read') else uploaded_image
image_base64 = base64.b64encode(image_bytes).decode('utf-8')
video_path = video_gen.generate_video(
mode="image_to_video",
prompt=video_prompt,
duration=float(duration),
fps=fps,
guidance_scale=guidance_scale,
video_guidance_scale=video_guidance_scale,
image_base64=image_base64,
progress=progress
)
if not video_path:
return None, "❌ Video generation failed"
# Combine intro + main if needed
if intro_path:
progress(0.95, desc="Combining intro + main video...")
video_path = video_gen.concatenate_videos(intro_path, video_path)
# Merge jingle + video if needed
if jingle_path:
progress(0.98, desc="Merging jingle with video...")
video_path = video_gen.merge_audio_video(video_path, jingle_path)
# Deduct credit
generation_data = {
'mode': mode,
'duration': duration,
'fps': fps,
'guidance_scale': guidance_scale,
'video_guidance_scale': video_guidance_scale,
'is_preset': not is_custom,
'is_expert_intro': add_intro,
'prompt': video_prompt
}
wp_api.use_video_credit(session['jwt_token'], generation_data)
progress(1.0, desc="Complete!")
return video_path, f"✓ {'Expert commercial' if add_intro else 'Commercial ad'} generated successfully!"
except Exception as e:
return None, f"❌ Generation failed: {str(e)}"
def load_example_ad(example_name):
"""Load example commercial ad"""
if example_name in EXAMPLE_COMMERCIAL_ADS:
example = EXAMPLE_COMMERCIAL_ADS[example_name]
return example["jingle"], example["lyrics"], example["video"]
return "", "", ""
# Custom CSS for Suno-style theming
custom_css = """
/* Hide Gradio's built-in download button */
.download-link,
a[download],
.icon-button[aria-label="Download"],
button[aria-label="Download"] {
display: none !important;
}
/* Suno AI Dark Theme */
.gradio-container {
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, sans-serif !important;
}
/* Dark background */
body, .gradio-container {
background-color: #212126 !important;
}
/* Card styling */
.gr-box, .gr-form, .gr-panel {
background-color: var(--surface-color) !important;
border: 1px solid #334155 !important;
border-radius: 12px !important;
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.2) !important;
}
/* Input fields */
.gr-input, .gr-text-input, .gr-text-area, .gr-dropdown {
background-color: #0f172a !important;
border: 1px solid #334155 !important;
border-radius: 8px !important;
color: var(--text-primary) !important;
padding: 10px 12px !important;
}
/* Buttons */
.gr-button {
background: linear-gradient(135deg, var(--primary-color) 0%, var(--secondary-color) 100%) !important;
border: none !important;
border-radius: 8px !important;
color: white !important;
font-weight: 600 !important;
padding: 12px 24px !important;
transition: all 0.3s ease !important;
}
.gr-button:hover {
transform: translateY(-2px) !important;
box-shadow: 0 6px 12px rgba(99, 102, 241, 0.4) !important;
}
.gr-button-primary {
background: linear-gradient(135deg, #10b981 0%, #059669 100%) !important;
}
.gr-button-secondary {
background: linear-gradient(135deg, #f59e0b 0%, #d97706 100%) !important;
}
/* Sliders */
.gr-slider input[type="range"] {
background: #3a3a3f !important;
}
.gr-slider input[type="range"]::-webkit-slider-thumb {
background: #6366f1 !important;
}
/* Audio/Video players */
audio, video {
width: 100% !important;
background-color: #1a1a1e !important;
border-radius: 8px !important;
}
/* Text colors */
.gr-label, .gr-text, .markdown-text {
color: #ffffff !important;
}
/* Header styling */
h1, h2, h3 {
color: #ffffff !important;
font-weight: 700 !important;
}
/* Tabs */
.tab-nav button {
background-color: #26262B !important;
border: 1px solid #3a3a3f !important;
color: #999 !important;
border-radius: 8px 8px 0 0 !important;
transition: all 0.3s ease !important;
}
.tab-nav button.selected {
background-color: #6366f1 !important;
color: white !important;
border-bottom: 3px solid #6366f1 !important;
}
/* Loading animation */
@keyframes pulse {
0%, 100% { opacity: 1; }
50% { opacity: 0.5; }
}
.generating {
animation: pulse 2s ease-in-out infinite;
}
"""
def create_ui():
"""Create the main Gradio interface"""
with gr.Blocks(title="SongLab AI") as app:
# Session state
session_id = gr.State(value=str(uuid.uuid4()))
# URL parameters for WordPress authentication
# Using hidden textboxes instead of State because JS can update textboxes but not State
jwt_token_param = gr.Textbox(value="", visible=False, elem_id="jwt_token_hidden")
user_id_param = gr.Textbox(value="", visible=False, elem_id="user_id_hidden")
# Header
gr.Markdown("# 🎵 SongLab AI", elem_classes=["header"])
gr.Markdown("*Professional Music & Video Generation - Powered by AI*")
# Authentication section
with gr.Row():
with gr.Column(scale=2):
auth_status = gr.Markdown("⚠️ Checking authentication...")
with gr.Column(scale=2):
credit_display = gr.Markdown("Credits: Loading...")
with gr.Column(scale=1):
gr.HTML(
'<a href="https://songlabai.com/credit-pricing/" target="_blank" '
'style="display: inline-block; padding: 8px 16px; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); '
'color: white; text-decoration: none; border-radius: 6px; font-weight: 600; text-align: center;">'
'💳 Buy Credits</a>'
)
auth_message = gr.Markdown("")
# Main tabs
with gr.Tabs() as tabs:
# Tab 1: Music Generation
with gr.Tab("🎵 Generate Music"):
gr.Markdown("### Create Your Track")
gr.Markdown("*🎁 1 free sample! Pay $5 to download or purchase credits for unlimited downloads.*")
with gr.Row():
with gr.Column(scale=2):
genre = gr.Dropdown(
choices=[
"Pop", "Rock", "Hip-Hop", "Electronic", "Jazz",
"Classical", "R&B", "Country", "Reggae", "Blues",
"Metal", "Funk", "Soul", "Disco", "House",
"Techno", "Ambient", "Lo-fi", "Trap", "Dubstep",
"Indie", "Folk", "Latin", "Gospel", "Afrobeat"
],
label="Genre",
value="Pop"
)
energy = gr.Radio(
choices=["Low", "Medium", "High"],
label="Energy Level",
value="Medium"
)
tempo = gr.Slider(
minimum=40,
maximum=200,
value=120,
step=1,
label="Tempo (BPM)"
)
duration = gr.Slider(
minimum=15,
maximum=140,
value=30,
step=5,
label="Duration (seconds)"
)
with gr.Column(scale=3):
description = gr.Textbox(
label="Track Description",
placeholder="Describe your track in detail... (e.g., 'Upbeat summer pop song with catchy chorus and bright synths')",
lines=4
)
generate_btn = gr.Button("🎵 Generate Track", variant="primary", size="lg")
status_music = gr.Markdown("")
info_music = gr.Markdown("")
audio_output = gr.Audio(label="Generated Track", interactive=False)
download_btn = gr.Button("⬇️ Download Track", variant="primary", size="lg")
unlock_btn = gr.Button("🔓 Unlock Download - $5", variant="secondary", size="lg")
download_file = gr.File(label="Your Download")
download_status = gr.Markdown("")
unlock_status = gr.Markdown("")
# Tab 2: Music with Vocals
with gr.Tab("🎤 Generate with Vocals"):
gr.Markdown("### Create Track with Lyrics")
gr.Markdown("*Add vocals and lyrics to your music*")
with gr.Row():
with gr.Column(scale=2):
genre_v = gr.Dropdown(
choices=[
"Pop", "Rock", "Hip-Hop", "Electronic", "Jazz",
"Classical", "R&B", "Country", "Reggae", "Blues",
"Metal", "Funk", "Soul", "Disco", "House",
"Techno", "Ambient", "Lo-fi", "Trap", "Dubstep",
"Indie", "Folk", "Latin", "Gospel", "Afrobeat"
],
label="Genre",
value="Pop"
)
energy_v = gr.Radio(
choices=["Low", "Medium", "High"],
label="Energy Level",
value="Medium"
)
tempo_v = gr.Slider(
minimum=40,
maximum=200,
value=120,
step=1,
label="Tempo (BPM)"
)
duration_v = gr.Slider(
minimum=15,
maximum=140,
value=30,
step=5,
label="Duration (seconds)"
)
with gr.Column(scale=3):
description_v = gr.Textbox(
label="Track Description",
placeholder="Describe your track...",
lines=3
)
lyrics_v = gr.Textbox(
label="Lyrics",
placeholder="Enter your lyrics here...\n\nVerse 1:\n...\n\nChorus:\n...",
lines=8
)
generate_vocals_btn = gr.Button("🎤 Generate with Vocals", variant="primary", size="lg")
status_vocals = gr.Markdown("")
info_vocals = gr.Markdown("")
audio_output_v = gr.Audio(label="Generated Track with Vocals", interactive=False)
download_btn_v = gr.Button("⬇️ Download Track", variant="primary", size="lg")
unlock_btn_v = gr.Button("🔓 Unlock Download - $5", variant="secondary", size="lg")
download_file_v = gr.File(label="Your Download")
download_status_v = gr.Markdown("")
unlock_status_v = gr.Markdown("")
# Tab 3: Audio Effects
with gr.Tab("🎚️ Audio Effects"):
gr.Markdown("### Process Your Track")
gr.Markdown("*Apply effects to your generated music*")
with gr.Row():
stereo_effect = gr.Checkbox(label="Stereo Widening", value=False)
reverse_effect = gr.Checkbox(label="Reverse", value=False)
with gr.Row():
volume_slider = gr.Slider(
minimum=-20,
maximum=20,
value=0,
step=1,
label="Volume (dB)"
)
pitch_slider = gr.Slider(
minimum=-12,
maximum=12,
value=0,
step=1,
label="Pitch (semitones)"
)
apply_effects_btn = gr.Button("✨ Apply Effects", variant="primary", size="lg")
status_effects = gr.Markdown("")
audio_processed = gr.Audio(label="Processed Track", interactive=False)
reset_btn = gr.Button("🔄 Reset Effects")
# Tab 4: Commercial Ad (VIDEO)
with gr.Tab("🎬 Commercial Ad"):
gr.Markdown("### Generate Professional Commercial Ads")
gr.Markdown("*Create video ads with optional jingles*")
# Mode selector
ad_mode = gr.Radio(
choices=["text_to_video", "image_to_video", "jingle_video"],
label="Generation Mode",
value="jingle_video", # Default to full ad with jingle
info="Choose how to create your ad"
)
# Example selector
example_selector = gr.Dropdown(
choices=list(EXAMPLE_COMMERCIAL_ADS.keys()),
label="📋 Example Commercial Ads",
value="Select an example..."
)
use_example_btn = gr.Button("Use This Example")
# Inputs (conditional based on mode)
with gr.Row():
with gr.Column():
video_prompt_input = gr.Textbox(
label="Video Description",
placeholder="Describe your video scene...",
lines=4
)
image_upload = gr.File(
label="Upload Image (for Image-to-Video)",
file_types=["image"],
visible=False
)
jingle_prompt_input = gr.Textbox(
label="Jingle Description (for Jingle+Video)",
placeholder="energetic corporate music, upbeat tempo, inspiring melody...",
lines=3,
visible=True # Visible by default
)
jingle_lyrics_input = gr.Textbox(
label="Jingle Lyrics (Optional)",
placeholder="Your brand, your way, every single day...",
lines=2,
visible=True # Visible by default
)
with gr.Column():
# Settings
custom_mode = gr.Checkbox(label="🎨 Customize Settings (Requires Credits)", value=False)
duration_video = gr.Slider(
minimum=2,
maximum=15,
value=3,
step=1,
label="Duration (seconds)",
interactive=False
)
fps_selector = gr.Dropdown(
choices=["Standard (16fps)"],
label="Quality (FPS)",
value="Standard (16fps)",
interactive=False
)
creativity_slider = gr.Slider(
minimum=1.0,
maximum=15.0,
value=7.0,
step=0.5,
label="🎨 Creativity",
interactive=False
)
motion_slider = gr.Slider(
minimum=1.0,
maximum=10.0,
value=5.0,
step=0.5,
label="⚡ Motion",
interactive=False
)
# Expert intro
expert_intro_check = gr.Checkbox(
label="🎬 Add 5s Intro with Voiceover (Expert Tier)",
value=False,
visible=False
)
intro_prompt_input = gr.Textbox(
label="Intro Description",
placeholder="Logo on gradient background...",
visible=False
)
company_name_input = gr.Textbox(
label="Company Name (for voiceover)",
placeholder="YourBrand Inc.",
visible=False
)
generate_ad_btn = gr.Button("🎬 Generate Commercial Ad", variant="primary", size="lg")
ad_status = gr.Markdown("")
ad_video_output = gr.Video(label="Your Commercial Ad")
download_ad_btn = gr.Button("⬇️ Download Ad (MP4)", variant="secondary")
# Event handlers - Mode switcher for video
def update_video_ui(mode):
return {
image_upload: gr.update(visible=(mode == "image_to_video")),
jingle_prompt_input: gr.update(visible=(mode == "jingle_video")),
jingle_lyrics_input: gr.update(visible=(mode == "jingle_video"))
}
ad_mode.change(
fn=update_video_ui,
inputs=[ad_mode],
outputs=[image_upload, jingle_prompt_input, jingle_lyrics_input]
)
# Custom mode toggle
def update_custom_ui(is_custom):
return {
duration_video: gr.update(interactive=is_custom),
fps_selector: gr.update(interactive=is_custom),
creativity_slider: gr.update(interactive=is_custom),
motion_slider: gr.update(interactive=is_custom)
}
custom_mode.change(
fn=update_custom_ui,
inputs=[custom_mode],
outputs=[duration_video, fps_selector, creativity_slider, motion_slider]
)
# Example loader
use_example_btn.click(
fn=load_example_ad,
inputs=[example_selector],
outputs=[jingle_prompt_input, jingle_lyrics_input, video_prompt_input]
)
# Generate music
generate_btn.click(
fn=generate_track,
inputs=[genre, energy, tempo, description, duration, session_id],
outputs=[audio_output, status_music, info_music]
)
# Generate with vocals
generate_vocals_btn.click(
fn=generate_with_vocals,
inputs=[genre_v, energy_v, tempo_v, description_v, lyrics_v, duration_v, session_id],
outputs=[audio_output_v, status_vocals, info_vocals]
)
# Download handlers
download_btn.click(
fn=download_track,
inputs=[session_id],
outputs=[download_file, download_status]
)
download_btn_v.click(
fn=download_track,
inputs=[session_id],
outputs=[download_file_v, download_status_v]
)
# Unlock download handlers
unlock_btn.click(
fn=unlock_download,
inputs=[session_id],
outputs=[unlock_status]
)
unlock_btn_v.click(
fn=unlock_download,
inputs=[session_id],
outputs=[unlock_status_v]
)
# Effects
apply_effects_btn.click(
fn=apply_effects,
inputs=[stereo_effect, reverse_effect, volume_slider, pitch_slider, session_id],
outputs=[audio_processed, status_effects]
)
reset_btn.click(
fn=lambda: (False, False, 0, 0, None, "✓ Effects reset"),
outputs=[stereo_effect, reverse_effect, volume_slider, pitch_slider, audio_processed, status_effects]
)
# Generate commercial ad
def generate_ad_wrapper(mode, video_prompt, jingle_prompt, jingle_lyrics, image, duration, fps_str, creativity, motion, is_custom, add_intro, intro_prompt, company_name, session_id, progress=gr.Progress()):
# Parse FPS from string
fps = 16 # default
if "8" in fps_str:
fps = 8
elif "24" in fps_str:
fps = 24
return generate_commercial_ad(
mode, video_prompt, jingle_prompt, jingle_lyrics, image,
duration, fps, creativity, motion, is_custom,
add_intro, intro_prompt, company_name, session_id, progress
)
generate_ad_btn.click(
fn=generate_ad_wrapper,
inputs=[
ad_mode, video_prompt_input, jingle_prompt_input, jingle_lyrics_input,
image_upload, duration_video, fps_selector, creativity_slider, motion_slider,
custom_mode, expert_intro_check, intro_prompt_input, company_name_input, session_id
],
outputs=[ad_video_output, ad_status]
)
# Auto-authenticate on page load using URL parameters
app.load(
fn=authenticate_user,
inputs=[jwt_token_param, user_id_param, session_id],
outputs=[auth_status, credit_display, auth_message],
js="""
function() {
const urlParams = new URLSearchParams(window.location.search);
const jwt = urlParams.get('jwt_token') || urlParams.get('token') || '';
const userId = urlParams.get('user_id') || '';
console.log('SongLab AI: Auto-authenticating with JWT:', jwt ? 'present' : 'missing');
console.log('SongLab AI: Extracted values - JWT length:', jwt.length, 'UserID:', userId);
// CRITICAL: Return as array for Gradio to pass to Python function
return [jwt, userId];
}
"""
)
return app
if __name__ == "__main__":
demo = create_ui()
demo.queue(max_size=20)
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
show_error=True,
theme=gr.themes.Base(),
css=custom_css,
footer_links=["gradio", "settings"] # Hide API docs link
)