Spaces:
Paused
Paused
| import os | |
| import json | |
| import uuid | |
| import re | |
| import unicodedata | |
| import subprocess | |
| from urllib.parse import urlparse, parse_qs, urlencode, urlunparse | |
| from flask import Flask, request, jsonify, render_template, session, send_from_directory | |
| from werkzeug.exceptions import BadRequest | |
| from yt_dlp import YoutubeDL, DownloadError | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| FLASK_SECRET_KEY = os.getenv('FLASK_SECRET_KEY', 'default_secret_key') | |
| OUTPUT_FOLDER = os.path.join(os.path.expanduser("~"), "Desktop", "myUniqSong") | |
| os.makedirs(OUTPUT_FOLDER, exist_ok=True) | |
| USER_DATA_FILE = 'users.json' | |
| app = Flask(__name__) | |
| app.secret_key = FLASK_SECRET_KEY | |
| def load_users(): | |
| if os.path.isfile(USER_DATA_FILE): | |
| with open(USER_DATA_FILE, 'r', encoding='utf-8') as f: | |
| return json.load(f) | |
| return {} | |
| def save_users(data): | |
| with open(USER_DATA_FILE, 'w', encoding='utf-8') as f: | |
| json.dump(data, f, indent=2, ensure_ascii=False) | |
| def sanitize_title(title): | |
| title = unicodedata.normalize('NFC', title.strip()) | |
| title = re.sub(r'[\\/:"*?<>|]', '_', title) | |
| title = re.sub(r'\s+', '_', title) | |
| title = re.sub(r'_+', '_', title) | |
| return title.strip('_') | |
| def clean_youtube_url(url): | |
| parsed = urlparse(url) | |
| qs = parse_qs(parsed.query) | |
| v = qs.get('v') | |
| if v: | |
| new_qs = {'v': v[0]} | |
| cleaned = urlunparse(( | |
| parsed.scheme if parsed.scheme else 'https', | |
| parsed.netloc, | |
| parsed.path, | |
| parsed.params, | |
| urlencode(new_qs), | |
| parsed.fragment | |
| )) | |
| return cleaned | |
| return url | |
| def download_youtube_audio(url, quality='192'): | |
| ydl_opts = { | |
| 'format': 'bestaudio/best', | |
| 'postprocessors': [{ | |
| 'key': 'FFmpegExtractAudio', | |
| 'preferredcodec': 'mp3', | |
| 'preferredquality': quality, | |
| }], | |
| 'outtmpl': os.path.join(OUTPUT_FOLDER, '%(title)s.%(ext)s'), | |
| 'quiet': True, | |
| 'noplaylist': True, | |
| } | |
| with YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(url, download=True) | |
| raw_title = info.get('title', 'audio') | |
| title = sanitize_title(raw_title) | |
| if 'requested_downloads' in info and info['requested_downloads']: | |
| for download in info['requested_downloads']: | |
| filepath = download.get('filepath') | |
| if filepath and filepath.lower().endswith('.mp3') and os.path.exists(filepath): | |
| return filepath, title | |
| mp3_files = [f for f in os.listdir(OUTPUT_FOLDER) if f.lower().endswith('.mp3')] | |
| cleaned_title = title.lower().replace('_', '').replace(' ', '') | |
| for f in mp3_files: | |
| cleaned_f = f.lower().replace('_', '').replace(' ', '') | |
| if cleaned_title in cleaned_f: | |
| return os.path.join(OUTPUT_FOLDER, f), title | |
| raise FileNotFoundError(f"MP3 file not found after download: {os.path.join(OUTPUT_FOLDER, title + '.mp3')}") | |
| def convert_to_m4a(mp3_path): | |
| m4a_path = mp3_path.rsplit('.', 1)[0] + '.m4a' | |
| subprocess.run(['ffmpeg', '-y', '-i', mp3_path, '-c:a', 'aac', m4a_path], check=True) | |
| return m4a_path | |
| def download_tiktok_audio(url, quality='192'): | |
| # TikTok video audio download, similar options to YouTube audio | |
| ydl_opts = { | |
| 'format': 'bestaudio/best', | |
| 'postprocessors': [{ | |
| 'key': 'FFmpegExtractAudio', | |
| 'preferredcodec': 'mp3', | |
| 'preferredquality': quality, | |
| }], | |
| 'outtmpl': os.path.join(OUTPUT_FOLDER, '%(title)s.%(ext)s'), | |
| 'quiet': True, | |
| 'noplaylist': True, | |
| # To avoid common TikTok issues | |
| 'merge_output_format': 'mp4', | |
| 'nocheckcertificate': True, | |
| } | |
| with YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(url, download=True) | |
| raw_title = info.get('title', 'tiktok_audio') | |
| title = sanitize_title(raw_title) | |
| if 'requested_downloads' in info and info['requested_downloads']: | |
| for download in info['requested_downloads']: | |
| filepath = download.get('filepath') | |
| if filepath and filepath.lower().endswith('.mp3') and os.path.exists(filepath): | |
| return filepath, title | |
| mp3_files = [f for f in os.listdir(OUTPUT_FOLDER) if f.lower().endswith('.mp3')] | |
| cleaned_title = title.lower().replace('_', '').replace(' ', '') | |
| for f in mp3_files: | |
| cleaned_f = f.lower().replace('_', '').replace(' ', '') | |
| if cleaned_title in cleaned_f: | |
| return os.path.join(OUTPUT_FOLDER, f), title | |
| raise FileNotFoundError(f"TikTok MP3 file not found after download: {os.path.join(OUTPUT_FOLDER, title + '.mp3')}") | |
| users = load_users() | |
| def home(): | |
| return render_template('index.html') | |
| def run_service(): | |
| try: | |
| if not request.is_json: | |
| raise BadRequest("請用 JSON 格式送出請求") | |
| data = request.get_json() | |
| url = data.get('input', '').strip() | |
| if not url: | |
| raise BadRequest("❌ 請輸入影片連結") | |
| # Determine platform and clean URL if needed | |
| if "youtube.com" in url or "youtu.be" in url: | |
| url = clean_youtube_url(url) | |
| platform = "youtube" | |
| elif "tiktok.com" in url: | |
| platform = "tiktok" | |
| else: | |
| raise BadRequest("目前只支援 YouTube 和 TikTok 影片連結!") | |
| fmt = data.get('format', 'mp3') | |
| quality = data.get('quality', '192') | |
| if fmt not in ['mp3', 'm4a']: | |
| raise BadRequest("不支援的格式,僅支援 mp3 與 m4a") | |
| user_id = session.setdefault('user_id', str(uuid.uuid4())) | |
| user = users.get(user_id, {"usage": 0}) | |
| links = {} | |
| if platform == "youtube": | |
| mp3_path, title = download_youtube_audio(url, quality) | |
| else: # tiktok | |
| mp3_path, title = download_tiktok_audio(url, quality) | |
| if fmt == 'mp3': | |
| links['mp3'] = f"/download/{os.path.basename(mp3_path)}" | |
| elif fmt == 'm4a': | |
| m4a_path = convert_to_m4a(mp3_path) | |
| links['m4a'] = f"/download/{os.path.basename(m4a_path)}" | |
| user["usage"] += 1 | |
| users[user_id] = user | |
| save_users(users) | |
| return jsonify({'status': 'success', 'message': f"{title} 已完成!", 'download_links': links}) | |
| except DownloadError as e: | |
| app.logger.error(f"DownloadError: {e}") | |
| return jsonify({'status': 'error', 'message': f"下載錯誤: {str(e)}"}), 400 | |
| except BadRequest as e: | |
| app.logger.warning(f"BadRequest: {e}") | |
| return jsonify({'status': 'error', 'message': str(e)}), 400 | |
| except Exception as e: | |
| app.logger.error(f"Exception: {e}", exc_info=True) | |
| return jsonify({'status': 'error', 'message': f"伺服器發生錯誤: {str(e)}"}), 500 | |
| def download_file(filename): | |
| path = os.path.join(OUTPUT_FOLDER, filename) | |
| if os.path.isfile(path): | |
| return send_from_directory(OUTPUT_FOLDER, filename, as_attachment=True) | |
| return jsonify({'status': 'error', 'message': '找不到檔案'}), 404 | |
| if __name__ == '__main__': | |
| import logging | |
| logging.basicConfig(level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s') | |
| port = int(os.getenv('PORT', 5000)) | |
| app.logger.info(f"Flask app running on port {port}, saving files to {OUTPUT_FOLDER}") | |
| app.run(host='0.0.0.0', port=port, debug=False) | |