movzip / main.py
samfred2's picture
Update main.py
3ab2deb verified
import os
import json
import asyncio
import zipfile
import shutil
import cv2
import time
from pathlib import Path
from fastapi import FastAPI, Request, BackgroundTasks
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from huggingface_hub import HfApi, list_repo_files, hf_hub_download
app = FastAPI()
templates = Jinja2Templates(directory="templates")
# Configuration from environment variables
HF_TOKEN = os.getenv("HF_TOKEN", "")
SOURCE_REPO_ID = os.getenv("SOURCE_REPO_ID", "factorstudios/movs")
TARGET_REPO_ID = os.getenv("TARGET_REPO_ID", "factorstudios/movzip")
DOWNLOAD_DIR = "downloads"
FRAMES_DIR = "frames"
ZIPS_DIR = "zips"
STATE_FILE = "processing_state.json"
for d in [DOWNLOAD_DIR, FRAMES_DIR, ZIPS_DIR]:
os.makedirs(d, exist_ok=True)
api = HfApi(token=HF_TOKEN)
# Global status for tracking
processing_status = {
"is_running": False,
"last_processed": None,
"total_videos_source": 0,
"processed_count": 0,
"current_action": "Idle",
"logs": []
}
def add_log(msg):
timestamp = time.strftime('%H:%M:%S')
log_msg = f"[{timestamp}] {msg}"
processing_status["logs"].append(log_msg)
if len(processing_status["logs"]) > 50:
processing_status["logs"].pop(0)
print(log_msg)
def load_state():
if os.path.exists(STATE_FILE):
with open(STATE_FILE, 'r') as f:
try:
return json.load(f)
except:
pass
return {"processed_files": []}
def save_state(state):
with open(STATE_FILE, 'w') as f:
json.dump(state, f, indent=2)
def extract_frames(video_path, output_dir, fps=10):
os.makedirs(output_dir, exist_ok=True)
cap = cv2.VideoCapture(str(video_path))
if not cap.isOpened():
return 0
video_fps = cap.get(cv2.CAP_PROP_FPS) or 30
frame_interval = max(1, int(round(video_fps / fps)))
frame_idx = 0
saved_count = 0
while True:
ret, frame = cap.read()
if not ret: break
if frame_idx % frame_interval == 0:
saved_count += 1
cv2.imwrite(os.path.join(output_dir, f"{saved_count:06d}.jpg"), frame, [int(cv2.IMWRITE_JPEG_QUALITY), 90])
frame_idx += 1
cap.release()
return saved_count
def zip_folder(folder_path, zip_path):
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for root, _, files in os.walk(folder_path):
for file in files:
zipf.write(os.path.join(root, file), arcname=file)
async def run_processor():
if processing_status["is_running"]:
return
processing_status["is_running"] = True
state = load_state()
try:
add_log("Checking source repository...")
files = list_repo_files(repo_id=SOURCE_REPO_ID, repo_type="dataset", token=HF_TOKEN)
video_extensions = ('.mp4', '.mkv', '.avi', '.mov', '.webm')
videos = [f for f in files if f.lower().endswith(video_extensions)]
processing_status["total_videos_source"] = len(videos)
processing_status["processed_count"] = len(state["processed_files"])
for video_file in videos:
if video_file in state["processed_files"]:
continue
processing_status["current_action"] = f"Processing {video_file}"
add_log(f"Downloading {video_file}...")
local_video_path = hf_hub_download(
repo_id=SOURCE_REPO_ID,
filename=video_file,
repo_type="dataset",
local_dir=DOWNLOAD_DIR,
token=HF_TOKEN
)
video_name = Path(video_file).stem
video_frames_dir = os.path.join(FRAMES_DIR, video_name)
add_log(f"Extracting frames for {video_name}...")
frame_count = extract_frames(local_video_path, video_frames_dir)
if frame_count > 0:
zip_filename = f"{video_name}_frames.zip"
zip_path = os.path.join(ZIPS_DIR, zip_filename)
add_log(f"Zipping {frame_count} frames...")
zip_folder(video_frames_dir, zip_path)
add_log(f"Uploading to {TARGET_REPO_ID}...")
api.upload_file(
path_or_fileobj=zip_path,
path_in_repo=zip_filename,
repo_id=TARGET_REPO_ID,
repo_type="dataset"
)
state["processed_files"].append(video_file)
save_state(state)
processing_status["processed_count"] = len(state["processed_files"])
processing_status["last_processed"] = video_file
add_log(f"✅ Finished {video_file}")
# Cleanup
if os.path.exists(video_frames_dir): shutil.rmtree(video_frames_dir)
if os.path.exists(local_video_path): os.remove(local_video_path)
if os.path.exists(zip_path): os.remove(zip_path)
processing_status["current_action"] = "Completed"
add_log("🎉 All available videos processed!")
except Exception as e:
add_log(f"❌ Error: {str(e)}")
processing_status["current_action"] = "Error"
finally:
processing_status["is_running"] = False
@app.get("/", response_class=HTMLResponse)
async def index(request: Request):
return templates.TemplateResponse(request, "index.html")
@app.get("/stats")
async def get_stats():
return processing_status
@app.post("/start")
async def start_processor(background_tasks: BackgroundTasks):
if not processing_status["is_running"]:
background_tasks.add_task(run_processor)
return {"message": "Processor started"}
return {"message": "Processor already running"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)