File size: 7,095 Bytes
05ae0b9
a8c659a
 
c3f17d3
4e8e87d
bcae6dd
131b652
 
e9456a0
fad001e
e059cb3
a8c659a
01bccd8
4e8e87d
e059cb3
4e8e87d
 
 
 
 
 
 
e059cb3
4e8e87d
 
 
 
 
bcae6dd
4e8e87d
 
623bad5
4c4dca8
4e8e87d
a8c659a
e059cb3
fad001e
4c4dca8
e9456a0
4c4dca8
fad001e
4e8e87d
 
62973c2
fad001e
4c4dca8
4e8e87d
 
 
e059cb3
4c4dca8
 
 
 
 
 
 
 
 
 
62973c2
bcae6dd
c91372f
4c4dca8
bcae6dd
 
4c4dca8
535b8bb
4c4dca8
 
 
 
 
 
 
 
62973c2
 
4c4dca8
62973c2
4c4dca8
 
 
 
 
62973c2
4c4dca8
 
 
fad001e
4c4dca8
fad001e
a8c659a
e059cb3
fad001e
ef27e6d
59e0951
 
 
 
 
0c79677
 
0644f3e
0c79677
4c4dca8
62973c2
0c79677
 
 
 
 
 
53a2257
0c79677
62973c2
 
70214e8
53a2257
62973c2
0c79677
 
 
 
0644f3e
0c79677
53a2257
62973c2
 
 
 
 
53a2257
62973c2
 
0c79677
 
53a2257
 
0753051
0c79677
 
4c4dca8
62973c2
0c79677
 
 
 
 
 
0753051
0c79677
 
 
 
924120a
0c79677
4c4dca8
0c79677
c76f6fa
0c79677
 
dae328a
0c79677
4c4dca8
ef27e6d
59e0951
fad001e
59e0951
4c4dca8
59e0951
 
 
 
 
 
 
 
 
 
 
 
70214e8
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
import os
import time
import logging
import tempfile
import hashlib
import json
from google import genai
from google.genai import types
from config import Settings
from utils import download_to_temp, download_blob, save_video_bytes, update_job_status, stitch_videos

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def get_file_hash(filepath):
    hash_md5 = hashlib.md5()
    with open(filepath, "rb") as f:
        for chunk in iter(lambda: f.read(4096), b""):
            hash_md5.update(chunk)
    return hash_md5.hexdigest()


def get_or_upload_file(client, filepath):
    file_hash = get_file_hash(filepath)
    try:
        for f in client.files.list(config={'page_size': 50}):
            if f.display_name == file_hash and f.state.name == "ACTIVE":
                logger.info(f"♻️ Smart Cache Hit: {file_hash}")
                return f
    except Exception:
        pass
    logger.info(f"⬆️ Uploading new file: {file_hash}")
    return client.files.upload(file=filepath, config={'display_name': file_hash})


def analyze_only(path_a, path_c, job_id=None):
    update_job_status(job_id, "analyzing", 10, "Director checking file cache...")
    client = genai.Client(api_key=Settings.GOOGLE_API_KEY)

    try:
        file_a = get_or_upload_file(client, path_a)
        file_c = get_or_upload_file(client, path_c)

        while file_a.state.name == "PROCESSING" or file_c.state.name == "PROCESSING":
            update_job_status(job_id, "analyzing", 20, "Google processing video...")
            time.sleep(2)
            file_a = client.files.get(name=file_a.name)
            file_c = client.files.get(name=file_c.name)

        prompt = """
        You are a VFX Director. Analyze Video A and Video C.
        Return a JSON object with exactly these keys:
        {
            "analysis_a": "Brief description of Video A",
            "analysis_c": "Brief description of Video C",
            "visual_prompt_b": "A surreal, seamless morphing prompt that transforms A into C."
        }
        """
        update_job_status(job_id, "analyzing", 30, "Director drafting creative morph...")

        res = client.models.generate_content(
            model="gemini-2.0-flash", 
            contents=[prompt, file_a, file_c],
            config=types.GenerateContentConfig(response_mime_type="application/json")
        )
        
        text = res.text.strip()
        if text.startswith("```json"): text = text[7:]
        elif text.startswith("```"): text = text[3:]
        if text.endswith("```"): text = text[:-3]
        text = text.strip()
        
        data = {}
        try:
            parsed = json.loads(text)
            if isinstance(parsed, list): data = parsed[0] if len(parsed) > 0 else {}
            elif isinstance(parsed, dict): data = parsed
        except json.JSONDecodeError:
            logger.warning(f"JSON Parse Failed. Fallback to raw text.")
            pass

        return {
            "analysis_a": data.get("analysis_a", "Analysis unavailable."),
            "analysis_c": data.get("analysis_c", "Analysis unavailable."),
            "prompt": data.get("visual_prompt_b", text), 
            "status": "success"
        }

    except Exception as e:
        logger.error(f"Analysis failed: {e}")
        return {"detail": str(e), "status": "error"}


