import os, cv2, base64, logging, zipfile, requests, tempfile, shutil import boto3 from uuid import uuid4 from typing import Any, Dict, List, Optional from datetime import datetime from dotenv import load_dotenv from io import BytesIO logger = logging.getLogger(__name__) load_dotenv() # --------------------------- # R2 Config # --------------------------- R2_ACCESS_KEY = os.getenv("R2_ACCESS_KEY") R2_SECRET_KEY = os.getenv("R2_SECRET_KEY") R2_BUCKET_NAME = os.getenv("R2_BUCKET_NAME") R2_ENDPOINT = os.getenv("R2_ENDPOINT") NEW_BASE = os.getenv("NEW_BASE", "").rstrip("/") if not all([R2_ACCESS_KEY, R2_SECRET_KEY, R2_BUCKET_NAME, R2_ENDPOINT, NEW_BASE]): logger.warning("Some R2 env vars are missing. Uploads may fail.") r2 = boto3.client( "s3", endpoint_url=R2_ENDPOINT, aws_access_key_id=R2_ACCESS_KEY, aws_secret_access_key=R2_SECRET_KEY, ) # --------------------------- # R2 Upload Helpers # --------------------------- def upload_to_r2(file_path: str, key: str, content_type: str = "application/octet-stream") -> str: """ Upload a file from disk to R2 and return public URL. """ r2.upload_file( file_path, R2_BUCKET_NAME, key, ExtraArgs={"ContentType": content_type}, ) return f"{NEW_BASE}/{key}" def upload_image_to_r2( image_bytes: bytes, folder_name: str = "default", app_type: str = "image_uploads", ext: str = "png", content_type: str = "image/png", ) -> Optional[str]: """ Upload raw image bytes to R2 and return public URL. """ try: filename = f"{uuid4().hex}.{ext.lstrip('.')}" file_key = f"{app_type.strip('/')}/{folder_name.strip('/')}/{filename}" r2.put_object( Bucket=R2_BUCKET_NAME, Key=file_key, Body=image_bytes, ContentType=content_type ) return f"{NEW_BASE}/{file_key}" except Exception as e: logger.error(f"upload_image_to_r2 failed: {e}") return None def upload_pil_image_to_r2( pil_img, folder_name: str = "default", app_type: str = "image_uploads", format: str = "PNG", ) -> Optional[str]: """ Takes a PIL image, converts to bytes, uploads to R2, returns public URL. """ try: buf = BytesIO() pil_img.save(buf, format=format, optimize=True) img_bytes = buf.getvalue() ext = format.lower() content_type = f"image/{ext if ext != 'jpg' else 'jpeg'}" return upload_image_to_r2( img_bytes, folder_name=folder_name, app_type=app_type, ext=ext, content_type=content_type ) except Exception as e: logger.error(f"upload_pil_image_to_r2 failed: {e}") return None # --------------------------- # Media helpers (unchanged) # --------------------------- def get_video_thumbnail_base64(video_path: str, time_sec: int = 1) -> str: try: cap = cv2.VideoCapture(video_path) cap.set(cv2.CAP_PROP_POS_MSEC, time_sec * 1000) success, frame = cap.read() cap.release() if not success: return "" _, buffer = cv2.imencode(".jpg", frame) return base64.b64encode(buffer).decode("utf-8") except Exception: logger.exception("Thumbnail extraction failed") return "" def encode_image_to_base64(image_path: str) -> str: with open(image_path, "rb") as f: return base64.b64encode(f.read()).decode("utf-8") def is_valid_image(file_name: str) -> bool: return file_name.lower().endswith((".png", ".jpg", ".jpeg", ".bmp", ".gif", ".webp")) # --------------------------- # Stats / labels (unchanged) # --------------------------- def _mean_effectiveness(metrics): if not metrics: return 0.0 scores = [] for m in metrics: s = str(m.get("effectiveness_score", "0/10")).split("/")[0] try: scores.append(int(s)) except Exception: pass return round(sum(scores) / len(scores), 2) if scores else 0.0 def _coerce_dt(val: Any) -> Optional[datetime]: if isinstance(val, datetime): return val try: return datetime.fromisoformat(str(val)) except Exception: return None def _label_for_item(doc: Dict[str, Any]) -> str: ts = _coerce_dt(doc.get("created_at")) ts_s = ts.strftime("%Y-%m-%d %H:%M") if ts else "Unknown time" cat = doc.get("category") or "—" model = doc.get("analyzer_model") or doc.get("model_name") or "—" return f"{ts_s} · {cat} · {model}" # --------------------------- # ZIP gallery (unchanged) # --------------------------- def _zip_gallery_images(gallery_items): if not gallery_items: return None image_urls = [item["url"] if isinstance(item, dict) else item for item in gallery_items] temp_dir = tempfile.mkdtemp() try: zip_path = tempfile.NamedTemporaryFile(delete=False, suffix=".zip").name for i, url in enumerate(image_urls): try: ext = url.split("?")[0].split(".")[-1] ext = ext if ext and len(ext) <= 5 else "png" file_path = os.path.join(temp_dir, f"image_{i}.{ext}") if url.startswith(("http://", "https://")): resp = requests.get(url, timeout=15) resp.raise_for_status() with open(file_path, "wb") as f: f.write(resp.content) elif os.path.exists(url): shutil.copy(url, file_path) except Exception as e: logger.error(f"Error processing image {url}: {e}") with zipfile.ZipFile(zip_path, "w") as zipf: for file_name in os.listdir(temp_dir): zipf.write(os.path.join(temp_dir, file_name), arcname=file_name) return zip_path except Exception as e: logger.critical(f"Failed to create zip: {e}") return None finally: shutil.rmtree(temp_dir, ignore_errors=True)