File size: 4,402 Bytes
566327c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException
import time
import logging
import os

app = FastAPI()

# Configure CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class VideoRequest(BaseModel):
    url: str

class TranscriptResponse(BaseModel):
    success: bool
    transcript: list[str] | None
    error: str | None
    processing_time: float

def init_driver():
    options = Options()
    options.add_argument("--headless")
    options.add_argument("--disable-gpu")
    options.add_argument("--no-sandbox")
    options.add_argument("--disable-dev-shm-usage")
    options.add_argument("--log-level=3")
    options.add_experimental_option("excludeSwitches", ["enable-automation"])
    options.add_experimental_option('useAutomationExtension', False)
    
    # For Hugging Face Spaces
    options.binary_location = "/usr/bin/google-chrome"
    return webdriver.Chrome(options=options)

@app.post("/transcript", response_model=TranscriptResponse)
async def get_transcript(request: VideoRequest):
    start_time = time.time()
    driver = None
    
    try:
        video_url = request.url
        if not ("youtube.com" in video_url or "youtu.be" in video_url):
            raise HTTPException(status_code=400, detail="Invalid YouTube URL")

        driver = init_driver()
        logger.info(f"Processing URL: {video_url}")
        driver.get(video_url)

        # Handle cookie consent if it appears
        try:
            cookie_button = WebDriverWait(driver, 5).until(
                EC.element_to_be_clickable((By.XPATH, "//*[contains(text(), 'Accept all')]"))
            )
            cookie_button.click()
            logger.info("Accepted cookies")
        except TimeoutException:
            pass

        # Click more button
        more_button = WebDriverWait(driver, 10).until(
            EC.element_to_be_clickable((By.ID, "expand"))
        )
        driver.execute_script("arguments[0].click();", more_button)

        # Click transcript button
        transcript_button = WebDriverWait(driver, 10).until(
            EC.element_to_be_clickable((By.CSS_SELECTOR, "button[aria-label='Show transcript']"))
        )
        driver.execute_script("arguments[0].click();", transcript_button)

        # Wait for transcript
        WebDriverWait(driver, 15).until(
            EC.presence_of_element_located((By.ID, "segments-container"))
        )

        # Extract transcript
        segments = driver.find_elements(By.CSS_SELECTOR, "div.ytd-transcript-segment-renderer")
        transcript = [segment.find_element(By.CLASS_NAME, "segment-text").text.strip() 
                     for segment in segments if segment.find_element(By.CLASS_NAME, "segment-text").text.strip()]

        if not transcript:
            raise HTTPException(status_code=404, detail="No transcript available")

        return TranscriptResponse(
            success=True,
            transcript=transcript,
            error=None,
            processing_time=time.time() - start_time
        )

    except TimeoutException as e:
        logger.error(f"Timeout: {str(e)}")
        return TranscriptResponse(
            success=False,
            transcript=None,
            error="Timed out waiting for page elements",
            processing_time=time.time() - start_time
        )
    except Exception as e:
        logger.error(f"Error: {str(e)}")
        return TranscriptResponse(
            success=False,
            transcript=None,
            error=str(e),
            processing_time=time.time() - start_time
        )
    finally:
        if driver:
            driver.quit()

@app.get("/")
async def health_check():
    return {"status": "OK", "message": "YouTube Transcript API is running"}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=int(os.getenv("PORT", 7860)))