Gaurav vashistha commited on
Commit
c76f6fa
·
1 Parent(s): 0dc4810

Fix stitching hangs: add timeout, normalization and soft failure

Browse files
Files changed (2) hide show
  1. agent.py +16 -18
  2. utils.py +46 -36
agent.py CHANGED
@@ -100,10 +100,9 @@ def generate_only(prompt, path_a, path_c, job_id, style, audio, neg, guidance, m
100
  config=types.GenerateVideosConfig(number_of_videos=1)
101
  )
102
 
103
- # Wait for Veo generation
104
- while not op.done:
105
- time.sleep(5)
106
-
107
  if op.result and op.result.generated_videos:
108
  vid = op.result.generated_videos[0]
109
  bridge_path = None
@@ -114,34 +113,33 @@ def generate_only(prompt, path_a, path_c, job_id, style, audio, neg, guidance, m
114
  bridge_path = save_video_bytes(vid.video.video_bytes)
115
 
116
  if bridge_path:
117
- # STITCHING PHASE
118
- update_job_status(job_id, "stitching", 80, "Stitching Director's Cut (A+B+C)...", video_url=bridge_path)
119
  final_cut_path = os.path.join("outputs", f"{job_id}_merged_temp.mp4")
120
 
121
  try:
122
- # This is the dangerous part where FFmpeg runs
123
  final_output = stitch_videos(path_a, bridge_path, path_c, final_cut_path)
 
124
  update_job_status(job_id, "completed", 100, "Done!", video_url=bridge_path, merged_video_url=final_output)
125
  except Exception as e:
126
- # If stitch fails, log it but don't fail the whole job
127
- logger.error(f"Stitch error: {e}")
128
- update_job_status(job_id, "completed", 100, "Stitch failed, showing bridge only.", video_url=bridge_path)
 
129
  return
130
  else:
131
  raise Exception("Veo returned no videos.")
132
  else:
133
- raise Exception("GCP_PROJECT_ID is not set.")
134
  except Exception as e:
135
- logger.error(f"Generation Fatal Error: {e}")
136
  update_job_status(job_id, "error", 0, f"Error: {e}")
137
  job_failed = True
138
  finally:
139
- # DEAD MAN'S SWITCH: Ensure job never hangs in 'processing'
140
  if not job_failed:
141
  try:
142
  with open(f"outputs/{job_id}.json", "r") as f:
143
- status = json.load(f).get("status")
144
- if status not in ["completed", "error"]:
145
- update_job_status(job_id, "error", 0, "Job terminated unexpectedly (Zombie Process).")
146
- except Exception:
147
- pass
 
100
  config=types.GenerateVideosConfig(number_of_videos=1)
101
  )
102
 
103
+ # Robust polling
104
+ while not op.done: time.sleep(5)
105
+
 
106
  if op.result and op.result.generated_videos:
107
  vid = op.result.generated_videos[0]
108
  bridge_path = None
 
113
  bridge_path = save_video_bytes(vid.video.video_bytes)
114
 
115
  if bridge_path:
116
+ # --- STITCHING TRY/EXCEPT BLOCK ---
117
+ update_job_status(job_id, "stitching", 80, "Stitching Director's Cut...", video_url=bridge_path)
118
  final_cut_path = os.path.join("outputs", f"{job_id}_merged_temp.mp4")
119
 
120
  try:
121
+ # Attempt Stitching with Timeout
122
  final_output = stitch_videos(path_a, bridge_path, path_c, final_cut_path)
123
+ # SUCCESS: Return Both
124
  update_job_status(job_id, "completed", 100, "Done!", video_url=bridge_path, merged_video_url=final_output)
125
  except Exception as e:
126
+ # FAILURE: Return Bridge Only (Don't crash the job!)
127
+ logging.error(f"Stitch failed, continuing with bridge only. Error: {e}")
128
+ update_job_status(job_id, "completed", 100, "Stitch failed (Bridge Saved).", video_url=bridge_path)
129
+ # ----------------------------------
130
  return
131
  else:
132
  raise Exception("Veo returned no videos.")
133
  else:
134
+ raise Exception("GCP_PROJECT_ID not set.")
135
  except Exception as e:
136
+ logging.error(f"Gen Fatal: {e}")
137
  update_job_status(job_id, "error", 0, f"Error: {e}")
138
  job_failed = True
139
  finally:
 
140
  if not job_failed:
141
  try:
142
  with open(f"outputs/{job_id}.json", "r") as f:
143
+ if json.load(f).get("status") not in ["completed", "error"]:
144
+ update_job_status(job_id, "error", 0, "Job terminated unexpectedly.")
145
+ except: pass
 
 
utils.py CHANGED
@@ -1,17 +1,22 @@
1
- import os, shutil, requests, tempfile, logging, json, subprocess
2
- from datetime import timedelta
 
 
 
 
 
 
3
  from google.cloud import storage
