Continuity / utils.py
Gaurav vashistha
Implement Fault Tolerance: UI polling timeout and backend FFmpeg detection
5e07b31
import os
import shutil
import requests
import tempfile
import logging
import json
import subprocess
from datetime import timedelta
from google.cloud import storage
from config import Settings
logger = logging.getLogger(__name__)
def download_to_temp(url):
if os.path.exists(url): return url
resp = requests.get(url, stream=True); resp.raise_for_status()
suffix = os.path.splitext(url.split("/")[-1])[1] or ".mp4"
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as f:
shutil.copyfileobj(resp.raw, f)
return f.name
def download_blob(gcs_uri, destination_file_name):
if not gcs_uri.startswith("gs://"): raise ValueError(f"Invalid GCS URI: {gcs_uri}")
parts = gcs_uri[5:].split("/", 1)
storage.Client().bucket(parts[0]).blob(parts[1]).download_to_filename(destination_file_name)
def upload_to_gcs(local_path, destination_blob_name):
if not Settings.GCP_BUCKET_NAME: return None
try:
blob = storage.Client().bucket(Settings.GCP_BUCKET_NAME).blob(destination_blob_name)
blob.upload_from_filename(local_path)
return blob.generate_signed_url(expiration=timedelta(hours=1), method='GET')
except Exception as e:
logger.error(f"GCS Upload Failed: {e}")
return None
def get_history_from_gcs():
if not Settings.GCP_BUCKET_NAME: return []
try:
blobs = list(storage.Client().bucket(Settings.GCP_BUCKET_NAME).list_blobs())
blobs.sort(key=lambda b: b.time_created, reverse=True)
return [{"name": b.name, "url": b.generate_signed_url(timedelta(hours=1), method='GET'), "created": b.time_created.isoformat()} for b in blobs[:20] if b.name.endswith(".mp4")]
except Exception:
return []
def save_video_bytes(bytes_data, suffix=".mp4") -> str:
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as f:
f.write(bytes_data)
return f.name
def normalize_video(input_path):
"""Helper to normalize video. Returns None if ffmpeg missing."""
if not shutil.which("ffmpeg"): return None
output_path = input_path.replace(".mp4", "_norm.mp4")
cmd = [
"ffmpeg", "-y", "-i", input_path,
"-vf", "scale=1920:1080:force_original_aspect_ratio=decrease,pad=1920:1080:(ow-iw)/2:(oh-ih)/2,setsar=1,fps=24,format=yuv420p",
"-c:v", "libx264", "-preset", "ultrafast", "-crf", "28",
"-an",
output_path
]
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=30)
return output_path
def stitch_videos(path_a, path_b, path_c, output_path):
""" Attempts to stitch videos. RETURNS: output_path if successful, NONE if ffmpeg is missing/fails. """
# 1. CHECK IF FFMPEG EXISTS
if not shutil.which("ffmpeg"):
logger.warning("⚠️ FFmpeg not found. Skipping stitch.")
return None
logger.info(f"🧵 Stitching: {path_a} + {path_b} + {path_c}")
try:
norm_a = normalize_video(path_a)
norm_b = normalize_video(path_b)
norm_c = normalize_video(path_c)
if not all([norm_a, norm_b, norm_c]):
raise Exception("Normalization failed")
list_file = "concat_list.txt"
with open(list_file, "w") as f:
f.write(f"file '{norm_a}'\n")
f.write(f"file '{norm_b}'\n")
f.write(f"file '{norm_c}'\n")
cmd = [
"ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", list_file,
"-c", "copy", output_path
]
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=30)
# Cleanup
for p in [norm_a, norm_b, norm_c, list_file]:
if os.path.exists(p): os.remove(p)
return output_path
except Exception as e:
logger.error(f"Stitch Logic Failed: {e}")
return None # Return None so the pipeline continues without crashing
def update_job_status(job_id, status, progress, log=None, video_url=None, merged_video_url=None):
if not job_id: return
os.makedirs("outputs", exist_ok=True)
final_url = video_url
final_merged_url = merged_video_url
if video_url and os.path.exists(video_url) and status == "completed":
final_filename = f"{job_id}_bridge.mp4"
dest = os.path.join("outputs", final_filename)
if os.path.abspath(video_url) != os.path.abspath(dest): shutil.move(video_url, dest)
final_url = f"/outputs/{final_filename}"
if Settings.GCP_BUCKET_NAME: upload_to_gcs(dest, final_filename)
if merged_video_url and os.path.exists(merged_video_url) and status == "completed":
merged_filename = f"{job_id}_merged.mp4"
merged_dest = os.path.join("outputs", merged_filename)
if os.path.abspath(merged_video_url) != os.path.abspath(merged_dest): shutil.move(merged_video_url, merged_dest)
final_merged_url = f"/outputs/{merged_filename}"
with open(f"outputs/{job_id}.json", "w") as f:
json.dump({
"status": status, "progress": progress, "log": log,
"video_url": final_url, "merged_video_url": final_merged_url
}, f)