import os import time import logging import tempfile import hashlib import json from google import genai from google.genai import types from config import Settings from utils import download_to_temp, download_blob, save_video_bytes, update_job_status, stitch_videos logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def get_file_hash(filepath): hash_md5 = hashlib.md5() with open(filepath, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_md5.update(chunk) return hash_md5.hexdigest() def get_or_upload_file(client, filepath): file_hash = get_file_hash(filepath) try: for f in client.files.list(config={'page_size': 50}): if f.display_name == file_hash and f.state.name == "ACTIVE": logger.info(f"♻️ Smart Cache Hit: {file_hash}") return f except Exception: pass logger.info(f"⬆️ Uploading new file: {file_hash}") return client.files.upload(file=filepath, config={'display_name': file_hash}) def analyze_only(path_a, path_c, job_id=None): update_job_status(job_id, "analyzing", 10, "Director checking file cache...") client = genai.Client(api_key=Settings.GOOGLE_API_KEY) try: file_a = get_or_upload_file(client, path_a) file_c = get_or_upload_file(client, path_c) while file_a.state.name == "PROCESSING" or file_c.state.name == "PROCESSING": update_job_status(job_id, "analyzing", 20, "Google processing video...") time.sleep(2) file_a = client.files.get(name=file_a.name) file_c = client.files.get(name=file_c.name) prompt = """ You are a VFX Director. Analyze Video A and Video C. Return a JSON object with exactly these keys: { "analysis_a": "Brief description of Video A", "analysis_c": "Brief description of Video C", "visual_prompt_b": "A surreal, seamless morphing prompt that transforms A into C." } """ update_job_status(job_id, "analyzing", 30, "Director drafting creative morph...") res = client.models.generate_content( model="gemini-2.0-flash", contents=[prompt, file_a, file_c], config=types.GenerateContentConfig(response_mime_type="application/json") ) text = res.text.strip() if text.startswith("```json"): text = text[7:] elif text.startswith("```"): text = text[3:] if text.endswith("```"): text = text[:-3] text = text.strip() data = {} try: parsed = json.loads(text) if isinstance(parsed, list): data = parsed[0] if len(parsed) > 0 else {} elif isinstance(parsed, dict): data = parsed except json.JSONDecodeError: logger.warning(f"JSON Parse Failed. Fallback to raw text.") pass return { "analysis_a": data.get("analysis_a", "Analysis unavailable."), "analysis_c": data.get("analysis_c", "Analysis unavailable."), "prompt": data.get("visual_prompt_b", text), "status": "success" } except Exception as e: logger.error(f"Analysis failed: {e}") return {"detail": str(e), "status": "error"} def generate_only(prompt, path_a, path_c, job_id, style, audio, neg, guidance, motion): try: update_job_status(job_id, "generating", 50, "Production started (Veo 3.1)...") full_prompt = f"{style} style. {prompt} Soundtrack: {audio}" if neg: full_prompt += f" --no {neg}" if not Settings.GCP_PROJECT_ID: raise Exception("GCP_PROJECT_ID missing.") client = genai.Client(vertexai=True, project=Settings.GCP_PROJECT_ID, location=Settings.GCP_LOCATION) # 1. Start Job op = client.models.generate_videos( model='veo-3.1-generate-preview', prompt=full_prompt, config=types.GenerateVideosConfig(number_of_videos=1) ) # 2. Extract ID String op_name = op.name if hasattr(op, 'name') else str(op) logger.info(f"Polling Job ID: {op_name}") # 3. Create Valid SDK Object for Polling polling_op = types.GenerateVideosOperation(name=op_name) start_time = time.time() while True: if time.time() - start_time > 600: raise Exception("Timeout (10m).") try: # Refresh logic: Pass the valid types.GenerateVideosOperation object refreshed_op = client.operations.get(polling_op) # Check status if hasattr(refreshed_op, 'done') and refreshed_op.done: logger.info("Generation Done.") op = refreshed_op break except Exception as e: logger.warning(f"Polling error: {e}") time.sleep(20) continue logger.info("Waiting for Veo...") time.sleep(20) # 4. Result Extraction res_val = op.result result = res_val() if callable(res_val) else res_val if result and (getattr(result, 'generated_videos', None) or 'generated_videos' in result): vid = result.generated_videos[0] if hasattr(result, 'generated_videos') else result['generated_videos'][0] bridge_path = tempfile.mktemp(suffix=".mp4") if hasattr(vid.video, 'uri') and vid.video.uri: download_blob(vid.video.uri, bridge_path) else: bridge_path = save_video_bytes(vid.video.video_bytes) update_job_status(job_id, "stitching", 85, "Stitching...") final_cut = os.path.join("outputs", f"{job_id}_merged_temp.mp4") merged_path = stitch_videos(path_a, bridge_path, path_c, final_cut) msg = "Done! (Merged)" if merged_path else "Done! (Bridge Only)" update_job_status(job_id, "completed", 100, msg, video_url=bridge_path, merged_video_url=merged_path) else: raise Exception("No video output.") except Exception as e: logger.error(f"Worker crashed: {e}") update_job_status(job_id, "error", 0, f"Error: {e}") finally: # Enforce Terminal State try: status_file = f"outputs/{job_id}.json" if os.path.exists(status_file): with open(status_file, "r") as f: data = json.load(f) status = data.get("status") if status not in ["completed", "error"]: logger.warning(f"Job {job_id} left in non-terminal state ({status}). Forcing error.") update_job_status(job_id, "error", 0, "Job terminated unexpectedly.") except Exception as e: logger.error(f"Final safety net failed: {e}")