import os import json import time import urllib.parse from datetime import datetime, timezone from starlette.responses import JSONResponse from fastapi import FastAPI, HTTPException, status, Request from yt_dlp import YoutubeDL from yt_dlp.version import __version__ as yt_dlp_version from typing import Union, Dict import uvicorn app = FastAPI(docs_url=None, redoc_url=None) # Set cache directory to /tmp which is writable in Hugging Face containers os.environ["XDG_CACHE_HOME"] = "/tmp" # Rate limiting configuration DAILY_LIMIT = 100 # Maximum requests per IP per day RATE_LIMIT_FILE = "/tmp/rate_limits.json" def load_rate_limits() -> Dict[str, Dict]: """Load rate limit data from file""" try: if os.path.exists(RATE_LIMIT_FILE): with open(RATE_LIMIT_FILE, 'r') as f: return json.load(f) except Exception: pass return {} def save_rate_limits(rate_limits: Dict[str, Dict]): """Save rate limit data to file""" try: with open(RATE_LIMIT_FILE, 'w') as f: json.dump(rate_limits, f) except Exception: pass def get_current_date() -> str: """Get current date as string in YYYY-MM-DD format""" return datetime.now(timezone.utc).strftime('%Y-%m-%d') def cleanup_old_entries(rate_limits: Dict[str, Dict]) -> Dict[str, Dict]: """Remove entries older than today""" current_date = get_current_date() cleaned = {} for ip, data in rate_limits.items(): if data.get('date') == current_date: cleaned[ip] = data return cleaned def check_rate_limit(ip: str) -> tuple[bool, int]: """ Check if IP has exceeded daily limit Returns: (is_allowed, remaining_requests) """ rate_limits = load_rate_limits() rate_limits = cleanup_old_entries(rate_limits) current_date = get_current_date() if ip not in rate_limits: rate_limits[ip] = { 'date': current_date, 'count': 0 } ip_data = rate_limits[ip] # Reset count if it's a new day if ip_data.get('date') != current_date: ip_data['date'] = current_date ip_data['count'] = 0 current_count = ip_data['count'] if current_count >= DAILY_LIMIT: return False, 0 # Increment count ip_data['count'] = current_count + 1 rate_limits[ip] = ip_data # Save updated limits save_rate_limits(rate_limits) remaining = DAILY_LIMIT - ip_data['count'] return True, remaining def get_client_ip(request: Request) -> str: """Extract client IP from request, handling proxies""" # Check for common proxy headers forwarded_for = request.headers.get("x-forwarded-for") if forwarded_for: # Take the first IP in the chain return forwarded_for.split(",")[0].strip() real_ip = request.headers.get("x-real-ip") if real_ip: return real_ip.strip() # Fallback to direct client IP return request.client.host if request.client else "unknown" @app.get("/api/version") async def version_info(): return JSONResponse({"yt_dlp": yt_dlp_version}) @app.get('/') def main(): return "Chrunos Downloader API Is Running well on Hugging Face." @app.get("/api/info") async def get_info( request: Request, url: str, quality: str = "1080", audio_only: bool = False # <-- Added audio toggle ): """ Resolves a video or audio URL and returns a simplified JSON payload. Auto-detects SoundCloud to prevent video-filter errors. """ client_ip = get_client_ip(request) is_allowed, remaining = check_rate_limit(client_ip) if not is_allowed: raise HTTPException( status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail=f"Daily limit of {DAILY_LIMIT} requests exceeded. Try again tomorrow.", headers={"X-RateLimit-Reset": str(int(time.time()) + 86400)} ) # --- SMART FORMAT SELECTION --- # Auto-detect SoundCloud, or check if the client explicitly wants audio if "soundcloud.com" in url or audio_only: format_selector = "bestaudio/best" else: # Your custom H.264 first, AV1 second video logic format_selector = f"best[height<={quality}][vcodec^=avc][ext=mp4]/best[height<={quality}][vcodec^=av01][ext=mp4]/best[height<={quality}][ext=mp4]/bestvideo[height<={quality}]+bestaudio/best" ydl_options = { "format": format_selector, "quiet": True, "no_warnings": True, "skip_download": True, "noplaylist": True, "cachedir": "/tmp/yt-dlp-cache", "js-runtimes": "node" } with YoutubeDL(ydl_options) as ydl: try: info = ydl.extract_info(url, download=False) download_url = info.get("url") http_headers = info.get("http_headers", {}) # Fallback for split formats (bestvideo+bestaudio) if not download_url and info.get("requested_formats"): # If audio_only is True, requested_formats[0] might be audio. # If video, requested_formats[0] is video, [1] is audio. # We grab the first one to ensure we get a valid URL. fmt = info["requested_formats"][0] download_url = fmt.get("url") http_headers = fmt.get("http_headers", http_headers) if not download_url: raise HTTPException( status_code=400, detail="ダウンロードURLを取得できませんでした", headers={"Cache-Control": "no-store, max-age=0"} ) title = info.get("title", "audio" if audio_only else "video") ext = info.get("ext", "mp3" if audio_only else "mp4") filename = f"{title}.{ext}" filesize = info.get("filesize") or info.get("filesize_approx") response_data = { "status": "ok", "url": download_url, "title": title, "filename": filename, "ext": ext, "filesize": filesize, "headers": http_headers, } return JSONResponse( response_data, headers={ "Cache-Control": "s-maxage=2592000, stale-while-revalidate", "X-RateLimit-Limit": str(DAILY_LIMIT), "X-RateLimit-Remaining": str(remaining) } ) except Exception as e: error_msg = str(e) if "DownloadError" in str(type(e)): error_msg = f"メディアの取得に失敗: {error_msg}" # Changed 'video' to 'media' in JP raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=error_msg, headers={"Cache-Control": "no-store, max-age=0"} ) import urllib.parse @app.get("/api/playlist") async def get_playlist_info( request: Request, url: str, start: int = 1, end: int = 50 ): """ Fetches paginated items from a playlist or user profile. Strictly enforces a maximum of 50 items per request and provides a next_page URL. """ if start < 1: raise HTTPException(status_code=400, detail="'start' must be 1 or greater.") if end < start: raise HTTPException(status_code=400, detail="'end' must be greater than or equal to 'start'.") requested_count = end - start + 1 if requested_count > 50: end = start + 49 requested_count = 50 client_ip = get_client_ip(request) is_allowed, remaining = check_rate_limit(client_ip) if not is_allowed: raise HTTPException( status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail=f"Daily limit of {DAILY_LIMIT} requests exceeded. Try again tomorrow.", headers={ "X-RateLimit-Limit": str(DAILY_LIMIT), "X-RateLimit-Remaining": "0", "X-RateLimit-Reset": str(int(time.time()) + 86400), "Cache-Control": "no-store, max-age=0" } ) ydl_options = { "retries": 3, "encoding": "utf8", "extract_flat": "in_playlist", "dump_single_json": True, "ignoreerrors": True, "cachedir": "/tmp/yt-dlp-cache", "js-runtimes": "node", "playliststart": start, "playlistend": end } with YoutubeDL(ydl_options) as ytdl: try: response = ytdl.extract_info(url, download=False) if not response: raise HTTPException(status_code=404, detail="Playlist or profile not found.") raw_entries = response.get("entries") or [] valid_entries = [e for e in raw_entries if e is not None] next_page_url = None if len(raw_entries) >= requested_count: next_start = end + 1 next_end = next_start + 49 encoded_url = urllib.parse.quote(url) base_url = str(request.base_url).rstrip('/') next_page_url = f"{base_url}/api/playlist?url={encoded_url}&start={next_start}&end={next_end}" clean_response = { "id": response.get("id"), "title": response.get("title", "Unknown Playlist"), "uploader": response.get("uploader"), "items_returned": len(valid_entries), "next_page": next_page_url, "entries": valid_entries } return JSONResponse( clean_response, headers={ "Cache-Control": "s-maxage=2592000, stale-while-revalidate", "X-RateLimit-Limit": str(DAILY_LIMIT), "X-RateLimit-Remaining": str(remaining), "X-RateLimit-Reset": str(int(time.time()) + 86400) } ) except Exception as e: print(f"Error extracting playlist: {e}") raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=repr(e), headers={"Cache-Control": "no-store, max-age=0"}, ) import httpx SOUNDCLOUD_CLIENT_ID = "khI8ciOiYPX6UVGInQY5zA0zvTkfzuuC" # sniff from browser network tab on soundcloud.com @app.get("/api/list") async def get_sound_playlist_info(request: Request, url: str, start: int = 1, end: int = 50): limit = min(end - start + 1, 50) offset = start - 1 async with httpx.AsyncClient(timeout=15) as client: # 1. Resolve URL resolve_res = await client.get( "https://api-v2.soundcloud.com/resolve", params={"url": url, "client_id": SOUNDCLOUD_CLIENT_ID} ) if resolve_res.status_code != 200: raise HTTPException(status_code=404, detail=f"Could not resolve URL: {resolve_res.text}") data = resolve_res.json() # 2. Handle both playlists and user profiles if data.get("kind") == "playlist": playlist_id = data["id"] # Playlists return a `tracks` array but lazy tracks only have `id` # Collect IDs from the paginated slice first all_track_ids = [t["id"] for t in data.get("tracks", [])] paginated_ids = all_track_ids[offset: offset + limit] if not paginated_ids: return JSONResponse({"entries": [], "items_returned": 0}) # 3. Fetch full track details in one batch request tracks_res = await client.get( "https://api-v2.soundcloud.com/tracks", params={ "ids": ",".join(str(i) for i in paginated_ids), "client_id": SOUNDCLOUD_CLIENT_ID } ) if tracks_res.status_code != 200: raise HTTPException(status_code=502, detail=f"Track fetch failed: {tracks_res.text}") tracks = tracks_res.json() elif data.get("kind") == "user": # For user profiles, fetch their tracks directly tracks_res = await client.get( f"https://api-v2.soundcloud.com/users/{data['id']}/tracks", params={ "client_id": SOUNDCLOUD_CLIENT_ID, "limit": limit, "offset": offset } ) if tracks_res.status_code != 200: raise HTTPException(status_code=502, detail=f"Track fetch failed: {tracks_res.text}") tracks = tracks_res.json().get("collection", []) else: raise HTTPException(status_code=400, detail=f"Unsupported kind: {data.get('kind')}") entries = [ { "id": t.get("id"), "title": t.get("title"), "url": t.get("permalink_url"), "duration": t.get("duration"), "uploader": t.get("user", {}).get("username"), } for t in tracks ] # Next page next_page_url = None total = data.get("track_count") or data.get("likes_count") if total and (offset + limit) < total: next_start = end + 1 next_end = next_start + 49 encoded_url = urllib.parse.quote(url) base_url = str(request.base_url).rstrip('/') next_page_url = f"{base_url}/api/list?url={encoded_url}&start={next_start}&end={next_end}" return JSONResponse({ "id": data.get("id"), "title": data.get("title") or data.get("username"), "uploader": data.get("username") or data.get("uploader"), "items_returned": len(entries), "next_page": next_page_url, "entries": entries }) @app.get("/api/rate-limit-status") async def get_rate_limit_status(request: Request): client_ip = get_client_ip(request) rate_limits = load_rate_limits() rate_limits = cleanup_old_entries(rate_limits) current_date = get_current_date() if client_ip in rate_limits and rate_limits[client_ip].get('date') == current_date: used = rate_limits[client_ip]['count'] remaining = DAILY_LIMIT - used else: used = 0 remaining = DAILY_LIMIT return JSONResponse({ "daily_limit": DAILY_LIMIT, "used": used, "remaining": remaining, "reset_time": f"{current_date}T00:00:00Z" }) # Add the Hugging Face compatible execution block if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)