def generate_only(prompt, path_a, path_c, job_id, style, audio, neg, guidance, motion):
    try:
        update_job_status(job_id, "generating", 50, "Production started (Veo 3.1)...")
        full_prompt = f"{style} style. {prompt} Soundtrack: {audio}"
        if neg:
            full_prompt += f" --no {neg}"

        if not Settings.GCP_PROJECT_ID:
            raise Exception("GCP_PROJECT_ID missing.")
            
        client = genai.Client(vertexai=True, project=Settings.GCP_PROJECT_ID, location=Settings.GCP_LOCATION)
        
        # 1. Start Job
        op = client.models.generate_videos(
            model='veo-3.1-generate-preview', 
            prompt=full_prompt, 
            config=types.GenerateVideosConfig(number_of_videos=1)
        )
        
        # 2. Extract ID String
        op_name = op.name if hasattr(op, 'name') else str(op)
        logger.info(f"Polling Job ID: {op_name}")
        
        # 3. Create Valid SDK Object for Polling
        polling_op = types.GenerateVideosOperation(name=op_name)

        start_time = time.time()
        while True:
            if time.time() - start_time > 600:
                raise Exception("Timeout (10m).")
            
            try:
                # Refresh logic: Pass the valid types.GenerateVideosOperation object
                refreshed_op = client.operations.get(polling_op)
                
                # Check status
                if hasattr(refreshed_op, 'done') and refreshed_op.done:
                    logger.info("Generation Done.")
                    op = refreshed_op 
                    break
                    
            except Exception as e:
                logger.warning(f"Polling error: {e}")
                time.sleep(20)
                continue
            
            logger.info("Waiting for Veo...")
            time.sleep(20)

        # 4. Result Extraction
        res_val = op.result
        result = res_val() if callable(res_val) else res_val
        
        if result and (getattr(result, 'generated_videos', None) or 'generated_videos' in result):
            vid = result.generated_videos[0] if hasattr(result, 'generated_videos') else result['generated_videos'][0]
            bridge_path = tempfile.mktemp(suffix=".mp4")
            
            if hasattr(vid.video, 'uri') and vid.video.uri:
                download_blob(vid.video.uri, bridge_path)
            else:
                bridge_path = save_video_bytes(vid.video.video_bytes)
            
            update_job_status(job_id, "stitching", 85, "Stitching...")
            final_cut = os.path.join("outputs", f"{job_id}_merged_temp.mp4")
            merged_path = stitch_videos(path_a, bridge_path, path_c, final_cut)
            
            msg = "Done! (Merged)" if merged_path else "Done! (Bridge Only)"
            update_job_status(job_id, "completed", 100, msg, video_url=bridge_path, merged_video_url=merged_path)
        else:
            raise Exception("No video output.")

    except Exception as e:
        logger.error(f"Worker crashed: {e}")
        update_job_status(job_id, "error", 0, f"Error: {e}")

    finally:
        # Enforce Terminal State
        try:
            status_file = f"outputs/{job_id}.json"
            if os.path.exists(status_file):
                with open(status_file, "r") as f:
                    data = json.load(f)

                status = data.get("status")
                if status not in ["completed", "error"]:
                    logger.warning(f"Job {job_id} left in non-terminal state ({status}). Forcing error.")
                    update_job_status(job_id, "error", 0, "Job terminated unexpectedly.")
        except Exception as e:
            logger.error(f"Final safety net failed: {e}")