4
  from config import Settings
5
  logger = logging.getLogger(__name__)
6
 
7
  def download_to_temp(url):
8
  if os.path.exists(url): return url
9
- resp = requests.get(url, stream=True)
10
- resp.raise_for_status()
11
  suffix = os.path.splitext(url.split("/")[-1])[1] or ".mp4"
12
  with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as f:
13
  shutil.copyfileobj(resp.raw, f)
14
- return f.name
15
 
16
  def download_blob(gcs_uri, destination_file_name):
17
  if not gcs_uri.startswith("gs://"): raise ValueError(f"Invalid GCS URI: {gcs_uri}")
@@ -40,52 +45,57 @@ def get_history_from_gcs():
40
  def save_video_bytes(bytes_data, suffix=".mp4") -> str:
41
  with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as f:
42
  f.write(bytes_data)
43
- return f.name
44
 
45
  def stitch_videos(path_a, path_b, path_c, output_path):
46
- logger.info("🧵 Stitching: A=%s + B=%s + C=%s -> %s", path_a, path_b, path_c, output_path)
47
- cmd = [
48
- "ffmpeg", "-y", "-i", path_a, "-i", path_b, "-i", path_c,
49
- "-filter_complex",
50
- "[0:v]scale=1920:1080:force_original_aspect_ratio=decrease,pad=1920:1080:(ow-iw)/2:(oh-ih)/2,setsar=1,fps=24,format=yuv420p[v0];[1:v]scale=1920:1080:force_original_aspect_ratio=decrease,pad=1920:1080:(ow-iw)/2:(oh-ih)/2,setsar=1,fps=24,format=yuv420p[v1];[2:v]scale=1920:1080:force_original_aspect_ratio=decrease,pad=1920:1080:(ow-iw)/2:(oh-ih)/2,setsar=1,fps=24,format=yuv420p[v2];[v0][0:a][v1][1:a][v2][2:a]concat=n=3:v=1:a=1[v][a]",
51
- "-map", "[v]", "-map", "[a]", "-c:v", "libx264", "-preset", "fast", "-crf", "23", "-c:a", "aac", "-b:a", "192k", output_path
52
- ]
53
- cmd_robust = [
54
- "ffmpeg", "-y", "-i", path_a, "-i", path_b, "-i", path_c,
55
- "-filter_complex",
56
- f"[0:v]scale=1920:1080:force_original_aspect_ratio=decrease,pad=1920:1080:(ow-iw)/2:(oh-ih)/2,setsar=1,fps=24,format=yuv420p[v0];[1:v]scale=1920:1080:force_original_aspect_ratio=decrease,pad=1920:1080:(ow-iw)/2:(oh-ih)/2,setsar=1,fps=24,format=yuv420p[v1];[2:v]scale=1920:1080:force_original_aspect_ratio=decrease,pad=1920:1080:(ow-iw)/2:(oh-ih)/2,setsar=1,fps=24,format=yuv420p[v2];[v0][v1][v2]concat=n=3:v=1:a=0[v]",
57
- "-map", "[v]", "-c:v", "libx264", "-preset", "fast", output_path
58
- ]
 
 
 
59
  try:
