Spaces:
Runtime error
Runtime error
| import logging | |
| from typing import Annotated, Optional | |
| from api.function import data_ingestion | |
| from api.router.user import user_dependency | |
| from api.auth import check_user_authentication, check_admin_authentication | |
| from fastapi import Form, APIRouter, File, UploadFile, Depends | |
| from fastapi.responses import JSONResponse | |
| from db.models import Metadata | |
| from db.database import get_db | |
| from db.query.query_book import BookQuery | |
| from db.query.query_category import CategoryQuery | |
| from db.fetching import DataFetching | |
| from config import MYSQL_CONFIG | |
| from utils.error_handlers import handle_exception | |
| from script.vector_db import IndexManager | |
| from service.dto import MetadataResponse | |
| from service.aws_loader import Loader | |
| from sqlalchemy.orm import Session | |
| from sqlalchemy.future import select | |
| router = APIRouter(tags=["Book"]) | |
| index_manager = IndexManager() | |
| db_dependency = Annotated[Session, Depends(get_db)] | |
| async def get_metadata(user: user_dependency, db: db_dependency): | |
| auth_response = check_user_authentication(user) | |
| if auth_response: | |
| print(auth_response) | |
| return auth_response | |
| try: | |
| # Join Metadata with Category to get the category name | |
| fetching = DataFetching(user, db) | |
| metadata_fetching = fetching.metadata_fetching() | |
| # Transform results into MetadataResponse model with optional thumbnail handling | |
| return metadata_fetching | |
| except Exception as e: | |
| return handle_exception(e) | |
| async def upload_file( | |
| user: user_dependency, | |
| db: db_dependency, | |
| title: str = Form(...), | |
| author: str = Form(...), | |
| category_id: int = Form(...), | |
| year: int = Form(...), | |
| publisher: str = Form(...), | |
| file: UploadFile = File(...), | |
| lang: str = Form(None), | |
| thumbnail: Optional[UploadFile] = File(None), | |
| ): | |
| auth_response = check_admin_authentication(user) | |
| if auth_response: | |
| return auth_response | |
| # Restrict `lang` to only "id" or "en" | |
| lang = lang if lang in {"id", "en"} else "en" | |
| # Query the category based on category_id | |
| category_query = CategoryQuery(user) | |
| category = category_query.get_category(db, category_id) | |
| try: | |
| reference = { | |
| "title": title, | |
| "author": author, | |
| "category": category, | |
| "year": year, | |
| "publisher": publisher, | |
| } | |
| # Process the file and handle data ingestion | |
| response = await data_ingestion(reference, file, lang) | |
| if isinstance(response, JSONResponse): | |
| return response # Return the error response directly | |
| if thumbnail: | |
| file_name = f"{reference['title']}" | |
| aws_loader = Loader() | |
| ekstensi_file = file.filename.split(".")[-1].lower() | |
| aws_loader.upload_image_to_s3(file=thumbnail, custom_name=f"{file_name}.{ekstensi_file}") | |
| # Create a new Metadata object | |
| book_query = BookQuery(user) | |
| book_query.add_book(db, title, author, category_id, year, publisher) | |
| logging.info("Database Inserted") | |
| return { | |
| "filename": file.filename, | |
| "response": response, | |
| "info": "upload file successfully", | |
| } | |
| except Exception as e: | |
| return handle_exception(e) | |
| async def update_metadata( | |
| user: user_dependency, | |
| db: db_dependency, | |
| metadata_id: int, | |
| title: str = Form(...), | |
| author: str = Form(...), | |
| category_id: int = Form(...), | |
| year: int = Form(...), | |
| publisher: str = Form(...), | |
| thumbnail: Optional[UploadFile] = File(None), | |
| ): | |
| auth_response = check_admin_authentication(user) | |
| if auth_response: | |
| return auth_response | |
| try: | |
| # fetch current metadata | |
| book_query = BookQuery(user) | |
| current_metadata = book_query.get_metadata_books(db, metadata_id) | |
| # Fetch current and new categories | |
| category_query = CategoryQuery(user) | |
| current_category = category_query.get_current_category(db, metadata_id) | |
| new_category = category_query.get_category(db, category_id) | |
| # Prepare the references | |
| current_reference = { | |
| "title": current_metadata.title, | |
| "author": current_metadata.author, | |
| "category": current_category, | |
| "year": current_metadata.year, | |
| "publisher": current_metadata.publisher, | |
| } | |
| new_reference = { | |
| "title": title, | |
| "author": author, | |
| "category": new_category, | |
| "year": year, | |
| "publisher": publisher, | |
| } | |
| # Update vector database | |
| index_manager.update_vector_database(current_reference, new_reference) | |
| # Update existing metadata entry | |
| metadata = db.query(Metadata).filter(Metadata.id == metadata_id).first() | |
| print(metadata) | |
| if not metadata: | |
| return JSONResponse(status_code=404, content="Metadata not found") | |
| updated_metadata = book_query.update_metadata_entry( | |
| db, metadata_id, title, author, category_id, year, publisher | |
| ) | |
| print(updated_metadata) | |
| updated_category = category_query.get_category(db, updated_metadata.category_id) | |
| print(updated_category) | |
| return MetadataResponse( | |
| id=metadata_id, | |
| title=updated_metadata.title, | |
| author=updated_metadata.author, | |
| category=updated_category, # Assuming category relationship is available | |
| category_id=updated_metadata.category_id, | |
| year=updated_metadata.year, | |
| publisher=updated_metadata.publisher, | |
| thumbnail=( | |
| updated_metadata.thumbnail | |
| if updated_metadata.thumbnail is not None | |
| else None | |
| ), | |
| ) | |
| except Exception as e: | |
| return handle_exception(e) | |
| async def delete_metadata(user: user_dependency, db: db_dependency, metadata_id: int): | |
| auth_response = check_admin_authentication(user) | |
| if auth_response: | |
| return auth_response | |
| try: | |
| # Check if metadata exists before deletion | |
| metadata = db.query(Metadata).filter(Metadata.id == metadata_id).first() | |
| if not metadata: | |
| return JSONResponse(status_code=404, content="Metadata not found") | |
| # Delete from the vector database and the database | |
| index_manager.delete_vector_database(metadata.title) | |
| db.delete(metadata) | |
| db.commit() | |
| return {"Status": "delete successfully"} | |
| except Exception as e: | |
| return handle_exception(e) | |