videogen / drivers /polling.py
evaluator42's picture
Upload 14 files
4336d0b verified
"""
Polls the status of an asynchronous Bedrock job and fetches the output video upon completion.
Args:
bedrock_runtime: The Bedrock runtime client used to poll the job status.
s3_client: The S3 client used to download the output video.
invocation_arn (str): The ARN of the asynchronous Bedrock job to poll.
Yields:
tuple:
- local_mp4 (str or None): Path to the downloaded MP4 file if the job is completed successfully, otherwise None.
- message (str): Status or error message describing the current state or any issues encountered.
Behavior:
- Continuously polls the job status at intervals defined by POLL_INTERVAL_SECS.
- Yields status updates when the job status changes.
- On completion, attempts to download the output MP4 from S3 and yields the local file path.
- Handles and yields error messages for polling failures, download failures, or job failures.
"""
import time
from botocore.exceptions import BotoCoreError, ClientError
from config import POLL_INTERVAL_SECS
from VideoBR.utils.s3_io import download_output_mp4
def poll_and_fetch(bedrock_runtime, s3_client, invocation_arn: str):
last_status = None
while True:
try:
job = bedrock_runtime.get_async_invoke(invocationArn=invocation_arn)
status = job.get("status", "Unknown")
except (ClientError, BotoCoreError) as e:
yield None, f"❌ Error polling job: `{e}`"
return
if status != last_status:
yield None, f"πŸ›°οΈ Status: **{status}**"
last_status = status
if status == "Completed":
try:
base_uri = job["outputDataConfig"]["s3OutputDataConfig"]["s3Uri"]
local_mp4 = download_output_mp4(s3_client, base_uri)
yield local_mp4, f"βœ… Completed. Downloaded `output.mp4` from `{base_uri}`"
return
except Exception as e:
yield None, f"❌ Completed but failed to download video: `{e}`"
return
if status == "Failed":
yield None, f"❌ Job failed: {job.get('failureMessage', 'Unknown error')}"
return
time.sleep(max(1.0, POLL_INTERVAL_SECS))