|
|
import os |
|
|
import requests |
|
|
from fastapi import FastAPI, HTTPException, Query |
|
|
from fastapi.responses import JSONResponse, FileResponse |
|
|
from fastapi.staticfiles import StaticFiles |
|
|
import yt_dlp |
|
|
from yt_dlp import YoutubeDL |
|
|
from urllib.parse import urlparse, parse_qs |
|
|
from pydantic import BaseModel |
|
|
import assemblyai as aai |
|
|
import uvicorn |
|
|
import tempfile |
|
|
import imageio |
|
|
import argparse |
|
|
import uuid |
|
|
import subprocess |
|
|
import shutil |
|
|
|
|
|
|
|
|
app = FastAPI() |
|
|
|
|
|
|
|
|
STATIC_DIR = os.path.join(os.getcwd(), "static") |
|
|
if not os.path.exists(STATIC_DIR): |
|
|
os.makedirs(STATIC_DIR) |
|
|
|
|
|
|
|
|
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") |
|
|
|
|
|
|
|
|
API_KEY = os.getenv("ASSEMBLYAI_API_KEY") |
|
|
if not API_KEY: |
|
|
raise ValueError("ASSEMBLYAI_API_KEY not set in OS secrets") |
|
|
|
|
|
aai.settings.api_key = API_KEY |
|
|
transcriber = aai.Transcriber() |
|
|
|
|
|
|
|
|
YOUTUBE_COOKIES = os.getenv("YOUTUBE_COOKIES", "") |
|
|
|
|
|
def create_cookie_file(): |
|
|
""" |
|
|
Create a temporary cookie file from environment variable |
|
|
Expected format: Netscape format cookies as a multi-line string |
|
|
""" |
|
|
if not YOUTUBE_COOKIES: |
|
|
return None |
|
|
|
|
|
try: |
|
|
|
|
|
cookie_file = tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) |
|
|
|
|
|
|
|
|
cookie_file.write(YOUTUBE_COOKIES) |
|
|
cookie_file.flush() |
|
|
cookie_file.close() |
|
|
|
|
|
return cookie_file.name |
|
|
except Exception as e: |
|
|
print(f"Warning: Failed to create cookie file: {e}") |
|
|
return None |
|
|
|
|
|
def get_ydl_opts(): |
|
|
""" |
|
|
Get YoutubeDL options with cookies from environment variables |
|
|
""" |
|
|
base_opts = { |
|
|
'quiet': True, |
|
|
'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', |
|
|
} |
|
|
|
|
|
|
|
|
cookie_file = create_cookie_file() |
|
|
if cookie_file: |
|
|
base_opts['cookiefile'] = cookie_file |
|
|
|
|
|
return base_opts |
|
|
|
|
|
def cleanup_cookie_file(ydl_opts): |
|
|
""" |
|
|
Clean up temporary cookie file if it was created |
|
|
""" |
|
|
if 'cookiefile' in ydl_opts: |
|
|
try: |
|
|
os.unlink(ydl_opts['cookiefile']) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
def extract_video_info(url: str): |
|
|
ydl_opts = { |
|
|
**get_ydl_opts(), |
|
|
'extract_flat': False, |
|
|
} |
|
|
|
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
|
try: |
|
|
info = ydl.extract_info(url, download=False) |
|
|
title = info.get('title', 'Unknown Title') |
|
|
formats = info.get('formats', []) |
|
|
|
|
|
|
|
|
thumbnails = info.get('thumbnails', []) |
|
|
best_thumbnail = None |
|
|
max_width = 0 |
|
|
|
|
|
for thumb in thumbnails: |
|
|
width = thumb.get('width', 0) |
|
|
if width > max_width: |
|
|
max_width = width |
|
|
best_thumbnail = thumb.get('url') |
|
|
|
|
|
|
|
|
default_thumbnail = info.get('thumbnail', None) |
|
|
|
|
|
|
|
|
audio_video_formats = [ |
|
|
{ |
|
|
'format_id': fmt['format_id'], |
|
|
'resolution': fmt.get('resolution', 'N/A'), |
|
|
'filesize': fmt.get('filesize') or fmt.get('filesize_approx'), |
|
|
'url': fmt['url'] |
|
|
} |
|
|
for fmt in formats if fmt.get('acodec') != 'none' and fmt.get('vcodec') != 'none' |
|
|
] |
|
|
|
|
|
|
|
|
highest_audio = max( |
|
|
(fmt for fmt in formats if fmt.get('acodec') != 'none' and fmt.get('vcodec') == 'none' and fmt.get('ext') == 'm4a'), |
|
|
key=lambda x: x.get('abr', 0), |
|
|
default=None |
|
|
) |
|
|
|
|
|
highest_audio_info = { |
|
|
'format_id': highest_audio['format_id'], |
|
|
'bitrate': highest_audio.get('abr', 'Unknown'), |
|
|
'filesize': highest_audio.get('filesize') or highest_audio.get('filesize_approx'), |
|
|
'url': highest_audio['url'] |
|
|
} if highest_audio else None |
|
|
|
|
|
result = { |
|
|
"title": title, |
|
|
"formats": audio_video_formats, |
|
|
"highest_audio": highest_audio_info, |
|
|
"thumbnail": best_thumbnail or default_thumbnail |
|
|
} |
|
|
|
|
|
|
|
|
cleanup_cookie_file(ydl_opts) |
|
|
|
|
|
return result |
|
|
except Exception as e: |
|
|
cleanup_cookie_file(ydl_opts) |
|
|
raise HTTPException(status_code=500, detail=f"Error extracting video info: {str(e)}") |
|
|
|
|
|
def search_video_by_title(title: str): |
|
|
ydl_opts = { |
|
|
**get_ydl_opts(), |
|
|
'extract_flat': True, |
|
|
'force_generic_extractor': True, |
|
|
} |
|
|
|
|
|
with YoutubeDL(ydl_opts) as ydl: |
|
|
try: |
|
|
search_results = ydl.extract_info(f"ytsearch:{title}", download=False) |
|
|
if not search_results or 'entries' not in search_results: |
|
|
cleanup_cookie_file(ydl_opts) |
|
|
raise HTTPException(status_code=404, detail="No videos found for the given title.") |
|
|
|
|
|
|
|
|
first_entry = search_results['entries'][0] |
|
|
result = first_entry['url'] |
|
|
|
|
|
|
|
|
cleanup_cookie_file(ydl_opts) |
|
|
|
|
|
return result |
|
|
except Exception as e: |
|
|
cleanup_cookie_file(ydl_opts) |
|
|
raise HTTPException(status_code=500, detail=f"Error searching video by title: {str(e)}") |
|
|
|
|
|
def extract_playlist_id(url: str) -> str: |
|
|
""" |
|
|
Extracts the playlist ID from a YouTube URL. |
|
|
""" |
|
|
parsed_url = urlparse(url) |
|
|
query_params = parse_qs(parsed_url.query) |
|
|
playlist_id = query_params.get('list', [None])[0] |
|
|
if not playlist_id: |
|
|
raise HTTPException(status_code=400, detail="No playlist ID found in the URL.") |
|
|
return playlist_id |
|
|
|
|
|
def extract_playlist_video_urls(playlist_url: str): |
|
|
""" |
|
|
Extracts video URLs and metadata from a YouTube playlist. |
|
|
""" |
|
|
|
|
|
if "watch?v=" in playlist_url and "list=" in playlist_url: |
|
|
playlist_id = extract_playlist_id(playlist_url) |
|
|
playlist_url = f"https://www.youtube.com/playlist?list={playlist_id}" |
|
|
|
|
|
ydl_opts = { |
|
|
**get_ydl_opts(), |
|
|
'extract_flat': True, |
|
|
'force_generic_extractor': True, |
|
|
'extractor_args': { |
|
|
'youtubetab': {'skip': ['authcheck']} |
|
|
} |
|
|
} |
|
|
|
|
|
with YoutubeDL(ydl_opts) as ydl: |
|
|
try: |
|
|
info = ydl.extract_info(playlist_url, download=False) |
|
|
if not info or 'entries' not in info: |
|
|
raise HTTPException(status_code=404, detail="No videos found in the playlist.") |
|
|
|
|
|
video_urls = [ |
|
|
{ |
|
|
'title': entry.get('title', 'Unknown Title'), |
|
|
'url': entry.get('url') or entry.get('webpage_url', 'N/A'), |
|
|
'thumbnail': entry.get('thumbnail', None) |
|
|
} |
|
|
for entry in info['entries'] if entry |
|
|
] |
|
|
|
|
|
return { |
|
|
"playlist_title": info.get('title', 'Unknown Playlist'), |
|
|
"playlist_thumbnail": info.get('thumbnail', None), |
|
|
"video_urls": video_urls |
|
|
} |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=f"Error extracting playlist video URLs: {str(e)}") |
|
|
|
|
|
def download_and_merge_video(url: str, resolution: str): |
|
|
""" |
|
|
Downloads video and audio separately and merges with ffmpeg. |
|
|
Returns the path to the merged file and thumbnail URL. |
|
|
|
|
|
If the specified resolution (720p/1080p) isn't available, falls back to best available quality. |
|
|
""" |
|
|
try: |
|
|
|
|
|
file_id = str(uuid.uuid4()) |
|
|
output_dir = os.path.join(STATIC_DIR, file_id) |
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
|
|
|
|
|
common_opts = get_ydl_opts() |
|
|
|
|
|
|
|
|
with YoutubeDL({**common_opts, 'noplaylist': True}) as ydl: |
|
|
info = ydl.extract_info(url, download=False) |
|
|
title = info.get('title', 'video') |
|
|
|
|
|
|
|
|
thumbnails = info.get('thumbnails', []) |
|
|
best_thumbnail = None |
|
|
max_width = 0 |
|
|
|
|
|
for thumb in thumbnails: |
|
|
width = thumb.get('width', 0) |
|
|
if width > max_width: |
|
|
max_width = width |
|
|
best_thumbnail = thumb.get('url') |
|
|
|
|
|
if not best_thumbnail: |
|
|
best_thumbnail = info.get('thumbnail', None) |
|
|
|
|
|
target_height = int(resolution.replace("p", "")) |
|
|
actual_resolution = resolution |
|
|
|
|
|
title = "".join(c for c in title if c.isalnum() or c in [' ', '-', '_']).strip() |
|
|
title = title.replace(' ', '_') |
|
|
|
|
|
output_path = os.path.join(STATIC_DIR, f"{title}_{resolution}_{file_id}.mp4") |
|
|
|
|
|
video_path = os.path.join(output_dir, f"{file_id}_video.mp4") |
|
|
audio_path = os.path.join(output_dir, f"{file_id}_audio.m4a") |
|
|
|
|
|
|
|
|
if resolution == "720p": |
|
|
|
|
|
video_format = ( |
|
|
"bestvideo[height<=720][ext=mp4]/bestvideo[height<=720][ext=webm]/" |
|
|
"bestvideo[height<=720]/best[height<=720][ext=mp4]/best[height<=720][ext=webm]/" |
|
|
"best[height<=720]/bestvideo[height<=800]/best[height<=800]/" |
|
|
"bestvideo/best" |
|
|
) |
|
|
height_filter = 720 |
|
|
else: |
|
|
|
|
|
video_format = ( |
|
|
"bestvideo[height<=1080][ext=mp4]/bestvideo[height<=1080][ext=webm]/" |
|
|
"bestvideo[height<=1080]/best[height<=1080][ext=mp4]/best[height<=1080][ext=webm]/" |
|
|
"best[height<=1080]/bestvideo[height<=1200]/best[height<=1200]/" |
|
|
"bestvideo/best" |
|
|
) |
|
|
height_filter = 1080 |
|
|
|
|
|
|
|
|
video_opts = { |
|
|
**common_opts, |
|
|
'format': video_format, |
|
|
'outtmpl': video_path, |
|
|
'format_sort': ['res', 'ext:mp4:m4a', 'ext:webm'], |
|
|
} |
|
|
|
|
|
|
|
|
video_downloaded = False |
|
|
fallback_formats = [ |
|
|
video_format, |
|
|
f"best[height<={height_filter}]", |
|
|
"bestvideo/best", |
|
|
"best" |
|
|
] |
|
|
|
|
|
for fmt in fallback_formats: |
|
|
try: |
|
|
video_opts['format'] = fmt |
|
|
with YoutubeDL(video_opts) as ydl: |
|
|
ydl.download([url]) |
|
|
video_downloaded = True |
|
|
break |
|
|
except Exception as e: |
|
|
print(f"Video download failed with format '{fmt}': {e}") |
|
|
continue |
|
|
|
|
|
if not video_downloaded: |
|
|
raise Exception("Failed to download video with any format") |
|
|
|
|
|
|
|
|
audio_downloaded = False |
|
|
temp_audio_path = os.path.join(output_dir, f"{file_id}_temp_audio") |
|
|
|
|
|
|
|
|
audio_strategies = [ |
|
|
|
|
|
{ |
|
|
'format': 'bestaudio/best', |
|
|
'outtmpl': temp_audio_path, |
|
|
'postprocessors': [{ |
|
|
'key': 'FFmpegExtractAudio', |
|
|
'preferredcodec': 'm4a', |
|
|
'preferredquality': '192', |
|
|
}], |
|
|
}, |
|
|
|
|
|
{ |
|
|
'format': 'bestaudio[ext=m4a]/bestaudio[ext=aac]/bestaudio[ext=mp3]/bestaudio', |
|
|
'outtmpl': audio_path, |
|
|
}, |
|
|
|
|
|
{ |
|
|
'format': 'bestaudio', |
|
|
'outtmpl': audio_path, |
|
|
}, |
|
|
|
|
|
{ |
|
|
'format': 'best', |
|
|
'outtmpl': temp_audio_path, |
|
|
'postprocessors': [{ |
|
|
'key': 'FFmpegExtractAudio', |
|
|
'preferredcodec': 'm4a', |
|
|
'preferredquality': '192', |
|
|
}], |
|
|
} |
|
|
] |
|
|
|
|
|
for i, strategy in enumerate(audio_strategies): |
|
|
try: |
|
|
audio_opts = {**common_opts, **strategy} |
|
|
with YoutubeDL(audio_opts) as ydl: |
|
|
ydl.download([url]) |
|
|
|
|
|
|
|
|
if 'postprocessors' in strategy: |
|
|
audio_files = [f for f in os.listdir(output_dir) if 'temp_audio' in f] |
|
|
if audio_files: |
|
|
actual_audio_path = os.path.join(output_dir, audio_files[0]) |
|
|
shutil.move(actual_audio_path, audio_path) |
|
|
audio_downloaded = True |
|
|
break |
|
|
else: |
|
|
|
|
|
if os.path.exists(audio_path): |
|
|
audio_downloaded = True |
|
|
break |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Audio download strategy {i+1} failed: {e}") |
|
|
continue |
|
|
|
|
|
if not audio_downloaded: |
|
|
raise Exception("Failed to download audio with any strategy") |
|
|
|
|
|
|
|
|
probe_cmd = [ |
|
|
'ffprobe', |
|
|
'-v', 'error', |
|
|
'-select_streams', 'v:0', |
|
|
'-show_entries', 'stream=width,height', |
|
|
'-of', 'csv=p=0', |
|
|
video_path |
|
|
] |
|
|
|
|
|
try: |
|
|
probe_result = subprocess.run(probe_cmd, capture_output=True, text=True, check=True) |
|
|
width, height = map(int, probe_result.stdout.strip().split(',')) |
|
|
actual_resolution = f"{height}p" |
|
|
except Exception: |
|
|
|
|
|
pass |
|
|
|
|
|
|
|
|
cmd = [ |
|
|
'ffmpeg', |
|
|
'-i', video_path, |
|
|
'-i', audio_path, |
|
|
'-c:v', 'copy', |
|
|
'-c:a', 'aac', |
|
|
'-map', '0:v:0', |
|
|
'-map', '1:a:0', |
|
|
output_path |
|
|
] |
|
|
|
|
|
|
|
|
subprocess.run(cmd, check=True) |
|
|
|
|
|
|
|
|
if actual_resolution != resolution: |
|
|
new_output_path = os.path.join(STATIC_DIR, f"{title}_{actual_resolution}_{file_id}.mp4") |
|
|
os.rename(output_path, new_output_path) |
|
|
output_path = new_output_path |
|
|
|
|
|
|
|
|
shutil.rmtree(output_dir) |
|
|
|
|
|
|
|
|
relative_path = os.path.basename(output_path) |
|
|
return relative_path, best_thumbnail |
|
|
|
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=f"Error processing video: {str(e)}") |
|
|
|
|
|
import os |
|
|
import uuid |
|
|
import shutil |
|
|
import requests |
|
|
from mutagen.mp3 import MP3 |
|
|
from mutagen.mp4 import MP4 |
|
|
from mutagen.id3 import ID3, APIC, TIT2, TPE1, TALB |
|
|
from yt_dlp import YoutubeDL |
|
|
|
|
|
def download_audio(url: str, audio_format: str): |
|
|
""" |
|
|
Downloads audio from a video URL in the specified format and adds metadata. |
|
|
Returns the path to the downloaded audio file. |
|
|
|
|
|
Parameters: |
|
|
- url: URL of the video |
|
|
- audio_format: Format of the audio (mp3 or m4a) |
|
|
""" |
|
|
try: |
|
|
file_id = str(uuid.uuid4()) |
|
|
output_dir = os.path.join(STATIC_DIR, file_id) |
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
|
|
common_opts = get_ydl_opts() |
|
|
|
|
|
|
|
|
with YoutubeDL({**common_opts, 'noplaylist': True}) as ydl: |
|
|
info = ydl.extract_info(url, download=False) |
|
|
title = info.get('title', 'audio') |
|
|
uploader = info.get('uploader', 'Unknown Artist') |
|
|
channel = info.get('channel', uploader) |
|
|
description = info.get('description', '') |
|
|
|
|
|
|
|
|
thumbnails = info.get('thumbnails', []) |
|
|
best_thumbnail = None |
|
|
max_width = 0 |
|
|
|
|
|
for thumb in thumbnails: |
|
|
width = thumb.get('width', 0) |
|
|
if width > max_width: |
|
|
max_width = width |
|
|
best_thumbnail = thumb.get('url') |
|
|
|
|
|
if not best_thumbnail: |
|
|
best_thumbnail = info.get('thumbnail', None) |
|
|
|
|
|
|
|
|
clean_title = "".join(c for c in title if c.isalnum() or c in [' ', '-', '_']).strip() |
|
|
clean_title = clean_title.replace(' ', '_') |
|
|
|
|
|
|
|
|
temp_audio_path = os.path.join(output_dir, f"{file_id}_temp.{audio_format}") |
|
|
output_path = os.path.join(STATIC_DIR, f"{clean_title}_{audio_format}_{file_id}.{audio_format}") |
|
|
thumbnail_path = os.path.join(output_dir, f"{file_id}_thumb.jpg") |
|
|
|
|
|
|
|
|
thumbnail_data = None |
|
|
if best_thumbnail: |
|
|
try: |
|
|
response = requests.get(best_thumbnail, timeout=10) |
|
|
if response.status_code == 200: |
|
|
with open(thumbnail_path, 'wb') as f: |
|
|
f.write(response.content) |
|
|
thumbnail_data = response.content |
|
|
except Exception as e: |
|
|
print(f"Failed to download thumbnail: {e}") |
|
|
|
|
|
|
|
|
if audio_format == "mp3": |
|
|
format_option = "bestaudio/best" |
|
|
postprocessors = [{ |
|
|
'key': 'FFmpegExtractAudio', |
|
|
'preferredcodec': 'mp3', |
|
|
'preferredquality': '192', |
|
|
}] |
|
|
else: |
|
|
format_option = "bestaudio[ext=m4a]/bestaudio" |
|
|
postprocessors = [{ |
|
|
'key': 'FFmpegExtractAudio', |
|
|
'preferredcodec': 'm4a', |
|
|
'preferredquality': '192', |
|
|
}] |
|
|
|
|
|
audio_opts = { |
|
|
**common_opts, |
|
|
'format': format_option, |
|
|
'outtmpl': temp_audio_path, |
|
|
'postprocessors': postprocessors, |
|
|
} |
|
|
|
|
|
with YoutubeDL(audio_opts) as ydl: |
|
|
ydl.download([url]) |
|
|
|
|
|
|
|
|
downloaded_files = os.listdir(output_dir) |
|
|
audio_files = [f for f in downloaded_files if f.endswith(('.mp3', '.m4a'))] |
|
|
|
|
|
if not audio_files: |
|
|
raise Exception("No audio file was downloaded") |
|
|
|
|
|
downloaded_file = os.path.join(output_dir, audio_files[0]) |
|
|
shutil.move(downloaded_file, output_path) |
|
|
|
|
|
|
|
|
add_metadata(output_path, audio_format, title, uploader, channel, thumbnail_data) |
|
|
|
|
|
|
|
|
shutil.rmtree(output_dir) |
|
|
|
|
|
relative_path = os.path.basename(output_path) |
|
|
return relative_path, best_thumbnail |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
if os.path.exists(output_dir): |
|
|
shutil.rmtree(output_dir) |
|
|
raise HTTPException(status_code=500, detail=f"Error processing audio: {str(e)}") |
|
|
|
|
|
|
|
|
def add_metadata(file_path: str, audio_format: str, title: str, artist: str, album: str, thumbnail_data: bytes = None): |
|
|
""" |
|
|
Adds metadata to the audio file including thumbnail as cover art. |
|
|
|
|
|
Parameters: |
|
|
- file_path: Path to the audio file |
|
|
- audio_format: Format of the audio (mp3 or m4a) |
|
|
- title: Track title |
|
|
- artist: Artist name |
|
|
- album: Album name (using channel name) |
|
|
- thumbnail_data: Binary data of the thumbnail image |
|
|
""" |
|
|
try: |
|
|
if audio_format == "mp3": |
|
|
|
|
|
audio = MP3(file_path, ID3=ID3) |
|
|
|
|
|
|
|
|
if audio.tags is None: |
|
|
audio.add_tags() |
|
|
|
|
|
|
|
|
audio.tags.add(TIT2(encoding=3, text=title)) |
|
|
audio.tags.add(TPE1(encoding=3, text=artist)) |
|
|
audio.tags.add(TALB(encoding=3, text=album)) |
|
|
|
|
|
|
|
|
if thumbnail_data: |
|
|
audio.tags.add(APIC( |
|
|
encoding=3, |
|
|
mime='image/jpeg', |
|
|
type=3, |
|
|
desc='Cover', |
|
|
data=thumbnail_data |
|
|
)) |
|
|
|
|
|
audio.save() |
|
|
|
|
|
elif audio_format == "m4a": |
|
|
|
|
|
audio = MP4(file_path) |
|
|
|
|
|
|
|
|
audio['\xa9nam'] = [title] |
|
|
audio['\xa9ART'] = [artist] |
|
|
audio['\xa9alb'] = [album] |
|
|
|
|
|
|
|
|
if thumbnail_data: |
|
|
audio['covr'] = [thumbnail_data] |
|
|
|
|
|
audio.save() |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Failed to add metadata: {e}") |
|
|
|
|
|
|
|
|
@app.get("/download-audio") |
|
|
async def download_audio_endpoint( |
|
|
query: str = Query(..., description="URL or title of the video"), |
|
|
format: str = Query("mp3", description="Audio format (mp3 or m4a)") |
|
|
): |
|
|
try: |
|
|
|
|
|
if format not in ["mp3", "m4a"]: |
|
|
raise HTTPException(status_code=400, detail="Format must be either mp3 or m4a") |
|
|
|
|
|
|
|
|
parsed_url = urlparse(query) |
|
|
if parsed_url.scheme and parsed_url.netloc: |
|
|
video_url = query |
|
|
else: |
|
|
video_url = search_video_by_title(query) |
|
|
|
|
|
|
|
|
output_file, thumbnail_url = download_audio(video_url, format) |
|
|
|
|
|
|
|
|
file_url = f"/static/{output_file}" |
|
|
server_url = os.getenv("SERVER_URL", "https://akane710-y2mateee.hf.space") |
|
|
download_url = f"{server_url}{file_url}" |
|
|
|
|
|
return JSONResponse(content={ |
|
|
"download_url": download_url, |
|
|
"filename": output_file, |
|
|
"thumbnail": thumbnail_url |
|
|
}) |
|
|
|
|
|
except HTTPException as e: |
|
|
raise e |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
|
|
|
@app.get("/video-info") |
|
|
async def get_video_info(query: str = Query(..., description="URL or title of the video")): |
|
|
try: |
|
|
|
|
|
parsed_url = urlparse(query) |
|
|
if parsed_url.scheme and parsed_url.netloc: |
|
|
video_url = query |
|
|
else: |
|
|
video_url = search_video_by_title(query) |
|
|
|
|
|
video_data = extract_video_info(video_url) |
|
|
return JSONResponse(content=video_data) |
|
|
except HTTPException as e: |
|
|
raise e |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
|
|
|
@app.get("/download-video") |
|
|
async def download_video( |
|
|
query: str = Query(..., description="URL or title of the video"), |
|
|
resolution: str = Query("720p", description="Video resolution (720p or 1080p)") |
|
|
): |
|
|
try: |
|
|
|
|
|
if resolution not in ["720p", "1080p"]: |
|
|
raise HTTPException(status_code=400, detail="Resolution must be either 720p or 1080p") |
|
|
|
|
|
|
|
|
parsed_url = urlparse(query) |
|
|
if parsed_url.scheme and parsed_url.netloc: |
|
|
video_url = query |
|
|
else: |
|
|
video_url = search_video_by_title(query) |
|
|
|
|
|
|
|
|
output_file, thumbnail_url = download_and_merge_video(video_url, resolution) |
|
|
|
|
|
|
|
|
actual_resolution = resolution |
|
|
if "_best_" in output_file: |
|
|
actual_resolution = "best quality available" |
|
|
elif "_" in output_file: |
|
|
parts = output_file.split("_") |
|
|
for part in parts: |
|
|
if part.endswith("p") and part[:-1].isdigit(): |
|
|
actual_resolution = part |
|
|
break |
|
|
|
|
|
|
|
|
file_url = f"/static/{output_file}" |
|
|
server_url = os.getenv("SERVER_URL", "https://akane710-y2mateee.hf.space") |
|
|
download_url = f"{server_url}{file_url}" |
|
|
|
|
|
return JSONResponse(content={ |
|
|
"download_url": download_url, |
|
|
"filename": output_file, |
|
|
"thumbnail": thumbnail_url, |
|
|
"actual_resolution": actual_resolution |
|
|
}) |
|
|
|
|
|
except HTTPException as e: |
|
|
raise e |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
|
|
|
@app.get("/direct-download/{filename}") |
|
|
async def get_file(filename: str): |
|
|
file_path = os.path.join(STATIC_DIR, filename) |
|
|
if not os.path.exists(file_path): |
|
|
raise HTTPException(status_code=404, detail="File not found") |
|
|
|
|
|
return FileResponse( |
|
|
path=file_path, |
|
|
filename=filename, |
|
|
media_type="video/mp4" |
|
|
) |
|
|
|
|
|
|
|
|
class TranscriptionRequest(BaseModel): |
|
|
audio_url: str |
|
|
|
|
|
@app.get("/transcribe") |
|
|
async def transcribe_audio(audio_url: str = Query(..., description="Publicly accessible URL for audio file")): |
|
|
try: |
|
|
config = aai.TranscriptionConfig( |
|
|
speaker_labels=True, |
|
|
speech_model=aai.SpeechModel.nano, |
|
|
language_detection=True |
|
|
) |
|
|
transcript = transcriber.transcribe(audio_url, config) |
|
|
|
|
|
if transcript.status == aai.TranscriptStatus.error: |
|
|
raise HTTPException(status_code=500, detail=f"Transcription failed: {transcript.error}") |
|
|
|
|
|
return {"transcription": transcript.text} |
|
|
|
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
|
|
|
@app.get("/playlist-download-urls") |
|
|
async def get_playlist_video_urls(playlist_url: str = Query(..., description="URL of the YouTube playlist")): |
|
|
try: |
|
|
result = extract_playlist_video_urls(playlist_url) |
|
|
return JSONResponse(content=result) |
|
|
except HTTPException as e: |
|
|
raise e |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
async def process_and_upload_webp(url: str) -> str: |
|
|
try: |
|
|
|
|
|
parsed_url = urlparse(url) |
|
|
if not parsed_url.scheme or not parsed_url.netloc: |
|
|
raise ValueError("Invalid URL") |
|
|
|
|
|
|
|
|
response = requests.get(url) |
|
|
if response.status_code != 200: |
|
|
raise HTTPException(status_code=400, detail="Failed to download the file") |
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(suffix=".webp", delete=False) as webp_file: |
|
|
webp_file.write(response.content) |
|
|
webp_file_path = webp_file.name |
|
|
|
|
|
|
|
|
output_path = webp_file_path.replace(".webp", ".mp4") |
|
|
reader = imageio.get_reader(webp_file_path) |
|
|
writer = imageio.get_writer(output_path, fps=30) |
|
|
for frame in reader: |
|
|
writer.append_data(frame) |
|
|
writer.close() |
|
|
|
|
|
|
|
|
with open(output_path, 'rb') as mp4_file: |
|
|
upload_response = requests.post( |
|
|
'https://uguu.se/upload', |
|
|
files={"files[]": mp4_file} |
|
|
) |
|
|
if upload_response.status_code != 200: |
|
|
raise HTTPException(status_code=500, detail="Failed to upload the file") |
|
|
|
|
|
file_url = upload_response.json()["files"][0]["url"] |
|
|
|
|
|
|
|
|
os.remove(webp_file_path) |
|
|
os.remove(output_path) |
|
|
|
|
|
return file_url |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
|
|
|
@app.get("/convert") |
|
|
async def convert_webp_to_mp4(url: str): |
|
|
file_url = await process_and_upload_webp(url) |
|
|
return JSONResponse(content={"url": file_url}) |
|
|
|
|
|
|
|
|
def cli(): |
|
|
parser = argparse.ArgumentParser(description="Convert and upload WEBP to MP4.") |
|
|
parser.add_argument("--url", type=str, required=True, help="URL of the .webp file.") |
|
|
args = parser.parse_args() |
|
|
|
|
|
|
|
|
import asyncio |
|
|
result = asyncio.run(process_and_upload_webp(args.url)) |
|
|
print(f"Uploaded file link: {result}") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
uvicorn.run(app, host="0.0.0.0", port=8000) |