| """ |
| Gallery store: persisted list of generated creatives per user in MongoDB. |
| """ |
|
|
| import uuid |
| from datetime import datetime, timezone, timedelta |
| from typing import Any |
|
|
| from app.mongo import get_mongo_db |
| from app.r2 import delete_object as r2_delete_object |
| DEFAULT_LIMIT = 100 |
| MAX_LIMIT = 500 |
| GALLERY_COLLECTION = "gallery_entries" |
|
|
| def _load_from_mongo(username: str) -> list[dict[str, Any]] | None: |
| db = get_mongo_db() |
| if db is None: |
| return None |
| docs = list( |
| db[GALLERY_COLLECTION] |
| .find({"username": username}, {"_id": 0}) |
| .sort("created_at", -1) |
| ) |
| return docs |
|
|
|
|
| def _save_to_mongo(username: str, entries: list[dict[str, Any]]) -> bool: |
| db = get_mongo_db() |
| if db is None: |
| return False |
| coll = db[GALLERY_COLLECTION] |
| coll.delete_many({"username": username}) |
| if entries: |
| coll.insert_many(entries) |
| return True |
|
|
|
|
| def load_entries(username: str, limit: int = DEFAULT_LIMIT, offset: int = 0) -> tuple[list[dict[str, Any]], int]: |
| """Load gallery entries for user, newest first. Returns (page_slice, total_count).""" |
| from_mongo = _load_from_mongo(username) |
| entries: list[dict[str, Any]] = from_mongo or [] |
| total = len(entries) |
| page = entries[offset : offset + min(limit, MAX_LIMIT)] |
| return (page, total) |
|
|
|
|
| def append_entry( |
| username: str, |
| *, |
| r2_key: str, |
| concept_name: str = "", |
| creative_id: int = 0, |
| product_name: str | None = None, |
| scene_prompt: str | None = None, |
| image_model: str | None = None, |
| ) -> dict[str, Any]: |
| """Append a gallery entry in MongoDB. Returns the new entry.""" |
| import datetime |
| now = datetime.datetime.now(datetime.timezone.utc).isoformat() |
| entry = { |
| "id": str(uuid.uuid4()), |
| "username": username, |
| "r2_key": r2_key, |
| "concept_name": concept_name or "", |
| "creative_id": creative_id, |
| "product_name": product_name or "", |
| "created_at": now, |
| } |
| if scene_prompt is not None: |
| entry["scene_prompt"] = scene_prompt |
| if image_model is not None: |
| entry["image_model"] = image_model |
| db = get_mongo_db() |
| if db is None: |
| raise RuntimeError("MongoDB is not configured") |
| db[GALLERY_COLLECTION].insert_one(entry) |
| return entry |
|
|
|
|
| def update_entry( |
| username: str, |
| entry_id: str, |
| **updates: Any, |
| ) -> bool: |
| """Update fields of an existing gallery entry. Returns True if updated.""" |
| allowed = {"r2_key", "concept_name", "creative_id", "product_name", "scene_prompt", "image_model"} |
| for key in list(updates.keys()): |
| if key not in allowed: |
| del updates[key] |
| if not updates: |
| return True |
| db = get_mongo_db() |
| if db is not None: |
| result = db[GALLERY_COLLECTION].update_one( |
| {"username": username, "id": entry_id}, |
| {"$set": updates}, |
| ) |
| return result.matched_count > 0 |
| return False |
|
|
|
|
| def delete_entry(username: str, entry_id: str) -> bool: |
| """Remove entry by id from MongoDB.""" |
| db = get_mongo_db() |
| if db is not None: |
| result = db[GALLERY_COLLECTION].delete_one({"username": username, "id": entry_id}) |
| return result.deleted_count > 0 |
| return False |
|
|
|
|
| def get_entry(username: str, entry_id: str) -> dict[str, Any] | None: |
| """Get a single entry by id.""" |
| db = get_mongo_db() |
| if db is not None: |
| return db[GALLERY_COLLECTION].find_one( |
| {"username": username, "id": entry_id}, |
| {"_id": 0}, |
| ) |
| return None |
|
|
|
|
| def _load_all_entries_for_user(username: str) -> list[dict[str, Any]]: |
| """Load full list of gallery entries for a user.""" |
| from_mongo = _load_from_mongo(username) |
| return from_mongo or [] |
|
|
|
|
| def delete_entries_older_than_days( |
| days: int = 60, |
| username: str | None = None, |
| ) -> dict[str, Any]: |
| """ |
| Delete gallery entries (and their R2 images) older than the given number of days. |
| If username is None, runs for all users (all .json files in gallery dir); otherwise only that user. |
| Returns {"deleted": count, "users_processed": count, "errors": []}. |
| """ |
| cutoff = datetime.now(timezone.utc) - timedelta(days=days) |
| deleted = 0 |
| users_processed = 0 |
| errors: list[str] = [] |
|
|
| def parse_created_at(entry: dict[str, Any]) -> datetime | None: |
| raw = entry.get("created_at") or "" |
| if not raw: |
| return None |
| try: |
| return datetime.fromisoformat(raw.replace("Z", "+00:00")) |
| except (ValueError, TypeError): |
| return None |
|
|
| if username is not None: |
| usernames = [username] |
| else: |
| db = get_mongo_db() |
| usernames = db[GALLERY_COLLECTION].distinct("username") if db is not None else [] |
|
|
| for uname in usernames: |
| entries = _load_all_entries_for_user(uname) |
| if not entries: |
| continue |
| users_processed += 1 |
| to_keep: list[dict[str, Any]] = [] |
| for e in entries: |
| created = parse_created_at(e) |
| if created is None: |
| to_keep.append(e) |
| continue |
| if created >= cutoff: |
| to_keep.append(e) |
| continue |
| r2_key = (e.get("r2_key") or "").strip() |
| if r2_key: |
| if not r2_delete_object(r2_key): |
| errors.append(f"R2 delete failed: {r2_key}") |
| deleted += 1 |
| if len(to_keep) < len(entries): |
| saved_to_mongo = _save_to_mongo(uname, to_keep) |
| if not saved_to_mongo: |
| errors.append(f"Save failed for {uname}: MongoDB unavailable") |
|
|
| return {"deleted": deleted, "users_processed": users_processed, "errors": errors} |
|
|