File size: 4,940 Bytes
2796280 030cf68 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 | from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.responses import JSONResponse
import asyncio
import random
import logging
from typing import List, Dict, Optional, Any
from playwright.async_api import async_playwright
import uvicorn
from pydantic import BaseModel
# Set up logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
app = FastAPI(title="Kling Video Fetcher",
description="API to fetch random videos from Kling AI gallery")
# Model for the response
class VideoResponse(BaseModel):
url: str
thumbnail: Optional[str] = None
title: Optional[str] = None
status: str = "success"
# Cache to store recent video URLs
video_cache: List[Dict[str, Any]] = []
last_fetch_time = 0
async def fetch_random_video_from_kling() -> Dict[str, Any]:
try:
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True, args=[
'--no-sandbox',
'--disable-setuid-sandbox',
'--disable-dev-shm-usage',
'--disable-accelerated-2d-canvas',
'--no-first-run',
'--no-zygote',
'--disable-gpu',
'--disable-web-security',
])
context = await browser.new_context(viewport={'width': 1280, 'height': 800})
page = await context.new_page()
video_urls = []
# Intercept requests to capture video URLs
async def on_response(response):
try:
url = response.url
content_type = response.headers.get("content-type", "")
if ".mp4" in url or "video" in content_type:
video_urls.append(url)
except Exception as e:
logger.error(f"Error processing response: {e}")
page.on("response", on_response)
await page.goto("https://app.klingai.com/global/community/material", timeout=90000, wait_until="networkidle")
await page.wait_for_timeout(5000)
for _ in range(6):
await page.evaluate("window.scrollBy(0, 800)")
await page.wait_for_timeout(1500)
elements = await page.query_selector_all("video, [class*='video'], [class*='card'], [class*='media']")
elements = [e for e in elements if await e.is_visible()]
random.shuffle(elements)
for element in elements[:5]:
try:
await element.scroll_into_view_if_needed()
await page.wait_for_timeout(1000)
await element.click()
await page.wait_for_timeout(3000)
if video_urls:
video_url = video_urls[0]
title = await page.title()
return {
"url": video_url,
"thumbnail": None,
"title": title if title != "Kling AI" else None,
"status": "success"
}
except Exception as e:
logger.warning(f"Element interaction failed: {e}")
raise HTTPException(status_code=404, detail="Could not find a downloadable video")
except Exception as e:
logger.error(f"Unexpected error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/random-video", response_model=VideoResponse)
async def get_random_video():
"""
Endpoint to get a random video from Kling AI gallery.
Returns the video URL and metadata.
"""
try:
# Fetch a random video
video_info = await fetch_random_video_from_kling()
# Return the video information
return {
"url": video_info["url"],
"thumbnail": video_info.get("thumbnail"),
"title": video_info.get("title"),
"status": "success"
}
except HTTPException as e:
# Re-raise HTTP exceptions
raise e
except Exception as e:
# Log and convert other exceptions to HTTPException
logger.error(f"Error in endpoint: {e}")
raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}")
@app.get("/")
async def root():
"""Root endpoint with API information"""
return {
"name": "Kling Video Fetcher API",
"version": "1.0",
"endpoints": {
"/api/random-video": "Get a random video URL from Kling AI gallery"
}
}
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {"status": "healthy"}
# Run the server
if __name__ == "__main__":
uvicorn.run("app:app", host="0.0.0.0", port=8000, reload=True) |