import os import json import uuid import re import unicodedata import subprocess from urllib.parse import urlparse, parse_qs, urlencode, urlunparse from flask import Flask, request, jsonify, render_template, session, send_from_directory from werkzeug.exceptions import BadRequest from yt_dlp import YoutubeDL, DownloadError from dotenv import load_dotenv load_dotenv() FLASK_SECRET_KEY = os.getenv('FLASK_SECRET_KEY', 'default_secret_key') OUTPUT_FOLDER = os.path.join(os.path.expanduser("~"), "Desktop", "myUniqSong") os.makedirs(OUTPUT_FOLDER, exist_ok=True) USER_DATA_FILE = 'users.json' app = Flask(__name__) app.secret_key = FLASK_SECRET_KEY def load_users(): if os.path.isfile(USER_DATA_FILE): with open(USER_DATA_FILE, 'r', encoding='utf-8') as f: return json.load(f) return {} def save_users(data): with open(USER_DATA_FILE, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) def sanitize_title(title): title = unicodedata.normalize('NFC', title.strip()) title = re.sub(r'[\\/:"*?<>|]', '_', title) title = re.sub(r'\s+', '_', title) title = re.sub(r'_+', '_', title) return title.strip('_') def clean_youtube_url(url): parsed = urlparse(url) qs = parse_qs(parsed.query) v = qs.get('v') if v: new_qs = {'v': v[0]} cleaned = urlunparse(( parsed.scheme if parsed.scheme else 'https', parsed.netloc, parsed.path, parsed.params, urlencode(new_qs), parsed.fragment )) return cleaned return url def download_youtube_audio(url, quality='192'): ydl_opts = { 'format': 'bestaudio/best', 'postprocessors': [{ 'key': 'FFmpegExtractAudio', 'preferredcodec': 'mp3', 'preferredquality': quality, }], 'outtmpl': os.path.join(OUTPUT_FOLDER, '%(title)s.%(ext)s'), 'quiet': True, 'noplaylist': True, } with YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=True) raw_title = info.get('title', 'audio') title = sanitize_title(raw_title) if 'requested_downloads' in info and info['requested_downloads']: for download in info['requested_downloads']: filepath = download.get('filepath') if filepath and filepath.lower().endswith('.mp3') and os.path.exists(filepath): return filepath, title mp3_files = [f for f in os.listdir(OUTPUT_FOLDER) if f.lower().endswith('.mp3')] cleaned_title = title.lower().replace('_', '').replace(' ', '') for f in mp3_files: cleaned_f = f.lower().replace('_', '').replace(' ', '') if cleaned_title in cleaned_f: return os.path.join(OUTPUT_FOLDER, f), title raise FileNotFoundError(f"MP3 file not found after download: {os.path.join(OUTPUT_FOLDER, title + '.mp3')}") def convert_to_m4a(mp3_path): m4a_path = mp3_path.rsplit('.', 1)[0] + '.m4a' subprocess.run(['ffmpeg', '-y', '-i', mp3_path, '-c:a', 'aac', m4a_path], check=True) return m4a_path def download_tiktok_audio(url, quality='192'): # TikTok video audio download, similar options to YouTube audio ydl_opts = { 'format': 'bestaudio/best', 'postprocessors': [{ 'key': 'FFmpegExtractAudio', 'preferredcodec': 'mp3', 'preferredquality': quality, }], 'outtmpl': os.path.join(OUTPUT_FOLDER, '%(title)s.%(ext)s'), 'quiet': True, 'noplaylist': True, # To avoid common TikTok issues 'merge_output_format': 'mp4', 'nocheckcertificate': True, } with YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=True) raw_title = info.get('title', 'tiktok_audio') title = sanitize_title(raw_title) if 'requested_downloads' in info and info['requested_downloads']: for download in info['requested_downloads']: filepath = download.get('filepath') if filepath and filepath.lower().endswith('.mp3') and os.path.exists(filepath): return filepath, title mp3_files = [f for f in os.listdir(OUTPUT_FOLDER) if f.lower().endswith('.mp3')] cleaned_title = title.lower().replace('_', '').replace(' ', '') for f in mp3_files: cleaned_f = f.lower().replace('_', '').replace(' ', '') if cleaned_title in cleaned_f: return os.path.join(OUTPUT_FOLDER, f), title raise FileNotFoundError(f"TikTok MP3 file not found after download: {os.path.join(OUTPUT_FOLDER, title + '.mp3')}") users = load_users() @app.route('/') def home(): return render_template('index.html') @app.route('/service', methods=['POST']) def run_service(): try: if not request.is_json: raise BadRequest("請用 JSON 格式送出請求") data = request.get_json() url = data.get('input', '').strip() if not url: raise BadRequest("❌ 請輸入影片連結") # Determine platform and clean URL if needed if "youtube.com" in url or "youtu.be" in url: url = clean_youtube_url(url) platform = "youtube" elif "tiktok.com" in url: platform = "tiktok" else: raise BadRequest("目前只支援 YouTube 和 TikTok 影片連結!") fmt = data.get('format', 'mp3') quality = data.get('quality', '192') if fmt not in ['mp3', 'm4a']: raise BadRequest("不支援的格式,僅支援 mp3 與 m4a") user_id = session.setdefault('user_id', str(uuid.uuid4())) user = users.get(user_id, {"usage": 0}) links = {} if platform == "youtube": mp3_path, title = download_youtube_audio(url, quality) else: # tiktok mp3_path, title = download_tiktok_audio(url, quality) if fmt == 'mp3': links['mp3'] = f"/download/{os.path.basename(mp3_path)}" elif fmt == 'm4a': m4a_path = convert_to_m4a(mp3_path) links['m4a'] = f"/download/{os.path.basename(m4a_path)}" user["usage"] += 1 users[user_id] = user save_users(users) return jsonify({'status': 'success', 'message': f"{title} 已完成!", 'download_links': links}) except DownloadError as e: app.logger.error(f"DownloadError: {e}") return jsonify({'status': 'error', 'message': f"下載錯誤: {str(e)}"}), 400 except BadRequest as e: app.logger.warning(f"BadRequest: {e}") return jsonify({'status': 'error', 'message': str(e)}), 400 except Exception as e: app.logger.error(f"Exception: {e}", exc_info=True) return jsonify({'status': 'error', 'message': f"伺服器發生錯誤: {str(e)}"}), 500 @app.route('/download/') def download_file(filename): path = os.path.join(OUTPUT_FOLDER, filename) if os.path.isfile(path): return send_from_directory(OUTPUT_FOLDER, filename, as_attachment=True) return jsonify({'status': 'error', 'message': '找不到檔案'}), 404 if __name__ == '__main__': import logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') port = int(os.getenv('PORT', 5000)) app.logger.info(f"Flask app running on port {port}, saving files to {OUTPUT_FOLDER}") app.run(host='0.0.0.0', port=port, debug=False)