import boto3 from uuid import uuid4 import os import cv2 import base64 import logging logger = logging.getLogger(__name__) def get_video_thumbnail_base64(video_path: str, time_sec: int = 1) -> str: try: cap = cv2.VideoCapture(video_path) cap.set(cv2.CAP_PROP_POS_MSEC, time_sec * 1000) success, frame = cap.read() cap.release() if not success: return "" _, buffer = cv2.imencode(".jpg", frame) return base64.b64encode(buffer).decode("utf-8") except Exception: logger.exception("Thumbnail extraction failed") return "" log = logging.getLogger(__name__) def upload_image_to_r2(image_bytes, folder_name="search_arb", app_type="bulk_generation"): try: s3 = boto3.client( "s3", endpoint_url=os.getenv('R2_ENDPOINT'), aws_access_key_id=os.getenv('R2_ACCESS_KEY'), aws_secret_access_key=os.getenv('R2_SECRET_KEY'), region_name="auto" ) filename = f"{uuid4().hex}.png" file_key = f"hug_face/{app_type.strip('/')}/{folder_name.strip('/')}/{filename}" if folder_name else f"hug_face/{app_type.strip('/')}/{filename}" s3.put_object(Bucket=os.getenv('R2_BUCKET_NAME'), Key=file_key, Body=image_bytes, ContentType="image/png") return f"{os.getenv('NEW_BASE').rstrip('/')}/{file_key}" except Exception as e: log.error(e) return None def encode_image_to_base64(image_path: str) -> str: with open(image_path, "rb") as f: return base64.b64encode(f.read()).decode("utf-8") def is_valid_image(file_name: str) -> bool: return file_name.lower().endswith((".png", ".jpg", ".jpeg", ".bmp", ".gif", ".webp")) def _mean_effectiveness(metrics): if not metrics: return 0.0 scores = [] for m in metrics: s = str(m.get("effectiveness_score", "0/10")).split("/")[0] try: scores.append(int(s)) except Exception: pass return round(sum(scores) / len(scores), 2) if scores else 0.0