Spaces:
Runtime error
Runtime error
| from db.query.base_query import BaseQuery | |
| from db.models import Metadata, Category, Session_Publisher | |
| from fastapi.responses import JSONResponse | |
| class BookQuery(BaseQuery): | |
| def __init__(self, user): | |
| super().__init__(user) | |
| def add_book(self, db, title, author, category_id, year, publisher): | |
| new_book = Metadata( | |
| title=title, | |
| author=author, | |
| category_id=category_id, | |
| year=year, | |
| publisher=publisher, | |
| ) | |
| self.add(db, new_book) | |
| def get_book(self, db): | |
| model = Metadata | |
| metadata_columns = [ | |
| Metadata.id, | |
| Metadata.title, | |
| Metadata.author, | |
| Category.category, # Assuming this is the correct field for category name | |
| Category.id, | |
| Metadata.year, | |
| Metadata.publisher, | |
| Metadata.thumbnail, | |
| ] | |
| join_models = [Category] | |
| join_conditions = [Metadata.category_id == Category.id] | |
| result = self.get_with_joins( | |
| db, | |
| model=model, | |
| columns=metadata_columns, | |
| join_models=join_models, | |
| join_conditions=join_conditions, | |
| multiple=True, | |
| ) | |
| return result | |
| def update_metadata_entry( | |
| self, db, metadata_id, title, author, category_id, year, publisher | |
| ): | |
| model = Metadata | |
| # Define filter conditions based on the metadata_id | |
| filter_conditions = [Metadata.id == metadata_id] | |
| # Prepare the update data | |
| update_data = { | |
| "title": title, | |
| "author": author, | |
| "category_id": category_id, | |
| "year": year, | |
| "publisher": publisher, | |
| } | |
| # Call the update_entries method to update the metadata | |
| update_response = self.update_entries( | |
| db, | |
| model=model, | |
| update_data=update_data, | |
| filter_conditions=filter_conditions, | |
| ) | |
| if isinstance(update_response, JSONResponse): | |
| return update_response # Return error response if any | |
| # Fetch the updated metadata to retrieve the new category | |
| updated_metadata = db.query(Metadata).filter(Metadata.id == metadata_id).first() | |
| if not updated_metadata: | |
| return JSONResponse(status_code=404, content="Metadata not found") | |
| return updated_metadata | |
| def update_book(self, db, book_id, title, author): | |
| update_data = {"title": title, "author": author} | |
| self.update( | |
| db, | |
| model=Metadata, | |
| id=book_id, | |
| update_data=update_data, | |
| ) | |
| def delete_book(self, db, book_id): | |
| self.delete(db, Metadata, book_id) | |
| def get_books(self, db): | |
| return self.get(db, model=Metadata, multiple=True) | |
| def get_metadata_books(self, db, metadata_id): | |
| return self.get(db, Metadata, id=metadata_id) | |
| def get_title_from_session(self, db, metadata_id, session_id): | |
| model = Session_Publisher | |
| columns = [Metadata.title] | |
| join_models = [Session_Publisher] | |
| join_conditions = [Metadata.id == metadata_id] | |
| filter_conditions = [ | |
| Session_Publisher.user_id == self.user_id, | |
| Session_Publisher.id == session_id, | |
| ] | |
| titles = self.get_with_joins( | |
| db, | |
| model=model, | |
| columns=columns, | |
| join_models=join_models, | |
| join_conditions=join_conditions, | |
| filter_conditions=filter_conditions, | |
| multiple=True, | |
| ) | |
| return titles | |