myVideoAudio / app.py
shiue20's picture
Update app.py
b00c37a verified
import os
import json
import uuid
import re
import unicodedata
import subprocess
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
from flask import Flask, request, jsonify, render_template, session, send_from_directory
from werkzeug.exceptions import BadRequest
from yt_dlp import YoutubeDL, DownloadError
from dotenv import load_dotenv
load_dotenv()
FLASK_SECRET_KEY = os.getenv('FLASK_SECRET_KEY', 'default_secret_key')
OUTPUT_FOLDER = os.path.join(os.path.expanduser("~"), "Desktop", "myUniqSong")
os.makedirs(OUTPUT_FOLDER, exist_ok=True)
USER_DATA_FILE = 'users.json'
app = Flask(__name__)
app.secret_key = FLASK_SECRET_KEY
def load_users():
if os.path.isfile(USER_DATA_FILE):
with open(USER_DATA_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
return {}
def save_users(data):
with open(USER_DATA_FILE, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
def sanitize_title(title):
title = unicodedata.normalize('NFC', title.strip())
title = re.sub(r'[\\/:"*?<>|]', '_', title)
title = re.sub(r'\s+', '_', title)
title = re.sub(r'_+', '_', title)
return title.strip('_')
def clean_youtube_url(url):
parsed = urlparse(url)
qs = parse_qs(parsed.query)
v = qs.get('v')
if v:
new_qs = {'v': v[0]}
cleaned = urlunparse((
parsed.scheme if parsed.scheme else 'https',
parsed.netloc,
parsed.path,
parsed.params,
urlencode(new_qs),
parsed.fragment
))
return cleaned
return url
def download_youtube_audio(url, quality='192'):
ydl_opts = {
'format': 'bestaudio/best',
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': quality,
}],
'outtmpl': os.path.join(OUTPUT_FOLDER, '%(title)s.%(ext)s'),
'quiet': True,
'noplaylist': True,
}
with YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=True)
raw_title = info.get('title', 'audio')
title = sanitize_title(raw_title)
if 'requested_downloads' in info and info['requested_downloads']:
for download in info['requested_downloads']:
filepath = download.get('filepath')
if filepath and filepath.lower().endswith('.mp3') and os.path.exists(filepath):
return filepath, title
mp3_files = [f for f in os.listdir(OUTPUT_FOLDER) if f.lower().endswith('.mp3')]
cleaned_title = title.lower().replace('_', '').replace(' ', '')
for f in mp3_files:
cleaned_f = f.lower().replace('_', '').replace(' ', '')
if cleaned_title in cleaned_f:
return os.path.join(OUTPUT_FOLDER, f), title
raise FileNotFoundError(f"MP3 file not found after download: {os.path.join(OUTPUT_FOLDER, title + '.mp3')}")
def convert_to_m4a(mp3_path):
m4a_path = mp3_path.rsplit('.', 1)[0] + '.m4a'
subprocess.run(['ffmpeg', '-y', '-i', mp3_path, '-c:a', 'aac', m4a_path], check=True)
return m4a_path
def download_tiktok_audio(url, quality='192'):
# TikTok video audio download, similar options to YouTube audio
ydl_opts = {
'format': 'bestaudio/best',
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': quality,
}],
'outtmpl': os.path.join(OUTPUT_FOLDER, '%(title)s.%(ext)s'),
'quiet': True,
'noplaylist': True,
# To avoid common TikTok issues
'merge_output_format': 'mp4',
'nocheckcertificate': True,
}
with YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=True)
raw_title = info.get('title', 'tiktok_audio')
title = sanitize_title(raw_title)
if 'requested_downloads' in info and info['requested_downloads']:
for download in info['requested_downloads']:
filepath = download.get('filepath')
if filepath and filepath.lower().endswith('.mp3') and os.path.exists(filepath):
return filepath, title
mp3_files = [f for f in os.listdir(OUTPUT_FOLDER) if f.lower().endswith('.mp3')]
cleaned_title = title.lower().replace('_', '').replace(' ', '')
for f in mp3_files:
cleaned_f = f.lower().replace('_', '').replace(' ', '')
if cleaned_title in cleaned_f:
return os.path.join(OUTPUT_FOLDER, f), title
raise FileNotFoundError(f"TikTok MP3 file not found after download: {os.path.join(OUTPUT_FOLDER, title + '.mp3')}")
users = load_users()
@app.route('/')
def home():
return render_template('index.html')
@app.route('/service', methods=['POST'])
def run_service():
try:
if not request.is_json:
raise BadRequest("請用 JSON 格式送出請求")
data = request.get_json()
url = data.get('input', '').strip()
if not url:
raise BadRequest("❌ 請輸入影片連結")
# Determine platform and clean URL if needed
if "youtube.com" in url or "youtu.be" in url:
url = clean_youtube_url(url)
platform = "youtube"
elif "tiktok.com" in url:
platform = "tiktok"
else:
raise BadRequest("目前只支援 YouTube 和 TikTok 影片連結!")
fmt = data.get('format', 'mp3')
quality = data.get('quality', '192')
if fmt not in ['mp3', 'm4a']:
raise BadRequest("不支援的格式,僅支援 mp3 與 m4a")
user_id = session.setdefault('user_id', str(uuid.uuid4()))
user = users.get(user_id, {"usage": 0})
links = {}
if platform == "youtube":
mp3_path, title = download_youtube_audio(url, quality)
else: # tiktok
mp3_path, title = download_tiktok_audio(url, quality)
if fmt == 'mp3':
links['mp3'] = f"/download/{os.path.basename(mp3_path)}"
elif fmt == 'm4a':
m4a_path = convert_to_m4a(mp3_path)
links['m4a'] = f"/download/{os.path.basename(m4a_path)}"
user["usage"] += 1
users[user_id] = user
save_users(users)
return jsonify({'status': 'success', 'message': f"{title} 已完成!", 'download_links': links})
except DownloadError as e:
app.logger.error(f"DownloadError: {e}")
return jsonify({'status': 'error', 'message': f"下載錯誤: {str(e)}"}), 400
except BadRequest as e:
app.logger.warning(f"BadRequest: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 400
except Exception as e:
app.logger.error(f"Exception: {e}", exc_info=True)
return jsonify({'status': 'error', 'message': f"伺服器發生錯誤: {str(e)}"}), 500
@app.route('/download/<filename>')
def download_file(filename):
path = os.path.join(OUTPUT_FOLDER, filename)
if os.path.isfile(path):
return send_from_directory(OUTPUT_FOLDER, filename, as_attachment=True)
return jsonify({'status': 'error', 'message': '找不到檔案'}), 404
if __name__ == '__main__':
import logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
port = int(os.getenv('PORT', 5000))
app.logger.info(f"Flask app running on port {port}, saving files to {OUTPUT_FOLDER}")
app.run(host='0.0.0.0', port=port, debug=False)