| from fastapi import FastAPI, UploadFile, File, Form, BackgroundTasks |
| import os, uuid, subprocess, torch, cv2, sys, time |
| import whisper |
| from scenedetect import VideoManager, SceneManager |
| from scenedetect.detectors import ContentDetector |
| from ultralytics import YOLO |
| from diffusers import StableVideoDiffusionPipeline |
| from PIL import Image |
|
|
| |
| |
| |
|
|
| app = FastAPI() |
|
|
| UPLOAD_DIR = "uploads" |
| OUTPUT_DIR = "outputs" |
| os.makedirs(UPLOAD_DIR, exist_ok=True) |
| os.makedirs(OUTPUT_DIR, exist_ok=True) |
|
|
| |
| |
| |
|
|
| DEVICE = "cuda" if torch.cuda.is_available() else "cpu" |
| DTYPE = torch.float16 if DEVICE == "cuda" else torch.float32 |
|
|
| |
| |
| |
|
|
| jobs = {} |
|
|
| |
| |
| |
|
|
| whisper_model = whisper.load_model("base") |
| yolo = YOLO("yolov8n.pt") |
|
|
| svd = StableVideoDiffusionPipeline.from_pretrained( |
| "stabilityai/stable-video-diffusion-img2vid" |
| ) |
| svd.to(device=DEVICE, dtype=DTYPE) |
|
|
| |
| |
| |
|
|
| def print_bar(label: str, percent: float, width: int = 40): |
| filled = int(width * percent / 100) |
| bar = "█" * filled + " " * (width - filled) |
| print(f"\r[{label}] {percent:5.1f}% |{bar}|", end="", flush=True) |
|
|
| |
| |
| |
|
|
| @app.get("/") |
| def root(): |
| return {"status": "ok"} |
|
|
| |
| |
| |
|
|
| @app.post("/captions") |
| async def captions(file: UploadFile = File(...)): |
| path = os.path.join(UPLOAD_DIR, file.filename) |
| with open(path, "wb") as f: |
| f.write(await file.read()) |
|
|
| result = whisper_model.transcribe(path) |
|
|
| return { |
| "segments": result["segments"], |
| "language": result["language"] |
| } |
|
|
|
|
| @app.post("/scene-detect") |
| async def scene_detect(file: UploadFile = File(...)): |
| path = os.path.join(UPLOAD_DIR, file.filename) |
| with open(path, "wb") as f: |
| f.write(await file.read()) |
|
|
| video_manager = VideoManager([path]) |
| scene_manager = SceneManager() |
| scene_manager.add_detector(ContentDetector(threshold=27.0)) |
|
|
| video_manager.start() |
| scene_manager.detect_scenes(frame_source=video_manager) |
|
|
| scenes = scene_manager.get_scene_list() |
| video_manager.release() |
|
|
| return { |
| "scenes": [ |
| {"start": s[0].get_seconds(), "end": s[1].get_seconds()} |
| for s in scenes |
| ] |
| } |
|
|
|
|
| @app.post("/smart-crop") |
| async def smart_crop(file: UploadFile = File(...), aspect: str = Form("9:16")): |
| path = os.path.join(UPLOAD_DIR, file.filename) |
| with open(path, "wb") as f: |
| f.write(await file.read()) |
|
|
| cap = cv2.VideoCapture(path) |
| ret, frame = cap.read() |
| cap.release() |
|
|
| if not ret: |
| return {"error": "Failed to read video frame"} |
|
|
| results = yolo(frame) |
| boxes = results[0].boxes |
|
|
| if boxes is None or len(boxes) == 0: |
| return {"error": "No subject detected"} |
|
|
| box = boxes.xyxy[0].cpu().numpy() |
|
|
| return {"crop_box": box.tolist(), "aspect": aspect} |
|
|
| |
| |
| |
|
|
| def run_edit_job(job_id: str, video_path: str, frame_path: str): |
| try: |
| jobs[job_id].update({ |
| "stage": "extracting_frame", |
| "progress": 0 |
| }) |
|
|
| |
| subprocess.run( |
| [ |
| "ffmpeg", "-y", |
| "-i", video_path, |
| "-vf", "scale=512:512:force_original_aspect_ratio=decrease", |
| "-frames:v", "1", |
| "-update", "1", |
| frame_path |
| ], |
| check=True |
| ) |
|
|
| img = Image.open(frame_path).convert("RGB") |
|
|
| |
| |
| |
|
|
| num_steps = 25 |
| jobs[job_id]["stage"] = "diffusion" |
|
|
| with torch.no_grad(): |
| for step in range(num_steps): |
| percent = ((step + 1) / num_steps) * 100 |
| jobs[job_id]["progress"] = round(percent, 1) |
|
|
| print_bar("SVD", percent) |
| time.sleep(0.1) |
|
|
| print() |
|
|
| output = svd( |
| image=img, |
| num_frames=8, |
| decode_chunk_size=4 |
| ) |
|
|
| jobs[job_id].update({ |
| "status": "done", |
| "stage": "completed", |
| "frames": len(output.frames), |
| "progress": 100 |
| }) |
|
|
| except Exception as e: |
| jobs[job_id]["status"] = "error" |
| jobs[job_id]["error"] = str(e) |
|
|
| |
| |
| |
|
|
| @app.get("/status/{job_id}") |
| def job_status(job_id: str): |
| return jobs.get(job_id, {"status": "not_found"}) |
|
|
| |
| |
| |
|
|
| @app.post("/edit") |
| async def edit_video( |
| background_tasks: BackgroundTasks, |
| file: UploadFile = File(...), |
| prompt: str = Form(...) |
| ): |
| job_id = uuid.uuid4().hex |
|
|
| video_path = os.path.join(UPLOAD_DIR, f"{job_id}.mp4") |
| frame_path = os.path.join(OUTPUT_DIR, f"{job_id}.png") |
|
|
| with open(video_path, "wb") as f: |
| f.write(await file.read()) |
|
|
| jobs[job_id] = { |
| "status": "running", |
| "stage": "queued", |
| "progress": 0, |
| "prompt_received_but_unused": prompt |
| } |
|
|
| background_tasks.add_task( |
| run_edit_job, |
| job_id, |
| video_path, |
| frame_path |
| ) |
|
|
| return {"job_id": job_id, "status": "running"} |
|
|