from fastapi import FastAPI, HTTPException, BackgroundTasks from fastapi.responses import JSONResponse import asyncio import random import logging from typing import List, Dict, Optional, Any from playwright.async_api import async_playwright import uvicorn from pydantic import BaseModel # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) app = FastAPI(title="Kling Video Fetcher", description="API to fetch random videos from Kling AI gallery") # Model for the response class VideoResponse(BaseModel): url: str thumbnail: Optional[str] = None title: Optional[str] = None status: str = "success" # Cache to store recent video URLs video_cache: List[Dict[str, Any]] = [] last_fetch_time = 0 async def fetch_random_video_from_kling() -> Dict[str, Any]: try: async with async_playwright() as p: browser = await p.chromium.launch(headless=True, args=[ '--no-sandbox', '--disable-setuid-sandbox', '--disable-dev-shm-usage', '--disable-accelerated-2d-canvas', '--no-first-run', '--no-zygote', '--disable-gpu', '--disable-web-security', ]) context = await browser.new_context(viewport={'width': 1280, 'height': 800}) page = await context.new_page() video_urls = [] # Intercept requests to capture video URLs async def on_response(response): try: url = response.url content_type = response.headers.get("content-type", "") if ".mp4" in url or "video" in content_type: video_urls.append(url) except Exception as e: logger.error(f"Error processing response: {e}") page.on("response", on_response) await page.goto("https://app.klingai.com/global/community/material", timeout=90000, wait_until="networkidle") await page.wait_for_timeout(5000) for _ in range(6): await page.evaluate("window.scrollBy(0, 800)") await page.wait_for_timeout(1500) elements = await page.query_selector_all("video, [class*='video'], [class*='card'], [class*='media']") elements = [e for e in elements if await e.is_visible()] random.shuffle(elements) for element in elements[:5]: try: await element.scroll_into_view_if_needed() await page.wait_for_timeout(1000) await element.click() await page.wait_for_timeout(3000) if video_urls: video_url = video_urls[0] title = await page.title() return { "url": video_url, "thumbnail": None, "title": title if title != "Kling AI" else None, "status": "success" } except Exception as e: logger.warning(f"Element interaction failed: {e}") raise HTTPException(status_code=404, detail="Could not find a downloadable video") except Exception as e: logger.error(f"Unexpected error: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/random-video", response_model=VideoResponse) async def get_random_video(): """ Endpoint to get a random video from Kling AI gallery. Returns the video URL and metadata. """ try: # Fetch a random video video_info = await fetch_random_video_from_kling() # Return the video information return { "url": video_info["url"], "thumbnail": video_info.get("thumbnail"), "title": video_info.get("title"), "status": "success" } except HTTPException as e: # Re-raise HTTP exceptions raise e except Exception as e: # Log and convert other exceptions to HTTPException logger.error(f"Error in endpoint: {e}") raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") @app.get("/") async def root(): """Root endpoint with API information""" return { "name": "Kling Video Fetcher API", "version": "1.0", "endpoints": { "/api/random-video": "Get a random video URL from Kling AI gallery" } } @app.get("/health") async def health_check(): """Health check endpoint""" return {"status": "healthy"} # Run the server if __name__ == "__main__": uvicorn.run("app:app", host="0.0.0.0", port=8000, reload=True)