| | import os |
| | import json |
| | import time |
| | import urllib.parse |
| | from datetime import datetime, timezone |
| | from starlette.responses import JSONResponse |
| | from fastapi import FastAPI, HTTPException, status, Request |
| | from yt_dlp import YoutubeDL |
| | from yt_dlp.version import __version__ as yt_dlp_version |
| | from typing import Union, Dict |
| | import uvicorn |
| |
|
| | app = FastAPI(docs_url=None, redoc_url=None) |
| |
|
| | |
| | os.environ["XDG_CACHE_HOME"] = "/tmp" |
| |
|
| | |
| | DAILY_LIMIT = 100 |
| | RATE_LIMIT_FILE = "/tmp/rate_limits.json" |
| |
|
| | def load_rate_limits() -> Dict[str, Dict]: |
| | """Load rate limit data from file""" |
| | try: |
| | if os.path.exists(RATE_LIMIT_FILE): |
| | with open(RATE_LIMIT_FILE, 'r') as f: |
| | return json.load(f) |
| | except Exception: |
| | pass |
| | return {} |
| |
|
| | def save_rate_limits(rate_limits: Dict[str, Dict]): |
| | """Save rate limit data to file""" |
| | try: |
| | with open(RATE_LIMIT_FILE, 'w') as f: |
| | json.dump(rate_limits, f) |
| | except Exception: |
| | pass |
| |
|
| | def get_current_date() -> str: |
| | """Get current date as string in YYYY-MM-DD format""" |
| | return datetime.now(timezone.utc).strftime('%Y-%m-%d') |
| |
|
| | def cleanup_old_entries(rate_limits: Dict[str, Dict]) -> Dict[str, Dict]: |
| | """Remove entries older than today""" |
| | current_date = get_current_date() |
| | cleaned = {} |
| | |
| | for ip, data in rate_limits.items(): |
| | if data.get('date') == current_date: |
| | cleaned[ip] = data |
| | |
| | return cleaned |
| |
|
| | def check_rate_limit(ip: str) -> tuple[bool, int]: |
| | """ |
| | Check if IP has exceeded daily limit |
| | Returns: (is_allowed, remaining_requests) |
| | """ |
| | rate_limits = load_rate_limits() |
| | rate_limits = cleanup_old_entries(rate_limits) |
| | |
| | current_date = get_current_date() |
| | |
| | if ip not in rate_limits: |
| | rate_limits[ip] = { |
| | 'date': current_date, |
| | 'count': 0 |
| | } |
| | |
| | ip_data = rate_limits[ip] |
| | |
| | |
| | if ip_data.get('date') != current_date: |
| | ip_data['date'] = current_date |
| | ip_data['count'] = 0 |
| | |
| | current_count = ip_data['count'] |
| | |
| | if current_count >= DAILY_LIMIT: |
| | return False, 0 |
| | |
| | |
| | ip_data['count'] = current_count + 1 |
| | rate_limits[ip] = ip_data |
| | |
| | |
| | save_rate_limits(rate_limits) |
| | |
| | remaining = DAILY_LIMIT - ip_data['count'] |
| | return True, remaining |
| |
|
| | def get_client_ip(request: Request) -> str: |
| | """Extract client IP from request, handling proxies""" |
| | |
| | forwarded_for = request.headers.get("x-forwarded-for") |
| | if forwarded_for: |
| | |
| | return forwarded_for.split(",")[0].strip() |
| | |
| | real_ip = request.headers.get("x-real-ip") |
| | if real_ip: |
| | return real_ip.strip() |
| | |
| | |
| | return request.client.host if request.client else "unknown" |
| |
|
| | @app.get("/api/version") |
| | async def version_info(): |
| | return JSONResponse({"yt_dlp": yt_dlp_version}) |
| |
|
| | @app.get('/') |
| | def main(): |
| | return "Chrunos Downloader API Is Running well on Hugging Face." |
| |
|
| | @app.get("/api/info") |
| | async def get_info( |
| | request: Request, |
| | url: str, |
| | quality: str = "1080", |
| | audio_only: bool = False |
| | ): |
| | """ |
| | Resolves a video or audio URL and returns a simplified JSON payload. |
| | Auto-detects SoundCloud to prevent video-filter errors. |
| | """ |
| | client_ip = get_client_ip(request) |
| | is_allowed, remaining = check_rate_limit(client_ip) |
| | |
| | if not is_allowed: |
| | raise HTTPException( |
| | status_code=status.HTTP_429_TOO_MANY_REQUESTS, |
| | detail=f"Daily limit of {DAILY_LIMIT} requests exceeded. Try again tomorrow.", |
| | headers={"X-RateLimit-Reset": str(int(time.time()) + 86400)} |
| | ) |
| | |
| | |
| | |
| | if "soundcloud.com" in url or audio_only: |
| | format_selector = "bestaudio/best" |
| | else: |
| | |
| | format_selector = f"best[height<={quality}][vcodec^=avc][ext=mp4]/best[height<={quality}][vcodec^=av01][ext=mp4]/best[height<={quality}][ext=mp4]/bestvideo[height<={quality}]+bestaudio/best" |
| |
|
| | ydl_options = { |
| | "format": format_selector, |
| | "quiet": True, |
| | "no_warnings": True, |
| | "skip_download": True, |
| | "noplaylist": True, |
| | "cachedir": "/tmp/yt-dlp-cache", |
| | "js-runtimes": "node" |
| | } |
| | |
| | with YoutubeDL(ydl_options) as ydl: |
| | try: |
| | info = ydl.extract_info(url, download=False) |
| | |
| | download_url = info.get("url") |
| | http_headers = info.get("http_headers", {}) |
| | |
| | |
| | if not download_url and info.get("requested_formats"): |
| | |
| | |
| | |
| | fmt = info["requested_formats"][0] |
| | download_url = fmt.get("url") |
| | http_headers = fmt.get("http_headers", http_headers) |
| | |
| | if not download_url: |
| | raise HTTPException( |
| | status_code=400, |
| | detail="ダウンロードURLを取得できませんでした", |
| | headers={"Cache-Control": "no-store, max-age=0"} |
| | ) |
| | |
| | title = info.get("title", "audio" if audio_only else "video") |
| | ext = info.get("ext", "mp3" if audio_only else "mp4") |
| | filename = f"{title}.{ext}" |
| | filesize = info.get("filesize") or info.get("filesize_approx") |
| | |
| | response_data = { |
| | "status": "ok", |
| | "url": download_url, |
| | "title": title, |
| | "filename": filename, |
| | "ext": ext, |
| | "filesize": filesize, |
| | "headers": http_headers, |
| | } |
| | |
| | return JSONResponse( |
| | response_data, |
| | headers={ |
| | "Cache-Control": "s-maxage=2592000, stale-while-revalidate", |
| | "X-RateLimit-Limit": str(DAILY_LIMIT), |
| | "X-RateLimit-Remaining": str(remaining) |
| | } |
| | ) |
| | |
| | except Exception as e: |
| | error_msg = str(e) |
| | if "DownloadError" in str(type(e)): |
| | error_msg = f"メディアの取得に失敗: {error_msg}" |
| | |
| | raise HTTPException( |
| | status_code=status.HTTP_400_BAD_REQUEST, |
| | detail=error_msg, |
| | headers={"Cache-Control": "no-store, max-age=0"} |
| | ) |
| |
|
| | import urllib.parse |
| |
|
| | @app.get("/api/playlist") |
| | async def get_playlist_info( |
| | request: Request, |
| | url: str, |
| | start: int = 1, |
| | end: int = 50 |
| | ): |
| | """ |
| | Fetches paginated items from a playlist or user profile. |
| | Strictly enforces a maximum of 50 items per request and provides a next_page URL. |
| | """ |
| | if start < 1: |
| | raise HTTPException(status_code=400, detail="'start' must be 1 or greater.") |
| | if end < start: |
| | raise HTTPException(status_code=400, detail="'end' must be greater than or equal to 'start'.") |
| |
|
| | requested_count = end - start + 1 |
| | if requested_count > 50: |
| | end = start + 49 |
| | requested_count = 50 |
| |
|
| | client_ip = get_client_ip(request) |
| | is_allowed, remaining = check_rate_limit(client_ip) |
| | |
| | if not is_allowed: |
| | raise HTTPException( |
| | status_code=status.HTTP_429_TOO_MANY_REQUESTS, |
| | detail=f"Daily limit of {DAILY_LIMIT} requests exceeded. Try again tomorrow.", |
| | headers={ |
| | "X-RateLimit-Limit": str(DAILY_LIMIT), |
| | "X-RateLimit-Remaining": "0", |
| | "X-RateLimit-Reset": str(int(time.time()) + 86400), |
| | "Cache-Control": "no-store, max-age=0" |
| | } |
| | ) |
| | |
| | ydl_options = { |
| | "retries": 3, |
| | "encoding": "utf8", |
| | "extract_flat": "in_playlist", |
| | "dump_single_json": True, |
| | "ignoreerrors": True, |
| | "cachedir": "/tmp/yt-dlp-cache", |
| | "js-runtimes": "node", |
| | "playliststart": start, |
| | "playlistend": end |
| | } |
| | |
| | with YoutubeDL(ydl_options) as ytdl: |
| | try: |
| | response = ytdl.extract_info(url, download=False) |
| | if not response: |
| | raise HTTPException(status_code=404, detail="Playlist or profile not found.") |
| | |
| | raw_entries = response.get("entries") or [] |
| | valid_entries = [e for e in raw_entries if e is not None] |
| |
|
| | next_page_url = None |
| | if len(raw_entries) >= requested_count: |
| | next_start = end + 1 |
| | next_end = next_start + 49 |
| | encoded_url = urllib.parse.quote(url) |
| | base_url = str(request.base_url).rstrip('/') |
| | next_page_url = f"{base_url}/api/playlist?url={encoded_url}&start={next_start}&end={next_end}" |
| |
|
| | clean_response = { |
| | "id": response.get("id"), |
| | "title": response.get("title", "Unknown Playlist"), |
| | "uploader": response.get("uploader"), |
| | "items_returned": len(valid_entries), |
| | "next_page": next_page_url, |
| | "entries": valid_entries |
| | } |
| | |
| | return JSONResponse( |
| | clean_response, |
| | headers={ |
| | "Cache-Control": "s-maxage=2592000, stale-while-revalidate", |
| | "X-RateLimit-Limit": str(DAILY_LIMIT), |
| | "X-RateLimit-Remaining": str(remaining), |
| | "X-RateLimit-Reset": str(int(time.time()) + 86400) |
| | } |
| | ) |
| | except Exception as e: |
| | print(f"Error extracting playlist: {e}") |
| | raise HTTPException( |
| | status_code=status.HTTP_400_BAD_REQUEST, |
| | detail=repr(e), |
| | headers={"Cache-Control": "no-store, max-age=0"}, |
| | ) |
| | |
| |
|
| | import httpx |
| |
|
| | SOUNDCLOUD_CLIENT_ID = "khI8ciOiYPX6UVGInQY5zA0zvTkfzuuC" |
| |
|
| | @app.get("/api/list") |
| | async def get_sound_playlist_info(request: Request, url: str, start: int = 1, end: int = 50): |
| | |
| | limit = min(end - start + 1, 50) |
| | offset = start - 1 |
| |
|
| | async with httpx.AsyncClient(timeout=15) as client: |
| | |
| | |
| | resolve_res = await client.get( |
| | "https://api-v2.soundcloud.com/resolve", |
| | params={"url": url, "client_id": SOUNDCLOUD_CLIENT_ID} |
| | ) |
| | if resolve_res.status_code != 200: |
| | raise HTTPException(status_code=404, detail=f"Could not resolve URL: {resolve_res.text}") |
| | |
| | data = resolve_res.json() |
| |
|
| | |
| | if data.get("kind") == "playlist": |
| | playlist_id = data["id"] |
| | |
| | |
| | |
| | all_track_ids = [t["id"] for t in data.get("tracks", [])] |
| | paginated_ids = all_track_ids[offset: offset + limit] |
| |
|
| | if not paginated_ids: |
| | return JSONResponse({"entries": [], "items_returned": 0}) |
| |
|
| | |
| | tracks_res = await client.get( |
| | "https://api-v2.soundcloud.com/tracks", |
| | params={ |
| | "ids": ",".join(str(i) for i in paginated_ids), |
| | "client_id": SOUNDCLOUD_CLIENT_ID |
| | } |
| | ) |
| | if tracks_res.status_code != 200: |
| | raise HTTPException(status_code=502, detail=f"Track fetch failed: {tracks_res.text}") |
| | |
| | tracks = tracks_res.json() |
| |
|
| | elif data.get("kind") == "user": |
| | |
| | tracks_res = await client.get( |
| | f"https://api-v2.soundcloud.com/users/{data['id']}/tracks", |
| | params={ |
| | "client_id": SOUNDCLOUD_CLIENT_ID, |
| | "limit": limit, |
| | "offset": offset |
| | } |
| | ) |
| | if tracks_res.status_code != 200: |
| | raise HTTPException(status_code=502, detail=f"Track fetch failed: {tracks_res.text}") |
| | |
| | tracks = tracks_res.json().get("collection", []) |
| |
|
| | else: |
| | raise HTTPException(status_code=400, detail=f"Unsupported kind: {data.get('kind')}") |
| |
|
| | entries = [ |
| | { |
| | "id": t.get("id"), |
| | "title": t.get("title"), |
| | "url": t.get("permalink_url"), |
| | "duration": t.get("duration"), |
| | "uploader": t.get("user", {}).get("username"), |
| | } |
| | for t in tracks |
| | ] |
| |
|
| | |
| | next_page_url = None |
| | total = data.get("track_count") or data.get("likes_count") |
| | if total and (offset + limit) < total: |
| | next_start = end + 1 |
| | next_end = next_start + 49 |
| | encoded_url = urllib.parse.quote(url) |
| | base_url = str(request.base_url).rstrip('/') |
| | next_page_url = f"{base_url}/api/list?url={encoded_url}&start={next_start}&end={next_end}" |
| |
|
| | return JSONResponse({ |
| | "id": data.get("id"), |
| | "title": data.get("title") or data.get("username"), |
| | "uploader": data.get("username") or data.get("uploader"), |
| | "items_returned": len(entries), |
| | "next_page": next_page_url, |
| | "entries": entries |
| | }) |
| |
|
| |
|
| | @app.get("/api/rate-limit-status") |
| | async def get_rate_limit_status(request: Request): |
| | client_ip = get_client_ip(request) |
| | rate_limits = load_rate_limits() |
| | rate_limits = cleanup_old_entries(rate_limits) |
| | |
| | current_date = get_current_date() |
| | |
| | if client_ip in rate_limits and rate_limits[client_ip].get('date') == current_date: |
| | used = rate_limits[client_ip]['count'] |
| | remaining = DAILY_LIMIT - used |
| | else: |
| | used = 0 |
| | remaining = DAILY_LIMIT |
| | |
| | return JSONResponse({ |
| | "daily_limit": DAILY_LIMIT, |
| | "used": used, |
| | "remaining": remaining, |
| | "reset_time": f"{current_date}T00:00:00Z" |
| | }) |
| |
|
| | |
| | if __name__ == "__main__": |
| | uvicorn.run(app, host="0.0.0.0", port=7860) |