File size: 2,039 Bytes
3c2b4ae
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import boto3
from uuid import uuid4
import os

import cv2
import base64
import logging

logger = logging.getLogger(__name__)

def get_video_thumbnail_base64(video_path: str, time_sec: int = 1) -> str:
    try:
        cap = cv2.VideoCapture(video_path)
        cap.set(cv2.CAP_PROP_POS_MSEC, time_sec * 1000)
        success, frame = cap.read()
        cap.release()
        if not success:
            return ""
        _, buffer = cv2.imencode(".jpg", frame)
        return base64.b64encode(buffer).decode("utf-8")
    except Exception:
        logger.exception("Thumbnail extraction failed")
        return ""


log = logging.getLogger(__name__)

def upload_image_to_r2(image_bytes, folder_name="search_arb", app_type="bulk_generation"):
    try:
        s3 = boto3.client(
            "s3",
            endpoint_url=os.getenv('R2_ENDPOINT'),
            aws_access_key_id=os.getenv('R2_ACCESS_KEY'),
            aws_secret_access_key=os.getenv('R2_SECRET_KEY'),
            region_name="auto"
        )
        filename = f"{uuid4().hex}.png"
        file_key = f"hug_face/{app_type.strip('/')}/{folder_name.strip('/')}/{filename}" if folder_name else f"hug_face/{app_type.strip('/')}/{filename}"
        s3.put_object(Bucket=os.getenv('R2_BUCKET_NAME'), Key=file_key, Body=image_bytes, ContentType="image/png")
        return f"{os.getenv('NEW_BASE').rstrip('/')}/{file_key}"
    except Exception as e:
        log.error(e)
        return None


def encode_image_to_base64(image_path: str) -> str:
    with open(image_path, "rb") as f:
        return base64.b64encode(f.read()).decode("utf-8")

def is_valid_image(file_name: str) -> bool:
    return file_name.lower().endswith((".png", ".jpg", ".jpeg", ".bmp", ".gif", ".webp"))

def _mean_effectiveness(metrics):
    if not metrics: return 0.0
    scores = []
    for m in metrics:
        s = str(m.get("effectiveness_score", "0/10")).split("/")[0]
        try: scores.append(int(s))
        except Exception: pass
    return round(sum(scores) / len(scores), 2) if scores else 0.0