hamza2923's picture
Update app.py
dd1737f verified
raw
history blame
7.38 kB
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, WebDriverException
import time
import logging
import os
app = FastAPI()
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class VideoRequest(BaseModel):
url: str
class TranscriptResponse(BaseModel):
success: bool
transcript: list[str] | None
error: str | None
processing_time: float
def init_driver():
options = Options()
options.add_argument("--headless=new")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--disable-gpu")
# Try multiple possible Chrome binary locations
possible_chrome_paths = [
"/usr/bin/google-chrome",
"/usr/bin/google-chrome-stable",
"/usr/lib/chromium-browser/chrome",
"/usr/bin/chromium"
]
chrome_path = None
for path in possible_chrome_paths:
if os.path.exists(path):
chrome_path = path
break
if not chrome_path:
logger.error(f"No Chrome binary found in paths: {possible_chrome_paths}")
raise Exception(f"No Chrome binary found in paths: {possible_chrome_paths}")
options.binary_location = chrome_path
logger.info(f"Using Chrome binary: {chrome_path}")
try:
chromedriver_path = "/usr/bin/chromedriver"
if not os.path.exists(chromedriver_path):
logger.error(f"ChromeDriver not found at {chromedriver_path}")
raise Exception(f"ChromeDriver not found at {chromedriver_path}")
service = Service(executable_path=chromedriver_path)
driver = webdriver.Chrome(service=service, options=options)
logger.info("ChromeDriver initialized successfully")
return driver
except Exception as e:
logger.error(f"Driver initialization failed: {str(e)}")
raise Exception(f"Driver initialization failed: {str(e)}")
@app.post("/transcript", response_model=TranscriptResponse)
async def get_transcript(request: VideoRequest):
start_time = time.time()
driver = None
try:
video_url = request.url
if not ("youtube.com" in video_url or "youtu.be" in video_url):
raise HTTPException(status_code=400, detail="Invalid YouTube URL")
driver = init_driver()
logger.info(f"Processing URL: {video_url}")
driver.get(video_url)
# Handle cookie consent if it appears
try:
cookie_button = WebDriverWait(driver, 5).until(
EC.element_to_be_clickable((By.XPATH, "//*[contains(text(), 'Accept all')]"))
)
cookie_button.click()
logger.info("Accepted cookies")
except TimeoutException:
logger.info("No cookie consent found")
pass
# Click more button
more_button = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((By.ID, "expand"))
)
driver.execute_script("arguments[0].click();", more_button)
# Click transcript button
transcript_button = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((By.CSS_SELECTOR, "button[aria-label='Show transcript']"))
)
driver.execute_script("arguments[0].click();", transcript_button)
# Wait for transcript
WebDriverWait(driver, 15).until(
EC.presence_of_element_located((By.ID, "segments-container"))
)
# Extract transcript
segments = driver.find_elements(By.CSS_SELECTOR, "div.ytd-transcript-segment-renderer")
transcript = []
for segment in segments:
try:
text = segment.find_element(By.CLASS_NAME, "segment-text").text.strip()
if text:
transcript.append(text)
except:
continue
if not transcript:
raise HTTPException(status_code=404, detail="No transcript available")
return TranscriptResponse(
success=True,
transcript=transcript,
error=None,
processing_time=time.time() - start_time
)
except TimeoutException as e:
error_msg = "Timed out waiting for page elements - the video might not have transcripts"
logger.error(error_msg)
return TranscriptResponse(
success=False,
transcript=None,
error=error_msg,
processing_time=time.time() - start_time
)
except Exception as e:
logger.error(f"Error: {str(e)}")
return TranscriptResponse(
success=False,
transcript=None,
error=str(e),
processing_time=time.time() - start_time
)
finally:
if driver:
driver.quit()
@app.get("/health")
async def health_check():
paths = {
"chrome": ["/usr/bin/google-chrome", "/usr/bin/google-chrome-stable", "/usr/lib/chromium-browser/chrome", "/usr/bin/chromium"],
"chromedriver": ["/usr/bin/chromedriver"]
}
exists = {
"chrome": any(os.path.exists(path) for path in paths["chrome"]),
"chromedriver": any(os.path.exists(path) for path in paths["chromedriver"])
}
found_paths = {
"chrome": [path for path in paths["chrome"] if os.path.exists(path)],
"chromedriver": [path for path in paths["chromedriver"] if os.path.exists(path)]
}
chrome_version = "Not found"
chromedriver_version = "Not found"
if exists["chrome"]:
for path in paths["chrome"]:
try:
chrome_version = os.popen(f"{path} --version").read().strip()
if chrome_version:
break
except Exception as e:
logger.error(f"Failed to get Chrome version for {path}: {str(e)}")
if exists["chromedriver"]:
try:
chromedriver_version = os.popen("/usr/bin/chromedriver --version").read().strip()
except Exception as e:
logger.error(f"Failed to get ChromeDriver version: {str(e)}")
status = "OK" if all(exists.values()) else "ERROR"
logger.info(f"Health check: Chrome={chrome_version}, ChromeDriver={chromedriver_version}, Paths={found_paths}, Exists={exists}")
return {
"status": status,
"paths_checked": paths,
"paths_found": found_paths,
"chrome_version": chrome_version,
"chromedriver_version": chromedriver_version,
"working": all(exists.values())
}
@app.get("/")
async def root():
return {"message": "Welcome to YouTube Transcript API"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=int(os.getenv("PORT", 7860)))