|
|
import os, cv2, base64, logging, zipfile, requests, tempfile, shutil |
|
|
import boto3 |
|
|
from uuid import uuid4 |
|
|
from typing import Any, Dict, List, Optional |
|
|
from datetime import datetime |
|
|
from dotenv import load_dotenv |
|
|
from io import BytesIO |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
R2_ACCESS_KEY = os.getenv("R2_ACCESS_KEY") |
|
|
R2_SECRET_KEY = os.getenv("R2_SECRET_KEY") |
|
|
R2_BUCKET_NAME = os.getenv("R2_BUCKET_NAME") |
|
|
R2_ENDPOINT = os.getenv("R2_ENDPOINT") |
|
|
NEW_BASE = os.getenv("NEW_BASE", "").rstrip("/") |
|
|
|
|
|
if not all([R2_ACCESS_KEY, R2_SECRET_KEY, R2_BUCKET_NAME, R2_ENDPOINT, NEW_BASE]): |
|
|
logger.warning("Some R2 env vars are missing. Uploads may fail.") |
|
|
|
|
|
r2 = boto3.client( |
|
|
"s3", |
|
|
endpoint_url=R2_ENDPOINT, |
|
|
aws_access_key_id=R2_ACCESS_KEY, |
|
|
aws_secret_access_key=R2_SECRET_KEY, |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def upload_to_r2(file_path: str, key: str, content_type: str = "application/octet-stream") -> str: |
|
|
""" |
|
|
Upload a file from disk to R2 and return public URL. |
|
|
""" |
|
|
r2.upload_file( |
|
|
file_path, |
|
|
R2_BUCKET_NAME, |
|
|
key, |
|
|
ExtraArgs={"ContentType": content_type}, |
|
|
) |
|
|
return f"{NEW_BASE}/{key}" |
|
|
|
|
|
|
|
|
def upload_image_to_r2( |
|
|
image_bytes: bytes, |
|
|
folder_name: str = "default", |
|
|
app_type: str = "image_uploads", |
|
|
ext: str = "png", |
|
|
content_type: str = "image/png", |
|
|
) -> Optional[str]: |
|
|
""" |
|
|
Upload raw image bytes to R2 and return public URL. |
|
|
""" |
|
|
try: |
|
|
filename = f"{uuid4().hex}.{ext.lstrip('.')}" |
|
|
file_key = f"{app_type.strip('/')}/{folder_name.strip('/')}/{filename}" |
|
|
|
|
|
r2.put_object( |
|
|
Bucket=R2_BUCKET_NAME, |
|
|
Key=file_key, |
|
|
Body=image_bytes, |
|
|
ContentType=content_type |
|
|
) |
|
|
return f"{NEW_BASE}/{file_key}" |
|
|
except Exception as e: |
|
|
logger.error(f"upload_image_to_r2 failed: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
def upload_pil_image_to_r2( |
|
|
pil_img, |
|
|
folder_name: str = "default", |
|
|
app_type: str = "image_uploads", |
|
|
format: str = "PNG", |
|
|
) -> Optional[str]: |
|
|
""" |
|
|
Takes a PIL image, converts to bytes, uploads to R2, |
|
|
returns public URL. |
|
|
""" |
|
|
try: |
|
|
buf = BytesIO() |
|
|
pil_img.save(buf, format=format, optimize=True) |
|
|
img_bytes = buf.getvalue() |
|
|
|
|
|
ext = format.lower() |
|
|
content_type = f"image/{ext if ext != 'jpg' else 'jpeg'}" |
|
|
|
|
|
return upload_image_to_r2( |
|
|
img_bytes, |
|
|
folder_name=folder_name, |
|
|
app_type=app_type, |
|
|
ext=ext, |
|
|
content_type=content_type |
|
|
) |
|
|
except Exception as e: |
|
|
logger.error(f"upload_pil_image_to_r2 failed: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_video_thumbnail_base64(video_path: str, time_sec: int = 1) -> str: |
|
|
try: |
|
|
cap = cv2.VideoCapture(video_path) |
|
|
cap.set(cv2.CAP_PROP_POS_MSEC, time_sec * 1000) |
|
|
success, frame = cap.read() |
|
|
cap.release() |
|
|
if not success: |
|
|
return "" |
|
|
_, buffer = cv2.imencode(".jpg", frame) |
|
|
return base64.b64encode(buffer).decode("utf-8") |
|
|
except Exception: |
|
|
logger.exception("Thumbnail extraction failed") |
|
|
return "" |
|
|
|
|
|
|
|
|
def encode_image_to_base64(image_path: str) -> str: |
|
|
with open(image_path, "rb") as f: |
|
|
return base64.b64encode(f.read()).decode("utf-8") |
|
|
|
|
|
|
|
|
def is_valid_image(file_name: str) -> bool: |
|
|
return file_name.lower().endswith((".png", ".jpg", ".jpeg", ".bmp", ".gif", ".webp")) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _mean_effectiveness(metrics): |
|
|
if not metrics: |
|
|
return 0.0 |
|
|
scores = [] |
|
|
for m in metrics: |
|
|
s = str(m.get("effectiveness_score", "0/10")).split("/")[0] |
|
|
try: |
|
|
scores.append(int(s)) |
|
|
except Exception: |
|
|
pass |
|
|
return round(sum(scores) / len(scores), 2) if scores else 0.0 |
|
|
|
|
|
|
|
|
def _coerce_dt(val: Any) -> Optional[datetime]: |
|
|
if isinstance(val, datetime): |
|
|
return val |
|
|
try: |
|
|
return datetime.fromisoformat(str(val)) |
|
|
except Exception: |
|
|
return None |
|
|
|
|
|
|
|
|
def _label_for_item(doc: Dict[str, Any]) -> str: |
|
|
ts = _coerce_dt(doc.get("created_at")) |
|
|
ts_s = ts.strftime("%Y-%m-%d %H:%M") if ts else "Unknown time" |
|
|
cat = doc.get("category") or "—" |
|
|
model = doc.get("analyzer_model") or doc.get("model_name") or "—" |
|
|
return f"{ts_s} · {cat} · {model}" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _zip_gallery_images(gallery_items): |
|
|
if not gallery_items: |
|
|
return None |
|
|
|
|
|
image_urls = [item["url"] if isinstance(item, dict) else item for item in gallery_items] |
|
|
temp_dir = tempfile.mkdtemp() |
|
|
|
|
|
try: |
|
|
zip_path = tempfile.NamedTemporaryFile(delete=False, suffix=".zip").name |
|
|
|
|
|
for i, url in enumerate(image_urls): |
|
|
try: |
|
|
ext = url.split("?")[0].split(".")[-1] |
|
|
ext = ext if ext and len(ext) <= 5 else "png" |
|
|
file_path = os.path.join(temp_dir, f"image_{i}.{ext}") |
|
|
|
|
|
if url.startswith(("http://", "https://")): |
|
|
resp = requests.get(url, timeout=15) |
|
|
resp.raise_for_status() |
|
|
with open(file_path, "wb") as f: |
|
|
f.write(resp.content) |
|
|
elif os.path.exists(url): |
|
|
shutil.copy(url, file_path) |
|
|
except Exception as e: |
|
|
logger.error(f"Error processing image {url}: {e}") |
|
|
|
|
|
with zipfile.ZipFile(zip_path, "w") as zipf: |
|
|
for file_name in os.listdir(temp_dir): |
|
|
zipf.write(os.path.join(temp_dir, file_name), arcname=file_name) |
|
|
|
|
|
return zip_path |
|
|
|
|
|
except Exception as e: |
|
|
logger.critical(f"Failed to create zip: {e}") |
|
|
return None |
|
|
|
|
|
finally: |
|
|
shutil.rmtree(temp_dir, ignore_errors=True) |
|
|
|