Gaurav vashistha commited on
Commit
e9456a0
·
1 Parent(s): 131b652

feat: phase 1 & 2 - async architecture, refactoring, and status polling

Browse files
Files changed (5) hide show
  1. agent.py +60 -61
  2. config.py +32 -0
  3. server.py +44 -17
  4. stitch_continuity_dashboard/code.html +94 -15
  5. utils.py +84 -0
agent.py CHANGED
@@ -1,33 +1,21 @@
1
  import os
2
  import time
3
- import shutil
4
- import requests
5
- import tempfile
6
  import logging
7
  import json
8
  from typing import TypedDict, Optional
9
  from langgraph.graph import StateGraph, END
10
 
11
- # Unified SDK for both Analyst (Gemini) and Generator (Veo)
12
  from google import genai
13
  from google.genai import types
14
- from google.cloud import storage
15
 
 
16
  from groq import Groq
17
  from gradio_client import Client, handle_file
18
- from dotenv import load_dotenv
19
 
20
- # --- AUTH SETUP FOR HUGGING FACE ---
21
- if "GCP_CREDENTIALS_JSON" in os.environ:
22
- # logger isn't configured yet, so use print
23
- print("🔐 Found GCP Credentials Secret. Setting up auth...")
24
- creds_path = "gcp_credentials.json"
25
- with open(creds_path, "w") as f:
26
- f.write(os.environ["GCP_CREDENTIALS_JSON"])
27
- os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = creds_path
28
-
29
- # Load environment variables
30
- load_dotenv()
31
 
32
  # Configure Logging
33
  logging.basicConfig(level=logging.INFO)
@@ -35,6 +23,7 @@ logger = logging.getLogger(__name__)
35
 
36
  # State Definition
37
  class ContinuityState(TypedDict):
 
38
  video_a_url: str
39
  video_c_url: str
40
  user_notes: Optional[str]
@@ -44,36 +33,12 @@ class ContinuityState(TypedDict):
44
  video_a_local_path: Optional[str]
45
  video_c_local_path: Optional[str]
46
 
47
- # --- HELPER FUNCTIONS ---
48
- def download_to_temp(url):
49
- logger.info(f"Downloading: {url}")
50
- if os.path.exists(url):
51
- return url
52
-
53
- resp = requests.get(url, stream=True)
54
- resp.raise_for_status()
55
- suffix = os.path.splitext(url.split("/")[-1])[1] or ".mp4"
56
- with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as f:
57
- shutil.copyfileobj(resp.raw, f)
58
- return f.name
59
-
60
- def download_blob(gcs_uri, destination_file_name):
61
- """Downloads a blob from the bucket."""
62
- if not gcs_uri.startswith("gs://"):
63
- raise ValueError(f"Invalid GCS URI: {gcs_uri}")
64
-
65
- parts = gcs_uri[5:].split("/", 1)
66
- bucket_name = parts[0]
67
- source_blob_name = parts[1]
68
- storage_client = storage.Client()
69
- bucket = storage_client.bucket(bucket_name)
70
- blob = bucket.blob(source_blob_name)
71
- blob.download_to_filename(destination_file_name)
72
- logger.info(f"Downloaded storage object {gcs_uri} to local file {destination_file_name}.")
73
-
74
  # --- NODE 1: ANALYST ---
75
  def analyze_videos(state: ContinuityState) -> dict:
76
  logger.info("--- 🧐 Analyst Node (Director) ---")
 
 
 
77
 
78
  video_a_url = state['video_a_url']
79
  video_c_url = state['video_c_url']
@@ -88,16 +53,23 @@ def analyze_videos(state: ContinuityState) -> dict:
88
  if not path_c:
89
  path_c = download_to_temp(video_c_url)
90
  except Exception as e:
91
- logger.error(f"Download failed: {e}")
 
 
92
  return {"scene_analysis": "Error downloading", "veo_prompt": "Smooth cinematic transition"}
93
 
 
 
94
  # 2. Try Gemini 2.0 (With Retry and Wait Loop)
95
- client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"])
96
  transition_prompt = None
97
  retries = 3
98
  for attempt in range(retries):
99
  try:
100
  logger.info(f"Uploading videos to Gemini... (Attempt {attempt+1})")
 
 
 
101
  file_a = client.files.upload(file=path_a)
102
  file_c = client.files.upload(file=path_c)
103
 
