Spaces:
Sleeping
Sleeping
| # ------------------------------------------------------------------- | |
| # Pimcore | |
| # | |
| # This source file is available under two different licenses: | |
| # - GNU General Public License version 3 (GPLv3) | |
| # - Pimcore Commercial License (PCL) | |
| # Full copyright and license information is available in | |
| # LICENSE.md which is distributed with this source code. | |
| # | |
| # @copyright Copyright (c) Pimcore GmbH (http://www.pimcore.org) | |
| # @license http://www.pimcore.org/license GPLv3 and PCL | |
| # ------------------------------------------------------------------- | |
| import os | |
| import torch | |
| from .training_status import Status | |
| from .environment_variable_checker import EnvironmentVariableChecker | |
| from .training_manager import TrainingManager | |
| from .image_classification.image_classification_trainer import ImageClassificationTrainer | |
| from .image_classification.image_classification_parameters import ImageClassificationParameters, map_image_classification_training_parameters, ImageClassificationTrainingParameters | |
| from .text_classification.text_classification_trainer import TextClassificationTrainer | |
| from .text_classification.text_classification_parameters import TextClassificationParameters, map_text_classification_training_parameters, TextClassificationTrainingParameters | |
| from fastapi import FastAPI, Depends, HTTPException, UploadFile, Form, File, status | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| from pydantic import BaseModel | |
| from typing import Annotated | |
| import logging | |
| import os | |
| from pathlib import Path | |
| import tempfile | |
| app = FastAPI( | |
| title="Pimcore Fine-Tuning Service", | |
| description="This service allows you to fine-tune image and text classification models and upload them to hugging face hub.", | |
| version="1.0.0" | |
| ) | |
| environmentVariableChecker = EnvironmentVariableChecker() | |
| environmentVariableChecker.validate_environment_variables() | |
| logging.basicConfig(format='%(asctime)s %(levelname)-8s %(message)s') | |
| logger = logging.getLogger(__name__) | |
| logger.setLevel(logging.DEBUG) | |
| classification_trainer: TrainingManager = TrainingManager() | |
| class ResponseModel(BaseModel): | |
| """ Default pesponse model for endpoints. """ | |
| message: str | |
| success: bool = True | |
| # =========================================== | |
| # Security Check | |
| # =========================================== | |
| security = HTTPBearer() | |
| def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)): | |
| """Verify the token provided by the user.""" | |
| token = environmentVariableChecker.get_authentication_token() | |
| if credentials.credentials != token: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid token", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| return {"token": credentials.credentials} | |
| # =========================================== | |
| # Training Status Endpoints | |
| # =========================================== | |
| async def get_task_status(token_data: dict = Depends(verify_token)): | |
| """ Get the status of the currently running training (if any). """ | |
| status = classification_trainer.get_task_status() | |
| return { | |
| "project": status.get_project_name(), | |
| "progress": status.get_progress(), | |
| "task": status.get_task(), | |
| "status": status.get_status().value | |
| } | |
| async def stop_task(token_data: dict = Depends(verify_token)): | |
| """ Stop the currently running training (if any). """ | |
| try: | |
| status = classification_trainer.get_task_status() | |
| classification_trainer.stop_task() | |
| return ResponseModel(message=f"Training stopped for `{ status.get_project_name() }`") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") | |
| async def gpu_check(): | |
| """ Check if a GPU is available """ | |
| gpu = 'GPU not available' | |
| if torch.cuda.is_available(): | |
| gpu = 'GPU is available' | |
| print("GPU is available") | |
| else: | |
| print("GPU is not available") | |
| return {'success': True, 'gpu': gpu} | |
| # =========================================== | |
| # Fine-Tuning Image Classification | |
| # =========================================== | |
| async def image_classification( | |
| training_params: Annotated[ImageClassificationTrainingParameters, Depends(map_image_classification_training_parameters)], | |
| training_data_zip: Annotated[UploadFile, File(description="The ZIP file containing the training data, with a folder per class which contains images belonging to that class.")], | |
| token_data: dict = Depends(verify_token), | |
| project_name: str = Form(description="The name of the project. Will also be used as name of resulting model that will be created after fine tuning and as the name of the repository at huggingface."), | |
| source_model_name: str = Form('google/vit-base-patch16-224-in21k', description="The source model to be used as basis for fine tuning."), | |
| ): | |
| """ | |
| Start fine tuning an image classification model with the provided data. | |
| """ | |
| # check if training is running, if so then exit | |
| status = classification_trainer.get_task_status() | |
| if status.get_status() == Status.IN_PROGRESS or status.get_status() == Status.CANCELLING: | |
| raise HTTPException(status_code=405, detail="Training is already in progress.") | |
| # Ensure the uploaded file is a ZIP file | |
| if not training_data_zip.filename.endswith(".zip"): | |
| raise HTTPException(status_code=422, detail="Uploaded file is not a zip file.") | |
| try: | |
| # Create a temporary directory to extract the contents | |
| tmp_path = os.path.join(tempfile.gettempdir(), 'training_data') | |
| path = Path(tmp_path) | |
| path.mkdir(parents=True, exist_ok=True) | |
| contents = await training_data_zip.read() | |
| zip_path = os.path.join(tmp_path, 'image_classification_data.zip') | |
| with open(zip_path, 'wb') as temp_file: | |
| temp_file.write(contents) | |
| # prepare parameters | |
| parameters = ImageClassificationParameters( | |
| training_files_path=tmp_path, | |
| training_zip_file_path=zip_path, | |
| project_name=project_name, | |
| source_model_name=source_model_name, | |
| training_parameters=training_params | |
| ) | |
| # start training | |
| await classification_trainer.start_training(ImageClassificationTrainer(), parameters) | |
| return ResponseModel(message="Training started.") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") | |
| # =========================================== | |
| # Fine-Tuning Text Classification | |
| # =========================================== | |
| async def text_classificaiton( | |
| training_params: Annotated[TextClassificationTrainingParameters, Depends(map_text_classification_training_parameters)], | |
| training_data_csv: Annotated[UploadFile, File(description="The CSV file containing the training data, necessary columns `value` (text data) and `target` (classification).")], | |
| token_data: dict = Depends(verify_token), | |
| project_name: str = Form(description="The name of the project. Will also be used as name of resulting model that will be created after fine tuning and as the name of the repository at huggingface."), | |
| training_csv_limiter: str = Form(';', description="The delimiter used in the CSV file."), | |
| source_model_name: str = Form('distilbert/distilbert-base-uncased'), | |
| ): | |
| """Start fine tuning an text classification model with the provided data.""" | |
| # check if training is running, if so then exit | |
| status = classification_trainer.get_task_status() | |
| if status.get_status() == Status.IN_PROGRESS or status.get_status() == Status.CANCELLING: | |
| raise HTTPException(status_code=405, detail="Training is already in progress") | |
| # Ensure the uploaded file is a CSV file | |
| if not training_data_csv.filename.endswith(".csv"): | |
| raise HTTPException(status_code=422, detail="Uploaded file is not a csv file.") | |
| try: | |
| # Create a temporary directory to extract the contents | |
| tmp_path = os.path.join(tempfile.gettempdir(), 'training_data') | |
| path = Path(tmp_path) | |
| path.mkdir(parents=True, exist_ok=True) | |
| contents = await training_data_csv.read() | |
| csv_path = os.path.join(tmp_path, 'data.csv') | |
| with open(csv_path, 'wb') as temp_file: | |
| temp_file.write(contents) | |
| # prepare parameters | |
| parameters = TextClassificationParameters( | |
| training_csv_file_path=csv_path, | |
| training_csv_limiter=training_csv_limiter, | |
| project_name=project_name, | |
| source_model_name=source_model_name, | |
| training_parameters=training_params | |
| ) | |
| # start training | |
| await classification_trainer.start_training(TextClassificationTrainer(), parameters) | |
| return ResponseModel(message="Training started.") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") | |