Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, HTTPException | |
| from pydantic import BaseModel | |
| import cv2 | |
| import numpy as np | |
| import requests | |
| import tempfile | |
| import os | |
| from typing import List | |
| import logging | |
| import urllib3 | |
| import vimeo | |
| # Suppress only the single InsecureRequestWarning | |
| urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) | |
| app = FastAPI() | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class VideoURLs(BaseModel): | |
| urls: List[str] | |
| MAX_VIDEOS = 1010 | |
| # Vimeo API credentials | |
| VIMEO_ACCESS_TOKEN = "35b24d71f540bfa24e61d488cf34e457" # Replace with your Vimeo token | |
| VIMEO_UPLOAD_URL = "https://api.vimeo.com/me/videos" | |
| def download_video(url): | |
| try: | |
| response = requests.get(url, verify=False, timeout=30) | |
| response.raise_for_status() | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as tmp_file: | |
| tmp_file.write(response.content) | |
| return tmp_file.name | |
| except requests.RequestException as e: | |
| logger.error(f"Failed to download video from {url}: {str(e)}") | |
| raise Exception(f"Failed to download video from {url}: {str(e)}") | |
| def combine_videos(urls): | |
| if not urls: | |
| return None | |
| if len(urls) > MAX_VIDEOS: | |
| raise HTTPException(status_code=400, detail=f"Maximum of {MAX_VIDEOS} videos allowed") | |
| temp_files = [] | |
| for url in urls: | |
| if url.strip(): | |
| # Check if it's a newline character | |
| try: | |
| temp_files.append(download_video(url.strip())) | |
| except Exception as e: | |
| logger.error(f"Error downloading video: {str(e)}") | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| if not temp_files: | |
| raise HTTPException(status_code=400, detail="No valid videos to combine") | |
| try: | |
| # Trim video to 400 milliseconds for newline characters | |
| captures = [cv2.VideoCapture(file) for file in temp_files] | |
| fps = captures[0].get(cv2.CAP_PROP_FPS) | |
| frame_size = (int(captures[0].get(cv2.CAP_PROP_FRAME_WIDTH)), | |
| int(captures[0].get(cv2.CAP_PROP_FRAME_HEIGHT))) | |
| output_path = 'combined_video.mp4' | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(output_path, fourcc, fps, frame_size) | |
| for cap in captures: | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| out.write(frame) | |
| for cap in captures: | |
| cap.release() | |
| out.release() | |
| for file in temp_files: | |
| os.remove(file) | |
| return output_path | |
| except Exception as e: | |
| logger.error(f"Error combining videos: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Error combining videos: {str(e)}") | |
| def upload_to_vimeo(video_path): | |
| try: | |
| # Step 1: Request an upload link (initialize the upload) | |
| headers = { | |
| 'Authorization': f'Bearer {VIMEO_ACCESS_TOKEN}', | |
| 'Content-Type': 'application/json', | |
| 'Accept': 'application/vnd.vimeo.*+json;version=3.4' | |
| } | |
| # Video metadata and upload initialization | |
| upload_data = { | |
| 'upload': { | |
| 'approach': 'tus', # Use the 'tus' approach for large file uploads | |
| 'size': os.path.getsize(video_path) | |
| }, | |
| 'name': 'Uploaded Video', | |
| 'description': 'This video was uploaded using the Vimeo Upload API.' | |
| } | |
| # Send request to create an upload ticket | |
| response = requests.post(VIMEO_UPLOAD_URL, json=upload_data, headers=headers) | |
| response.raise_for_status() | |
| vimeo_data = response.json() | |
| # Extract the upload link | |
| upload_link = vimeo_data['upload']['upload_link'] | |
| video_uri = vimeo_data['uri'] # Used to get the video link after upload | |
| # Step 2: Upload the video file using the provided upload link | |
| tus_headers = { | |
| 'Tus-Resumable': '1.0.0', | |
| 'Upload-Offset': '0', | |
| 'Content-Type': 'application/offset+octet-stream', | |
| 'Authorization': f'Bearer {VIMEO_ACCESS_TOKEN}' | |
| } | |
| with open(video_path, 'rb') as video_file: | |
| tus_response = requests.patch(upload_link, headers=tus_headers, data=video_file) | |
| tus_response.raise_for_status() | |
| # Step 3: Confirm the upload and retrieve the Vimeo video link | |
| video_response = requests.get(f"https://api.vimeo.com{video_uri}?fields=link", headers=headers) | |
| video_response.raise_for_status() | |
| video_link = video_response.json()['link'] | |
| print(f"Video uploaded successfully: {video_link}") | |
| return video_link | |
| except requests.RequestException as e: | |
| print(f"Error uploading video to Vimeo: {str(e)}") | |
| raise | |
| async def process_urls(video_urls: VideoURLs): | |
| try: | |
| combined_video_path = combine_videos(video_urls.urls) | |
| if combined_video_path: | |
| vimeo_url = upload_to_vimeo(combined_video_path) | |
| os.remove(combined_video_path) # Clean up the local file | |
| return {"vimeo_url": vimeo_url} | |
| else: | |
| raise HTTPException(status_code=400, detail="Failed to combine videos") | |
| except HTTPException as he: | |
| logger.error(f"HTTP Exception: {str(he)}") | |
| raise he | |
| except Exception as e: | |
| logger.error(f"Unexpected error: {str(e)}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=8080) |