myfy-server / app.py
Shirpi's picture
Update app.py
8cec65f verified
from flask import Flask, jsonify, request, send_file, Response
import yt_dlp
import os
import tempfile
import threading
app = Flask(__name__)
# Cache — same song again search பண்ணா fast!
_cache = {}
_cache_lock = threading.Lock()
def _get_audio_url(query):
with _cache_lock:
if query in _cache:
return _cache[query]
ydl_opts = {
'format': 'bestaudio[ext=m4a]/bestaudio/best',
'quiet': True,
'no_warnings': True,
'socket_timeout': 10,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(
f'ytsearch1:{query}',
download=False
)
if 'entries' in info and info['entries']:
video = info['entries'][0]
else:
video = info
formats = video.get('formats', [])
# m4a format prefer பண்றோம் — faster!
m4a = [f for f in formats
if f.get('ext') == 'm4a'
and f.get('acodec') != 'none'
and f.get('vcodec') == 'none']
if m4a:
best = max(m4a, key=lambda f: f.get('abr', 0) or 0)
else:
audio = [f for f in formats
if f.get('acodec') != 'none'
and f.get('vcodec') == 'none']
if not audio:
return None
best = max(audio, key=lambda f: f.get('abr', 0) or 0)
result = {
'url': best['url'],
'title': video.get('title', ''),
'bitrate': int(best.get('abr', 0) or 128),
'ext': best.get('ext', 'm4a'),
}
with _cache_lock:
_cache[query] = result
# Cache 50 songs மட்டும்
if len(_cache) > 50:
oldest = next(iter(_cache))
del _cache[oldest]
return result
@app.route('/audio', methods=['GET'])
def get_audio():
query = request.args.get('q', '')
if not query:
return jsonify({'error': 'No query'}), 400
try:
result = _get_audio_url(query)
if not result:
return jsonify({'error': 'Not found'}), 404
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/stream', methods=['GET'])
def stream_audio():
query = request.args.get('q', '')
if not query:
return jsonify({'error': 'No query'}), 400
try:
result = _get_audio_url(query)
if not result:
return jsonify({'error': 'Not found'}), 404
import requests as req
# Range header support — resume download!
range_header = request.headers.get('Range', '')
headers = {
'User-Agent': 'Mozilla/5.0 (Linux; Android 13; Pixel 7) AppleWebKit/537.36',
'Accept': '*/*',
'Accept-Encoding': 'identity',
'Referer': 'https://www.youtube.com/',
}
if range_header:
headers['Range'] = range_header
yt_resp = req.get(
result['url'],
headers=headers,
stream=True,
timeout=30,
)
ext = result.get('ext', 'm4a')
content_type = 'audio/mp4' if ext == 'm4a' else 'audio/mpeg'
response_headers = {
'Content-Type': content_type,
'Accept-Ranges': 'bytes',
'X-Song-Title': result.get('title', ''),
'X-Bitrate': str(result.get('bitrate', 128)),
}
if 'Content-Length' in yt_resp.headers:
response_headers['Content-Length'] = \
yt_resp.headers['Content-Length']
if 'Content-Range' in yt_resp.headers:
response_headers['Content-Range'] = \
yt_resp.headers['Content-Range']
def generate():
for chunk in yt_resp.iter_content(chunk_size=65536):
if chunk:
yield chunk
status = yt_resp.status_code
return Response(
generate(),
status=status,
headers=response_headers,
)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/health', methods=['GET'])
def health():
return jsonify({'status': 'ok', 'cache': len(_cache)})
if __name__ == '__main__':
port = int(os.environ.get('PORT', 7860))
app.run(host='0.0.0.0', port=port, threaded=True)