Spaces:
Sleeping
Sleeping
| import os | |
| import tempfile | |
| import logging | |
| import re | |
| import random | |
| import time | |
| import zipfile | |
| import smtplib | |
| from email.mime.text import MIMEText | |
| from email.mime.multipart import MIMEMultipart | |
| from email.mime.base import MIMEBase | |
| from email import encoders | |
| from dotenv import load_dotenv | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| import multiprocessing | |
| from flask import Flask, render_template, request, jsonify, send_from_directory | |
| from googleapiclient.discovery import build | |
| import yt_dlp | |
| from pydub import AudioSegment | |
| logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') | |
| app = Flask(__name__, static_folder='static') | |
| # Load API key and email credentials from .env file | |
| load_dotenv() | |
| api_key = os.getenv('YOUTUBE_API_KEY') | |
| sender_email = os.getenv('SENDER_EMAIL') | |
| email_password = os.getenv('EMAIL_PASSWORD') | |
| # Validate environment variables | |
| if not all([api_key, sender_email, email_password]): | |
| logging.error("Missing environment variables. Please check your .env file.") | |
| raise ValueError("Missing environment variables. Please check your .env file.") | |
| # Determine the number of CPU cores available | |
| num_cores = multiprocessing.cpu_count() | |
| # Function to validate email | |
| def is_valid_email(email): | |
| pattern = r'^[\w\.-]+@[\w\.-]+\.\w+$' | |
| return re.match(pattern, email) is not None | |
| # Function to get YouTube links | |
| def get_youtube_links(api_key, query, max_results=20): | |
| try: | |
| youtube = build('youtube', 'v3', developerKey=api_key) | |
| search_response = youtube.search().list( | |
| q=query, | |
| part='snippet', | |
| type='video', | |
| maxResults=max_results | |
| ).execute() | |
| videos = [] | |
| for item in search_response['items']: | |
| video_id = item['id']['videoId'] | |
| video_title = item['snippet']['title'] | |
| video_url = f"https://www.youtube.com/watch?v={video_id}" | |
| videos.append((video_title, video_url)) | |
| return videos | |
| except Exception as e: | |
| logging.error(f"Failed to fetch YouTube links: {e}") | |
| return [] | |
| # Function to download audio from YouTube | |
| def download_single_audio(url, index, download_path): | |
| ydl_opts = { | |
| 'format': 'bestaudio/best', | |
| 'outtmpl': f'{download_path}/song_{index}.%(ext)s', | |
| 'postprocessors': [{ | |
| 'key': 'FFmpegExtractAudio', | |
| 'preferredcodec': 'mp3', | |
| 'preferredquality': '192', | |
| }], | |
| 'retries': 3, | |
| 'fragment_retries': 3, | |
| 'cookies': './cookies.txt', # Path to your cookies file | |
| } | |
| # Ensure the cookies file exists | |
| if not os.path.exists(ydl_opts['cookies']): | |
| logging.error("Cookies file not found. Please ensure './cookies.txt' exists and is valid.") | |
| return None | |
| max_attempts = 3 | |
| for attempt in range(max_attempts): | |
| try: | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| ydl.download([url]) | |
| downloaded_files = [f for f in os.listdir(download_path) if f.startswith(f"song_{index}.") and f.endswith(".mp3")] | |
| if downloaded_files: | |
| return os.path.join(download_path, downloaded_files[0]) | |
| else: | |
| logging.error(f"Downloaded file not found for {url}") | |
| return None | |
| except Exception as e: | |
| logging.error(f"Error downloading audio (attempt {attempt + 1}/{max_attempts}): {e}") | |
| if "Sign in to confirm you're not a bot" in str(e): | |
| sleep_time = random.uniform(5, 15) | |
| logging.info(f"Detected anti-bot measure. Waiting for {sleep_time:.2f} seconds before retrying...") | |
| time.sleep(sleep_time) # Introduce wait time before retrying | |
| else: | |
| return None | |
| logging.error(f"Failed to download audio after {max_attempts} attempts: {url}") | |
| return None | |
| # Function to download all audio files in parallel | |
| def download_all_audio(video_urls, download_path): | |
| downloaded_files = [] | |
| with ThreadPoolExecutor(max_workers=num_cores) as executor: | |
| futures = { | |
| executor.submit(download_single_audio, url, index, download_path): index | |
| for index, url in enumerate(video_urls, start=1) | |
| } | |
| for future in as_completed(futures): | |
| try: | |
| mp3_file = future.result() | |
| if mp3_file: | |
| downloaded_files.append(mp3_file) | |
| except Exception as e: | |
| logging.error(f"Error occurred: {e}") | |
| return downloaded_files | |
| # Function to create a mashup from audio files | |
| def create_mashup(audio_files, output_file, trim_duration): | |
| mashup = AudioSegment.silent(duration=0) | |
| total_trim_duration_per_file = trim_duration * 1000 # Convert to milliseconds | |
| for file in audio_files: | |
| try: | |
| audio = AudioSegment.from_file(file) | |
| if len(audio) < total_trim_duration_per_file: | |
| logging.warning(f"Audio file {file} is shorter than trim duration. Using full length.") | |
| part = audio | |
| else: | |
| part = audio[:total_trim_duration_per_file] | |
| mashup += part | |
| except Exception as e: | |
| logging.error(f"Error processing file {file}: {e}") | |
| if len(mashup) == 0: | |
| logging.error("No audio files were successfully processed.") | |
| return None | |
| expected_mashup_duration = total_trim_duration_per_file * len(audio_files) | |
| if len(mashup) < expected_mashup_duration: | |
| logging.warning(f"Mashup duration ({len(mashup)}ms) is less than expected ({expected_mashup_duration}ms).") | |
| else: | |
| mashup = mashup[:expected_mashup_duration] | |
| mashup.export(output_file, format="mp3", bitrate="128k") | |
| return output_file | |
| def create_zip_file(file_path, zip_path): | |
| with zipfile.ZipFile(zip_path, 'w') as zipf: | |
| zipf.write(file_path, os.path.basename(file_path)) | |
| return zip_path | |
| # Function to send email | |
| def send_email(sender_email, receiver_email, subject, body, attachment_path, password): | |
| try: | |
| msg = MIMEMultipart() | |
| msg['From'] = sender_email | |
| msg['To'] = receiver_email | |
| msg['Subject'] = subject | |
| msg.attach(MIMEText(body, 'plain')) | |
| with open(attachment_path, 'rb') as attachment: | |
| part = MIMEBase('application', 'zip') | |
| part.set_payload(attachment.read()) | |
| encoders.encode_base64(part) | |
| part.add_header('Content-Disposition', f"attachment; filename= {os.path.basename(attachment_path)}") | |
| msg.attach(part) | |
| server = smtplib.SMTP('smtp.gmail.com', 587) | |
| server.starttls() | |
| server.login(sender_email, password) | |
| text = msg.as_string() | |
| server.sendmail(sender_email, receiver_email, text) | |
| server.quit() | |
| logging.info("Email sent successfully!") | |
| return True | |
| except Exception as e: | |
| logging.error(f"Failed to send email: {e}") | |
| return False | |
| def index(): | |
| return render_template('index.html') | |
| def serve_static(filename): | |
| return send_from_directory(app.static_folder, filename) | |
| def page_not_found(e): | |
| logging.error(f"404 error: {request.url}") | |
| return jsonify(error=str(e)), 404 | |
| def internal_server_error(e): | |
| logging.error(f"500 error: {str(e)}") | |
| return jsonify(error="Internal Server Error"), 500 | |
| def create_mashup_route(): | |
| try: | |
| # Log incoming request data | |
| logging.info(f"Received create_mashup request: {request.form}") | |
| # Validate and extract form data | |
| singer_name = request.form.get('singer_name') | |
| num_videos = request.form.get('num_videos') | |
| trim_duration = request.form.get('trim_duration') | |
| receiver_email = request.form.get('receiver_email') | |
| # Validate required fields | |
| if not all([singer_name, num_videos, trim_duration, receiver_email]): | |
| missing_fields = [field for field in ['singer_name', 'num_videos', 'trim_duration', 'receiver_email'] if not request.form.get(field)] | |
| logging.error(f"Missing required fields: {missing_fields}") | |
| return jsonify({'status': 'error', 'message': f'Missing required fields: {", ".join(missing_fields)}'}) | |
| # Validate and convert numeric fields | |
| try: | |
| num_videos = max(int(num_videos), 10) | |
| trim_duration = max(int(trim_duration), 20) | |
| except ValueError as e: | |
| logging.error(f"Invalid numeric values: {str(e)}") | |
| return jsonify({'status': 'error', 'message': 'Invalid numeric values for num_videos or trim_duration'}) | |
| # Validate email | |
| if not is_valid_email(receiver_email): | |
| logging.error(f"Invalid email address: {receiver_email}") | |
| return jsonify({'status': 'error', 'message': 'Please enter a valid email address.'}) | |
| # Fetch YouTube links | |
| logging.info(f"Fetching YouTube links for {singer_name}") | |
| videos = get_youtube_links(api_key, singer_name, max_results=num_videos) | |
| if not videos: | |
| logging.warning(f"No videos found for {singer_name}") | |
| return jsonify({'status': 'error', 'message': f'No videos found for {singer_name}. Please try a different singer name.'}) | |
| # Create temporary directory | |
| with tempfile.TemporaryDirectory() as download_path: | |
| logging.info(f"Created temporary directory: {download_path}") | |
| # Download audio files | |
| video_urls = [url for _, url in videos] | |
| logging.info(f"Downloading {len(video_urls)} audio files") | |
| audio_files = download_all_audio(video_urls, download_path) | |
| if not audio_files: | |
| logging.error("Failed to download audio files") | |
| return jsonify({'status': 'error', 'message': 'Failed to download audio files. Please try again.'}) | |
| # Create mashup | |
| logging.info("Creating mashup") | |
| output_file = os.path.join(download_path, "mashup.mp3") | |
| mashup_file = create_mashup(audio_files, output_file, trim_duration) | |
| if not mashup_file: | |
| logging.error("Failed to create mashup") | |
| return jsonify({'status': 'error', 'message': 'Failed to create mashup. Please try again.'}) | |
| # Create zip file | |
| zip_file = os.path.join(download_path, "mashup.zip") | |
| create_zip_file(mashup_file, zip_file) | |
| # Send email | |
| logging.info(f"Sending email to {receiver_email}") | |
| subject = f"Your {singer_name} YouTube Mashup" | |
| body = f"Please find attached your custom YouTube mashup of {singer_name} songs. Duration: {trim_duration * len(audio_files)} seconds." | |
| email_sent = send_email(sender_email, receiver_email, subject, body, zip_file, email_password) | |
| if email_sent: | |
| logging.info("Mashup created and sent successfully") | |
| return jsonify({'status': 'success', 'message': 'Mashup created and sent successfully! Check your email.'}) | |
| else: | |
| logging.error("Failed to send email") | |
| return jsonify({'status': 'error', 'message': 'Mashup created but failed to send email. Please try again.'}) | |
| except Exception as e: | |
| logging.error(f"Unexpected error in create_mashup_route: {str(e)}", exc_info=True) | |
| return jsonify({'status': 'error', 'message': f'An unexpected error occurred: {str(e)}'}) | |
| if __name__ == '__main__': | |
| port = int(os.environ.get('PORT', 5000)) | |
| app.run(host='0.0.0.0', port=port, debug=True) | |