Spaces:
Runtime error
Runtime error
| from sqlalchemy import select | |
| from db.models import User_Meta, Metadata, Category | |
| from db.query.base_query import BaseQuery | |
| class UserMetaQuery(BaseQuery): | |
| def __init__(self, user): | |
| super().__init__(user) | |
| def get_user_meta_entries(self, db): | |
| """Fetch all user meta entries joined with metadata and category.""" | |
| join_models = [Metadata, Category] | |
| join_conditions = [ | |
| User_Meta.metadata_id == Metadata.id, | |
| Metadata.category_id == Category.id, | |
| ] | |
| filter_conditions = [User_Meta.user_id == self.user_id] | |
| result = self.get_with_joins( | |
| db, | |
| model=User_Meta, | |
| join_models=join_models, | |
| join_conditions=join_conditions, | |
| filter_conditions=filter_conditions, | |
| multiple=True | |
| ) | |
| return result | |
| def insert_user_meta_entries(self, db, metadata_ids): | |
| """Insert new user meta entries if they don't already exist.""" | |
| # Fetch existing metadata IDs for the user | |
| existing_metadata_ids = { | |
| entry.metadata_id | |
| for entry in db.query(User_Meta).filter_by(user_id=self.user.get("id")).all() | |
| } | |
| # Filter out metadata IDs that already exist | |
| new_metadata_ids = [ | |
| mid for mid in metadata_ids if mid not in existing_metadata_ids | |
| ] | |
| if not new_metadata_ids: | |
| return { | |
| "message": "No new entries to add; all specified metadata IDs already exist.", | |
| "existing_metadata_ids": existing_metadata_ids, | |
| } | |
| user_meta_entries = [ | |
| User_Meta(user_id=self.user.get("id"), metadata_id=mid) | |
| for mid in new_metadata_ids | |
| ] | |
| # Use the method from BaseQuery to insert entries | |
| self.insert_entries(db, user_meta_entries) | |
| return { | |
| "message": "User meta entries added successfully.", | |
| "metadata_ids": new_metadata_ids, # Include only new metadata IDs in the result | |
| } | |
| def update_user_meta_entries(self, db, metadata_ids): | |
| """Update user meta entries: keep, delete, or add new entries based on metadata_ids.""" | |
| filter_conditions = [User_Meta.user_id == self.user_id] | |
| # Fetch existing user meta entries | |
| existing_user_meta = self.get(db, model=User_Meta, filter_conditions=filter_conditions, multiple=True) | |
| existing_user_meta = [user_meta[0] for user_meta in existing_user_meta] | |
| existing_meta_ids = [entry.metadata_id for entry in existing_user_meta] | |
| # Convert both lists to sets once for efficiency | |
| metadata_ids_set = set(metadata_ids) | |
| existing_meta_ids_set = set(existing_meta_ids) | |
| # Find metadata to add, keep, or delete | |
| metadata_to_add = list(metadata_ids_set - existing_meta_ids_set) | |
| metadata_to_keep = list(metadata_ids_set & existing_meta_ids_set) | |
| metadata_to_delete = list(existing_meta_ids_set - metadata_ids_set) | |
| # Delete entries that are no longer in the updated metadata_ids list | |
| if metadata_to_delete: | |
| db.query(User_Meta).filter(User_Meta.user_id == self.user_id, User_Meta.metadata_id.in_(metadata_to_delete)).delete(synchronize_session=False) | |
| # Add new entries for metadata that are not in the existing user meta | |
| for meta_id in metadata_to_add: | |
| new_entry = User_Meta(user_id=self.user_id, metadata_id=meta_id) | |
| self.add(db, new_entry) | |
| db.commit() | |
| return { | |
| "status": "success", | |
| "added_meta": list(metadata_to_add), | |
| "deleted_meta": list(metadata_to_delete), | |
| "kept_meta": list(metadata_to_keep), | |
| } | |
| def delete_user_meta(self, db, metadata_id): | |
| """Delete user meta entries by metadata_id.""" | |
| filter_conditions = [User_Meta.metadata_id==metadata_id, | |
| User_Meta.user_id==self.user_id] | |
| self.delete(db, model=User_Meta, filter_conditions=filter_conditions) | |
| return {"message": f"Book user with id {metadata_id} deleted successfully."} | |
| def delete_all_user_meta(self, db): | |
| """Delete all user meta entries for a user.""" | |
| self.delete_all(db, model=User_Meta) | |