@@ -123,6 +95,8 @@ def analyze_videos(state: ContinuityState) -> dict:
123
  """
124
 
125
  logger.info("Generating transition prompt...")
 
 
126
  response = client.models.generate_content(
127
  model="gemini-2.0-flash-exp",
128
  contents=[prompt_text, file_a, file_c]
@@ -134,6 +108,7 @@ def analyze_videos(state: ContinuityState) -> dict:
134
  if "429" in str(e) or "RESOURCE_EXHAUSTED" in str(e):
135
  wait = 30 * (attempt + 1)
136
  logger.warning(f"⚠️ Gemini Quota 429. Retrying in {wait}s...")
 
137
  time.sleep(wait)
138
  else:
139
  logger.error(f"⚠️ Gemini Error: {e}")
@@ -142,8 +117,9 @@ def analyze_videos(state: ContinuityState) -> dict:
142
  # 3. Fallback: Groq (Updated Model)
143
  if not transition_prompt:
144
  logger.info("Switching to Llama 3.2 (Groq) Fallback...")
 
145
  try:
146
- groq_client = Groq(api_key=os.environ["GROQ_API_KEY"])
147
  fallback_prompt = "Create a smooth, cinematic visual transition that bridges two scenes."
148
  completion = groq_client.chat.completions.create(
149
  model="llama-3.2-90b-vision-preview",
@@ -154,6 +130,8 @@ def analyze_videos(state: ContinuityState) -> dict:
154
  logger.error(f"❌ Groq also failed: {e}")
155
  transition_prompt = "Smooth cinematic transition with motion blur matching the scenes."
156
 
 
 
157
  return {
158
  "scene_analysis": transition_prompt,
159
  "veo_prompt": transition_prompt,
@@ -164,19 +142,24 @@ def analyze_videos(state: ContinuityState) -> dict:
164
  # --- NODE 2: GENERATOR ---
165
  def generate_video(state: ContinuityState) -> dict:
166
  logger.info("--- 🎥 Generator Node ---")
167
-
 
168
  prompt = state.get('veo_prompt', "")
169
  path_a = state.get('video_a_local_path')
170
  path_c = state.get('video_c_local_path')
171
 
 
 
172
  if not path_a or not path_c:
173
- return {"generated_video_url": "Error: Missing local video paths"}
 
 
174
 
175
  # --- ATTEMPT 1: GOOGLE VEO ---
176
  try:
177
  logger.info("⚡ Initializing Google Veo (Unified SDK)...")
178
- project_id = os.getenv("GCP_PROJECT_ID")
179
- location = os.getenv("GCP_LOCATION", "us-central1")
180
 
181
  if project_id:
182
  client = genai.Client(
@@ -186,6 +169,7 @@ def generate_video(state: ContinuityState) -> dict:
186
  )
187
 
188
  logger.info(f"Generating with Veo... Prompt: {prompt[:30]}...")
 
189
 
190
  operation = client.models.generate_videos(
191
  model='veo-2.0-generate-001',
@@ -198,33 +182,38 @@ def generate_video(state: ContinuityState) -> dict:
198
  logger.info(f"Waiting for Veo operation {operation.name}...")
199
  while not operation.done:
200
  time.sleep(10)
 
201
  operation = client.operations.get(operation)
202
  logger.info("...still generating...")
203
 
204
  if operation.result and operation.result.generated_videos:
205
  video_result = operation.result.generated_videos[0]
206
 
 
207
  # CASE 1: URI (GCS Bucket)
208
  if hasattr(video_result.video, 'uri') and video_result.video.uri:
209
  gcs_uri = video_result.video.uri
210
  logger.info(f"Veo output saved to GCS: {gcs_uri}")
 
211
  with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as f:
212
  local_path = f.name
 
213
  download_blob(gcs_uri, local_path)
214
  logger.info(f"✅ Veo Video Downloaded (from GCS): {local_path}")
215
- return {"generated_video_url": local_path}
216
 
217
  # CASE 2: RAW BYTES (Direct Return)
218
  elif hasattr(video_result.video, 'video_bytes') and video_result.video.video_bytes:
219
  logger.info("Veo returned raw bytes. Saving to local file...")
220
- with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as f:
221
- f.write(video_result.video.video_bytes)
222
- local_path = f.name
223
  logger.info(f"✅ Veo Video Saved (from Bytes): {local_path}")
224
- return {"generated_video_url": local_path}
225
-
226
  else:
227
  logger.warning(f"Veo operation completed but no URI/Bytes found. Result: {video_result}")
 
 
 
 
 
228
  else:
229
  logger.warning("Veo operation completed with no result.")
230
 
@@ -239,6 +228,8 @@ def generate_video(state: ContinuityState) -> dict:
239
 
240
  # --- ATTEMPT 2: SVD FALLBACK (Free) ---
241
  logger.info("🔄 Switching to SVD Fallback...")
 
 
242
  try:
243
  import cv2
244
  from PIL import Image
@@ -258,10 +249,12 @@ def generate_video(state: ContinuityState) -> dict:
258
  start_path = f_start.name
259
 
260
  client = Client("multimodalart/stable-video-diffusion")
 
 
261
  result = client.predict(
262
  handle_file(start_path),
263
  0.0, 0.0, 1, 25,
264
- fn_index=0
265
  )
266
  logger.info(f"✅ SVD Generated: {result}")
267
 
@@ -277,11 +270,13 @@ def generate_video(state: ContinuityState) -> dict:
277
  final_path = result['video']
278
  else:
279
  final_path = result
280
-
 
281
  return {"generated_video_url": final_path}
282
 
283
  except Exception as e:
284
  logger.error(f"❌ All Generators Failed. Error: {e}")
 
285
  return {"generated_video_url": f"Error: {str(e)}"}
286
 
287
  # Graph Construction
@@ -294,9 +289,10 @@ workflow.add_edge("generator", END)
294
  app = workflow.compile()
295
 
296
  # --- SERVER COMPATIBILITY WRAPPERS ---
297
- def analyze_only(state_or_path_a, path_c=None):
298
  if isinstance(state_or_path_a, str) and path_c:
299
  state = {
 
300
  "video_a_url": "local",
301
  "video_c_url": "local",
302
  "video_a_local_path": state_or_path_a,
@@ -304,12 +300,15 @@ def analyze_only(state_or_path_a, path_c=None):
304
  }
305
  else:
306
  state = state_or_path_a if isinstance(state_or_path_a, dict) else state_or_path_a.dict()
 
 
307
 
308
  result = analyze_videos(state)
309
  return {"prompt": result.get("scene_analysis"), "status": "success"}
310
 
311
- def generate_only(prompt, path_a, path_c):
312
  state = {
 
313
  "video_a_url": "local",
314
  "video_c_url": "local",
315
  "video_a_local_path": path_a,
 
1
  import os
2
  import time
 
 
 
3
  import logging
4
  import json
5
  from typing import TypedDict, Optional
6
  from langgraph.graph import StateGraph, END
7
 
8
+ # Import unified SDK
9
  from google import genai
10
  from google.genai import types
 
11
 
12
+ # Import other clients
13
  from groq import Groq
14
  from gradio_client import Client, handle_file
 
15
 
16
+ # Import refactored modules
17
+ from config import Settings
18
+ from utils import download_to_temp, download_blob, save_video_bytes, update_job_status
 
 
 
 
 
 
 
 
19
 
20
  # Configure Logging
21
  logging.basicConfig(level=logging.INFO)
 
23
 
24
  # State Definition
25
  class ContinuityState(TypedDict):
26
+ job_id: Optional[str] # Added job_id
27
  video_a_url: str
28
  video_c_url: str
29
  user_notes: Optional[str]
 
33
  video_a_local_path: Optional[str]
34
  video_c_local_path: Optional[str]
35
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
36
  # --- NODE 1: ANALYST ---
37
  def analyze_videos(state: ContinuityState) -> dict:
38
  logger.info("--- 🧐 Analyst Node (Director) ---")
39
+ job_id = state.get("job_id")
40
+
41
+ update_job_status(job_id, "analyzing", 10, "Director starting analysis...")
42
 
43
  video_a_url = state['video_a_url']
44
  video_c_url = state['video_c_url']
 
53
  if not path_c:
54
  path_c = download_to_temp(video_c_url)
55
  except Exception as e:
56
+ error_msg = f"Download failed: {e}"
57
+ logger.error(error_msg)
58
+ update_job_status(job_id, "error", 0, error_msg)
59
  return {"scene_analysis": "Error downloading", "veo_prompt": "Smooth cinematic transition"}
60
 
61
+ update_job_status(job_id, "analyzing", 20, "Director analyzing motion and lighting...")
62
+
63
  # 2. Try Gemini 2.0 (With Retry and Wait Loop)
64
+ client = genai.Client(api_key=Settings.GOOGLE_API_KEY)
65
  transition_prompt = None
66
  retries = 3
67
  for attempt in range(retries):
68
  try:
69
  logger.info(f"Uploading videos to Gemini... (Attempt {attempt+1})")
70
+ if attempt > 0:
71
+ update_job_status(job_id, "analyzing", 20, f"Retrying analysis (Attempt {attempt+1})...")
72
+
73
  file_a = client.files.upload(file=path_a)
74
  file_c = client.files.upload(file=path_c)
75
 
 
95
  """
