| import os |
| import json |
| import asyncio |
| import zipfile |
| import shutil |
| import cv2 |
| import time |
| from pathlib import Path |
| from fastapi import FastAPI, Request, BackgroundTasks |
| from fastapi.responses import HTMLResponse |
| from fastapi.templating import Jinja2Templates |
| from huggingface_hub import HfApi, list_repo_files, hf_hub_download |
|
|
| app = FastAPI() |
| templates = Jinja2Templates(directory="templates") |
|
|
| |
| HF_TOKEN = os.getenv("HF_TOKEN", "") |
| SOURCE_REPO_ID = os.getenv("SOURCE_REPO_ID", "factorstudios/movs") |
| TARGET_REPO_ID = os.getenv("TARGET_REPO_ID", "factorstudios/movzip") |
|
|
| DOWNLOAD_DIR = "downloads" |
| FRAMES_DIR = "frames" |
| ZIPS_DIR = "zips" |
| STATE_FILE = "processing_state.json" |
|
|
| for d in [DOWNLOAD_DIR, FRAMES_DIR, ZIPS_DIR]: |
| os.makedirs(d, exist_ok=True) |
|
|
| api = HfApi(token=HF_TOKEN) |
|
|
| |
| processing_status = { |
| "is_running": False, |
| "last_processed": None, |
| "total_videos_source": 0, |
| "processed_count": 0, |
| "current_action": "Idle", |
| "logs": [] |
| } |
|
|
| def add_log(msg): |
| timestamp = time.strftime('%H:%M:%S') |
| log_msg = f"[{timestamp}] {msg}" |
| processing_status["logs"].append(log_msg) |
| if len(processing_status["logs"]) > 50: |
| processing_status["logs"].pop(0) |
| print(log_msg) |
|
|
| def load_state(): |
| if os.path.exists(STATE_FILE): |
| with open(STATE_FILE, 'r') as f: |
| try: |
| return json.load(f) |
| except: |
| pass |
| return {"processed_files": []} |
|
|
| def save_state(state): |
| with open(STATE_FILE, 'w') as f: |
| json.dump(state, f, indent=2) |
|
|
| def extract_frames(video_path, output_dir, fps=10): |
| os.makedirs(output_dir, exist_ok=True) |
| cap = cv2.VideoCapture(str(video_path)) |
| if not cap.isOpened(): |
| return 0 |
| video_fps = cap.get(cv2.CAP_PROP_FPS) or 30 |
| frame_interval = max(1, int(round(video_fps / fps))) |
| frame_idx = 0 |
| saved_count = 0 |
| while True: |
| ret, frame = cap.read() |
| if not ret: break |
| if frame_idx % frame_interval == 0: |
| saved_count += 1 |
| cv2.imwrite(os.path.join(output_dir, f"{saved_count:06d}.jpg"), frame, [int(cv2.IMWRITE_JPEG_QUALITY), 90]) |
| frame_idx += 1 |
| cap.release() |
| return saved_count |
|
|
| def zip_folder(folder_path, zip_path): |
| with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: |
| for root, _, files in os.walk(folder_path): |
| for file in files: |
| zipf.write(os.path.join(root, file), arcname=file) |
|
|
| async def run_processor(): |
| if processing_status["is_running"]: |
| return |
| |
| processing_status["is_running"] = True |
| state = load_state() |
| |
| try: |
| add_log("Checking source repository...") |
| files = list_repo_files(repo_id=SOURCE_REPO_ID, repo_type="dataset", token=HF_TOKEN) |
| video_extensions = ('.mp4', '.mkv', '.avi', '.mov', '.webm') |
| videos = [f for f in files if f.lower().endswith(video_extensions)] |
| |
| processing_status["total_videos_source"] = len(videos) |
| processing_status["processed_count"] = len(state["processed_files"]) |
| |
| for video_file in videos: |
| if video_file in state["processed_files"]: |
| continue |
| |
| processing_status["current_action"] = f"Processing {video_file}" |
| add_log(f"Downloading {video_file}...") |
| |
| local_video_path = hf_hub_download( |
| repo_id=SOURCE_REPO_ID, |
| filename=video_file, |
| repo_type="dataset", |
| local_dir=DOWNLOAD_DIR, |
| token=HF_TOKEN |
| ) |
| |
| video_name = Path(video_file).stem |
| video_frames_dir = os.path.join(FRAMES_DIR, video_name) |
| add_log(f"Extracting frames for {video_name}...") |
| frame_count = extract_frames(local_video_path, video_frames_dir) |
| |
| if frame_count > 0: |
| zip_filename = f"{video_name}_frames.zip" |
| zip_path = os.path.join(ZIPS_DIR, zip_filename) |
| add_log(f"Zipping {frame_count} frames...") |
| zip_folder(video_frames_dir, zip_path) |
| |
| add_log(f"Uploading to {TARGET_REPO_ID}...") |
| api.upload_file( |
| path_or_fileobj=zip_path, |
| path_in_repo=zip_filename, |
| repo_id=TARGET_REPO_ID, |
| repo_type="dataset" |
| ) |
| |
| state["processed_files"].append(video_file) |
| save_state(state) |
| processing_status["processed_count"] = len(state["processed_files"]) |
| processing_status["last_processed"] = video_file |
| add_log(f"✅ Finished {video_file}") |
| |
| |
| if os.path.exists(video_frames_dir): shutil.rmtree(video_frames_dir) |
| if os.path.exists(local_video_path): os.remove(local_video_path) |
| if os.path.exists(zip_path): os.remove(zip_path) |
| |
| processing_status["current_action"] = "Completed" |
| add_log("🎉 All available videos processed!") |
| |
| except Exception as e: |
| add_log(f"❌ Error: {str(e)}") |
| processing_status["current_action"] = "Error" |
| finally: |
| processing_status["is_running"] = False |
|
|
| @app.get("/", response_class=HTMLResponse) |
| async def index(request: Request): |
| return templates.TemplateResponse(request, "index.html") |
|
|
| @app.get("/stats") |
| async def get_stats(): |
| return processing_status |
|
|
| @app.post("/start") |
| async def start_processor(background_tasks: BackgroundTasks): |
| if not processing_status["is_running"]: |
| background_tasks.add_task(run_processor) |
| return {"message": "Processor started"} |
| return {"message": "Processor already running"} |
|
|
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run(app, host="0.0.0.0", port=7860) |