File size: 4,940 Bytes
2796280
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
030cf68
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.responses import JSONResponse
import asyncio
import random
import logging
from typing import List, Dict, Optional, Any
from playwright.async_api import async_playwright
import uvicorn
from pydantic import BaseModel

# Set up logging
logging.basicConfig(level=logging.INFO, 
                   format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

app = FastAPI(title="Kling Video Fetcher", 
             description="API to fetch random videos from Kling AI gallery")

# Model for the response
class VideoResponse(BaseModel):
    url: str
    thumbnail: Optional[str] = None
    title: Optional[str] = None
    status: str = "success"

# Cache to store recent video URLs
video_cache: List[Dict[str, Any]] = []
last_fetch_time = 0

async def fetch_random_video_from_kling() -> Dict[str, Any]:
    try:
        async with async_playwright() as p:
            browser = await p.chromium.launch(headless=True, args=[
                '--no-sandbox',
                '--disable-setuid-sandbox',
                '--disable-dev-shm-usage',
                '--disable-accelerated-2d-canvas',
                '--no-first-run',
                '--no-zygote',
                '--disable-gpu',
                '--disable-web-security',
            ])
            context = await browser.new_context(viewport={'width': 1280, 'height': 800})
            page = await context.new_page()

            video_urls = []

            # Intercept requests to capture video URLs
            async def on_response(response):
                try:
                    url = response.url
                    content_type = response.headers.get("content-type", "")
                    if ".mp4" in url or "video" in content_type:
                        video_urls.append(url)
                except Exception as e:
                    logger.error(f"Error processing response: {e}")

            page.on("response", on_response)

            await page.goto("https://app.klingai.com/global/community/material", timeout=90000, wait_until="networkidle")
            await page.wait_for_timeout(5000)

            for _ in range(6):
                await page.evaluate("window.scrollBy(0, 800)")
                await page.wait_for_timeout(1500)

            elements = await page.query_selector_all("video, [class*='video'], [class*='card'], [class*='media']")
            elements = [e for e in elements if await e.is_visible()]
            random.shuffle(elements)

            for element in elements[:5]:
                try:
                    await element.scroll_into_view_if_needed()
                    await page.wait_for_timeout(1000)
                    await element.click()
                    await page.wait_for_timeout(3000)

                    if video_urls:
                        video_url = video_urls[0]
                        title = await page.title()
                        return {
                            "url": video_url,
                            "thumbnail": None,
                            "title": title if title != "Kling AI" else None,
                            "status": "success"
                        }
                except Exception as e:
                    logger.warning(f"Element interaction failed: {e}")

            raise HTTPException(status_code=404, detail="Could not find a downloadable video")
    except Exception as e:
        logger.error(f"Unexpected error: {e}")
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/api/random-video", response_model=VideoResponse)
async def get_random_video():
    """
    Endpoint to get a random video from Kling AI gallery.
    Returns the video URL and metadata.
    """
    try:
        # Fetch a random video
        video_info = await fetch_random_video_from_kling()
        
        # Return the video information
        return {
            "url": video_info["url"],
            "thumbnail": video_info.get("thumbnail"),
            "title": video_info.get("title"),
            "status": "success"
        }
    except HTTPException as e:
        # Re-raise HTTP exceptions
        raise e
    except Exception as e:
        # Log and convert other exceptions to HTTPException
        logger.error(f"Error in endpoint: {e}")
        raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}")

@app.get("/")
async def root():
    """Root endpoint with API information"""
    return {
        "name": "Kling Video Fetcher API",
        "version": "1.0",
        "endpoints": {
            "/api/random-video": "Get a random video URL from Kling AI gallery"
        }
    }

@app.get("/health")
async def health_check():
    """Health check endpoint"""
    return {"status": "healthy"}

# Run the server
if __name__ == "__main__":
    uvicorn.run("app:app", host="0.0.0.0", port=8000, reload=True)