96
 
97
  logger.info("Generating transition prompt...")
98
+ update_job_status(job_id, "analyzing", 30, "Director writing scene transition...")
99
+
100
  response = client.models.generate_content(
101
  model="gemini-2.0-flash-exp",
102
  contents=[prompt_text, file_a, file_c]
 
108
  if "429" in str(e) or "RESOURCE_EXHAUSTED" in str(e):
109
  wait = 30 * (attempt + 1)
110
  logger.warning(f"⚠️ Gemini Quota 429. Retrying in {wait}s...")
111
+ update_job_status(job_id, "analyzing", 25, f"High traffic, retrying in {wait}s...")
112
  time.sleep(wait)
113
  else:
114
  logger.error(f"⚠️ Gemini Error: {e}")
 
117
  # 3. Fallback: Groq (Updated Model)
118
  if not transition_prompt:
119
  logger.info("Switching to Llama 3.2 (Groq) Fallback...")
120
+ update_job_status(job_id, "analyzing", 35, "Using backup director (Llama 3.2)...")
121
  try:
122
+ groq_client = Groq(api_key=Settings.GROQ_API_KEY)
123
  fallback_prompt = "Create a smooth, cinematic visual transition that bridges two scenes."
124
  completion = groq_client.chat.completions.create(
125
  model="llama-3.2-90b-vision-preview",
 
130
  logger.error(f"❌ Groq also failed: {e}")
131
  transition_prompt = "Smooth cinematic transition with motion blur matching the scenes."
132
 
133
+ update_job_status(job_id, "generating", 40, "Director prompt ready. Starting generation...")
134
+
135
  return {
136
  "scene_analysis": transition_prompt,
137
  "veo_prompt": transition_prompt,
 
142
  # --- NODE 2: GENERATOR ---
143
  def generate_video(state: ContinuityState) -> dict:
144
  logger.info("--- 🎥 Generator Node ---")
145
+ job_id = state.get("job_id")
146
+
147
  prompt = state.get('veo_prompt', "")
148
  path_a = state.get('video_a_local_path')
149
  path_c = state.get('video_c_local_path')
150
 
151
+ update_job_status(job_id, "generating", 50, "Veo initializing...")
152
+
153
  if not path_a or not path_c:
154
+ error_msg = "Error: Missing local video paths"
155
+ update_job_status(job_id, "error", 0, error_msg)
156
+ return {"generated_video_url": error_msg}
157
 
158
  # --- ATTEMPT 1: GOOGLE VEO ---
159
  try:
160
  logger.info("⚡ Initializing Google Veo (Unified SDK)...")
161
+ project_id = Settings.GCP_PROJECT_ID
162
+ location = Settings.GCP_LOCATION
163
 
164
  if project_id:
165
  client = genai.Client(
 
169
  )
170
 
171
  logger.info(f"Generating with Veo... Prompt: {prompt[:30]}...")
172
+ update_job_status(job_id, "generating", 60, "Veo generating video (this takes ~60s)...")
173
 
174
  operation = client.models.generate_videos(
175
  model='veo-2.0-generate-001',
 
182
  logger.info(f"Waiting for Veo operation {operation.name}...")
183
  while not operation.done:
184
  time.sleep(10)
185
+ # Pass operation object, not name
186
  operation = client.operations.get(operation)
187
  logger.info("...still generating...")
188
 
189
  if operation.result and operation.result.generated_videos:
190
  video_result = operation.result.generated_videos[0]
191
 
192
+ local_path = None
193
  # CASE 1: URI (GCS Bucket)
194
  if hasattr(video_result.video, 'uri') and video_result.video.uri:
195
  gcs_uri = video_result.video.uri
196
  logger.info(f"Veo output saved to GCS: {gcs_uri}")
197
+
198
  with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as f:
199
  local_path = f.name
200
+
201
  download_blob(gcs_uri, local_path)
202
  logger.info(f"✅ Veo Video Downloaded (from GCS): {local_path}")
 
203
 
204
  # CASE 2: RAW BYTES (Direct Return)
205
  elif hasattr(video_result.video, 'video_bytes') and video_result.video.video_bytes:
206
  logger.info("Veo returned raw bytes. Saving to local file...")
207
+ local_path = save_video_bytes(video_result.video.video_bytes)
 
 
208
  logger.info(f"✅ Veo Video Saved (from Bytes): {local_path}")
209
+
 
210
  else:
211
  logger.warning(f"Veo operation completed but no URI/Bytes found. Result: {video_result}")
212
+
213
+ if local_path:
214
+ update_job_status(job_id, "completed", 100, "Done!", video_url=local_path)
215
+ return {"generated_video_url": local_path}
216
+
217
  else:
218
  logger.warning("Veo operation completed with no result.")
219
 
 
228
 
229
  # --- ATTEMPT 2: SVD FALLBACK (Free) ---
230
  logger.info("🔄 Switching to SVD Fallback...")
231
+ update_job_status(job_id, "generating", 60, "Switching to SVD fallback...")
232
+
233
  try:
234
  import cv2
235
  from PIL import Image
 
249
  start_path = f_start.name
250
 
251
  client = Client("multimodalart/stable-video-diffusion")
252
+
253
+ update_job_status(job_id, "generating", 70, "SVD generating video...")
254
  result = client.predict(
255
  handle_file(start_path),
256
  0.0, 0.0, 1, 25,
257
+ api_name="/video"
258
  )
259
  logger.info(f"✅ SVD Generated: {result}")
260
 
 
270
  final_path = result['video']
271
  else:
272
  final_path = result
273
+
274
+ update_job_status(job_id, "completed", 100, "Done (SVD)!", video_url=final_path)
275
  return {"generated_video_url": final_path}
276
 
277
  except Exception as e:
278
  logger.error(f"❌ All Generators Failed. Error: {e}")
279
+ update_job_status(job_id, "error", 0, f"All generation failed: {e}")
280
  return {"generated_video_url": f"Error: {str(e)}"}
281
 
282
  # Graph Construction
 
289
  app = workflow.compile()
290
 
291
  # --- SERVER COMPATIBILITY WRAPPERS ---
292
+ def analyze_only(state_or_path_a, path_c=None, job_id=None):
293
  if isinstance(state_or_path_a, str) and path_c:
294
  state = {
295
+ "job_id": job_id,
296
  "video_a_url": "local",
297
  "video_c_url": "local",
298
  "video_a_local_path": state_or_path_a,
 
300
  }
301
  else:
302
  state = state_or_path_a if isinstance(state_or_path_a, dict) else state_or_path_a.dict()
303
+ if job_id and "job_id" not in state:
304
+ state["job_id"] = job_id
305
 
306
  result = analyze_videos(state)
307
  return {"prompt": result.get("scene_analysis"), "status": "success"}
308
 
309
+ def generate_only(prompt, path_a, path_c, job_id=None):
310
  state = {
311
+ "job_id": job_id,
312
  "video_a_url": "local",
313
  "video_c_url": "local",
314
  "video_a_local_path": path_a,
config.py ADDED
@@ -0,0 +1,32 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ from dotenv import load_dotenv
3
+
4
+ # Load environment variables
5
+ load_dotenv()
6
+
7
+ class Settings:
8
+ GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
9
+ GCP_PROJECT_ID = os.getenv("GCP_PROJECT_ID")
10
+ GCP_LOCATION = os.getenv("GCP_LOCATION", "us-central1")
11
+ GCP_CREDENTIALS_JSON = os.getenv("GCP_CREDENTIALS_JSON")
12
+ GROQ_API_KEY = os.getenv("GROQ_API_KEY")
13
+
14
+ @classmethod
15
+ def setup_auth(cls):
16
+ """Sets up Google Application Credentials if JSON is provided in env."""
17
+ if cls.GCP_CREDENTIALS_JSON:
18
+ print("🔐 Found GCP Credentials Secret. Setting up auth...")
19
+ creds_path = "gcp_credentials.json"
20
+ with open(creds_path, "w") as f:
21
+ f.write(cls.GCP_CREDENTIALS_JSON)
22
+ os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = creds_path
23
+
24
+ @classmethod
25
+ def validate(cls):
26
+ """Validates critical environment variables."""
27
+ if not cls.GOOGLE_API_KEY:
28
+ raise ValueError("GOOGLE_API_KEY is missing from environment variables.")
29
+
30
+ # Run setup and validation immediately on import
31
+ Settings.setup_auth()
32
+ Settings.validate()
server.py CHANGED
@@ -1,11 +1,12 @@
1
- from fastapi import FastAPI, HTTPException, UploadFile, Form, File, Body
2
  from fastapi.middleware.cors import CORSMiddleware
3
  from fastapi.staticfiles import StaticFiles
4
- from fastapi.responses import FileResponse
5
  import uvicorn
6
  import os
7
  import shutil
8
  import uuid
 
9
  # FIXED IMPORT: Importing from root agent.py instead of continuity_agent
10
  from agent import analyze_only, generate_only
11
 
@@ -30,6 +31,7 @@ async def read_root():
30
 
31
  @app.post("/analyze")
32
  async def analyze_endpoint(
 
33
  video_a: UploadFile = File(...),
34
  video_c: UploadFile = File(...)
35
  ):
@@ -46,8 +48,24 @@ async def analyze_endpoint(
46
  with open(path_c, "wb") as buffer:
47
  shutil.copyfileobj(video_c.file, buffer)
48
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
49
  # Call Agent with local paths
50
- result = analyze_only(os.path.abspath(path_a), os.path.abspath(path_c))
51
 
52
  if result.get("status") == "error":
53
  raise HTTPException(status_code=500, detail=result.get("detail"))
@@ -63,6 +81,7 @@ async def analyze_endpoint(
63
 
64
  @app.post("/generate")
65
  async def generate_endpoint(
 
66
  prompt: str = Body(...),
67
  video_a_path: str = Body(...),
68
  video_c_path: str = Body(...)
@@ -71,26 +90,34 @@ async def generate_endpoint(
71
  if not os.path.exists(video_a_path) or not os.path.exists(video_c_path):
72
  raise HTTPException(status_code=400, detail="Video files not found on server.")
73
 
74
- # Call Agent
75
- result = generate_only(prompt, video_a_path, video_c_path)
76
- gen_path = result.get("generated_video_url")
77
-
78
- if not gen_path or "Error" in gen_path:
79
- raise HTTPException(status_code=500, detail=f"Generation failed: {gen_path}")
80
-
81
- final_filename = f"{uuid.uuid4()}_bridge.mp4"
82
- final_output_path = os.path.join(OUTPUT_DIR, final_filename)
83
 
84
- if os.path.exists(gen_path):
85
- shutil.move(gen_path, final_output_path)
86
- else:
87
- raise HTTPException(status_code=500, detail="Generated file missing.")
 
 
 
88
 
89
- return {"video_url": f"/outputs/{final_filename}"}
90
 
91
  except Exception as e:
92
  print(f"Server Error (Generate): {e}")
93
  raise HTTPException(status_code=500, detail=str(e))
94
 
 
 
 
 
 
 
 
 
 
 
 
 
 
95
  if __name__ == "__main__":
96
  uvicorn.run("server:app", host="0.0.0.0", port=7860, reload=False)
 
1
+ from fastapi import FastAPI, HTTPException, UploadFile, Form, File, Body, BackgroundTasks
2
  from fastapi.middleware.cors import CORSMiddleware
3
  from fastapi.staticfiles import StaticFiles
4
+ from fastapi.responses import FileResponse, JSONResponse
5
  import uvicorn
6
  import os
7
  import shutil
8
  import uuid
9
+ import json
10
  # FIXED IMPORT: Importing from root agent.py instead of continuity_agent
11
  from agent import analyze_only, generate_only
12
 
 
31
 
32
  @app.post("/analyze")
33
  async def analyze_endpoint(
34
+ background_tasks: BackgroundTasks,
35
  video_a: UploadFile = File(...),
36
  video_c: UploadFile = File(...)
37
  ):
 
48
  with open(path_c, "wb") as buffer:
49
  shutil.copyfileobj(video_c.file, buffer)
50
 
51
+ # Call Agent synchronously for analysis (it's relatively fast usually, but could be async too if desired)
52
+ # For now keeping it sync as per user previous flows, but could be made async easily.
53
+ # However, user request specifically focused on "generate" being async.
54
+ # But wait, analyze also calls Gemini which can be slow.
55
+ # Refactoring to also include job_id for analyze might be good practice but not explicitly requested for "analyze" in prompt detail,
56
+ # BUT the user request says: "Update analyze_videos node: Call utils.update_job_status...".
57
+ # So we should probably treat analyze as async too OR just pass the job_id.
58
+ # But the frontend flow for analyze currently awaits the response to get the prompt.
59
+ # If we make it async, we break the frontend flow unless we refactor that too.
60
+ # The prompt says: "Update code.html ... Update the generate button JavaScript."
61
+ # It doesn't explicitly say to update the analyze button logic to be async.
62
+ # However, update_job_status IS called in analyze_videos.
63
+ # So, we can pass a job_id if we want status updates, but if we await it, the status updates are only useful if polled in parallel.
64
+ # For now, I will keep analyze synchronous but pass a dummy job_id if we want logging, or just let it block.
65
+ # Actually, let's keep it blocking as per original server code, but pass a job_id so at least logs are written.
66
+
67
  # Call Agent with local paths
68
+ result = analyze_only(os.path.abspath(path_a), os.path.abspath(path_c), job_id=request_id)
69
 
70
  if result.get("status") == "error":
71
  raise HTTPException(status_code=500, detail=result.get("detail"))
 
81
 
82
  @app.post("/generate")
83
  async def generate_endpoint(
84
+ background_tasks: BackgroundTasks,
85
  prompt: str = Body(...),
86
  video_a_path: str = Body(...),
87
  video_c_path: str = Body(...)
 
90
  if not os.path.exists(video_a_path) or not os.path.exists(video_c_path):
91
  raise HTTPException(status_code=400, detail="Video files not found on server.")
92
 
93
+ job_id = str(uuid.uuid4())
 
 
 
 
 
 
 
 
94
 
95
+ # Initialize job status
96
+ status_file = os.path.join(OUTPUT_DIR, f"{job_id}.json")
97
+ with open(status_file, "w") as f:
98
+ json.dump({"status": "queued", "progress": 0, "log": "Job queued..."}, f)
99
+
100
+ # Add to background tasks
101
+ background_tasks.add_task(generate_only, prompt, video_a_path, video_c_path, job_id)
102
 
103
+ return {"job_id": job_id}
104
 
105
  except Exception as e:
106
  print(f"Server Error (Generate): {e}")
107
  raise HTTPException(status_code=500, detail=str(e))
108
 
109
+ @app.get("/status/{job_id}")
110
+ async def get_status(job_id: str):
111
+ file_path = os.path.join(OUTPUT_DIR, f"{job_id}.json")
112
+ if not os.path.exists(file_path):
113
+ raise HTTPException(status_code=404, detail="Job not found")
114
+
115
+ try:
116
+ with open(file_path, "r") as f:
117
+ data = json.load(f)
118
+ return data
119
+ except Exception as e:
120
+ raise HTTPException(status_code=500, detail=f"Error reading status: {e}")
121
+
122
  if __name__ == "__main__":
123
  uvicorn.run("server:app", host="0.0.0.0", port=7860, reload=False)
stitch_continuity_dashboard/code.html CHANGED
@@ -364,11 +364,11 @@
364
 
365
  // Loading State
366
  const originalContent = btn.innerHTML;
367
- btn.innerHTML = `<span class="material-symbols-outlined animate-spin text-[20px] mr-2">progress_activity</span> Generating (This may take ~60s)...`;
368
  btn.disabled = true;
369
  btn.classList.add("opacity-70", "cursor-not-allowed");
370
 
371
  try {
 
372
  const response = await fetch("/generate", {
373
  method: "POST",
374
  headers: {
@@ -381,29 +381,108 @@
381
  })
382
  });
383
 
384
- if (!response.ok) throw new Error("Generation failed.");
385
 
386
  const data = await response.json();
387
-
388
- // Success: Inject Video
389
- const bridgeCard = document.getElementById("bridge-card");
390
- if (bridgeCard) {
391
- bridgeCard.innerHTML = `
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
392
  <video controls autoplay loop class="w-full h-full object-cover rounded-2xl border-2 border-primary shadow-neon">
393
- <source src="${data.video_url}" type="video/mp4">
394
  Your browser does not support the video tag.
395
  </video>
396
  `;
397
- // Reset UI to allow new creation but keep video
398
- document.getElementById("analysis-panel").classList.remove("hidden");
399
- document.getElementById("review-panel").classList.add("hidden");
400
- btn.innerHTML = originalContent; // Restore button just in case
401
- }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
402
 
403
  } catch (error) {
404
  console.error(error);
405
- alert("❌ Generation Failed: " + error.message);
406
- } finally {
407
  btn.innerHTML = originalContent;
408
  btn.disabled = false;
409
  btn.classList.remove("opacity-70", "cursor-not-allowed");
 
364
 
365
  // Loading State
366
  const originalContent = btn.innerHTML;
 
367
  btn.disabled = true;
368
  btn.classList.add("opacity-70", "cursor-not-allowed");
369
 
370
  try {
371
+ // 1. Start Job
372
  const response = await fetch("/generate", {
373
  method: "POST",
374
  headers: {
 
381
  })
382
  });
383
 
384
+ if (!response.ok) throw new Error("Generation failed to start.");
385
 
386
  const data = await response.json();
387
+ const jobId = data.job_id;
388
+
389
+ // 2. Poll for Status
390
+ const pollInterval = setInterval(async () => {
391
+ try {
392
+ const statusRes = await fetch(`/status/${jobId}`);
393
+ if (!statusRes.ok) return;
394
+
395
+ const statusData = await statusRes.json();
396
+
397
+ // Update Button Text
398
+ btn.innerHTML = `<span class="material-symbols-outlined animate-spin text-[20px] mr-2">progress_activity</span> ${statusData.log} (${statusData.progress}%)`;
399
+
400
+ if (statusData.status === "completed") {
401
+ clearInterval(pollInterval);
402
+
403
+ // Show Video
404
+ const bridgeCard = document.getElementById("bridge-card");
405
+ if (bridgeCard) {
406
+ // If video_url is a relative path like "outputs/...", make sure it starts with /
407
+ let videoUrl = statusData.video_url;
408
+ // The backend returns full local path generally, but server logic for update_job_status
409
+ // kept the local path. We need to serve it via the /outputs/ mount.
410
+ // Wait, utils.save_video_bytes returns absolute path.
411
+ // server.py: app.mount("/outputs", StaticFiles(directory=OUTPUT_DIR), name="outputs")
412
+ // config.py/utils.py save to temp folder?
413
+ // Agent returns local path.
414
+ // In server.py for generate_only, we don't have the final file move logic anymore locally in server.py?
415
+ // Ah, right. Agent calls update_job_status with the video_url (local path).
416
+ // The frontend needs a URL it can access.
417
+ // We need to move the file to OUTPUT_DIR or make sure Agent saves to OUTPUT_DIR.
418
+ // Agent uses download_to_temp / save_video_bytes which use tempfile.
419
+ // So the file in 'video_url' is in /tmp/...
420
+ // This won't be accessible via /outputs/.
421
+ // We need to fix this.
422
+
423
+ // This requires a minor fix in server or agent.
424
+ // Since I can't edit server.py inside this JS block logic easily to change python logic...
425
+ // I should probably have updated the Agent to save to OUTPUT_DIR or Server to handle the move?
426
+ // BUT, the server.py I just wrote launches generate_only in background.
427
+ // generate_only returns the result, but since it's in background, we ignore return.
428
+ // The only communication channel is the status file.
429
+ // The status file contains 'video_url' which is the temp path.
430
+ // Frontend cannot access temp path.
431
+
432
+ // Quick FIX:
433
+ // The frontend can't move files.
434
+ // The AGENT needs to know where to save, OR Utils.
435
+ // OR, we update the logic in agent.py to try to move it? No that's messy.
436
+ // Let's look at where static files are served. `/outputs`.
437
+ // If the file is in /tmp, we can't serve it.
438
+
439
+ // Maybe I should modify `utils.save_video_bytes` to accept a directory?
440
+ // Or I can modify `server.py` to launch a wrapper that moves the file?
441
+
442
+ // I will implement a wrapper in server.py in the NEXT step or modify server.py now if I catch it?
443
+ // I already wrote server.py.
444
+ // I will apply a fix to `agent.py` or `utils.py` to ensure it saves to `outputs/` OR
445
+ // modify `server.py` to wrap the task.
446
+ // Wrapping in server.py is cleanest for separation.
447
+ // "generate_only" is imported.
448
+
449
+ // Wait, `utils.update_job_status` writes to `outputs/{job_id}.json`.
450
+ // If I change `utils.py` to also move the video if provided?
451
+ // That seems like a good side effect for a "helper" dealing with job status in this specific app context.
452
+ // Or `agent.py` can move it.
453
+
454
+ // Let's handle this in the JS for now assuming the URL *might* work if I fix it in backend in a sec.
455
+ // I'll assume the backend will provide a valid relative URL like `/outputs/filename.mp4`.
456
+
457
+ bridgeCard.innerHTML = `
458
  <video controls autoplay loop class="w-full h-full object-cover rounded-2xl border-2 border-primary shadow-neon">
459
+ <source src="${videoUrl}" type="video/mp4">
460
  Your browser does not support the video tag.
461
  </video>
462
  `;
463
+
464
+ document.getElementById("analysis-panel").classList.remove("hidden");
465
+ document.getElementById("review-panel").classList.add("hidden");
466
+ btn.innerHTML = originalContent;
467
+ btn.disabled = false;
468
+ btn.classList.remove("opacity-70", "cursor-not-allowed");
469
+ }
470
+ } else if (statusData.status === "error") {
471
+ clearInterval(pollInterval);
472
+ alert("❌ Generation Error: " + statusData.log);
473
+ btn.innerHTML = originalContent;
474
+ btn.disabled = false;
475
+ btn.classList.remove("opacity-70", "cursor-not-allowed");
476
+ }
477
+
478
+ } catch (e) {
479
+ console.error("Polling error", e);
480
+ }
481
+ }, 1000);
482
 
483
  } catch (error) {
484
  console.error(error);
485
+ alert("❌ Request Failed: " + error.message);
 
486
  btn.innerHTML = originalContent;
487
  btn.disabled = false;
488
  btn.classList.remove("opacity-70", "cursor-not-allowed");
utils.py ADDED
@@ -0,0 +1,84 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import shutil
3
+ import requests
4
+ import tempfile
5
+ import logging
6
+ import json
7
+ from google.cloud import storage
8
+
9
+ # Configure logging for utils
10
+ logger = logging.getLogger(__name__)
11
+
12
+ def download_to_temp(url):
13
+ """Downloads a file from a URL to a temporary local file."""
14
+ logger.info(f"Downloading: {url}")
15
+ if os.path.exists(url):
16
+ return url
17
+
18
+ resp = requests.get(url, stream=True)
19
+ resp.raise_for_status()
20
+ suffix = os.path.splitext(url.split("/")[-1])[1] or ".mp4"
21
+ with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as f:
22
+ shutil.copyfileobj(resp.raw, f)
23
+ return f.name
24
+
25
+ def download_blob(gcs_uri, destination_file_name):
26
+ """Downloads a blob from the Google Cloud Storage bucket."""
27
+ if not gcs_uri.startswith("gs://"):
28
+ raise ValueError(f"Invalid GCS URI: {gcs_uri}")
29
+
30
+ parts = gcs_uri[5:].split("/", 1)
31
+ bucket_name = parts[0]
32
+ source_blob_name = parts[1]
33
+ storage_client = storage.Client()
34
+ bucket = storage_client.bucket(bucket_name)
35
+ blob = bucket.blob(source_blob_name)
36
+ blob.download_to_filename(destination_file_name)
37
+ logger.info(f"Downloaded storage object {gcs_uri} to local file {destination_file_name}.")
38
+
39
+ def save_video_bytes(bytes_data, suffix=".mp4") -> str:
40
+ """Saves raw video bytes to a temporary local file."""
41
+ with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as f:
42
+ f.write(bytes_data)
43
+ local_path = f.name
44
+ logger.info(f"✅ Video bytes saved to: {local_path}")
45
+ return local_path
46
+
47
+ def update_job_status(job_id, status, progress, log=None, video_url=None):
48
+ """Writes a JSON file to outputs/{job_id}.json with current job status."""
49
+ if not job_id:
50
+ return
51
+
52
+ output_dir = "outputs"
53
+ os.makedirs(output_dir, exist_ok=True)
54
+
55
+ # Handle video file move if completed
56
+ final_video_url = video_url
57
+ if video_url and os.path.exists(video_url) and status == "completed":
58
+ try:
59
+ filename = os.path.basename(video_url)
60
+ # Ensure unique name or use job_id
61
+ final_filename = f"{job_id}_final{os.path.splitext(filename)[1]}"
62
+ destination = os.path.join(output_dir, final_filename)
63
+ shutil.move(video_url, destination)
64
+ logger.info(f"Moved video to {destination}")
65
+ # Set public URL relative to server root
66
+ final_video_url = f"/outputs/{final_filename}"
67
+ except Exception as e:
68
+ logger.error(f"Failed to move output video: {e}")
69
+
70
+ file_path = os.path.join(output_dir, f"{job_id}.json")
71
+
72
+ data = {
73
+ "status": status,
74
+ "progress": progress,
75
+ "log": log,
76
+ "video_url": final_video_url
77
+ }
78
+
79
+ try:
80
+ with open(file_path, "w") as f:
81
+ json.dump(data, f)
82
+ logger.info(f"Job {job_id} updated: {status} ({progress}%) - {log}")
83
+ except Exception as e:
84
+ logger.error(f"Failed to update job status for {job_id}: {e}")