| import sys |
| import time |
| from fastapi import FastAPI, BackgroundTasks, Request, HTTPException, Security |
| from fastapi.security import APIKeyHeader |
| from fastapi.responses import FileResponse |
| from fastapi.concurrency import run_in_threadpool |
| import yt_dlp |
| import ffmpeg |
| import urllib.parse |
| import os |
| from datetime import datetime, timedelta |
| import schedule |
| import requests |
| import uvicorn |
| import subprocess |
| import json |
| from dotenv import load_dotenv |
| import mimetypes |
| import tempfile |
| from PIL import Image |
| from io import BytesIO |
| from pathlib import Path |
| from fastapi.staticfiles import StaticFiles |
| from collections import defaultdict |
| from starlette.responses import JSONResponse |
| import logging |
| import gc |
| from typing import Dict, Any |
| import re |
| import asyncio |
| import cloudscraper |
|
|
| tmp_dir = tempfile.gettempdir() |
| BASE_URL = "https://chrunos-zam.hf.space" |
|
|
|
|
| def env_to_cookies(env_content: str, output_file: str) -> None: |
| """Convert environment variable content back to cookie file""" |
| try: |
| |
| if '="' not in env_content: |
| raise ValueError("Invalid env content format") |
| |
| content = env_content.split('="', 1)[1].strip('"') |
| |
| |
| cookie_content = content.replace('\\n', '\n') |
| |
| |
| with open(output_file, 'w') as f: |
| f.write(cookie_content) |
| |
| except Exception as e: |
| raise ValueError(f"Error converting to cookie file: {str(e)}") |
|
|
| def save_to_env_file(env_content: str, env_file: str = '.env') -> None: |
| """Save environment variable content to .env file""" |
| try: |
| with open(env_file, 'w') as f: |
| f.write(env_content) |
| |
| except Exception as e: |
| raise ValueError(f"Error saving to env file: {str(e)}") |
|
|
| def env_to_cookies_from_env(output_file: str) -> None: |
| """Convert environment variable from .env file to cookie file""" |
| try: |
| load_dotenv() |
| env_content = os.getenv('FIREFOX_COOKIES') |
| |
| if not env_content: |
| raise ValueError("FIREFOX_COOKIES not found in .env file") |
| |
| env_to_cookies(f'FIREFOX_COOKIES="{env_content}"', output_file) |
| except Exception as e: |
| raise ValueError(f"Error converting to cookie file: {str(e)}") |
|
|
| def get_cookies(): |
| """Get cookies from environment variable""" |
| load_dotenv() |
| cookie_content = os.getenv('FIREFOX_COOKIES') |
| |
| if not cookie_content: |
| raise ValueError("FIREFOX_COOKIES environment variable not set") |
| return cookie_content |
|
|
| def create_temp_cookie_file(): |
| """Create temporary cookie file from environment variable""" |
| temp_cookie = tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix='.txt') |
| try: |
| cookie_content = get_cookies() |
| |
| cookie_content = cookie_content.replace('\\n', '\n') |
| temp_cookie.write() |
| temp_cookie.flush() |
| return Path(temp_cookie.name) |
| finally: |
| temp_cookie.close() |
|
|
| load_dotenv() |
| app = FastAPI() |
|
|
|
|
|
|
| @app.get('/') |
| def main(): |
| return "API Is Running. If you want to use this API, contact Cody from chrunos.com" |
|
|
| @app.get("/get_video_url") |
| async def get_video_url(youtube_url: str): |
| try: |
| cookiefile = "firefox-cookies.txt" |
| env_to_cookies_from_env("firefox-cookies.txt") |
| |
| ydl_opts["cookiefile"] = "firefox-cookies.txt" |
| |
| |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
| info = ydl.extract_info(youtube_url, download=False) |
| return info |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
|
| |
| global_download_dir = tempfile.mkdtemp() |
|
|
|
|
|
|
| class RateLimiter: |
| def __init__(self, max_requests: int, time_window: timedelta): |
| self.max_requests = max_requests |
| self.time_window = time_window |
| self.requests: Dict[str, list] = defaultdict(list) |
| |
| def _cleanup_old_requests(self, user_ip: str) -> None: |
| """Remove requests that are outside the time window.""" |
| current_time = time.time() |
| self.requests[user_ip] = [ |
| timestamp for timestamp in self.requests[user_ip] |
| if current_time - timestamp < self.time_window.total_seconds() |
| ] |
| |
| def is_rate_limited(self, user_ip: str) -> bool: |
| """Check if the user has exceeded their rate limit.""" |
| self._cleanup_old_requests(user_ip) |
| |
| |
| current_count = len(self.requests[user_ip]) |
| |
| |
| current_time = time.time() |
| self.requests[user_ip].append(current_time) |
| |
| |
| return (current_count + 1) > self.max_requests |
| |
| def get_current_count(self, user_ip: str) -> int: |
| """Get the current request count for an IP.""" |
| self._cleanup_old_requests(user_ip) |
| return len(self.requests[user_ip]) |
|
|
|
|
| |
| rate_limiter = RateLimiter( |
| max_requests=20, |
| time_window=timedelta(days=1) |
| ) |
|
|
| def get_user_ip(request: Request) -> str: |
| """Helper function to get user's IP address.""" |
| forwarded = request.headers.get("X-Forwarded-For") |
| if forwarded: |
| return forwarded.split(",")[0] |
| return request.client.host |
|
|
|
|
| class ApiRotator: |
| def __init__(self, apis): |
| self.apis = apis |
| self.last_successful_index = None |
|
|
| def get_prioritized_apis(self): |
| if self.last_successful_index is not None: |
| |
| rotated_apis = ( |
| [self.apis[self.last_successful_index]] + |
| self.apis[:self.last_successful_index] + |
| self.apis[self.last_successful_index+1:] |
| ) |
| return rotated_apis |
| return self.apis |
|
|
| def update_last_successful(self, index): |
| self.last_successful_index = index |
|
|
| |
| api_rotator = ApiRotator([ |
| "https://dwnld.nichind.dev", |
| "https://cobalt-api.kwiatekmiki.com", |
| "https://yt.edd1e.xyz/", |
| "https://cobalt-api.ayo.tf", |
| "https://cblt.fariz.dev" |
| ]) |
|
|
|
|
|
|
| async def get_track_download_url(video_url: str) -> str: |
| apis = api_rotator.get_prioritized_apis() |
| session = cloudscraper.create_scraper() |
| headers = { |
| "Accept": "application/json", |
| "Content-Type": "application/json", |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" |
| } |
|
|
| for i, api_url in enumerate(apis): |
| try: |
| logger.info(f"Attempting to get download URL from: {api_url}") |
| y_url = video_url |
| response = session.post( |
| api_url, |
| timeout=20, |
| json={"url": y_url, "videoQuality": "720", "filenameStyle": "pretty"}, |
| headers=headers |
| ) |
| logger.info(f"Response status: {response.status_code}") |
| logger.info(f"Response content: {response.content}") |
|
|
| if response.headers.get('content-type', '').startswith('application/json'): |
| json_response = response.json() |
| error_code = json_response.get("error", {}).get("code", "") |
| |
| if error_code == "error.api.content.video.unavailable": |
| logger.warning(f"Video unavailable error from {api_url}") |
| break |
| |
| if "url" in json_response: |
| api_rotator.update_last_successful(i) |
| return json_response["url"] |
| |
| except Exception as e: |
| logger.error(f"Failed with {api_url}: {str(e)}") |
| continue |
| |
| logger.error(f"No download URL found") |
| return {"error": "Download URL not found"} |
|
|
|
|
| async def extract_video_info(video_url: str) -> str: |
| api_url = f'https://yt-dl-web.vercel.app/api/info?url={video_url}' |
| session = cloudscraper.create_scraper() |
| response = session.get(api_url, timeout=20) |
| if "url" in response: |
| json_response = response.json() |
| return json_response |
| else: |
| return {"error": "Download URL not found"} |
|
|
|
|
| @app.post("/test") |
| async def test_download(request: Request): |
| data = await request.json() |
| video_url = data.get('url') |
| response = await extract_video_info(video_url) |
| return response |
|
|
| |
|
|
| @app.post("/maxs") |
| async def download_high_quality_video(request: Request): |
| user_ip = get_user_ip(request) |
| |
| if rate_limiter.is_rate_limited(user_ip): |
| current_count = rate_limiter.get_current_count(user_ip) |
| raise HTTPException( |
| status_code=429, |
| detail={ |
| "error": "You have exceeded the maximum number of requests per day. Please try again tomorrow.", |
| "url": "https://t.me/chrunoss" |
| } |
| ) |
|
|
| |
| data = await request.json() |
| restricted_domain = "chrunos.com" |
| video_url = data.get('url') |
| is_youtube_url = re.search(r'(youtube\.com|youtu\.be)', video_url) is not None |
| if video_url and restricted_domain in video_url: |
| return {"error": "What is wrong with you?", "url": "https://t.me/chrunoss"} |
| |
| quality = data.get('quality', '720') |
|
|
| |
| if int(quality) > 1080: |
| error_message = "Quality above 1080p is for Premium Members Only. Please check the URL for more information." |
| help_url = "https://chrunos.com/premium-shortcuts/" |
| return {"error": error_message, "url": help_url} |
|
|
| cookiefile = "firefox-cookies.txt" |
| env_to_cookies_from_env("firefox-cookies.txt") |
|
|
| timestamp = datetime.now().strftime('%Y%m%d%H%M%S') |
| output_template = str(Path(global_download_dir) / f'%(title).70s_{timestamp}.%(ext)s') |
|
|
| |
| height_map = { |
| '480': 480, |
| '720': 720, |
| '1080': 1080 |
| } |
| max_height = height_map.get(quality, 1080) |
|
|
| |
| format_str = f'bestvideo[height<={max_height}][vcodec^=avc]+bestaudio/best' |
|
|
| ydl_opts = { |
| 'format': format_str, |
| 'outtmpl': output_template, |
| 'quiet': True, |
| 'no_warnings': True, |
| 'noprogress': True, |
| 'merge_output_format': 'mp4' |
| } |
|
|
| if is_youtube_url: |
| dl_url = await get_track_download_url(video_url) |
| if dl_url and "http" in dl_url: |
| return {"url": dl_url, "requests_remaining": rate_limiter.max_requests - rate_limiter.get_current_count(user_ip)} |
| else: |
| return { |
| "error": "Failed to Fetch the video." |
| } |
| |
| else: |
| await run_in_threadpool(lambda: yt_dlp.YoutubeDL(ydl_opts).download([video_url])) |
| |
| downloaded_files = list(Path(global_download_dir).glob(f"*_{timestamp}.mp4")) |
| if not downloaded_files: |
| return {"error": "Download failed"} |
| |
| downloaded_file = downloaded_files[0] |
| encoded_filename = urllib.parse.quote(downloaded_file.name) |
| download_url = f"{BASE_URL}/file/{encoded_filename}" |
| |
| |
| gc.collect() |
| |
| return {"url": download_url, "requests_remaining": rate_limiter.max_requests - rate_limiter.get_current_count(user_ip)} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| api_key_header = APIKeyHeader(name="X-API-Key") |
|
|
| |
| API_KEY = os.getenv("API_KEY") |
|
|
| async def verify_api_key(api_key: str = Security(api_key_header)): |
| if api_key != API_KEY: |
| raise HTTPException( |
| status_code=403, |
| detail="Invalid API key" |
| ) |
| return api_key |
|
|
| @app.post("/audio") |
| async def download_audio( |
| request: Request, |
| api_key: str = Security(verify_api_key) |
| ): |
| data = await request.json() |
| video_url = data.get('url') |
| cookiefile = "firefox-cookies.txt" |
| env_to_cookies_from_env("firefox-cookies.txt") |
|
|
| timestamp = datetime.now().strftime('%Y%m%d%H%M%S') |
| output_template = str(Path(global_download_dir) / f'%(title).70s_{timestamp}.%(ext)s') |
|
|
| ydl_opts = { |
| 'format': 'bestaudio/best', |
| 'outtmpl': output_template, |
| 'quiet': True, |
| 'no_warnings': True, |
| 'noprogress': True, |
| 'cookiefile': cookiefile, |
| 'postprocessors': [{ |
| 'key': 'FFmpegExtractAudio', |
| 'preferredcodec': 'mp3', |
| 'preferredquality': '192' |
| }] |
| |
| } |
|
|
| await run_in_threadpool(lambda: yt_dlp.YoutubeDL(ydl_opts).download([video_url])) |
| |
| downloaded_files = list(Path(global_download_dir).glob(f"*_{timestamp}.*")) |
| if not downloaded_files: |
| return {"error": "Download failed"} |
|
|
| downloaded_file = downloaded_files[0] |
| encoded_filename = urllib.parse.quote(downloaded_file.name) |
| download_url = f"{BASE_URL}/file/{encoded_filename}" |
| gc.collect() |
| return {"url": download_url} |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| @app.post("/search") |
| async def search_and_download_song(request: Request, |
| api_key: str = Security(verify_api_key) |
| ): |
| data = await request.json() |
| song_name = data.get('songname') |
| artist_name = data.get('artist') |
| if artist_name: |
| search_query = f"ytsearch:{song_name} {artist_name}" |
| else: |
| search_query = f"ytsearch:{song_name}" |
|
|
| logging.info(f"Search query: {search_query}") |
| cookiefile = "firefox-cookies.txt" |
| env_to_cookies_from_env("firefox-cookies.txt") |
| |
| timestamp = datetime.now().strftime('%Y%m%d%H%M%S') |
| output_template = str(Path(global_download_dir) / f'%(title).70s_{timestamp}.%(ext)s') |
|
|
| ydl_opts = { |
| 'format': 'bestaudio/best', |
| 'outtmpl': output_template, |
| 'quiet': True, |
| 'no_warnings': True, |
| 'noprogress': True, |
| 'postprocessors': [{ |
| 'key': 'FFmpegExtractAudio', |
| 'preferredcodec': 'mp3', |
| 'preferredquality': '192' |
| }], |
| 'cookiefile': cookiefile |
| } |
|
|
| try: |
| logging.info("Starting yt-dlp search and download...") |
| await run_in_threadpool(lambda: yt_dlp.YoutubeDL(ydl_opts).download([search_query])) |
| logging.info("yt-dlp search and download completed") |
| except yt_dlp.utils.DownloadError as e: |
| error_message = str(e) |
| logging.error(f"yt-dlp error: {error_message}") |
| return JSONResponse(content={"error": error_message}, status_code=500) |
| except Exception as e: |
| error_message = str(e) |
| logging.error(f"General error: {error_message}") |
| return JSONResponse(content={"error": error_message}, status_code=500) |
| |
| downloaded_files = list(Path(global_download_dir).glob(f"*_{timestamp}.mp3")) |
| if not downloaded_files: |
| logging.error("Download failed: No MP3 files found") |
| return JSONResponse(content={"error": "Download failed"}, status_code=500) |
|
|
| downloaded_file = downloaded_files[0] |
| encoded_filename = urllib.parse.quote(downloaded_file.name) |
| download_url = f"{BASE_URL}/file/{encoded_filename}" |
| logging.info(f"Download URL: {download_url}") |
|
|
| |
| logging.info("Preparing to send response back to the client") |
| gc.collect() |
| return JSONResponse(content={"url": download_url}, status_code=200) |
|
|
| |
| app.mount("/file", StaticFiles(directory=global_download_dir), name="downloads") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| @app.middleware("http") |
| async def set_mime_type_middleware(request: Request, call_next): |
| response = await call_next(request) |
| if request.url.path.endswith(".mp4"): |
| response.headers["Content-Type"] = "video/mp4" |
| return response |
|
|
|
|