sushilideaclan01's picture
refactor: migrate user and gallery data storage from JSON files to MongoDB
c4a64b4
"""
Gallery store: persisted list of generated creatives per user in MongoDB.
"""
import uuid
from datetime import datetime, timezone, timedelta
from typing import Any
from app.mongo import get_mongo_db
from app.r2 import delete_object as r2_delete_object
DEFAULT_LIMIT = 100
MAX_LIMIT = 500
GALLERY_COLLECTION = "gallery_entries"
def _load_from_mongo(username: str) -> list[dict[str, Any]] | None:
db = get_mongo_db()
if db is None:
return None
docs = list(
db[GALLERY_COLLECTION]
.find({"username": username}, {"_id": 0})
.sort("created_at", -1)
)
return docs
def _save_to_mongo(username: str, entries: list[dict[str, Any]]) -> bool:
db = get_mongo_db()
if db is None:
return False
coll = db[GALLERY_COLLECTION]
coll.delete_many({"username": username})
if entries:
coll.insert_many(entries)
return True
def load_entries(username: str, limit: int = DEFAULT_LIMIT, offset: int = 0) -> tuple[list[dict[str, Any]], int]:
"""Load gallery entries for user, newest first. Returns (page_slice, total_count)."""
from_mongo = _load_from_mongo(username)
entries: list[dict[str, Any]] = from_mongo or []
total = len(entries)
page = entries[offset : offset + min(limit, MAX_LIMIT)]
return (page, total)
def append_entry(
username: str,
*,
r2_key: str,
concept_name: str = "",
creative_id: int = 0,
product_name: str | None = None,
scene_prompt: str | None = None,
image_model: str | None = None,
) -> dict[str, Any]:
"""Append a gallery entry in MongoDB. Returns the new entry."""
import datetime
now = datetime.datetime.now(datetime.timezone.utc).isoformat()
entry = {
"id": str(uuid.uuid4()),
"username": username,
"r2_key": r2_key,
"concept_name": concept_name or "",
"creative_id": creative_id,
"product_name": product_name or "",
"created_at": now,
}
if scene_prompt is not None:
entry["scene_prompt"] = scene_prompt
if image_model is not None:
entry["image_model"] = image_model
db = get_mongo_db()
if db is None:
raise RuntimeError("MongoDB is not configured")
db[GALLERY_COLLECTION].insert_one(entry)
return entry
def update_entry(
username: str,
entry_id: str,
**updates: Any,
) -> bool:
"""Update fields of an existing gallery entry. Returns True if updated."""
allowed = {"r2_key", "concept_name", "creative_id", "product_name", "scene_prompt", "image_model"}
for key in list(updates.keys()):
if key not in allowed:
del updates[key]
if not updates:
return True
db = get_mongo_db()
if db is not None:
result = db[GALLERY_COLLECTION].update_one(
{"username": username, "id": entry_id},
{"$set": updates},
)
return result.matched_count > 0
return False
def delete_entry(username: str, entry_id: str) -> bool:
"""Remove entry by id from MongoDB."""
db = get_mongo_db()
if db is not None:
result = db[GALLERY_COLLECTION].delete_one({"username": username, "id": entry_id})
return result.deleted_count > 0
return False
def get_entry(username: str, entry_id: str) -> dict[str, Any] | None:
"""Get a single entry by id."""
db = get_mongo_db()
if db is not None:
return db[GALLERY_COLLECTION].find_one(
{"username": username, "id": entry_id},
{"_id": 0},
)
return None
def _load_all_entries_for_user(username: str) -> list[dict[str, Any]]:
"""Load full list of gallery entries for a user."""
from_mongo = _load_from_mongo(username)
return from_mongo or []
def delete_entries_older_than_days(
days: int = 60,
username: str | None = None,
) -> dict[str, Any]:
"""
Delete gallery entries (and their R2 images) older than the given number of days.
If username is None, runs for all users (all .json files in gallery dir); otherwise only that user.
Returns {"deleted": count, "users_processed": count, "errors": []}.
"""
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
deleted = 0
users_processed = 0
errors: list[str] = []
def parse_created_at(entry: dict[str, Any]) -> datetime | None:
raw = entry.get("created_at") or ""
if not raw:
return None
try:
return datetime.fromisoformat(raw.replace("Z", "+00:00"))
except (ValueError, TypeError):
return None
if username is not None:
usernames = [username]
else:
db = get_mongo_db()
usernames = db[GALLERY_COLLECTION].distinct("username") if db is not None else []
for uname in usernames:
entries = _load_all_entries_for_user(uname)
if not entries:
continue
users_processed += 1
to_keep: list[dict[str, Any]] = []
for e in entries:
created = parse_created_at(e)
if created is None:
to_keep.append(e)
continue
if created >= cutoff:
to_keep.append(e)
continue
r2_key = (e.get("r2_key") or "").strip()
if r2_key:
if not r2_delete_object(r2_key):
errors.append(f"R2 delete failed: {r2_key}")
deleted += 1
if len(to_keep) < len(entries):
saved_to_mongo = _save_to_mongo(uname, to_keep)
if not saved_to_mongo:
errors.append(f"Save failed for {uname}: MongoDB unavailable")
return {"deleted": deleted, "users_processed": users_processed, "errors": errors}