Spaces:
Sleeping
Sleeping
| """ | |
| Polls the status of an asynchronous Bedrock job and fetches the output video upon completion. | |
| Args: | |
| bedrock_runtime: The Bedrock runtime client used to poll the job status. | |
| s3_client: The S3 client used to download the output video. | |
| invocation_arn (str): The ARN of the asynchronous Bedrock job to poll. | |
| Yields: | |
| tuple: | |
| - local_mp4 (str or None): Path to the downloaded MP4 file if the job is completed successfully, otherwise None. | |
| - message (str): Status or error message describing the current state or any issues encountered. | |
| Behavior: | |
| - Continuously polls the job status at intervals defined by POLL_INTERVAL_SECS. | |
| - Yields status updates when the job status changes. | |
| - On completion, attempts to download the output MP4 from S3 and yields the local file path. | |
| - Handles and yields error messages for polling failures, download failures, or job failures. | |
| """ | |
| import time | |
| from botocore.exceptions import BotoCoreError, ClientError | |
| from config import POLL_INTERVAL_SECS | |
| from VideoBR.utils.s3_io import download_output_mp4 | |
| def poll_and_fetch(bedrock_runtime, s3_client, invocation_arn: str): | |
| last_status = None | |
| while True: | |
| try: | |
| job = bedrock_runtime.get_async_invoke(invocationArn=invocation_arn) | |
| status = job.get("status", "Unknown") | |
| except (ClientError, BotoCoreError) as e: | |
| yield None, f"β Error polling job: `{e}`" | |
| return | |
| if status != last_status: | |
| yield None, f"π°οΈ Status: **{status}**" | |
| last_status = status | |
| if status == "Completed": | |
| try: | |
| base_uri = job["outputDataConfig"]["s3OutputDataConfig"]["s3Uri"] | |
| local_mp4 = download_output_mp4(s3_client, base_uri) | |
| yield local_mp4, f"β Completed. Downloaded `output.mp4` from `{base_uri}`" | |
| return | |
| except Exception as e: | |
| yield None, f"β Completed but failed to download video: `{e}`" | |
| return | |
| if status == "Failed": | |
| yield None, f"β Job failed: {job.get('failureMessage', 'Unknown error')}" | |
| return | |
| time.sleep(max(1.0, POLL_INTERVAL_SECS)) |