hamza2923's picture
Update app.py
98664d2 verified
raw
history blame
5.98 kB
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, WebDriverException
import time
import logging
import os
app = FastAPI()
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class VideoRequest(BaseModel):
url: str
class TranscriptResponse(BaseModel):
success: bool
transcript: list[str] | None
error: str | None
processing_time: float
def init_driver():
options = Options()
options.add_argument("--headless=new")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--disable-gpu")
options.binary_location = "/usr/bin/google-chrome"
try:
# Use explicit ChromeDriver path
service = Service(executable_path="/usr/bin/chromedriver")
driver = webdriver.Chrome(service=service, options=options)
logger.info("ChromeDriver initialized successfully")
return driver
except Exception as e:
logger.error(f"Driver initialization failed: {str(e)}")
raise Exception(f"Driver initialization failed: {str(e)}")
@app.post("/transcript", response_model=TranscriptResponse)
async def get_transcript(request: VideoRequest):
start_time = time.time()
driver = None
try:
video_url = request.url
if not ("youtube.com" in video_url or "youtu.be" in video_url):
raise HTTPException(status_code=400, detail="Invalid YouTube URL")
driver = init_driver()
logger.info(f"Processing URL: {video_url}")
driver.get(video_url)
# Handle cookie consent if it appears
try:
cookie_button = WebDriverWait(driver, 5).until(
EC.element_to_be_clickable((By.XPATH, "//*[contains(text(), 'Accept all')]"))
)
cookie_button.click()
logger.info("Accepted cookies")
except TimeoutException:
logger.info("No cookie consent found")
pass
# Click more button
more_button = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((By.ID, "expand"))
)
driver.execute_script("arguments[0].click();", more_button)
# Click transcript button
transcript_button = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((By.CSS_SELECTOR, "button[aria-label='Show transcript']"))
)
driver.execute_script("arguments[0].click();", transcript_button)
# Wait for transcript
WebDriverWait(driver, 15).until(
EC.presence_of_element_located((By.ID, "segments-container"))
)
# Extract transcript
segments = driver.find_elements(By.CSS_SELECTOR, "div.ytd-transcript-segment-renderer")
transcript = []
for segment in segments:
try:
text = segment.find_element(By.CLASS_NAME, "segment-text").text.strip()
if text:
transcript.append(text)
except:
continue
if not transcript:
raise HTTPException(status_code=404, detail="No transcript available")
return TranscriptResponse(
success=True,
transcript=transcript,
error=None,
processing_time=time.time() - start_time
)
except TimeoutException as e:
error_msg = "Timed out waiting for page elements - the video might not have transcripts"
logger.error(error_msg)
return TranscriptResponse(
success=False,
transcript=None,
error=error_msg,
processing_time=time.time() - start_time
)
except Exception as e:
logger.error(f"Error: {str(e)}")
return TranscriptResponse(
success=False,
transcript=None,
error=str(e),
processing_time=time.time() - start_time
)
finally:
if driver:
driver.quit()
@app.get("/health")
async def health_check():
paths = {
"chrome": "/usr/bin/google-chrome",
"chromedriver": "/usr/bin/chromedriver"
}
exists = {name: os.path.exists(path) for name, path in paths.items()}
chrome_version = "Not found"
chromedriver_version = "Not found"
if exists["chrome"]:
try:
chrome_version = os.popen("/usr/bin/google-chrome --version").read().strip()
except Exception as e:
logger.error(f"Failed to get Chrome version: {str(e)}")
if exists["chromedriver"]:
try:
chromedriver_version = os.popen("/usr/bin/chromedriver --version").read().strip()
except Exception as e:
logger.error(f"Failed to get ChromeDriver version: {str(e)}")
status = "OK" if all(exists.values()) else "ERROR"
logger.info(f"Health check: Chrome={chrome_version}, ChromeDriver={chromedriver_version}, Paths={exists}")
return {
"status": status,
"paths": exists,
"chrome_version": chrome_version,
"chromedriver_version": chromedriver_version,
"working": all(exists.values())
}
@app.get("/")
async def root():
return {"message": "Welcome to YouTube Transcript API"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=int(os.getenv("PORT", 7860)))