Spaces:
Runtime error
Runtime error
| from typing import Annotated, List, Optional | |
| from api.router.user import user_dependency | |
| from api.auth import check_user_authentication | |
| from fastapi import APIRouter, Depends | |
| from db.database import get_db | |
| from sqlalchemy.orm import Session | |
| from db.query.query_user_meta import UserMetaQuery | |
| from db.fetching import DataFetching | |
| from utils.error_handlers import handle_exception | |
| router = APIRouter(tags=["Book_Collection"]) | |
| db_dependency = Annotated[Session, Depends(get_db)] | |
| async def get_book_collection(user: user_dependency, db: db_dependency): | |
| """Fetch user's book collection.""" | |
| auth_response = check_user_authentication(user) | |
| if auth_response: | |
| return auth_response | |
| try: | |
| fetching = DataFetching(user, db) | |
| book_collection = fetching.collection_fetching() | |
| return { | |
| "status": "success", | |
| "book_collection": book_collection, | |
| } | |
| except Exception as e: | |
| return handle_exception(e) | |
| async def request_book_collection( | |
| user: user_dependency, db: db_dependency, metadata_ids: List[Optional[int]] | |
| ): | |
| """Insert book collection metadata for a user.""" | |
| auth_response = check_user_authentication(user) | |
| if auth_response: | |
| return auth_response | |
| user_meta_query = UserMetaQuery(user) | |
| try: | |
| return user_meta_query.insert_user_meta_entries(db, metadata_ids) | |
| except Exception as e: | |
| return handle_exception(e) | |
| async def update_book_collection( | |
| user: user_dependency, db: db_dependency, metadata_ids: List[Optional[int]] | |
| ): | |
| """Update user's book collection metadata.""" | |
| auth_response = check_user_authentication(user) | |
| if auth_response: | |
| return auth_response | |
| try: | |
| user_meta_query = UserMetaQuery(user) | |
| return user_meta_query.update_user_meta_entries(db, metadata_ids) | |
| except Exception as e: | |
| return handle_exception(e) | |
| async def delete_book_collection( | |
| user: user_dependency, db: db_dependency, metadata_id: int | |
| ): | |
| """Delete a specific book collection entry.""" | |
| auth_response = check_user_authentication(user) | |
| if auth_response: | |
| return auth_response | |
| try: | |
| user_meta_query = UserMetaQuery(user) | |
| return user_meta_query.delete_user_meta( | |
| db, metadata_id=metadata_id | |
| ) | |
| except Exception as e: | |
| return handle_exception(e) | |
| async def delete_all_book(user: user_dependency, db: db_dependency): | |
| """Delete all book collections for the authenticated user.""" | |
| auth_response = check_user_authentication(user) | |
| if auth_response: | |
| return auth_response | |
| try: | |
| user_meta_query = UserMetaQuery(user) | |
| user_meta_query.delete_all_user_meta(db) | |
| return { | |
| "status": "success", | |
| "message": f"Deleted book collection for user {user.get('id')}", | |
| } | |
| except Exception as e: | |
| return handle_exception(e) | |