Spaces:
Sleeping
Sleeping
File size: 5,166 Bytes
c76f6fa 7451451 e9456a0 c4bca54 e9456a0 623bad5 c76f6fa e9456a0 c76f6fa e9456a0 623bad5 e9456a0 1197046 e9456a0 c4bca54 623bad5 c4bca54 1197046 c4bca54 1197046 c4bca54 fad001e c4bca54 623bad5 c4bca54 1197046 fad001e 1197046 c4bca54 e9456a0 c76f6fa fad001e 7451451 5e07b31 7451451 5e07b31 7451451 c76f6fa 7451451 5e07b31 c76f6fa 5e07b31 fad001e 7451451 5e07b31 7451451 5e07b31 7451451 c76f6fa 7451451 5e07b31 e9456a0 1197046 623bad5 fad001e c76f6fa fad001e 1197046 e9456a0 1197046 fad001e c76f6fa fad001e c76f6fa 1197046 c76f6fa 1197046 c76f6fa |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
import os
import shutil
import requests
import tempfile
import logging
import json
import subprocess
from datetime import timedelta
from google.cloud import storage
from config import Settings
logger = logging.getLogger(__name__)
def download_to_temp(url):
if os.path.exists(url): return url
resp = requests.get(url, stream=True); resp.raise_for_status()
suffix = os.path.splitext(url.split("/")[-1])[1] or ".mp4"
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as f:
shutil.copyfileobj(resp.raw, f)
return f.name
def download_blob(gcs_uri, destination_file_name):
if not gcs_uri.startswith("gs://"): raise ValueError(f"Invalid GCS URI: {gcs_uri}")
parts = gcs_uri[5:].split("/", 1)
storage.Client().bucket(parts[0]).blob(parts[1]).download_to_filename(destination_file_name)
def upload_to_gcs(local_path, destination_blob_name):
if not Settings.GCP_BUCKET_NAME: return None
try:
blob = storage.Client().bucket(Settings.GCP_BUCKET_NAME).blob(destination_blob_name)
blob.upload_from_filename(local_path)
return blob.generate_signed_url(expiration=timedelta(hours=1), method='GET')
except Exception as e:
logger.error(f"GCS Upload Failed: {e}")
return None
def get_history_from_gcs():
if not Settings.GCP_BUCKET_NAME: return []
try:
blobs = list(storage.Client().bucket(Settings.GCP_BUCKET_NAME).list_blobs())
blobs.sort(key=lambda b: b.time_created, reverse=True)
return [{"name": b.name, "url": b.generate_signed_url(timedelta(hours=1), method='GET'), "created": b.time_created.isoformat()} for b in blobs[:20] if b.name.endswith(".mp4")]
except Exception:
return []
def save_video_bytes(bytes_data, suffix=".mp4") -> str:
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as f:
f.write(bytes_data)
return f.name
def normalize_video(input_path):
"""Helper to normalize video. Returns None if ffmpeg missing."""
if not shutil.which("ffmpeg"): return None
output_path = input_path.replace(".mp4", "_norm.mp4")
cmd = [
"ffmpeg", "-y", "-i", input_path,
"-vf", "scale=1920:1080:force_original_aspect_ratio=decrease,pad=1920:1080:(ow-iw)/2:(oh-ih)/2,setsar=1,fps=24,format=yuv420p",
"-c:v", "libx264", "-preset", "ultrafast", "-crf", "28",
"-an",
output_path
]
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=30)
return output_path
def stitch_videos(path_a, path_b, path_c, output_path):
""" Attempts to stitch videos. RETURNS: output_path if successful, NONE if ffmpeg is missing/fails. """
# 1. CHECK IF FFMPEG EXISTS
if not shutil.which("ffmpeg"):
logger.warning("⚠️ FFmpeg not found. Skipping stitch.")
return None
logger.info(f"🧵 Stitching: {path_a} + {path_b} + {path_c}")
try:
norm_a = normalize_video(path_a)
norm_b = normalize_video(path_b)
norm_c = normalize_video(path_c)
if not all([norm_a, norm_b, norm_c]):
raise Exception("Normalization failed")
list_file = "concat_list.txt"
with open(list_file, "w") as f:
f.write(f"file '{norm_a}'\n")
f.write(f"file '{norm_b}'\n")
f.write(f"file '{norm_c}'\n")
cmd = [
"ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", list_file,
"-c", "copy", output_path
]
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=30)
# Cleanup
for p in [norm_a, norm_b, norm_c, list_file]:
if os.path.exists(p): os.remove(p)
return output_path
except Exception as e:
logger.error(f"Stitch Logic Failed: {e}")
return None # Return None so the pipeline continues without crashing
def update_job_status(job_id, status, progress, log=None, video_url=None, merged_video_url=None):
if not job_id: return
os.makedirs("outputs", exist_ok=True)
final_url = video_url
final_merged_url = merged_video_url
if video_url and os.path.exists(video_url) and status == "completed":
final_filename = f"{job_id}_bridge.mp4"
dest = os.path.join("outputs", final_filename)
if os.path.abspath(video_url) != os.path.abspath(dest): shutil.move(video_url, dest)
final_url = f"/outputs/{final_filename}"
if Settings.GCP_BUCKET_NAME: upload_to_gcs(dest, final_filename)
if merged_video_url and os.path.exists(merged_video_url) and status == "completed":
merged_filename = f"{job_id}_merged.mp4"
merged_dest = os.path.join("outputs", merged_filename)
if os.path.abspath(merged_video_url) != os.path.abspath(merged_dest): shutil.move(merged_video_url, merged_dest)
final_merged_url = f"/outputs/{merged_filename}"
with open(f"outputs/{job_id}.json", "w") as f:
json.dump({
"status": status, "progress": progress, "log": log,
"video_url": final_url, "merged_video_url": final_merged_url
}, f)
|