| from fastapi import FastAPI, HTTPException, BackgroundTasks |
| from fastapi.responses import JSONResponse |
| import asyncio |
| import random |
| import logging |
| from typing import List, Dict, Optional, Any |
| from playwright.async_api import async_playwright |
| import uvicorn |
| from pydantic import BaseModel |
|
|
| |
| logging.basicConfig(level=logging.INFO, |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
| logger = logging.getLogger(__name__) |
|
|
| app = FastAPI(title="Kling Video Fetcher", |
| description="API to fetch random videos from Kling AI gallery") |
|
|
| |
| class VideoResponse(BaseModel): |
| url: str |
| thumbnail: Optional[str] = None |
| title: Optional[str] = None |
| status: str = "success" |
|
|
| |
| video_cache: List[Dict[str, Any]] = [] |
| last_fetch_time = 0 |
|
|
| async def fetch_random_video_from_kling() -> Dict[str, Any]: |
| try: |
| async with async_playwright() as p: |
| browser = await p.chromium.launch(headless=True, args=[ |
| '--no-sandbox', |
| '--disable-setuid-sandbox', |
| '--disable-dev-shm-usage', |
| '--disable-accelerated-2d-canvas', |
| '--no-first-run', |
| '--no-zygote', |
| '--disable-gpu', |
| '--disable-web-security', |
| ]) |
| context = await browser.new_context(viewport={'width': 1280, 'height': 800}) |
| page = await context.new_page() |
|
|
| video_urls = [] |
|
|
| |
| async def on_response(response): |
| try: |
| url = response.url |
| content_type = response.headers.get("content-type", "") |
| if ".mp4" in url or "video" in content_type: |
| video_urls.append(url) |
| except Exception as e: |
| logger.error(f"Error processing response: {e}") |
|
|
| page.on("response", on_response) |
|
|
| await page.goto("https://app.klingai.com/global/community/material", timeout=90000, wait_until="networkidle") |
| await page.wait_for_timeout(5000) |
|
|
| for _ in range(6): |
| await page.evaluate("window.scrollBy(0, 800)") |
| await page.wait_for_timeout(1500) |
|
|
| elements = await page.query_selector_all("video, [class*='video'], [class*='card'], [class*='media']") |
| elements = [e for e in elements if await e.is_visible()] |
| random.shuffle(elements) |
|
|
| for element in elements[:5]: |
| try: |
| await element.scroll_into_view_if_needed() |
| await page.wait_for_timeout(1000) |
| await element.click() |
| await page.wait_for_timeout(3000) |
|
|
| if video_urls: |
| video_url = video_urls[0] |
| title = await page.title() |
| return { |
| "url": video_url, |
| "thumbnail": None, |
| "title": title if title != "Kling AI" else None, |
| "status": "success" |
| } |
| except Exception as e: |
| logger.warning(f"Element interaction failed: {e}") |
|
|
| raise HTTPException(status_code=404, detail="Could not find a downloadable video") |
| except Exception as e: |
| logger.error(f"Unexpected error: {e}") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| @app.get("/api/random-video", response_model=VideoResponse) |
| async def get_random_video(): |
| """ |
| Endpoint to get a random video from Kling AI gallery. |
| Returns the video URL and metadata. |
| """ |
| try: |
| |
| video_info = await fetch_random_video_from_kling() |
| |
| |
| return { |
| "url": video_info["url"], |
| "thumbnail": video_info.get("thumbnail"), |
| "title": video_info.get("title"), |
| "status": "success" |
| } |
| except HTTPException as e: |
| |
| raise e |
| except Exception as e: |
| |
| logger.error(f"Error in endpoint: {e}") |
| raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") |
|
|
| @app.get("/") |
| async def root(): |
| """Root endpoint with API information""" |
| return { |
| "name": "Kling Video Fetcher API", |
| "version": "1.0", |
| "endpoints": { |
| "/api/random-video": "Get a random video URL from Kling AI gallery" |
| } |
| } |
|
|
| @app.get("/health") |
| async def health_check(): |
| """Health check endpoint""" |
| return {"status": "healthy"} |
|
|
| |
| if __name__ == "__main__": |
| uvicorn.run("app:app", host="0.0.0.0", port=8000, reload=True) |