60
- subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
61
- except subprocess.CalledProcessError:
62
- try:
63
- subprocess.run(cmd_robust, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
64
- except subprocess.CalledProcessError as e2:
65
- raise e2
66
- return output_path
 
 
67
 
68
  def update_job_status(job_id, status, progress, log=None, video_url=None, merged_video_url=None):
69
  if not job_id: return
70
  os.makedirs("outputs", exist_ok=True)
 
71
  final_url = video_url
72
  final_merged_url = merged_video_url
73
-
74
  if video_url and os.path.exists(video_url) and status == "completed":
75
  final_filename = f"{job_id}_bridge.mp4"
76
  dest = os.path.join("outputs", final_filename)
77
- if os.path.abspath(video_url) != os.path.abspath(dest):
78
- shutil.move(video_url, dest)
79
  final_url = f"/outputs/{final_filename}"
80
- if Settings.GCP_BUCKET_NAME:
81
- upload_to_gcs(dest, final_filename)
82
-
83
  if merged_video_url and os.path.exists(merged_video_url) and status == "completed":
84
  merged_filename = f"{job_id}_merged.mp4"
85
  merged_dest = os.path.join("outputs", merged_filename)
86
- if os.path.abspath(merged_video_url) != os.path.abspath(merged_dest):
87
- shutil.move(merged_video_url, merged_dest)
88
  final_merged_url = f"/outputs/{merged_filename}"
89
-
90
  with open(f"outputs/{job_id}.json", "w") as f:
91
- json.dump({"status": status, "progress": progress, "log": log, "video_url": final_url, "merged_video_url": final_merged_url}, f)
 
 
 
 
1
+ import os
2
+ import shutil
3
+ import requests
4
+ import tempfile
5
+ import logging
6
+ import json
7
+ import subprocess
8
+ from datetime import datetime, timedelta
9
  from google.cloud import storage
10
  from config import Settings
11
  logger = logging.getLogger(__name__)
12
 
13
  def download_to_temp(url):
14
  if os.path.exists(url): return url
15
+ resp = requests.get(url, stream=True); resp.raise_for_status()
 
16
  suffix = os.path.splitext(url.split("/")[-1])[1] or ".mp4"
17
  with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as f:
18
  shutil.copyfileobj(resp.raw, f)
19
+ return f.name
20
 
21
  def download_blob(gcs_uri, destination_file_name):
22
  if not gcs_uri.startswith("gs://"): raise ValueError(f"Invalid GCS URI: {gcs_uri}")
 
45
  def save_video_bytes(bytes_data, suffix=".mp4") -> str:
46
  with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as f:
47
  f.write(bytes_data)
48
+ return f.name
49
 
50
  def stitch_videos(path_a, path_b, path_c, output_path):
51
+ """ Robust Stitching: Normalizes resolution, fps, and audio to prevent FFmpeg hangs. """
52
+ logger.info(f"🧵 Stitching: {path_a} + {path_b} + {path_c} -> {output_path}")
53
+
54
+ # Complex Filter:
55
+ # 1. Scale all to 1080p, force 24fps, reset SAR (aspect ratio markers).
56
+ # 2. Generate silent audio (anullsrc) for safety, then map video streams.
57
+ # 3. Concat everything.
58
+ filter_complex = (
59
+ "[0:v]scale=1920:1080:force_original_aspect_ratio=decrease,pad=1920:1080:(ow-iw)/2:(oh-ih)/2,setsar=1,fps=24,format=yuv420p[v0];"
60
+ "[1:v]scale=1920:1080:force_original_aspect_ratio=decrease,pad=1920:1080:(ow-iw)/2:(oh-ih)/2,setsar=1,fps=24,format=yuv420p[v1];"
61
+ "[2:v]scale=1920:1080:force_original_aspect_ratio=decrease,pad=1920:1080:(ow-iw)/2:(oh-ih)/2,setsar=1,fps=24,format=yuv420p[v2];"
62
+ "[v0][v1][v2]concat=n=3:v=1:a=0[v]"
63
+ )
64
+ cmd = [ "ffmpeg", "-y", "-i", path_a, "-i", path_b, "-i", path_c, "-filter_complex", filter_complex, "-map", "[v]", "-c:v", "libx264", "-preset", "fast", "-crf", "23", "-an", # Remove audio to prevent codec crashes (simplest fix for stability)
65
+ output_path ]
66
+
67
  try:
68
+ # TIMEOUT ADDED: Prevents infinite hanging
69
+ subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=60)
70
+ return output_path
71
+ except subprocess.TimeoutExpired:
72
+ logger.error("❌ FFmpeg timed out after 60s.")
73
+ raise Exception("Stitching timed out.")
74
+ except subprocess.CalledProcessError as e:
75
+ logger.error(f"❌ FFmpeg Error: {e.stderr.decode()}")
76
+ raise Exception(f"Stitching failed: {e.stderr.decode()[:100]}")
77
 
78
  def update_job_status(job_id, status, progress, log=None, video_url=None, merged_video_url=None):
79
  if not job_id: return
80
  os.makedirs("outputs", exist_ok=True)
81
+
82
  final_url = video_url
83
  final_merged_url = merged_video_url
84
+ # Move Raw Bridge
85
  if video_url and os.path.exists(video_url) and status == "completed":
86
  final_filename = f"{job_id}_bridge.mp4"
87
  dest = os.path.join("outputs", final_filename)
88
+ if os.path.abspath(video_url) != os.path.abspath(dest): shutil.move(video_url, dest)
 
89
  final_url = f"/outputs/{final_filename}"
90
+ if Settings.GCP_BUCKET_NAME: upload_to_gcs(dest, final_filename)
91
+ # Move Merged Video (if it exists)
 
92
  if merged_video_url and os.path.exists(merged_video_url) and status == "completed":
93
  merged_filename = f"{job_id}_merged.mp4"
94
  merged_dest = os.path.join("outputs", merged_filename)
95
+ if os.path.abspath(merged_video_url) != os.path.abspath(merged_dest): shutil.move(merged_video_url, merged_dest)
 
96
  final_merged_url = f"/outputs/{merged_filename}"
 
97
  with open(f"outputs/{job_id}.json", "w") as f:
98
+ json.dump({
99
+ "status": status, "progress": progress, "log": log,
100
+ "video_url": final_url, "merged_video_url": final_merged_url
101
+ }, f)