| from pytubefix import YouTube |
| from pytubefix.cli import on_progress |
| from fastapi import FastAPI, HTTPException |
| from fastapi.middleware.cors import CORSMiddleware |
| import logging as log |
| import subprocess |
| import json |
| from typing import Tuple |
| import random |
|
|
| |
| log.basicConfig( |
| level=log.INFO, |
| format="%(asctime)s [%(levelname)s] %(message)s", |
| ) |
|
|
| |
| app = FastAPI() |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| @app.get("/") |
| def index(): |
| return {"message": "Hello, World!"} |
|
|
|
|
| |
| def cmd(command, check=True, shell=True, capture_output=True, text=True): |
| log.info(f"Executing command: {command}") |
| try: |
| result = subprocess.run( |
| command, |
| check=check, |
| shell=shell, |
| capture_output=capture_output, |
| text=text |
| ) |
| log.info("Command executed successfully.") |
| return result |
| except subprocess.CalledProcessError as error: |
| log.error(f"Command failed: {error}") |
| raise HTTPException(status_code=500, detail="Token generation failed.") |
|
|
|
|
| |
| def generate_youtube_token() -> dict: |
| result = cmd("node test.js") |
| if not result or not result.stdout: |
| log.error("No output received from test.js") |
| raise HTTPException(status_code=500, detail="Invalid token response") |
| |
| try: |
| data = json.loads(result.stdout) |
| log.info(f"Token data received: {data}") |
| return data |
| except json.JSONDecodeError as e: |
| log.error(f"Failed to decode token JSON: {e}") |
| raise HTTPException(status_code=500, detail="Invalid JSON from token generator") |
|
|
|
|
| |
| def po_token_verifier() -> Tuple[str, str]: |
| token_object = generate_youtube_token() |
| return token_object["visitorData"], token_object["poToken"] |
|
|
|
|
| |
| @app.get("/api/video/{id}") |
| def video_id(id: str): |
| url = f"https://www.youtube.com/watch?v={id}" |
| log.info(f"Fetching YouTube video for ID: {id}") |
|
|
| proxy_list = [ |
| "198.23.239.134:6540:widxbsal:cnu4fy74afh7", |
| "207.244.217.165:6712:widxbsal:cnu4fy74afh7", |
| "107.172.163.27:6543:widxbsal:cnu4fy74afh7", |
| "23.94.138.75:6349:widxbsal:cnu4fy74afh7", |
| "216.10.27.159:6837:widxbsal:cnu4fy74afh7", |
| "136.0.207.84:6661:widxbsal:cnu4fy74afh7", |
| "64.64.118.149:6732:widxbsal:cnu4fy74afh7", |
| "142.147.128.93:6593:widxbsal:cnu4fy74afh7", |
| "104.239.105.125:6655:widxbsal:cnu4fy74afh7", |
| "173.0.9.70:5653:widxbsal:cnu4fy74afh7" |
| ] |
|
|
| proxy = random.choice(proxy_list) |
| |
| ip, port, user, password = proxy.split(":") |
| |
| |
| proxies = { |
| "http": f"http://{user}:{password}@{ip}:{port}", |
| "https": f"http://{user}:{password}@{ip}:{port}" |
| } |
| |
| try: |
| yt = YouTube( |
| url, |
| use_po_token=True, |
| po_token_verifier=po_token_verifier(), |
| proxies=proxies, |
| ) |
| ys = yt.streams.get_highest_resolution() |
|
|
| if not ys: |
| log.warning("No suitable video stream found.") |
| raise HTTPException(status_code=404, detail="No suitable video stream found") |
|
|
| log.info(f"Video fetched: {yt.title}") |
| return { |
| "title": yt.title, |
| "url": ys.url, |
| "thumbnail": yt.thumbnail_url, |
| "description": yt.description |
| } |
|
|
| except Exception as e: |
| log.error(f"Failed to fetch video info: {e}") |
| raise HTTPException(status_code=500, detail=str(e)) |