hamza2923's picture
Create app.py
566327c verified
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException
import time
import logging
import os
app = FastAPI()
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class VideoRequest(BaseModel):
url: str
class TranscriptResponse(BaseModel):
success: bool
transcript: list[str] | None
error: str | None
processing_time: float
def init_driver():
options = Options()
options.add_argument("--headless")
options.add_argument("--disable-gpu")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--log-level=3")
options.add_experimental_option("excludeSwitches", ["enable-automation"])
options.add_experimental_option('useAutomationExtension', False)
# For Hugging Face Spaces
options.binary_location = "/usr/bin/google-chrome"
return webdriver.Chrome(options=options)
@app.post("/transcript", response_model=TranscriptResponse)
async def get_transcript(request: VideoRequest):
start_time = time.time()
driver = None
try:
video_url = request.url
if not ("youtube.com" in video_url or "youtu.be" in video_url):
raise HTTPException(status_code=400, detail="Invalid YouTube URL")
driver = init_driver()
logger.info(f"Processing URL: {video_url}")
driver.get(video_url)
# Handle cookie consent if it appears
try:
cookie_button = WebDriverWait(driver, 5).until(
EC.element_to_be_clickable((By.XPATH, "//*[contains(text(), 'Accept all')]"))
)
cookie_button.click()
logger.info("Accepted cookies")
except TimeoutException:
pass
# Click more button
more_button = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((By.ID, "expand"))
)
driver.execute_script("arguments[0].click();", more_button)
# Click transcript button
transcript_button = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((By.CSS_SELECTOR, "button[aria-label='Show transcript']"))
)
driver.execute_script("arguments[0].click();", transcript_button)
# Wait for transcript
WebDriverWait(driver, 15).until(
EC.presence_of_element_located((By.ID, "segments-container"))
)
# Extract transcript
segments = driver.find_elements(By.CSS_SELECTOR, "div.ytd-transcript-segment-renderer")
transcript = [segment.find_element(By.CLASS_NAME, "segment-text").text.strip()
for segment in segments if segment.find_element(By.CLASS_NAME, "segment-text").text.strip()]
if not transcript:
raise HTTPException(status_code=404, detail="No transcript available")
return TranscriptResponse(
success=True,
transcript=transcript,
error=None,
processing_time=time.time() - start_time
)
except TimeoutException as e:
logger.error(f"Timeout: {str(e)}")
return TranscriptResponse(
success=False,
transcript=None,
error="Timed out waiting for page elements",
processing_time=time.time() - start_time
)
except Exception as e:
logger.error(f"Error: {str(e)}")
return TranscriptResponse(
success=False,
transcript=None,
error=str(e),
processing_time=time.time() - start_time
)
finally:
if driver:
driver.quit()
@app.get("/")
async def health_check():
return {"status": "OK", "message": "YouTube Transcript API is running"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=int(os.getenv("PORT", 7860)))