Spaces:
Sleeping
Sleeping
File size: 7,095 Bytes
05ae0b9 a8c659a c3f17d3 4e8e87d bcae6dd 131b652 e9456a0 fad001e e059cb3 a8c659a 01bccd8 4e8e87d e059cb3 4e8e87d e059cb3 4e8e87d bcae6dd 4e8e87d 623bad5 4c4dca8 4e8e87d a8c659a e059cb3 fad001e 4c4dca8 e9456a0 4c4dca8 fad001e 4e8e87d 62973c2 fad001e 4c4dca8 4e8e87d e059cb3 4c4dca8 62973c2 bcae6dd c91372f 4c4dca8 bcae6dd 4c4dca8 535b8bb 4c4dca8 62973c2 4c4dca8 62973c2 4c4dca8 62973c2 4c4dca8 fad001e 4c4dca8 fad001e a8c659a e059cb3 fad001e ef27e6d 59e0951 0c79677 0644f3e 0c79677 4c4dca8 62973c2 0c79677 53a2257 0c79677 62973c2 70214e8 53a2257 62973c2 0c79677 0644f3e 0c79677 53a2257 62973c2 53a2257 62973c2 0c79677 53a2257 0753051 0c79677 4c4dca8 62973c2 0c79677 0753051 0c79677 924120a 0c79677 4c4dca8 0c79677 c76f6fa 0c79677 dae328a 0c79677 4c4dca8 ef27e6d 59e0951 fad001e 59e0951 4c4dca8 59e0951 70214e8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
import os
import time
import logging
import tempfile
import hashlib
import json
from google import genai
from google.genai import types
from config import Settings
from utils import download_to_temp, download_blob, save_video_bytes, update_job_status, stitch_videos
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def get_file_hash(filepath):
hash_md5 = hashlib.md5()
with open(filepath, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def get_or_upload_file(client, filepath):
file_hash = get_file_hash(filepath)
try:
for f in client.files.list(config={'page_size': 50}):
if f.display_name == file_hash and f.state.name == "ACTIVE":
logger.info(f"♻️ Smart Cache Hit: {file_hash}")
return f
except Exception:
pass
logger.info(f"⬆️ Uploading new file: {file_hash}")
return client.files.upload(file=filepath, config={'display_name': file_hash})
def analyze_only(path_a, path_c, job_id=None):
update_job_status(job_id, "analyzing", 10, "Director checking file cache...")
client = genai.Client(api_key=Settings.GOOGLE_API_KEY)
try:
file_a = get_or_upload_file(client, path_a)
file_c = get_or_upload_file(client, path_c)
while file_a.state.name == "PROCESSING" or file_c.state.name == "PROCESSING":
update_job_status(job_id, "analyzing", 20, "Google processing video...")
time.sleep(2)
file_a = client.files.get(name=file_a.name)
file_c = client.files.get(name=file_c.name)
prompt = """
You are a VFX Director. Analyze Video A and Video C.
Return a JSON object with exactly these keys:
{
"analysis_a": "Brief description of Video A",
"analysis_c": "Brief description of Video C",
"visual_prompt_b": "A surreal, seamless morphing prompt that transforms A into C."
}
"""
update_job_status(job_id, "analyzing", 30, "Director drafting creative morph...")
res = client.models.generate_content(
model="gemini-2.0-flash",
contents=[prompt, file_a, file_c],
config=types.GenerateContentConfig(response_mime_type="application/json")
)
text = res.text.strip()
if text.startswith("```json"): text = text[7:]
elif text.startswith("```"): text = text[3:]
if text.endswith("```"): text = text[:-3]
text = text.strip()
data = {}
try:
parsed = json.loads(text)
if isinstance(parsed, list): data = parsed[0] if len(parsed) > 0 else {}
elif isinstance(parsed, dict): data = parsed
except json.JSONDecodeError:
logger.warning(f"JSON Parse Failed. Fallback to raw text.")
pass
return {
"analysis_a": data.get("analysis_a", "Analysis unavailable."),
"analysis_c": data.get("analysis_c", "Analysis unavailable."),
"prompt": data.get("visual_prompt_b", text),
"status": "success"
}
except Exception as e:
logger.error(f"Analysis failed: {e}")
return {"detail": str(e), "status": "error"}
def generate_only(prompt, path_a, path_c, job_id, style, audio, neg, guidance, motion):
try:
update_job_status(job_id, "generating", 50, "Production started (Veo 3.1)...")
full_prompt = f"{style} style. {prompt} Soundtrack: {audio}"
if neg:
full_prompt += f" --no {neg}"
if not Settings.GCP_PROJECT_ID:
raise Exception("GCP_PROJECT_ID missing.")
client = genai.Client(vertexai=True, project=Settings.GCP_PROJECT_ID, location=Settings.GCP_LOCATION)
# 1. Start Job
op = client.models.generate_videos(
model='veo-3.1-generate-preview',
prompt=full_prompt,
config=types.GenerateVideosConfig(number_of_videos=1)
)
# 2. Extract ID String
op_name = op.name if hasattr(op, 'name') else str(op)
logger.info(f"Polling Job ID: {op_name}")
# 3. Create Valid SDK Object for Polling
polling_op = types.GenerateVideosOperation(name=op_name)
start_time = time.time()
while True:
if time.time() - start_time > 600:
raise Exception("Timeout (10m).")
try:
# Refresh logic: Pass the valid types.GenerateVideosOperation object
refreshed_op = client.operations.get(polling_op)
# Check status
if hasattr(refreshed_op, 'done') and refreshed_op.done:
logger.info("Generation Done.")
op = refreshed_op
break
except Exception as e:
logger.warning(f"Polling error: {e}")
time.sleep(20)
continue
logger.info("Waiting for Veo...")
time.sleep(20)
# 4. Result Extraction
res_val = op.result
result = res_val() if callable(res_val) else res_val
if result and (getattr(result, 'generated_videos', None) or 'generated_videos' in result):
vid = result.generated_videos[0] if hasattr(result, 'generated_videos') else result['generated_videos'][0]
bridge_path = tempfile.mktemp(suffix=".mp4")
if hasattr(vid.video, 'uri') and vid.video.uri:
download_blob(vid.video.uri, bridge_path)
else:
bridge_path = save_video_bytes(vid.video.video_bytes)
update_job_status(job_id, "stitching", 85, "Stitching...")
final_cut = os.path.join("outputs", f"{job_id}_merged_temp.mp4")
merged_path = stitch_videos(path_a, bridge_path, path_c, final_cut)
msg = "Done! (Merged)" if merged_path else "Done! (Bridge Only)"
update_job_status(job_id, "completed", 100, msg, video_url=bridge_path, merged_video_url=merged_path)
else:
raise Exception("No video output.")
except Exception as e:
logger.error(f"Worker crashed: {e}")
update_job_status(job_id, "error", 0, f"Error: {e}")
finally:
# Enforce Terminal State
try:
status_file = f"outputs/{job_id}.json"
if os.path.exists(status_file):
with open(status_file, "r") as f:
data = json.load(f)
status = data.get("status")
if status not in ["completed", "error"]:
logger.warning(f"Job {job_id} left in non-terminal state ({status}). Forcing error.")
update_job_status(job_id, "error", 0, "Job terminated unexpectedly.")
except Exception as e:
logger.error(f"Final safety net failed: {e}")
|