Anchit-mashup / app.py
Keys
Added Cookies to make it smoother
a112445
import os
import tempfile
import logging
import re
import random
import time
import zipfile
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
from dotenv import load_dotenv
from concurrent.futures import ThreadPoolExecutor, as_completed
import multiprocessing
from flask import Flask, render_template, request, jsonify, send_from_directory
from googleapiclient.discovery import build
import yt_dlp
from pydub import AudioSegment
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
app = Flask(__name__, static_folder='static')
# Load API key and email credentials from .env file
load_dotenv()
api_key = os.getenv('YOUTUBE_API_KEY')
sender_email = os.getenv('SENDER_EMAIL')
email_password = os.getenv('EMAIL_PASSWORD')
# Validate environment variables
if not all([api_key, sender_email, email_password]):
logging.error("Missing environment variables. Please check your .env file.")
raise ValueError("Missing environment variables. Please check your .env file.")
# Determine the number of CPU cores available
num_cores = multiprocessing.cpu_count()
# Function to validate email
def is_valid_email(email):
pattern = r'^[\w\.-]+@[\w\.-]+\.\w+$'
return re.match(pattern, email) is not None
# Function to get YouTube links
def get_youtube_links(api_key, query, max_results=20):
try:
youtube = build('youtube', 'v3', developerKey=api_key)
search_response = youtube.search().list(
q=query,
part='snippet',
type='video',
maxResults=max_results
).execute()
videos = []
for item in search_response['items']:
video_id = item['id']['videoId']
video_title = item['snippet']['title']
video_url = f"https://www.youtube.com/watch?v={video_id}"
videos.append((video_title, video_url))
return videos
except Exception as e:
logging.error(f"Failed to fetch YouTube links: {e}")
return []
# Function to download audio from YouTube
def download_single_audio(url, index, download_path):
ydl_opts = {
'format': 'bestaudio/best',
'outtmpl': f'{download_path}/song_{index}.%(ext)s',
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
}],
'retries': 3,
'fragment_retries': 3,
'cookies': './cookies.txt', # Path to your cookies file
}
# Ensure the cookies file exists
if not os.path.exists(ydl_opts['cookies']):
logging.error("Cookies file not found. Please ensure './cookies.txt' exists and is valid.")
return None
max_attempts = 3
for attempt in range(max_attempts):
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([url])
downloaded_files = [f for f in os.listdir(download_path) if f.startswith(f"song_{index}.") and f.endswith(".mp3")]
if downloaded_files:
return os.path.join(download_path, downloaded_files[0])
else:
logging.error(f"Downloaded file not found for {url}")
return None
except Exception as e:
logging.error(f"Error downloading audio (attempt {attempt + 1}/{max_attempts}): {e}")
if "Sign in to confirm you're not a bot" in str(e):
sleep_time = random.uniform(5, 15)
logging.info(f"Detected anti-bot measure. Waiting for {sleep_time:.2f} seconds before retrying...")
time.sleep(sleep_time) # Introduce wait time before retrying
else:
return None
logging.error(f"Failed to download audio after {max_attempts} attempts: {url}")
return None
# Function to download all audio files in parallel
def download_all_audio(video_urls, download_path):
downloaded_files = []
with ThreadPoolExecutor(max_workers=num_cores) as executor:
futures = {
executor.submit(download_single_audio, url, index, download_path): index
for index, url in enumerate(video_urls, start=1)
}
for future in as_completed(futures):
try:
mp3_file = future.result()
if mp3_file:
downloaded_files.append(mp3_file)
except Exception as e:
logging.error(f"Error occurred: {e}")
return downloaded_files
# Function to create a mashup from audio files
def create_mashup(audio_files, output_file, trim_duration):
mashup = AudioSegment.silent(duration=0)
total_trim_duration_per_file = trim_duration * 1000 # Convert to milliseconds
for file in audio_files:
try:
audio = AudioSegment.from_file(file)
if len(audio) < total_trim_duration_per_file:
logging.warning(f"Audio file {file} is shorter than trim duration. Using full length.")
part = audio
else:
part = audio[:total_trim_duration_per_file]
mashup += part
except Exception as e:
logging.error(f"Error processing file {file}: {e}")
if len(mashup) == 0:
logging.error("No audio files were successfully processed.")
return None
expected_mashup_duration = total_trim_duration_per_file * len(audio_files)
if len(mashup) < expected_mashup_duration:
logging.warning(f"Mashup duration ({len(mashup)}ms) is less than expected ({expected_mashup_duration}ms).")
else:
mashup = mashup[:expected_mashup_duration]
mashup.export(output_file, format="mp3", bitrate="128k")
return output_file
def create_zip_file(file_path, zip_path):
with zipfile.ZipFile(zip_path, 'w') as zipf:
zipf.write(file_path, os.path.basename(file_path))
return zip_path
# Function to send email
def send_email(sender_email, receiver_email, subject, body, attachment_path, password):
try:
msg = MIMEMultipart()
msg['From'] = sender_email
msg['To'] = receiver_email
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
with open(attachment_path, 'rb') as attachment:
part = MIMEBase('application', 'zip')
part.set_payload(attachment.read())
encoders.encode_base64(part)
part.add_header('Content-Disposition', f"attachment; filename= {os.path.basename(attachment_path)}")
msg.attach(part)
server = smtplib.SMTP('smtp.gmail.com', 587)
server.starttls()
server.login(sender_email, password)
text = msg.as_string()
server.sendmail(sender_email, receiver_email, text)
server.quit()
logging.info("Email sent successfully!")
return True
except Exception as e:
logging.error(f"Failed to send email: {e}")
return False
@app.route('/')
def index():
return render_template('index.html')
@app.route('/static/<path:filename>')
def serve_static(filename):
return send_from_directory(app.static_folder, filename)
@app.errorhandler(404)
def page_not_found(e):
logging.error(f"404 error: {request.url}")
return jsonify(error=str(e)), 404
@app.errorhandler(500)
def internal_server_error(e):
logging.error(f"500 error: {str(e)}")
return jsonify(error="Internal Server Error"), 500
@app.route('/create_mashup', methods=['POST'])
def create_mashup_route():
try:
# Log incoming request data
logging.info(f"Received create_mashup request: {request.form}")
# Validate and extract form data
singer_name = request.form.get('singer_name')
num_videos = request.form.get('num_videos')
trim_duration = request.form.get('trim_duration')
receiver_email = request.form.get('receiver_email')
# Validate required fields
if not all([singer_name, num_videos, trim_duration, receiver_email]):
missing_fields = [field for field in ['singer_name', 'num_videos', 'trim_duration', 'receiver_email'] if not request.form.get(field)]
logging.error(f"Missing required fields: {missing_fields}")
return jsonify({'status': 'error', 'message': f'Missing required fields: {", ".join(missing_fields)}'})
# Validate and convert numeric fields
try:
num_videos = max(int(num_videos), 10)
trim_duration = max(int(trim_duration), 20)
except ValueError as e:
logging.error(f"Invalid numeric values: {str(e)}")
return jsonify({'status': 'error', 'message': 'Invalid numeric values for num_videos or trim_duration'})
# Validate email
if not is_valid_email(receiver_email):
logging.error(f"Invalid email address: {receiver_email}")
return jsonify({'status': 'error', 'message': 'Please enter a valid email address.'})
# Fetch YouTube links
logging.info(f"Fetching YouTube links for {singer_name}")
videos = get_youtube_links(api_key, singer_name, max_results=num_videos)
if not videos:
logging.warning(f"No videos found for {singer_name}")
return jsonify({'status': 'error', 'message': f'No videos found for {singer_name}. Please try a different singer name.'})
# Create temporary directory
with tempfile.TemporaryDirectory() as download_path:
logging.info(f"Created temporary directory: {download_path}")
# Download audio files
video_urls = [url for _, url in videos]
logging.info(f"Downloading {len(video_urls)} audio files")
audio_files = download_all_audio(video_urls, download_path)
if not audio_files:
logging.error("Failed to download audio files")
return jsonify({'status': 'error', 'message': 'Failed to download audio files. Please try again.'})
# Create mashup
logging.info("Creating mashup")
output_file = os.path.join(download_path, "mashup.mp3")
mashup_file = create_mashup(audio_files, output_file, trim_duration)
if not mashup_file:
logging.error("Failed to create mashup")
return jsonify({'status': 'error', 'message': 'Failed to create mashup. Please try again.'})
# Create zip file
zip_file = os.path.join(download_path, "mashup.zip")
create_zip_file(mashup_file, zip_file)
# Send email
logging.info(f"Sending email to {receiver_email}")
subject = f"Your {singer_name} YouTube Mashup"
body = f"Please find attached your custom YouTube mashup of {singer_name} songs. Duration: {trim_duration * len(audio_files)} seconds."
email_sent = send_email(sender_email, receiver_email, subject, body, zip_file, email_password)
if email_sent:
logging.info("Mashup created and sent successfully")
return jsonify({'status': 'success', 'message': 'Mashup created and sent successfully! Check your email.'})
else:
logging.error("Failed to send email")
return jsonify({'status': 'error', 'message': 'Mashup created but failed to send email. Please try again.'})
except Exception as e:
logging.error(f"Unexpected error in create_mashup_route: {str(e)}", exc_info=True)
return jsonify({'status': 'error', 'message': f'An unexpected error occurred: {str(e)}'})
if __name__ == '__main__':
port = int(os.environ.get('PORT', 5000))
app.run(host='0.0.0.0', port=port, debug=True)