import os import shutil import requests import tempfile import logging import json import subprocess from datetime import timedelta from google.cloud import storage from config import Settings logger = logging.getLogger(__name__) def download_to_temp(url): if os.path.exists(url): return url resp = requests.get(url, stream=True); resp.raise_for_status() suffix = os.path.splitext(url.split("/")[-1])[1] or ".mp4" with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as f: shutil.copyfileobj(resp.raw, f) return f.name def download_blob(gcs_uri, destination_file_name): if not gcs_uri.startswith("gs://"): raise ValueError(f"Invalid GCS URI: {gcs_uri}") parts = gcs_uri[5:].split("/", 1) storage.Client().bucket(parts[0]).blob(parts[1]).download_to_filename(destination_file_name) def upload_to_gcs(local_path, destination_blob_name): if not Settings.GCP_BUCKET_NAME: return None try: blob = storage.Client().bucket(Settings.GCP_BUCKET_NAME).blob(destination_blob_name) blob.upload_from_filename(local_path) return blob.generate_signed_url(expiration=timedelta(hours=1), method='GET') except Exception as e: logger.error(f"GCS Upload Failed: {e}") return None def get_history_from_gcs(): if not Settings.GCP_BUCKET_NAME: return [] try: blobs = list(storage.Client().bucket(Settings.GCP_BUCKET_NAME).list_blobs()) blobs.sort(key=lambda b: b.time_created, reverse=True) return [{"name": b.name, "url": b.generate_signed_url(timedelta(hours=1), method='GET'), "created": b.time_created.isoformat()} for b in blobs[:20] if b.name.endswith(".mp4")] except Exception: return [] def save_video_bytes(bytes_data, suffix=".mp4") -> str: with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as f: f.write(bytes_data) return f.name def normalize_video(input_path): """Helper to normalize video. Returns None if ffmpeg missing.""" if not shutil.which("ffmpeg"): return None output_path = input_path.replace(".mp4", "_norm.mp4") cmd = [ "ffmpeg", "-y", "-i", input_path, "-vf", "scale=1920:1080:force_original_aspect_ratio=decrease,pad=1920:1080:(ow-iw)/2:(oh-ih)/2,setsar=1,fps=24,format=yuv420p", "-c:v", "libx264", "-preset", "ultrafast", "-crf", "28", "-an", output_path ] subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=30) return output_path def stitch_videos(path_a, path_b, path_c, output_path): """ Attempts to stitch videos. RETURNS: output_path if successful, NONE if ffmpeg is missing/fails. """ # 1. CHECK IF FFMPEG EXISTS if not shutil.which("ffmpeg"): logger.warning("⚠️ FFmpeg not found. Skipping stitch.") return None logger.info(f"🧵 Stitching: {path_a} + {path_b} + {path_c}") try: norm_a = normalize_video(path_a) norm_b = normalize_video(path_b) norm_c = normalize_video(path_c) if not all([norm_a, norm_b, norm_c]): raise Exception("Normalization failed") list_file = "concat_list.txt" with open(list_file, "w") as f: f.write(f"file '{norm_a}'\n") f.write(f"file '{norm_b}'\n") f.write(f"file '{norm_c}'\n") cmd = [ "ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", list_file, "-c", "copy", output_path ] subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=30) # Cleanup for p in [norm_a, norm_b, norm_c, list_file]: if os.path.exists(p): os.remove(p) return output_path except Exception as e: logger.error(f"Stitch Logic Failed: {e}") return None # Return None so the pipeline continues without crashing def update_job_status(job_id, status, progress, log=None, video_url=None, merged_video_url=None): if not job_id: return os.makedirs("outputs", exist_ok=True) final_url = video_url final_merged_url = merged_video_url if video_url and os.path.exists(video_url) and status == "completed": final_filename = f"{job_id}_bridge.mp4" dest = os.path.join("outputs", final_filename) if os.path.abspath(video_url) != os.path.abspath(dest): shutil.move(video_url, dest) final_url = f"/outputs/{final_filename}" if Settings.GCP_BUCKET_NAME: upload_to_gcs(dest, final_filename) if merged_video_url and os.path.exists(merged_video_url) and status == "completed": merged_filename = f"{job_id}_merged.mp4" merged_dest = os.path.join("outputs", merged_filename) if os.path.abspath(merged_video_url) != os.path.abspath(merged_dest): shutil.move(merged_video_url, merged_dest) final_merged_url = f"/outputs/{merged_filename}" with open(f"outputs/{job_id}.json", "w") as f: json.dump({ "status": status, "progress": progress, "log": log, "video_url": final_url, "merged_video_url": final_merged_url }, f)