from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from selenium import webdriver from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, WebDriverException import time import logging import os import shutil from pathlib import Path app = FastAPI() # Configure CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class VideoRequest(BaseModel): url: str class TranscriptResponse(BaseModel): success: bool transcript: list[str] | None error: str | None processing_time: float def init_driver(): options = Options() options.add_argument("--headless=new") options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") options.add_argument("--disable-gpu") # Try multiple possible Chrome binary locations possible_chrome_paths = [ "/usr/bin/google-chrome", "/usr/bin/google-chrome-stable", "/usr/lib/chromium-browser/chrome", "/usr/bin/chromium" ] chrome_path = None for path in possible_chrome_paths: if os.path.exists(path): chrome_path = path break if not chrome_path: logger.error(f"No Chrome binary found in paths: {possible_chrome_paths}") raise Exception(f"No Chrome binary found in paths: {possible_chrome_paths}") options.binary_location = chrome_path logger.info(f"Using Chrome binary: {chrome_path}") try: chromedriver_path = "/usr/bin/chromedriver" if not os.path.exists(chromedriver_path): logger.error(f"ChromeDriver not found at {chromedriver_path}") raise Exception(f"ChromeDriver not found at {chromedriver_path}") service = Service(executable_path=chromedriver_path) driver = webdriver.Chrome(service=service, options=options) logger.info("ChromeDriver initialized successfully") return driver except Exception as e: logger.error(f"Driver initialization failed: {str(e)}") raise Exception(f"Driver initialization failed: {str(e)}") @app.post("/transcript", response_model=TranscriptResponse) async def get_transcript(request: VideoRequest): start_time = time.time() driver = None try: video_url = request.url if not ("youtube.com" in video_url or "youtu.be" in video_url): raise HTTPException(status_code=400, detail="Invalid YouTube URL") driver = init_driver() logger.info(f"Processing URL: {video_url}") driver.get(video_url) # Handle cookie consent if it appears try: cookie_button = WebDriverWait(driver, 5).until( EC.element_to_be_clickable((By.XPATH, "//*[contains(text(), 'Accept all')]")) ) cookie_button.click() logger.info("Accepted cookies") except TimeoutException: logger.info("No cookie consent found") pass # Click more button more_button = WebDriverWait(driver, 10).until( EC.element_to_be_clickable((By.ID, "expand")) ) driver.execute_script("arguments[0].click();", more_button) # Click transcript button transcript_button = WebDriverWait(driver, 10).until( EC.element_to_be_clickable((By.CSS_SELECTOR, "button[aria-label='Show transcript']")) ) driver.execute_script("arguments[0].click();", transcript_button) # Wait for transcript WebDriverWait(driver, 15).until( EC.presence_of_element_located((By.ID, "segments-container")) ) # Extract transcript segments = driver.find_elements(By.CSS_SELECTOR, "div.ytd-transcript-segment-renderer") transcript = [] for segment in segments: try: text = segment.find_element(By.CLASS_NAME, "segment-text").text.strip() if text: transcript.append(text) except: continue if not transcript: raise HTTPException(status_code=404, detail="No transcript available") return TranscriptResponse( success=True, transcript=transcript, error=None, processing_time=time.time() - start_time ) except TimeoutException as e: error_msg = "Timed out waiting for page elements - the video might not have transcripts" logger.error(error_msg) return TranscriptResponse( success=False, transcript=None, error=error_msg, processing_time=time.time() - start_time ) except Exception as e: logger.error(f"Error: {str(e)}") return TranscriptResponse( success=False, transcript=None, error=str(e), processing_time=time.time() - start_time ) finally: if driver: driver.quit() app = FastAPI() @app.get("/health") def health_check(): chrome_path = shutil.which("google-chrome") chromedriver_path = shutil.which("chromedriver") return { "ChromePath": chrome_path, "ChromeDriverPath": chromedriver_path, "ChromeExists": Path(chrome_path or "").exists(), "ChromeDriverExists": Path(chromedriver_path or "").exists() } @app.get("/") async def root(): return {"message": "Welcome to YouTube Transcript API"} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=int(os.getenv("PORT", 7860)))