Spaces:
Runtime error
Runtime error
| """ | |
| # Main module for the ScanUDoc application, containing various endpoints for text processing and benchmarking. | |
| # Author: Rodolfo Torres | |
| # Email: rodolfo.torres@outlook.com | |
| # LinkedIn: https://www.linkedin.com/in/rodolfo-torres-p | |
| # This module includes endpoints for text processing, benchmarking of different pipelines, and handling file uploads. | |
| # The code is licensed under the GPL-3.0 license, which is a widely used open-source license, ensuring that any derivative work is also open source. | |
| # It grants users the freedom to use, modify, and distribute the software, as well as any modifications or extensions made to it. | |
| # However, any modified versions of the software must also be licensed under GPL-3.0. | |
| # For more details, please refer to the full text of the GPL-3.0 license at https://www.gnu.org/licenses/gpl-3.0.html. | |
| """ | |
| import torch | |
| # Attempt to import the Intel Extension for PyTorch module. | |
| # Set the 'ipex_enabled' flag accordingly to indicate if the import was successful. | |
| try: | |
| import intel_extension_for_pytorch as ipex | |
| ipex_enabled = True | |
| except: | |
| # If the import fails, set 'ipex_enabled' to False. | |
| ipex_enabled = False | |
| import time | |
| import numpy as np | |
| from fastapi import FastAPI, UploadFile, File, HTTPException | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.responses import FileResponse | |
| from typing import Optional | |
| from transformers import pipeline | |
| from pydantic import BaseModel | |
| from fastapi.responses import JSONResponse | |
| from io import BytesIO | |
| import PyPDF2 | |
| from newspaper import Article | |
| from transformers import AutoModelForMultipleChoice, AutoTokenizer, AutoModelForQuestionAnswering, AutoModelForSequenceClassification | |
| try: | |
| # Check if there is any XPU (any accelerator device) available with PyTorch. | |
| has_xpu = torch.xpu.device_count() | |
| except: | |
| # If there is an error during the device count check, set 'has_xpu' to False. | |
| has_xpu = False | |
| def get_qa_pipeline(optimize=True): | |
| """ | |
| Creates a question-answering pipeline using a pre-trained model and tokenizer. Optionally applies Intel PyTorch Extension optimizations. | |
| Parameters: | |
| - optimize (bool): A flag indicating whether to apply Intel PyTorch Extension optimizations. Default is True. | |
| Returns: | |
| - qa_pipeline: A pipeline for question-answering using the specified model and tokenizer. | |
| """ | |
| # Define the model checkpoint for the question-answering pipeline. | |
| model_checkpoint = "roaltopo/scan-u-doc_question-answer" | |
| # Initialize the tokenizer and the model for question-answering based on the specified checkpoint. | |
| tokenizer = AutoTokenizer.from_pretrained(model_checkpoint) | |
| model = AutoModelForQuestionAnswering.from_pretrained(model_checkpoint) | |
| model.eval() | |
| # Determine the device based on the availability of an XPU and the 'ipex_enabled' flag. | |
| if has_xpu: | |
| device = 'xpu' | |
| else: | |
| device = None | |
| if ipex_enabled and optimize: | |
| # Apply Intel PyTorch Extension optimizations if 'ipex_enabled' and 'optimize' are both True. | |
| model = ipex.optimize(model, weights_prepack=False) | |
| model = torch.compile(model, backend="ipex") | |
| # Use 'torch.no_grad()' to ensure that no gradient calculations are performed during inference. | |
| with torch.no_grad(): | |
| # Create a question-answering pipeline using the specified model and tokenizer. | |
| # Set the torch data type to 'torch.bfloat16' and the device according to the determined value. | |
| qa_pipeline = pipeline("question-answering", model=model, tokenizer=tokenizer, torch_dtype=torch.bfloat16, device=device) | |
| return qa_pipeline | |
| def get_bool_q_pipeline(optimize=True): | |
| """ | |
| Creates a pipeline for text classification for boolean questions using a pre-trained model and tokenizer. | |
| Optionally applies Intel PyTorch Extension optimizations. | |
| Parameters: | |
| - optimize (bool): A flag indicating whether to apply Intel PyTorch Extension optimizations. Default is True. | |
| Returns: | |
| - bool_q_pipeline: A pipeline for text classification for boolean questions using the specified model and tokenizer. | |
| """ | |
| # Define the model checkpoint for the boolean question pipeline. | |
| model_checkpoint = "roaltopo/scan-u-doc_bool-question" | |
| # Initialize the tokenizer and the model for text classification based on the specified checkpoint. | |
| tokenizer = AutoTokenizer.from_pretrained(model_checkpoint) | |
| model = AutoModelForSequenceClassification.from_pretrained(model_checkpoint) | |
| model.eval() | |
| # Determine the device based on the availability of an XPU and the 'ipex_enabled' flag. | |
| if has_xpu: | |
| device = 'xpu' | |
| else: | |
| device = None | |
| if ipex_enabled and optimize: | |
| # Apply Intel PyTorch Extension optimizations if 'ipex_enabled' and 'optimize' are both True. | |
| model = ipex.optimize(model, weights_prepack=False) | |
| model = torch.compile(model, backend="ipex") | |
| # Use 'torch.no_grad()' to ensure that no gradient calculations are performed during inference. | |
| with torch.no_grad(): | |
| # Create a text classification pipeline for boolean questions using the specified model and tokenizer. | |
| # Set the torch data type to 'torch.bfloat16' and the device according to the determined value. | |
| bool_q_pipeline = pipeline("text-classification", model=model, tokenizer=tokenizer, torch_dtype=torch.bfloat16, device=device) | |
| return bool_q_pipeline | |
| def get_bool_a_model(optimize=True): | |
| """ | |
| Retrieves the pre-trained model and tokenizer for answering boolean questions. | |
| Optionally applies Intel PyTorch Extension optimizations. | |
| Parameters: | |
| - optimize (bool): A flag indicating whether to apply Intel PyTorch Extension optimizations. Default is True. | |
| Returns: | |
| - model: The pre-trained model for answering boolean questions. | |
| - tokenizer: The tokenizer corresponding to the model. | |
| """ | |
| # Define the model checkpoint for the boolean answer model. | |
| model_checkpoint = "roaltopo/scan-u-doc_bool-answer" | |
| # Initialize the model and the tokenizer for multiple-choice answers based on the specified checkpoint. | |
| model = AutoModelForMultipleChoice.from_pretrained(model_checkpoint) | |
| tokenizer = AutoTokenizer.from_pretrained(model_checkpoint) | |
| if has_xpu: | |
| # If an XPU is available, move the model to the XPU device. | |
| model = model.to("xpu") | |
| model.eval() | |
| if ipex_enabled and optimize: | |
| # Apply Intel PyTorch Extension optimizations if 'ipex_enabled' and 'optimize' are both True. | |
| model = ipex.optimize(model, weights_prepack=False) | |
| model = torch.compile(model, backend="ipex") | |
| return model, tokenizer | |
| # Initialize the question-answering pipeline using the 'get_qa_pipeline' function. | |
| qa_pipeline = get_qa_pipeline() | |
| # Initialize the pipeline for text classification for boolean questions using the 'get_bool_q_pipeline' function. | |
| bool_q_pipeline = get_bool_q_pipeline() | |
| # Retrieve the model and tokenizer for answering boolean questions using the 'get_bool_a_model' function. | |
| bool_a_model, bool_a_tokenizer = get_bool_a_model() | |
| # Initialize the FastAPI application. | |
| app = FastAPI() | |
| # In-memory dictionary for storing information during runtime. | |
| text_storage = {} | |
| class TextInfo(BaseModel): | |
| """ | |
| A Pydantic Base model representing information related to text data. | |
| Attributes: | |
| - text (str): Optional. The text data to be processed. | |
| - pdf (bytes): Optional. The PDF data to be processed. | |
| - html_url (str): Optional. The URL pointing to the HTML content to be processed. | |
| """ | |
| text: Optional[str] = None | |
| pdf: Optional[bytes] = None | |
| html_url: Optional[str] = None | |
| class QuestionInfo(BaseModel): | |
| """ | |
| A Pydantic Base model representing information related to a specific question. | |
| Attributes: | |
| - question (str): The question to be answered or classified. | |
| - allow_bool (bool): Optional. Flag indicating whether to allow boolean question types. Default is False. | |
| """ | |
| question: str | |
| allow_bool: Optional[bool] = False | |
| def predict_boolean_answer(text, question, model=bool_a_model, tokenizer=bool_a_tokenizer): | |
| """ | |
| Predicts a boolean answer for the given text and question using the specified model and tokenizer. | |
| Parameters: | |
| - text (str): The text data for context. | |
| - question (str): The question to be answered. | |
| - model: The pre-trained model for answering boolean questions. Default is 'bool_a_model'. | |
| - tokenizer: The tokenizer corresponding to the model. Default is 'bool_a_tokenizer'. | |
| Returns: | |
| - dict: A dictionary containing the predicted boolean answer. | |
| """ | |
| # Mapping for converting predicted labels to human-readable answers. | |
| id2label = {0: "No", 1: "Yes"} | |
| text += '\n' | |
| question += '\n' | |
| # Tokenize the text and question inputs for the model. | |
| inputs = tokenizer([[text, question+'no'], [text, question+'yes']], return_tensors="pt", padding=True) | |
| labels = torch.tensor(0).unsqueeze(0) | |
| if has_xpu: | |
| # If an XPU is available, move the inputs and labels to the XPU device. | |
| inputs = inputs.to("xpu") | |
| labels = labels.to("xpu") | |
| # Perform the forward pass with the model to get the outputs and logits. | |
| outputs = model(**{k: v.unsqueeze(0) for k, v in inputs.items()}, labels=labels) | |
| logits = outputs.logits | |
| # Return the predicted boolean answer in a dictionary format. | |
| return {'answer': id2label[int(logits.argmax().item())]} | |
| def get_qa_score(question, context, optimize, num_times, warmup_rounds): | |
| """ | |
| Calculates the average inference time for the question-answering pipeline. | |
| Parameters: | |
| - question (str): The question to be answered. | |
| - context (str): The context for the question. | |
| - optimize (bool): A flag indicating whether to apply optimizations to the pipeline. | |
| - num_times (int): The number of times the inference is run to calculate the average time. | |
| - warmup_rounds (int): The number of initial rounds to be ignored for calculating the average time. | |
| Returns: | |
| - pipeline_inference_time: The average inference time for the question-answering pipeline. | |
| """ | |
| if optimize: | |
| pipeline = qa_pipeline | |
| else: | |
| pipeline = get_qa_pipeline(optimize=False) | |
| with torch.no_grad(): | |
| latency_list = [] | |
| for i in range(num_times): | |
| time_start = time.time() | |
| answer = pipeline(question=question, context=context) | |
| if i >= warmup_rounds: | |
| latency_list.append(time.time() - time_start) | |
| pipeline_inference_time = np.mean(latency_list) | |
| return pipeline_inference_time | |
| def get_bool_q_score(question, optimize, num_times, warmup_rounds): | |
| """ | |
| Calculates the average inference time for the text classification pipeline for boolean questions. | |
| Parameters: | |
| - question (str): The question to be classified. | |
| - optimize (bool): A flag indicating whether to apply optimizations to the pipeline. | |
| - num_times (int): The number of times the inference is run to calculate the average time. | |
| - warmup_rounds (int): The number of initial rounds to be ignored for calculating the average time. | |
| Returns: | |
| - pipeline_inference_time: The average inference time for the text classification pipeline for boolean questions. | |
| """ | |
| if optimize: | |
| pipeline = bool_q_pipeline | |
| else: | |
| pipeline = get_bool_q_pipeline(optimize=False) | |
| with torch.no_grad(): | |
| latency_list = [] | |
| for i in range(num_times): | |
| time_start = time.time() | |
| answer = pipeline(question) | |
| if i >= warmup_rounds: | |
| latency_list.append(time.time() - time_start) | |
| pipeline_inference_time = np.mean(latency_list) | |
| return pipeline_inference_time | |
| def get_bool_a_score(text, question, optimize, num_times, warmup_rounds): | |
| """ | |
| Calculates the average inference time for answering boolean questions. | |
| Parameters: | |
| - text (str): The text data for context. | |
| - question (str): The question to be answered. | |
| - optimize (bool): A flag indicating whether to apply optimizations to the pipeline. | |
| - num_times (int): The number of times the inference is run to calculate the average time. | |
| - warmup_rounds (int): The number of initial rounds to be ignored for calculating the average time. | |
| Returns: | |
| - pipeline_inference_time: The average inference time for answering boolean questions. | |
| """ | |
| if not optimize: | |
| model, tokenizer = get_bool_a_model(optimize=optimize) | |
| else: | |
| model = bool_a_model | |
| tokenizer = bool_a_tokenizer | |
| with torch.no_grad(): | |
| latency_list = [] | |
| for i in range(num_times): | |
| time_start = time.time() | |
| answer = predict_boolean_answer(text, question, model=model, tokenizer=tokenizer) | |
| if i >= warmup_rounds: | |
| latency_list.append(time.time() - time_start) | |
| pipeline_inference_time = np.mean(latency_list) | |
| return pipeline_inference_time | |
| async def store_text(uuid: str, text_info: TextInfo): | |
| """ | |
| Stores text data in the in-memory dictionary using the provided UUID. | |
| Parameters: | |
| - uuid (str): The unique identifier for the stored text data. | |
| - text_info (TextInfo): A Pydantic Base model containing information related to the text data. | |
| Returns: | |
| - dict: A dictionary indicating the success of the storing operation. | |
| """ | |
| try: | |
| url = text_info.html_url.strip() if text_info.html_url else None | |
| if url: | |
| article = Article(url) | |
| article.download() | |
| article.parse() | |
| text = f'{article.title}\n{article.text}' | |
| elif text_info.text: | |
| text = text_info.text | |
| else: | |
| raise HTTPException(status_code=400, detail="Invalid Option: 'url' or 'text' required in text_info.") | |
| # Store information in the in-memory dictionary | |
| text_storage[uuid] = { | |
| 'text': text, | |
| 'url': text_info.html_url | |
| } | |
| return {'success': True} | |
| except Exception as e: | |
| error_message = f"Error: {str(e)}" | |
| print(error_message) | |
| raise HTTPException(status_code=500, detail="Internal Server Error: An unexpected error occurred.") | |
| async def upload_file(uuid: str, file: UploadFile = File(...)): | |
| """ | |
| Uploads a file and extracts text content to be stored in the in-memory dictionary using the provided UUID. | |
| Parameters: | |
| - uuid (str): The unique identifier for the stored text data. | |
| - file (UploadFile): The file to be uploaded. | |
| Returns: | |
| - JSONResponse: A JSON response indicating the success or failure of the file upload and text extraction process. | |
| """ | |
| try: | |
| file_extension = file.filename.split('.')[-1].lower() | |
| if file_extension == 'pdf': | |
| content = await file.read() | |
| stream = BytesIO(content) | |
| reader = PyPDF2.PdfReader(stream) | |
| extracted_text = '' | |
| for page_num in range(len(reader.pages)): | |
| page = reader.pages[page_num] | |
| tmp = page.extract_text() | |
| tmp = tmp.replace('\n', ' ') | |
| tmp = tmp.replace(' ', ' ') | |
| tmp = tmp.replace('. ', '.\n') | |
| extracted_text += tmp | |
| if len(extracted_text) > 4000: | |
| extracted_text = extracted_text[:4000] | |
| break | |
| elif file_extension == 'txt': | |
| content = await file.read() | |
| extracted_text = content.decode('utf-8') | |
| else: | |
| raise ValueError("Unsupported file format.") | |
| text_storage[uuid] = { | |
| 'text': extracted_text, | |
| } | |
| return JSONResponse(content={'success': True}) | |
| except Exception as e: | |
| return JSONResponse(content={"message": f"Error while uploading the file: {e}"}, status_code=500) | |
| async def answer_question(uuid: str, question_info: QuestionInfo): | |
| """ | |
| Answers a given question based on the stored text corresponding to the provided UUID. | |
| Parameters: | |
| - uuid (str): The unique identifier for the stored text data. | |
| - question_info (QuestionInfo): A Pydantic Base model containing information related to the question. | |
| Returns: | |
| - dict: A dictionary containing the answer to the question. | |
| """ | |
| bool_activate = question_info.allow_bool | |
| question = question_info.question | |
| # Verify if the text with the ID exists in the dictionary | |
| if uuid not in text_storage: | |
| return {'error': 'Text not found'} | |
| answer = qa_pipeline(question=question, context=text_storage[uuid]['text']) | |
| if bool_activate: | |
| is_bool_inference = bool_q_pipeline(question) | |
| if is_bool_inference[0]['label'] == 'YES': | |
| answer = predict_boolean_answer(answer['answer'], question) | |
| return answer | |
| async def benchmark(question: str, context: str, num_times: int, warmup_rounds: int): | |
| """ | |
| Conducts benchmarking for the different pipeline components based on the specified parameters. | |
| Parameters: | |
| - question (str): The question to be used for benchmarking. | |
| - context (str): The context for the question. | |
| - num_times (int): The number of times the inference is run to calculate the average time. | |
| - warmup_rounds (int): The number of initial rounds to be ignored for calculating the average time. | |
| Returns: | |
| - dict: A dictionary containing the benchmarking results for the question-answering and boolean pipelines. | |
| """ | |
| qa = { get_qa_score(question, context, False, num_times, warmup_rounds), get_qa_score(question, context, True, num_times, warmup_rounds)} | |
| bool_q = { get_bool_q_score(question, False, num_times, warmup_rounds), get_bool_q_score(question, True, num_times, warmup_rounds)} | |
| answer = qa_pipeline(question=question, context=context) | |
| bool_a = { get_bool_a_score(answer['answer'], question, False, num_times, warmup_rounds), get_bool_a_score(answer['answer'], question, True, num_times, warmup_rounds)} | |
| return {'has_xpu': has_xpu, 'ipex_enabled': ipex_enabled,'qa': qa, 'bool_q': bool_q, 'bool_a': bool_a, 'answer': answer['answer']} | |
| app.mount("/", StaticFiles(directory="static", html=True), name="static") | |
| def index() -> FileResponse: | |
| """ | |
| Returns the index.html file as the main landing page. | |
| Returns: | |
| - FileResponse: The index.html file as the main landing page. | |
| """ | |
| return FileResponse(path="/app/static/index.html", media_type="text/html") | |