from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from selenium import webdriver from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, WebDriverException import time import logging import os app = FastAPI() # Configure CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class VideoRequest(BaseModel): url: str class TranscriptResponse(BaseModel): success: bool transcript: list[str] | None error: str | None processing_time: float def init_driver(): options = Options() options.add_argument("--headless=new") options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") options.add_argument("--disable-gpu") options.binary_location = "/usr/bin/google-chrome" try: # Use explicit ChromeDriver path service = Service(executable_path="/usr/bin/chromedriver") driver = webdriver.Chrome(service=service, options=options) logger.info("ChromeDriver initialized successfully") return driver except Exception as e: logger.error(f"Driver initialization failed: {str(e)}") raise Exception(f"Driver initialization failed: {str(e)}") @app.post("/transcript", response_model=TranscriptResponse) async def get_transcript(request: VideoRequest): start_time = time.time() driver = None try: video_url = request.url if not ("youtube.com" in video_url or "youtu.be" in video_url): raise HTTPException(status_code=400, detail="Invalid YouTube URL") driver = init_driver() logger.info(f"Processing URL: {video_url}") driver.get(video_url) # Handle cookie consent if it appears try: cookie_button = WebDriverWait(driver, 5).until( EC.element_to_be_clickable((By.XPATH, "//*[contains(text(), 'Accept all')]")) ) cookie_button.click() logger.info("Accepted cookies") except TimeoutException: logger.info("No cookie consent found") pass # Click more button more_button = WebDriverWait(driver, 10).until( EC.element_to_be_clickable((By.ID, "expand")) ) driver.execute_script("arguments[0].click();", more_button) # Click transcript button transcript_button = WebDriverWait(driver, 10).until( EC.element_to_be_clickable((By.CSS_SELECTOR, "button[aria-label='Show transcript']")) ) driver.execute_script("arguments[0].click();", transcript_button) # Wait for transcript WebDriverWait(driver, 15).until( EC.presence_of_element_located((By.ID, "segments-container")) ) # Extract transcript segments = driver.find_elements(By.CSS_SELECTOR, "div.ytd-transcript-segment-renderer") transcript = [] for segment in segments: try: text = segment.find_element(By.CLASS_NAME, "segment-text").text.strip() if text: transcript.append(text) except: continue if not transcript: raise HTTPException(status_code=404, detail="No transcript available") return TranscriptResponse( success=True, transcript=transcript, error=None, processing_time=time.time() - start_time ) except TimeoutException as e: error_msg = "Timed out waiting for page elements - the video might not have transcripts" logger.error(error_msg) return TranscriptResponse( success=False, transcript=None, error=error_msg, processing_time=time.time() - start_time ) except Exception as e: logger.error(f"Error: {str(e)}") return TranscriptResponse( success=False, transcript=None, error=str(e), processing_time=time.time() - start_time ) finally: if driver: driver.quit() @app.get("/health") async def health_check(): paths = { "chrome": "/usr/bin/google-chrome", "chromedriver": "/usr/bin/chromedriver" } exists = {name: os.path.exists(path) for name, path in paths.items()} chrome_version = "Not found" chromedriver_version = "Not found" if exists["chrome"]: try: chrome_version = os.popen("/usr/bin/google-chrome --version").read().strip() except Exception as e: logger.error(f"Failed to get Chrome version: {str(e)}") if exists["chromedriver"]: try: chromedriver_version = os.popen("/usr/bin/chromedriver --version").read().strip() except Exception as e: logger.error(f"Failed to get ChromeDriver version: {str(e)}") status = "OK" if all(exists.values()) else "ERROR" logger.info(f"Health check: Chrome={chrome_version}, ChromeDriver={chromedriver_version}, Paths={exists}") return { "status": status, "paths": exists, "chrome_version": chrome_version, "chromedriver_version": chromedriver_version, "working": all(exists.values()) } @app.get("/") async def root(): return {"message": "Welcome to YouTube Transcript API"} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=int(os.getenv("PORT", 7860)))