Spaces:
Sleeping
Sleeping
| import os | |
| import time | |
| import logging | |
| import tempfile | |
| import hashlib | |
| import json | |
| from google import genai | |
| from google.genai import types | |
| from config import Settings | |
| from utils import download_to_temp, download_blob, save_video_bytes, update_job_status, stitch_videos | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| def get_file_hash(filepath): | |
| hash_md5 = hashlib.md5() | |
| with open(filepath, "rb") as f: | |
| for chunk in iter(lambda: f.read(4096), b""): | |
| hash_md5.update(chunk) | |
| return hash_md5.hexdigest() | |
| def get_or_upload_file(client, filepath): | |
| file_hash = get_file_hash(filepath) | |
| try: | |
| for f in client.files.list(config={'page_size': 50}): | |
| if f.display_name == file_hash and f.state.name == "ACTIVE": | |
| logger.info(f"♻️ Smart Cache Hit: {file_hash}") | |
| return f | |
| except Exception: | |
| pass | |
| logger.info(f"⬆️ Uploading new file: {file_hash}") | |
| return client.files.upload(file=filepath, config={'display_name': file_hash}) | |
| def analyze_only(path_a, path_c, job_id=None): | |
| update_job_status(job_id, "analyzing", 10, "Director checking file cache...") | |
| client = genai.Client(api_key=Settings.GOOGLE_API_KEY) | |
| try: | |
| file_a = get_or_upload_file(client, path_a) | |
| file_c = get_or_upload_file(client, path_c) | |
| while file_a.state.name == "PROCESSING" or file_c.state.name == "PROCESSING": | |
| update_job_status(job_id, "analyzing", 20, "Google processing video...") | |
| time.sleep(2) | |
| file_a = client.files.get(name=file_a.name) | |
| file_c = client.files.get(name=file_c.name) | |
| prompt = """ | |
| You are a VFX Director. Analyze Video A and Video C. | |
| Return a JSON object with exactly these keys: | |
| { | |
| "analysis_a": "Brief description of Video A", | |
| "analysis_c": "Brief description of Video C", | |
| "visual_prompt_b": "A surreal, seamless morphing prompt that transforms A into C." | |
| } | |
| """ | |
| update_job_status(job_id, "analyzing", 30, "Director drafting creative morph...") | |
| res = client.models.generate_content( | |
| model="gemini-2.0-flash", | |
| contents=[prompt, file_a, file_c], | |
| config=types.GenerateContentConfig(response_mime_type="application/json") | |
| ) | |
| text = res.text.strip() | |
| if text.startswith("```json"): text = text[7:] | |
| elif text.startswith("```"): text = text[3:] | |
| if text.endswith("```"): text = text[:-3] | |
| text = text.strip() | |
| data = {} | |
| try: | |
| parsed = json.loads(text) | |
| if isinstance(parsed, list): data = parsed[0] if len(parsed) > 0 else {} | |
| elif isinstance(parsed, dict): data = parsed | |
| except json.JSONDecodeError: | |
| logger.warning(f"JSON Parse Failed. Fallback to raw text.") | |
| pass | |
| return { | |
| "analysis_a": data.get("analysis_a", "Analysis unavailable."), | |
| "analysis_c": data.get("analysis_c", "Analysis unavailable."), | |
| "prompt": data.get("visual_prompt_b", text), | |
| "status": "success" | |
| } | |
| except Exception as e: | |
| logger.error(f"Analysis failed: {e}") | |
| return {"detail": str(e), "status": "error"} | |
| def generate_only(prompt, path_a, path_c, job_id, style, audio, neg, guidance, motion): | |
| try: | |
| update_job_status(job_id, "generating", 50, "Production started (Veo 3.1)...") | |
| full_prompt = f"{style} style. {prompt} Soundtrack: {audio}" | |
| if neg: | |
| full_prompt += f" --no {neg}" | |
| if not Settings.GCP_PROJECT_ID: | |
| raise Exception("GCP_PROJECT_ID missing.") | |
| client = genai.Client(vertexai=True, project=Settings.GCP_PROJECT_ID, location=Settings.GCP_LOCATION) | |
| # 1. Start Job | |
| op = client.models.generate_videos( | |
| model='veo-3.1-generate-preview', | |
| prompt=full_prompt, | |
| config=types.GenerateVideosConfig(number_of_videos=1) | |
| ) | |
| # 2. Extract ID String | |
| op_name = op.name if hasattr(op, 'name') else str(op) | |
| logger.info(f"Polling Job ID: {op_name}") | |
| # 3. Create Valid SDK Object for Polling | |
| polling_op = types.GenerateVideosOperation(name=op_name) | |
| start_time = time.time() | |
| while True: | |
| if time.time() - start_time > 600: | |
| raise Exception("Timeout (10m).") | |
| try: | |
| # Refresh logic: Pass the valid types.GenerateVideosOperation object | |
| refreshed_op = client.operations.get(polling_op) | |
| # Check status | |
| if hasattr(refreshed_op, 'done') and refreshed_op.done: | |
| logger.info("Generation Done.") | |
| op = refreshed_op | |
| break | |
| except Exception as e: | |
| logger.warning(f"Polling error: {e}") | |
| time.sleep(20) | |
| continue | |
| logger.info("Waiting for Veo...") | |
| time.sleep(20) | |
| # 4. Result Extraction | |
| res_val = op.result | |
| result = res_val() if callable(res_val) else res_val | |
| if result and (getattr(result, 'generated_videos', None) or 'generated_videos' in result): | |
| vid = result.generated_videos[0] if hasattr(result, 'generated_videos') else result['generated_videos'][0] | |
| bridge_path = tempfile.mktemp(suffix=".mp4") | |
| if hasattr(vid.video, 'uri') and vid.video.uri: | |
| download_blob(vid.video.uri, bridge_path) | |
| else: | |
| bridge_path = save_video_bytes(vid.video.video_bytes) | |
| update_job_status(job_id, "stitching", 85, "Stitching...") | |
| final_cut = os.path.join("outputs", f"{job_id}_merged_temp.mp4") | |
| merged_path = stitch_videos(path_a, bridge_path, path_c, final_cut) | |
| msg = "Done! (Merged)" if merged_path else "Done! (Bridge Only)" | |
| update_job_status(job_id, "completed", 100, msg, video_url=bridge_path, merged_video_url=merged_path) | |
| else: | |
| raise Exception("No video output.") | |
| except Exception as e: | |
| logger.error(f"Worker crashed: {e}") | |
| update_job_status(job_id, "error", 0, f"Error: {e}") | |
| finally: | |
| # Enforce Terminal State | |
| try: | |
| status_file = f"outputs/{job_id}.json" | |
| if os.path.exists(status_file): | |
| with open(status_file, "r") as f: | |
| data = json.load(f) | |
| status = data.get("status") | |
| if status not in ["completed", "error"]: | |
| logger.warning(f"Job {job_id} left in non-terminal state ({status}). Forcing error.") | |
| update_job_status(job_id, "error", 0, "Job terminated unexpectedly.") | |
| except Exception as e: | |
| logger.error(f"Final safety net failed: {e}") | |