scrape / app.py
d3evil4's picture
Update app.py
2796280 verified
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.responses import JSONResponse
import asyncio
import random
import logging
from typing import List, Dict, Optional, Any
from playwright.async_api import async_playwright
import uvicorn
from pydantic import BaseModel
# Set up logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
app = FastAPI(title="Kling Video Fetcher",
description="API to fetch random videos from Kling AI gallery")
# Model for the response
class VideoResponse(BaseModel):
url: str
thumbnail: Optional[str] = None
title: Optional[str] = None
status: str = "success"
# Cache to store recent video URLs
video_cache: List[Dict[str, Any]] = []
last_fetch_time = 0
async def fetch_random_video_from_kling() -> Dict[str, Any]:
try:
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True, args=[
'--no-sandbox',
'--disable-setuid-sandbox',
'--disable-dev-shm-usage',
'--disable-accelerated-2d-canvas',
'--no-first-run',
'--no-zygote',
'--disable-gpu',
'--disable-web-security',
])
context = await browser.new_context(viewport={'width': 1280, 'height': 800})
page = await context.new_page()
video_urls = []
# Intercept requests to capture video URLs
async def on_response(response):
try:
url = response.url
content_type = response.headers.get("content-type", "")
if ".mp4" in url or "video" in content_type:
video_urls.append(url)
except Exception as e:
logger.error(f"Error processing response: {e}")
page.on("response", on_response)
await page.goto("https://app.klingai.com/global/community/material", timeout=90000, wait_until="networkidle")
await page.wait_for_timeout(5000)
for _ in range(6):
await page.evaluate("window.scrollBy(0, 800)")
await page.wait_for_timeout(1500)
elements = await page.query_selector_all("video, [class*='video'], [class*='card'], [class*='media']")
elements = [e for e in elements if await e.is_visible()]
random.shuffle(elements)
for element in elements[:5]:
try:
await element.scroll_into_view_if_needed()
await page.wait_for_timeout(1000)
await element.click()
await page.wait_for_timeout(3000)
if video_urls:
video_url = video_urls[0]
title = await page.title()
return {
"url": video_url,
"thumbnail": None,
"title": title if title != "Kling AI" else None,
"status": "success"
}
except Exception as e:
logger.warning(f"Element interaction failed: {e}")
raise HTTPException(status_code=404, detail="Could not find a downloadable video")
except Exception as e:
logger.error(f"Unexpected error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/random-video", response_model=VideoResponse)
async def get_random_video():
"""
Endpoint to get a random video from Kling AI gallery.
Returns the video URL and metadata.
"""
try:
# Fetch a random video
video_info = await fetch_random_video_from_kling()
# Return the video information
return {
"url": video_info["url"],
"thumbnail": video_info.get("thumbnail"),
"title": video_info.get("title"),
"status": "success"
}
except HTTPException as e:
# Re-raise HTTP exceptions
raise e
except Exception as e:
# Log and convert other exceptions to HTTPException
logger.error(f"Error in endpoint: {e}")
raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}")
@app.get("/")
async def root():
"""Root endpoint with API information"""
return {
"name": "Kling Video Fetcher API",
"version": "1.0",
"endpoints": {
"/api/random-video": "Get a random video URL from Kling AI gallery"
}
}
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {"status": "healthy"}
# Run the server
if __name__ == "__main__":
uvicorn.run("app:app", host="0.0.0.0", port=8000, reload=True)