""" Polls the status of an asynchronous Bedrock job and fetches the output video upon completion. Args: bedrock_runtime: The Bedrock runtime client used to poll the job status. s3_client: The S3 client used to download the output video. invocation_arn (str): The ARN of the asynchronous Bedrock job to poll. Yields: tuple: - local_mp4 (str or None): Path to the downloaded MP4 file if the job is completed successfully, otherwise None. - message (str): Status or error message describing the current state or any issues encountered. Behavior: - Continuously polls the job status at intervals defined by POLL_INTERVAL_SECS. - Yields status updates when the job status changes. - On completion, attempts to download the output MP4 from S3 and yields the local file path. - Handles and yields error messages for polling failures, download failures, or job failures. """ import time from botocore.exceptions import BotoCoreError, ClientError from config import POLL_INTERVAL_SECS from VideoBR.utils.s3_io import download_output_mp4 def poll_and_fetch(bedrock_runtime, s3_client, invocation_arn: str): last_status = None while True: try: job = bedrock_runtime.get_async_invoke(invocationArn=invocation_arn) status = job.get("status", "Unknown") except (ClientError, BotoCoreError) as e: yield None, f"❌ Error polling job: `{e}`" return if status != last_status: yield None, f"🛰️ Status: **{status}**" last_status = status if status == "Completed": try: base_uri = job["outputDataConfig"]["s3OutputDataConfig"]["s3Uri"] local_mp4 = download_output_mp4(s3_client, base_uri) yield local_mp4, f"✅ Completed. Downloaded `output.mp4` from `{base_uri}`" return except Exception as e: yield None, f"❌ Completed but failed to download video: `{e}`" return if status == "Failed": yield None, f"❌ Job failed: {job.get('failureMessage', 'Unknown error')}" return time.sleep(max(1.0, POLL_INTERVAL_SECS))