import os import json import asyncio import zipfile import shutil import cv2 import time from pathlib import Path from fastapi import FastAPI, Request, BackgroundTasks from fastapi.responses import HTMLResponse from fastapi.templating import Jinja2Templates from huggingface_hub import HfApi, list_repo_files, hf_hub_download app = FastAPI() templates = Jinja2Templates(directory="templates") # Configuration from environment variables HF_TOKEN = os.getenv("HF_TOKEN", "") SOURCE_REPO_ID = os.getenv("SOURCE_REPO_ID", "factorstudios/movs") TARGET_REPO_ID = os.getenv("TARGET_REPO_ID", "factorstudios/movzip") DOWNLOAD_DIR = "downloads" FRAMES_DIR = "frames" ZIPS_DIR = "zips" STATE_FILE = "processing_state.json" for d in [DOWNLOAD_DIR, FRAMES_DIR, ZIPS_DIR]: os.makedirs(d, exist_ok=True) api = HfApi(token=HF_TOKEN) # Global status for tracking processing_status = { "is_running": False, "last_processed": None, "total_videos_source": 0, "processed_count": 0, "current_action": "Idle", "logs": [] } def add_log(msg): timestamp = time.strftime('%H:%M:%S') log_msg = f"[{timestamp}] {msg}" processing_status["logs"].append(log_msg) if len(processing_status["logs"]) > 50: processing_status["logs"].pop(0) print(log_msg) def load_state(): if os.path.exists(STATE_FILE): with open(STATE_FILE, 'r') as f: try: return json.load(f) except: pass return {"processed_files": []} def save_state(state): with open(STATE_FILE, 'w') as f: json.dump(state, f, indent=2) def extract_frames(video_path, output_dir, fps=10): os.makedirs(output_dir, exist_ok=True) cap = cv2.VideoCapture(str(video_path)) if not cap.isOpened(): return 0 video_fps = cap.get(cv2.CAP_PROP_FPS) or 30 frame_interval = max(1, int(round(video_fps / fps))) frame_idx = 0 saved_count = 0 while True: ret, frame = cap.read() if not ret: break if frame_idx % frame_interval == 0: saved_count += 1 cv2.imwrite(os.path.join(output_dir, f"{saved_count:06d}.jpg"), frame, [int(cv2.IMWRITE_JPEG_QUALITY), 90]) frame_idx += 1 cap.release() return saved_count def zip_folder(folder_path, zip_path): with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: for root, _, files in os.walk(folder_path): for file in files: zipf.write(os.path.join(root, file), arcname=file) async def run_processor(): if processing_status["is_running"]: return processing_status["is_running"] = True state = load_state() try: add_log("Checking source repository...") files = list_repo_files(repo_id=SOURCE_REPO_ID, repo_type="dataset", token=HF_TOKEN) video_extensions = ('.mp4', '.mkv', '.avi', '.mov', '.webm') videos = [f for f in files if f.lower().endswith(video_extensions)] processing_status["total_videos_source"] = len(videos) processing_status["processed_count"] = len(state["processed_files"]) for video_file in videos: if video_file in state["processed_files"]: continue processing_status["current_action"] = f"Processing {video_file}" add_log(f"Downloading {video_file}...") local_video_path = hf_hub_download( repo_id=SOURCE_REPO_ID, filename=video_file, repo_type="dataset", local_dir=DOWNLOAD_DIR, token=HF_TOKEN ) video_name = Path(video_file).stem video_frames_dir = os.path.join(FRAMES_DIR, video_name) add_log(f"Extracting frames for {video_name}...") frame_count = extract_frames(local_video_path, video_frames_dir) if frame_count > 0: zip_filename = f"{video_name}_frames.zip" zip_path = os.path.join(ZIPS_DIR, zip_filename) add_log(f"Zipping {frame_count} frames...") zip_folder(video_frames_dir, zip_path) add_log(f"Uploading to {TARGET_REPO_ID}...") api.upload_file( path_or_fileobj=zip_path, path_in_repo=zip_filename, repo_id=TARGET_REPO_ID, repo_type="dataset" ) state["processed_files"].append(video_file) save_state(state) processing_status["processed_count"] = len(state["processed_files"]) processing_status["last_processed"] = video_file add_log(f"✅ Finished {video_file}") # Cleanup if os.path.exists(video_frames_dir): shutil.rmtree(video_frames_dir) if os.path.exists(local_video_path): os.remove(local_video_path) if os.path.exists(zip_path): os.remove(zip_path) processing_status["current_action"] = "Completed" add_log("🎉 All available videos processed!") except Exception as e: add_log(f"❌ Error: {str(e)}") processing_status["current_action"] = "Error" finally: processing_status["is_running"] = False @app.get("/", response_class=HTMLResponse) async def index(request: Request): return templates.TemplateResponse(request, "index.html") @app.get("/stats") async def get_stats(): return processing_status @app.post("/start") async def start_processor(background_tasks: BackgroundTasks): if not processing_status["is_running"]: background_tasks.add_task(run_processor) return {"message": "Processor started"} return {"message": "Processor already running"} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)