y2mateee / main.py
Akane710's picture
Update main.py
bce8e49 verified
import os
import requests
from fastapi import FastAPI, HTTPException, Query
from fastapi.responses import JSONResponse, FileResponse
from fastapi.staticfiles import StaticFiles
import yt_dlp
from yt_dlp import YoutubeDL
from urllib.parse import urlparse, parse_qs
from pydantic import BaseModel
import assemblyai as aai
import uvicorn
import tempfile
import imageio
import argparse
import uuid
import subprocess
import shutil
app = FastAPI()
STATIC_DIR = os.path.join(os.getcwd(), "static")
if not os.path.exists(STATIC_DIR):
os.makedirs(STATIC_DIR)
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static")
API_KEY = os.getenv("ASSEMBLYAI_API_KEY")
if not API_KEY:
raise ValueError("ASSEMBLYAI_API_KEY not set in OS secrets")
aai.settings.api_key = API_KEY
transcriber = aai.Transcriber()
YOUTUBE_COOKIES = os.getenv("YOUTUBE_COOKIES", "")
def create_cookie_file():
"""
Create a temporary cookie file from environment variable
Expected format: Netscape format cookies as a multi-line string
"""
if not YOUTUBE_COOKIES:
return None
try:
cookie_file = tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False)
cookie_file.write(YOUTUBE_COOKIES)
cookie_file.flush()
cookie_file.close()
return cookie_file.name
except Exception as e:
print(f"Warning: Failed to create cookie file: {e}")
return None
def get_ydl_opts():
"""
Get YoutubeDL options with cookies from environment variables
"""
base_opts = {
'quiet': True,
'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
}
cookie_file = create_cookie_file()
if cookie_file:
base_opts['cookiefile'] = cookie_file
return base_opts
def cleanup_cookie_file(ydl_opts):
"""
Clean up temporary cookie file if it was created
"""
if 'cookiefile' in ydl_opts:
try:
os.unlink(ydl_opts['cookiefile'])
except Exception:
pass
def extract_video_info(url: str):
ydl_opts = {
**get_ydl_opts(),
'extract_flat': False,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
try:
info = ydl.extract_info(url, download=False)
title = info.get('title', 'Unknown Title')
formats = info.get('formats', [])
thumbnails = info.get('thumbnails', [])
best_thumbnail = None
max_width = 0
for thumb in thumbnails:
width = thumb.get('width', 0)
if width > max_width:
max_width = width
best_thumbnail = thumb.get('url')
default_thumbnail = info.get('thumbnail', None)
audio_video_formats = [
{
'format_id': fmt['format_id'],
'resolution': fmt.get('resolution', 'N/A'),
'filesize': fmt.get('filesize') or fmt.get('filesize_approx'),
'url': fmt['url']
}
for fmt in formats if fmt.get('acodec') != 'none' and fmt.get('vcodec') != 'none'
]
# Get the highest audio-only format in m4a
highest_audio = max(
(fmt for fmt in formats if fmt.get('acodec') != 'none' and fmt.get('vcodec') == 'none' and fmt.get('ext') == 'm4a'),
key=lambda x: x.get('abr', 0),
default=None
)
highest_audio_info = {
'format_id': highest_audio['format_id'],
'bitrate': highest_audio.get('abr', 'Unknown'),
'filesize': highest_audio.get('filesize') or highest_audio.get('filesize_approx'),
'url': highest_audio['url']
} if highest_audio else None
result = {
"title": title,
"formats": audio_video_formats,
"highest_audio": highest_audio_info,
"thumbnail": best_thumbnail or default_thumbnail
}
cleanup_cookie_file(ydl_opts)
return result
except Exception as e:
cleanup_cookie_file(ydl_opts)
raise HTTPException(status_code=500, detail=f"Error extracting video info: {str(e)}")
def search_video_by_title(title: str):
ydl_opts = {
**get_ydl_opts(),
'extract_flat': True,
'force_generic_extractor': True,
}
with YoutubeDL(ydl_opts) as ydl:
try:
search_results = ydl.extract_info(f"ytsearch:{title}", download=False)
if not search_results or 'entries' not in search_results:
cleanup_cookie_file(ydl_opts)
raise HTTPException(status_code=404, detail="No videos found for the given title.")
first_entry = search_results['entries'][0]
result = first_entry['url']
cleanup_cookie_file(ydl_opts)
return result
except Exception as e:
cleanup_cookie_file(ydl_opts)
raise HTTPException(status_code=500, detail=f"Error searching video by title: {str(e)}")
def extract_playlist_id(url: str) -> str:
"""
Extracts the playlist ID from a YouTube URL.
"""
parsed_url = urlparse(url)
query_params = parse_qs(parsed_url.query)
playlist_id = query_params.get('list', [None])[0]
if not playlist_id:
raise HTTPException(status_code=400, detail="No playlist ID found in the URL.")
return playlist_id
def extract_playlist_video_urls(playlist_url: str):
"""
Extracts video URLs and metadata from a YouTube playlist.
"""
if "watch?v=" in playlist_url and "list=" in playlist_url:
playlist_id = extract_playlist_id(playlist_url)
playlist_url = f"https://www.youtube.com/playlist?list={playlist_id}"
ydl_opts = {
**get_ydl_opts(),
'extract_flat': True,
'force_generic_extractor': True,
'extractor_args': {
'youtubetab': {'skip': ['authcheck']}
}
}
with YoutubeDL(ydl_opts) as ydl:
try:
info = ydl.extract_info(playlist_url, download=False)
if not info or 'entries' not in info:
raise HTTPException(status_code=404, detail="No videos found in the playlist.")
video_urls = [
{
'title': entry.get('title', 'Unknown Title'),
'url': entry.get('url') or entry.get('webpage_url', 'N/A'),
'thumbnail': entry.get('thumbnail', None)
}
for entry in info['entries'] if entry
]
return {
"playlist_title": info.get('title', 'Unknown Playlist'),
"playlist_thumbnail": info.get('thumbnail', None),
"video_urls": video_urls
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error extracting playlist video URLs: {str(e)}")
def download_and_merge_video(url: str, resolution: str):
"""
Downloads video and audio separately and merges with ffmpeg.
Returns the path to the merged file and thumbnail URL.
If the specified resolution (720p/1080p) isn't available, falls back to best available quality.
"""
try:
file_id = str(uuid.uuid4())
output_dir = os.path.join(STATIC_DIR, file_id)
os.makedirs(output_dir, exist_ok=True)
common_opts = get_ydl_opts()
with YoutubeDL({**common_opts, 'noplaylist': True}) as ydl:
info = ydl.extract_info(url, download=False)
title = info.get('title', 'video')
thumbnails = info.get('thumbnails', [])
best_thumbnail = None
max_width = 0
for thumb in thumbnails:
width = thumb.get('width', 0)
if width > max_width:
max_width = width
best_thumbnail = thumb.get('url')
if not best_thumbnail:
best_thumbnail = info.get('thumbnail', None)
target_height = int(resolution.replace("p", ""))
actual_resolution = resolution # Default to requested resolution
title = "".join(c for c in title if c.isalnum() or c in [' ', '-', '_']).strip()
title = title.replace(' ', '_')
output_path = os.path.join(STATIC_DIR, f"{title}_{resolution}_{file_id}.mp4")
video_path = os.path.join(output_dir, f"{file_id}_video.mp4")
audio_path = os.path.join(output_dir, f"{file_id}_audio.m4a")
if resolution == "720p":
video_format = (
"bestvideo[height<=720][ext=mp4]/bestvideo[height<=720][ext=webm]/"
"bestvideo[height<=720]/best[height<=720][ext=mp4]/best[height<=720][ext=webm]/"
"best[height<=720]/bestvideo[height<=800]/best[height<=800]/"
"bestvideo/best"
)
height_filter = 720
else:
video_format = (
"bestvideo[height<=1080][ext=mp4]/bestvideo[height<=1080][ext=webm]/"
"bestvideo[height<=1080]/best[height<=1080][ext=mp4]/best[height<=1080][ext=webm]/"
"best[height<=1080]/bestvideo[height<=1200]/best[height<=1200]/"
"bestvideo/best"
)
height_filter = 1080
video_opts = {
**common_opts,
'format': video_format,
'outtmpl': video_path,
'format_sort': ['res', 'ext:mp4:m4a', 'ext:webm'],
}
video_downloaded = False
fallback_formats = [
video_format,
f"best[height<={height_filter}]",
"bestvideo/best",
"best"
]
for fmt in fallback_formats:
try:
video_opts['format'] = fmt
with YoutubeDL(video_opts) as ydl:
ydl.download([url])
video_downloaded = True
break
except Exception as e:
print(f"Video download failed with format '{fmt}': {e}")
continue
if not video_downloaded:
raise Exception("Failed to download video with any format")
audio_downloaded = False
temp_audio_path = os.path.join(output_dir, f"{file_id}_temp_audio")
audio_strategies = [
{
'format': 'bestaudio/best',
'outtmpl': temp_audio_path,
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'm4a',
'preferredquality': '192',
}],
},
{
'format': 'bestaudio[ext=m4a]/bestaudio[ext=aac]/bestaudio[ext=mp3]/bestaudio',
'outtmpl': audio_path,
},
{
'format': 'bestaudio',
'outtmpl': audio_path,
},
{
'format': 'best',
'outtmpl': temp_audio_path,
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'm4a',
'preferredquality': '192',
}],
}
]
for i, strategy in enumerate(audio_strategies):
try:
audio_opts = {**common_opts, **strategy}
with YoutubeDL(audio_opts) as ydl:
ydl.download([url])
if 'postprocessors' in strategy:
audio_files = [f for f in os.listdir(output_dir) if 'temp_audio' in f]
if audio_files:
actual_audio_path = os.path.join(output_dir, audio_files[0])
shutil.move(actual_audio_path, audio_path)
audio_downloaded = True
break
else:
if os.path.exists(audio_path):
audio_downloaded = True
break
except Exception as e:
print(f"Audio download strategy {i+1} failed: {e}")
continue
if not audio_downloaded:
raise Exception("Failed to download audio with any strategy")
probe_cmd = [
'ffprobe',
'-v', 'error',
'-select_streams', 'v:0',
'-show_entries', 'stream=width,height',
'-of', 'csv=p=0',
video_path
]
try:
probe_result = subprocess.run(probe_cmd, capture_output=True, text=True, check=True)
width, height = map(int, probe_result.stdout.strip().split(','))
actual_resolution = f"{height}p"
except Exception:
pass
cmd = [
'ffmpeg',
'-i', video_path,
'-i', audio_path,
'-c:v', 'copy',
'-c:a', 'aac',
'-map', '0:v:0',
'-map', '1:a:0',
output_path
]
subprocess.run(cmd, check=True)
if actual_resolution != resolution:
new_output_path = os.path.join(STATIC_DIR, f"{title}_{actual_resolution}_{file_id}.mp4")
os.rename(output_path, new_output_path)
output_path = new_output_path
shutil.rmtree(output_dir)
relative_path = os.path.basename(output_path)
return relative_path, best_thumbnail
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error processing video: {str(e)}")
import os
import uuid
import shutil
import requests
from mutagen.mp3 import MP3
from mutagen.mp4 import MP4
from mutagen.id3 import ID3, APIC, TIT2, TPE1, TALB
from yt_dlp import YoutubeDL
def download_audio(url: str, audio_format: str):
"""
Downloads audio from a video URL in the specified format and adds metadata.
Returns the path to the downloaded audio file.
Parameters:
- url: URL of the video
- audio_format: Format of the audio (mp3 or m4a)
"""
try:
file_id = str(uuid.uuid4())
output_dir = os.path.join(STATIC_DIR, file_id)
os.makedirs(output_dir, exist_ok=True)
common_opts = get_ydl_opts()
with YoutubeDL({**common_opts, 'noplaylist': True}) as ydl:
info = ydl.extract_info(url, download=False)
title = info.get('title', 'audio')
uploader = info.get('uploader', 'Unknown Artist')
channel = info.get('channel', uploader)
description = info.get('description', '')
thumbnails = info.get('thumbnails', [])
best_thumbnail = None
max_width = 0
for thumb in thumbnails:
width = thumb.get('width', 0)
if width > max_width:
max_width = width
best_thumbnail = thumb.get('url')
if not best_thumbnail:
best_thumbnail = info.get('thumbnail', None)
clean_title = "".join(c for c in title if c.isalnum() or c in [' ', '-', '_']).strip()
clean_title = clean_title.replace(' ', '_')
temp_audio_path = os.path.join(output_dir, f"{file_id}_temp.{audio_format}")
output_path = os.path.join(STATIC_DIR, f"{clean_title}_{audio_format}_{file_id}.{audio_format}")
thumbnail_path = os.path.join(output_dir, f"{file_id}_thumb.jpg")
thumbnail_data = None
if best_thumbnail:
try:
response = requests.get(best_thumbnail, timeout=10)
if response.status_code == 200:
with open(thumbnail_path, 'wb') as f:
f.write(response.content)
thumbnail_data = response.content
except Exception as e:
print(f"Failed to download thumbnail: {e}")
if audio_format == "mp3":
format_option = "bestaudio/best"
postprocessors = [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
}]
else: # m4a
format_option = "bestaudio[ext=m4a]/bestaudio"
postprocessors = [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'm4a',
'preferredquality': '192',
}]
audio_opts = {
**common_opts,
'format': format_option,
'outtmpl': temp_audio_path,
'postprocessors': postprocessors,
}
with YoutubeDL(audio_opts) as ydl:
ydl.download([url])
downloaded_files = os.listdir(output_dir)
audio_files = [f for f in downloaded_files if f.endswith(('.mp3', '.m4a'))]
if not audio_files:
raise Exception("No audio file was downloaded")
downloaded_file = os.path.join(output_dir, audio_files[0])
shutil.move(downloaded_file, output_path)
add_metadata(output_path, audio_format, title, uploader, channel, thumbnail_data)
shutil.rmtree(output_dir)
relative_path = os.path.basename(output_path)
return relative_path, best_thumbnail
except Exception as e:
if os.path.exists(output_dir):
shutil.rmtree(output_dir)
raise HTTPException(status_code=500, detail=f"Error processing audio: {str(e)}")
def add_metadata(file_path: str, audio_format: str, title: str, artist: str, album: str, thumbnail_data: bytes = None):
"""
Adds metadata to the audio file including thumbnail as cover art.
Parameters:
- file_path: Path to the audio file
- audio_format: Format of the audio (mp3 or m4a)
- title: Track title
- artist: Artist name
- album: Album name (using channel name)
- thumbnail_data: Binary data of the thumbnail image
"""
try:
if audio_format == "mp3":
audio = MP3(file_path, ID3=ID3)
if audio.tags is None:
audio.add_tags()
audio.tags.add(TIT2(encoding=3, text=title))
audio.tags.add(TPE1(encoding=3, text=artist))
audio.tags.add(TALB(encoding=3, text=album))
if thumbnail_data:
audio.tags.add(APIC(
encoding=3,
mime='image/jpeg',
type=3,
desc='Cover',
data=thumbnail_data
))
audio.save()
elif audio_format == "m4a":
audio = MP4(file_path)
audio['\xa9nam'] = [title]
audio['\xa9ART'] = [artist]
audio['\xa9alb'] = [album]
if thumbnail_data:
audio['covr'] = [thumbnail_data]
audio.save()
except Exception as e:
print(f"Failed to add metadata: {e}")
@app.get("/download-audio")
async def download_audio_endpoint(
query: str = Query(..., description="URL or title of the video"),
format: str = Query("mp3", description="Audio format (mp3 or m4a)")
):
try:
if format not in ["mp3", "m4a"]:
raise HTTPException(status_code=400, detail="Format must be either mp3 or m4a")
parsed_url = urlparse(query)
if parsed_url.scheme and parsed_url.netloc:
video_url = query
else:
video_url = search_video_by_title(query)
output_file, thumbnail_url = download_audio(video_url, format)
file_url = f"/static/{output_file}"
server_url = os.getenv("SERVER_URL", "https://akane710-y2mateee.hf.space")
download_url = f"{server_url}{file_url}"
return JSONResponse(content={
"download_url": download_url,
"filename": output_file,
"thumbnail": thumbnail_url
})
except HTTPException as e:
raise e
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/video-info")
async def get_video_info(query: str = Query(..., description="URL or title of the video")):
try:
parsed_url = urlparse(query)
if parsed_url.scheme and parsed_url.netloc:
video_url = query
else:
video_url = search_video_by_title(query)
video_data = extract_video_info(video_url)
return JSONResponse(content=video_data)
except HTTPException as e:
raise e
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/download-video")
async def download_video(
query: str = Query(..., description="URL or title of the video"),
resolution: str = Query("720p", description="Video resolution (720p or 1080p)")
):
try:
if resolution not in ["720p", "1080p"]:
raise HTTPException(status_code=400, detail="Resolution must be either 720p or 1080p")
parsed_url = urlparse(query)
if parsed_url.scheme and parsed_url.netloc:
video_url = query
else:
video_url = search_video_by_title(query)
output_file, thumbnail_url = download_and_merge_video(video_url, resolution)
actual_resolution = resolution
if "_best_" in output_file:
actual_resolution = "best quality available"
elif "_" in output_file:
parts = output_file.split("_")
for part in parts:
if part.endswith("p") and part[:-1].isdigit():
actual_resolution = part
break
file_url = f"/static/{output_file}"
server_url = os.getenv("SERVER_URL", "https://akane710-y2mateee.hf.space")
download_url = f"{server_url}{file_url}"
return JSONResponse(content={
"download_url": download_url,
"filename": output_file,
"thumbnail": thumbnail_url,
"actual_resolution": actual_resolution
})
except HTTPException as e:
raise e
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/direct-download/{filename}")
async def get_file(filename: str):
file_path = os.path.join(STATIC_DIR, filename)
if not os.path.exists(file_path):
raise HTTPException(status_code=404, detail="File not found")
return FileResponse(
path=file_path,
filename=filename,
media_type="video/mp4"
)
class TranscriptionRequest(BaseModel):
audio_url: str
@app.get("/transcribe")
async def transcribe_audio(audio_url: str = Query(..., description="Publicly accessible URL for audio file")):
try:
config = aai.TranscriptionConfig(
speaker_labels=True,
speech_model=aai.SpeechModel.nano,
language_detection=True
)
transcript = transcriber.transcribe(audio_url, config)
if transcript.status == aai.TranscriptStatus.error:
raise HTTPException(status_code=500, detail=f"Transcription failed: {transcript.error}")
return {"transcription": transcript.text}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/playlist-download-urls")
async def get_playlist_video_urls(playlist_url: str = Query(..., description="URL of the YouTube playlist")):
try:
result = extract_playlist_video_urls(playlist_url)
return JSONResponse(content=result)
except HTTPException as e:
raise e
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
async def process_and_upload_webp(url: str) -> str:
try:
parsed_url = urlparse(url)
if not parsed_url.scheme or not parsed_url.netloc:
raise ValueError("Invalid URL")
response = requests.get(url)
if response.status_code != 200:
raise HTTPException(status_code=400, detail="Failed to download the file")
with tempfile.NamedTemporaryFile(suffix=".webp", delete=False) as webp_file:
webp_file.write(response.content)
webp_file_path = webp_file.name
output_path = webp_file_path.replace(".webp", ".mp4")
reader = imageio.get_reader(webp_file_path)
writer = imageio.get_writer(output_path, fps=30)
for frame in reader:
writer.append_data(frame)
writer.close()
with open(output_path, 'rb') as mp4_file:
upload_response = requests.post(
'https://uguu.se/upload',
files={"files[]": mp4_file}
)
if upload_response.status_code != 200:
raise HTTPException(status_code=500, detail="Failed to upload the file")
file_url = upload_response.json()["files"][0]["url"]
os.remove(webp_file_path)
os.remove(output_path)
return file_url
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/convert")
async def convert_webp_to_mp4(url: str):
file_url = await process_and_upload_webp(url)
return JSONResponse(content={"url": file_url})
def cli():
parser = argparse.ArgumentParser(description="Convert and upload WEBP to MP4.")
parser.add_argument("--url", type=str, required=True, help="URL of the .webp file.")
args = parser.parse_args()
import asyncio
result = asyncio.run(process_and_upload_webp(args.url))
print(f"Uploaded file link: {result}")
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)