Data2 / app.py
Fred808's picture
Update app.py
1e49a32 verified
raw
history blame
4.86 kB
from fastapi import FastAPI, HTTPException
from fastapi.responses import FileResponse
from pathlib import Path
import os
import threading
import requests
from huggingface_hub import HfApi
import random
import time
app = FastAPI()
DOWNLOAD_DIR = Path("downloaded").resolve()
DATASET_DIR = Path("dataset").resolve()
MAX_VIDEOS = 5000
DOWNLOAD_DIR.mkdir(parents=True, exist_ok=True)
DATASET_DIR.mkdir(parents=True, exist_ok=True)
DOWNLOAD_URLS = [
"https://youtu.be/ULCkj_Q5NCc?si=P5fVfGeL9dc47tju", "https://youtu.be/WJkI0cds4m4?si=4GlB22ly6RV32q48"
]
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Safari/605.1.15",
]
COOKIES_FILE = Path("youtube.com_cookies.txt").resolve() # Place your exported cookies file here
RAPIDAPI_HOST = "yt-api.p.rapidapi.com"
RAPIDAPI_KEY = os.environ.get("RAPIDAPI_KEY", "7b63a42ed4msha215d4e2fb17099p17ae62jsn0f42bd187691")
PROXY = os.environ.get("198.23.239.134:6540:kknqfmqe:0wyvognccou8") # Set this environment variable to your proxy, e.g. http://user:pass@host:port
HF_DATASET_REPO_ID = os.environ.get("HF_DATASET_REPO_ID")
HF_TOKEN = os.environ.get("HF_TOKEN")
def batch_download_via_api(download_urls, download_dir=DOWNLOAD_DIR):
"""
Download each video using the public API endpoint and save to download_dir.
Also copy to dataset dir and upload to HuggingFace if configured.
No zipping, just raw mp4s.
"""
api_url = "https://fred808-data1.hf.space/video/download"
for url in download_urls:
try:
resp = requests.post(api_url, json={"urls": [url]}, stream=True)
if resp.status_code == 200:
# Try to extract a video ID or use a hash for filename
if "v=" in url:
video_id = url.split("v=")[1].split("&")[0]
elif "youtu.be/" in url:
video_id = url.split("youtu.be/")[1].split("?")[0]
else:
import hashlib
video_id = hashlib.md5(url.encode()).hexdigest()
out_path = download_dir / f"{video_id}.mp4"
with open(out_path, "wb") as f:
for chunk in resp.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
print(f"Downloaded {url} to {out_path}")
# Copy to dataset dir and upload to HF if configured
if HF_DATASET_REPO_ID and HF_TOKEN:
upload_to_hf_dataset(out_path, HF_DATASET_REPO_ID, HF_TOKEN)
else:
import shutil
shutil.copy2(out_path, DATASET_DIR / out_path.name)
else:
print(f"Failed to download {url}: {resp.status_code} {resp.text}")
except Exception as e:
print(f"Error downloading {url}: {e}")
@app.on_event("startup")
def startup_event():
threading.Thread(target=batch_download_via_api, args=(DOWNLOAD_URLS,), daemon=True).start()
@app.get("/files")
def list_files():
files = [f.name for f in DOWNLOAD_DIR.glob("*") if f.is_file()]
return {"files": files}
@app.get("/download/{filename}")
def download_file(filename: str):
file_path = DOWNLOAD_DIR / filename
if not file_path.exists() or not file_path.is_file():
raise HTTPException(status_code=404, detail="File not found")
return FileResponse(str(file_path), filename=filename)
@app.get("/")
def root():
files = [f.name for f in DOWNLOAD_DIR.glob("*") if f.is_file()]
return {"message": "Use /download/{filename} to download a file.", "available_files": files}
def upload_to_hf_dataset(local_path, repo_id, token):
api = HfApi()
try:
# Also copy to dataset dir for local access
import shutil
shutil.copy2(local_path, DATASET_DIR / local_path.name)
api.upload_file(
path_or_fileobj=str(local_path),
path_in_repo=local_path.name,
repo_id=repo_id,
repo_type="dataset",
token=token,
)
print(f"Uploaded {local_path.name} to {repo_id} and copied to dataset dir")
except Exception as e:
print(f"Failed to upload {local_path.name} to {repo_id}: {e}")
@app.get("/dataset/{filename}")
def download_dataset_file(filename: str):
file_path = DATASET_DIR / filename
if not file_path.exists() or not file_path.is_file():
raise HTTPException(status_code=404, detail="File not found in dataset")
return FileResponse(str(file_path), filename=filename)
@app.get("/dataset")
def list_dataset_files():
files = [f.name for f in DATASET_DIR.glob("*") if f.is_file()]
return {"dataset_files": files}