Spaces:
Build error
Build error
| import io | |
| import json | |
| import os | |
| import shutil | |
| import time | |
| from collections import Counter | |
| from pathlib import Path | |
| import fitz | |
| import numpy as np | |
| import pandas as pd | |
| import plotly.express as px | |
| import streamlit as st | |
| import torch | |
| import torch.nn.functional as F | |
| from easyocr import Reader | |
| from PIL import Image | |
| from tqdm import tqdm | |
| from transformers import (LayoutLMv3FeatureExtractor, | |
| LayoutLMv3ForSequenceClassification, | |
| LayoutLMv3Processor, LayoutLMv3TokenizerFast) | |
| DEVICE = "cuda:0" if torch.cuda.is_available() else "cpu" | |
| # DEVICE = "cpu" | |
| MICROSOFT_HODEL_NAME = "microsoft/layoutlmv3-base" | |
| MODEL_NAME = "arthur-lima/layoutlmv3-triagem-documentos" | |
| def create_bounding_box(bbox_data, width_scale: float, height_scale: float): | |
| xs = [] | |
| ys = [] | |
| for x, y in bbox_data: | |
| xs.append(x) | |
| ys.append(y) | |
| left = int(min(xs) * width_scale) | |
| top = int(min(ys) * height_scale) | |
| right = int(max(xs) * width_scale) | |
| bottom = int(max(ys) * height_scale) | |
| return [left, top, right, bottom] | |
| def create_ocr_reader(): | |
| return Reader(["pt", "en"], gpu=True) | |
| # return Reader(["pt", "en"], gpu=False) | |
| def create_processor(): | |
| feature_extractor = LayoutLMv3FeatureExtractor(apply_ocr=False) | |
| tokenizer = LayoutLMv3TokenizerFast.from_pretrained(MICROSOFT_HODEL_NAME) | |
| return LayoutLMv3Processor(feature_extractor, tokenizer) | |
| def create_model(revision="main"): | |
| model = LayoutLMv3ForSequenceClassification.from_pretrained(MODEL_NAME, revision=revision) | |
| return model.eval().to(DEVICE) | |
| def pdf2jpg(src: Path, dest_path: Path=None, dpi=100, limit=None): | |
| """ | |
| Converte um arquivo PDF em JPG. | |
| Se forem várias páginas, serão geradas várias imagens | |
| """ | |
| # Tratamento dos caminhos de destino | |
| if (dest_path is None): | |
| # Não passou caminho | |
| dest = src.parent / src.stem | |
| elif (dest_path.suffix == ""): | |
| # Só passou uma pasta | |
| dest = dest_path / src.stem | |
| else: | |
| # Passou um caminho com arquivo | |
| dest = dest_path.parent / dest_path.stem | |
| zoom = dpi / 72 # zoom factor, standard: 72 dpi | |
| magnify = fitz.Matrix(zoom, zoom) # magnifies in x, resp. y direction | |
| try: | |
| doc = fitz.open(src) # open document | |
| for page in doc: | |
| pix = page.get_pixmap(matrix=magnify) # render page to an image | |
| dest_final_filename = Path(str(dest) + f"-{page.number}.jpg") | |
| pix.save(dest_final_filename) | |
| return True | |
| except Exception as e: | |
| print(f"\nProblemas na conversão para JPG do arquivo PDF {src}: " + str(e)) | |
| return False | |
| def classifyPDF( | |
| pdfpath: Path, model, processor, reader: Reader = None, dpi=100 | |
| ) -> str: | |
| def create_bounding_box(bbox_data, width_scale: float = 1, height_scale: float = 1): | |
| xs = [] | |
| ys = [] | |
| for x, y in bbox_data: | |
| xs.append(x) | |
| ys.append(y) | |
| left = int(min(xs) * width_scale) | |
| top = int(min(ys) * height_scale) | |
| right = int(max(xs) * width_scale) | |
| bottom = int(max(ys) * height_scale) | |
| return [left, top, right, bottom] | |
| # Cria pasta temporária para converter em JPG | |
| tmp = Path("temp") | |
| if os.path.exists(tmp): | |
| tmp = Path("temp_classification") | |
| shutil.rmtree(tmp, ignore_errors=True) | |
| os.mkdir(tmp) | |
| image_path = tmp / Path(pdfpath.name).with_suffix(".jpg") | |
| pdf2jpg(pdfpath, image_path, dpi) | |
| if reader is None: | |
| reader = Reader(["pt", "en"]) | |
| time.sleep(0.5) | |
| # Verificar se há várias páginas | |
| if len(os.listdir(tmp)) > 1: | |
| # Várias páginas, escolher a da maioria | |
| results = [] | |
| all_probs = [] | |
| for img in tqdm(os.listdir(tmp)): | |
| image_path = tmp / img | |
| # Ler cada página (em bytes) via OCR | |
| image = Image.open(image_path) | |
| with open(image_path, "rb") as f: | |
| image_bytes = f.read() | |
| ocr_result = reader.readtext(image_bytes, batch_size=1) | |
| ocr_page = [] | |
| for bbox, word, confidence in ocr_result: | |
| ocr_page.append( | |
| {"word": word, "bounding_box": create_bounding_box(bbox)} | |
| ) | |
| with Path(image_path).with_suffix(".json").open("w") as f: | |
| json.dump(ocr_page, f) | |
| # Fazer a previsão | |
| predicted_class, probabilities = predict( | |
| image, image_bytes, reader, processor, model | |
| ) | |
| # result = model.config.id2label[predicted_class] | |
| results.append(predicted_class) | |
| if (len(all_probs) == 0): all_probs = np.array(probabilities) | |
| else: all_probs += np.array(probabilities) | |
| # Resultado é o mais comum | |
| result = Counter(results).most_common(1) | |
| result = result[0][0] | |
| all_probs = all_probs * (1 / len(os.listdir(tmp))) | |
| predicted_class, probabilities = result, all_probs | |
| else: | |
| # Uma página | |
| image_path = tmp / (os.listdir(tmp)[0]) | |
| # Ler a imagem via OCR | |
| image = Image.open(image_path) | |
| with open(image_path, "rb") as f: | |
| image_bytes = f.read() | |
| ocr_result = reader.readtext(image_bytes, batch_size=1) | |
| ocr_page = [] | |
| for bbox, word, confidence in ocr_result: | |
| ocr_page.append({"word": word, "bounding_box": create_bounding_box(bbox)}) | |
| with image_path.with_suffix(".json").open("w") as f: | |
| json.dump(ocr_page, f) | |
| # Fazer a previsão | |
| predicted_class, probabilities = predict( | |
| image, image_bytes, reader, processor, model | |
| ) | |
| probabilities = np.array(probabilities) | |
| # result = model.config.id2label[predicted_class] | |
| probabilities = probabilities / np.sqrt(np.sum(probabilities**2)) | |
| return predicted_class, probabilities | |
| def predict( | |
| image: Image.Image, | |
| image_bytes: bytes, | |
| reader: Reader, | |
| processor: LayoutLMv3Processor, | |
| model: LayoutLMv3ForSequenceClassification, | |
| ): | |
| ocr_result = reader.readtext(image_bytes) | |
| width, height = image.size | |
| width_scale = 1000 / width | |
| height_scale = 1000 / height | |
| words = [] | |
| boxes = [] | |
| for bbox, word, _ in ocr_result: | |
| boxes.append(create_bounding_box(bbox, width_scale, height_scale)) | |
| words.append(word) | |
| encoding = processor( | |
| image, | |
| words, | |
| boxes=boxes, | |
| max_length=512, | |
| padding="max_length", | |
| truncation=True, | |
| return_tensors="pt", | |
| ) | |
| with torch.inference_mode(): | |
| output = model( | |
| input_ids=encoding["input_ids"].to(DEVICE), | |
| attention_mask=encoding["attention_mask"].to(DEVICE), | |
| bbox=encoding["bbox"].to(DEVICE), | |
| pixel_values=encoding["pixel_values"].to(DEVICE), | |
| ) | |
| logits = output.logits | |
| predicted_class = logits.argmax() | |
| probabilities = ( | |
| F.softmax(logits, dim=-1).flatten().tolist() | |
| ) # Convertendo em probabilidades novamente | |
| # return model.config.id2label[predicted_class.item()] | |
| return predicted_class.detach().item(), probabilities | |
| reader = create_ocr_reader() | |
| processor = create_processor() | |
| model = create_model(revision="e34c270") | |
| # Logo | |
| c1, c2, c3 = st.columns([2.7,5,1]) | |
| c2.image("resources/previsa_cinza.png", width=250) | |
| # Caixas de Upload | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| uploaded_file = st.file_uploader("Upload: Notas Fiscais de Entrada", ["jpg", "pdf"]) | |
| uploaded_file = st.file_uploader("Upload: Notas Fiscais de Saída", ["jpg", "pdf"]) | |
| uploaded_file = st.file_uploader("Upload: Notas Fiscais de Retenção", ["jpg", "pdf"]) | |
| uploaded_file = st.file_uploader("Upload: Notas Fiscais de Serviços", ["jpg", "pdf"]) | |
| with col2: | |
| uploaded_file = st.file_uploader("Upload: Documentos Aluguel", ["jpg", "pdf"]) | |
| uploaded_file = st.file_uploader("Upload: Documentos Contábeis", ["jpg", "pdf"]) | |
| uploaded_file = st.file_uploader("Upload: Documentos Tributos", ["jpg", "pdf"]) | |
| uploaded_file = st.file_uploader("Upload: Documentos MEI", ["jpg", "pdf"]) | |
| uploaded_file = st.file_uploader("Upload: Extrato Bancário", ["jpg", "pdf"]) | |
| def plot_confianca(probabilities, model): | |
| # Desenhar o gráfico de confianças | |
| with st.spinner("Criando gráficos de confiança..."): | |
| df_predictions = pd.DataFrame( | |
| { | |
| "Tipo Documento": list(model.config.id2label.values()), | |
| "Confiança": probabilities, | |
| } | |
| ) | |
| fig = px.bar(df_predictions, x="Tipo Documento", y="Confiança") | |
| fig.update_layout({ | |
| 'plot_bgcolor': '#FFFFFF' | |
| }) | |
| fig.update_traces(marker_color='#fcaf17') | |
| st.plotly_chart(fig, use_container_width=True) | |
| # Processamento | |
| if uploaded_file is not None: | |
| c1, c2, c3 = st.columns([2.4,5,1]) | |
| try: | |
| # Tentar decodificar como PDF | |
| if os.path.exists("temp"): | |
| shutil.rmtree("temp", ignore_errors=True) | |
| os.mkdir("temp") | |
| doc = fitz.Document(stream=uploaded_file.getvalue()) | |
| pdfPath = Path("temp/temp.pdf") | |
| doc.save(pdfPath) | |
| # Imprimir a primeira página | |
| for page in doc: | |
| pix = page.get_pixmap() | |
| pix.save("temp/icon-page-1.jpg") | |
| c2.image("temp/icon-page-1.jpg", "Página do documento", width=300) | |
| break | |
| # Fazer a previsão | |
| with st.spinner("Fazendo previsão..."): | |
| predicted_class, probabilities = classifyPDF(pdfPath, model, processor, reader) | |
| print(probabilities) | |
| except fitz.fitz.FileDataError: | |
| # Carregar a imagem passada | |
| image_bytes = uploaded_file.getvalue() | |
| bytes_data = io.BytesIO(image_bytes) | |
| image = Image.open(bytes_data) | |
| # Mostrar a imagem | |
| c2.image(image, "Página do documento", width=300) | |
| # Fazer a previsão | |
| with st.spinner("Fazendo previsão..."): | |
| predicted_class, probabilities = predict( | |
| image, image_bytes, reader, processor, model | |
| ) | |
| finally: | |
| # Remover a pasta temporária se ainda existir | |
| if os.path.exists("temp"): | |
| shutil.rmtree("temp", ignore_errors=True) | |
| if os.path.exists("temp_classification"): | |
| shutil.rmtree("temp_classification", ignore_errors=True) | |
| # Imprimir o resultado na tela | |
| predicted_label = model.config.id2label[predicted_class] | |
| st.markdown(f"Tipo do documento previsto: **{predicted_label}**") | |
| plot_confianca(probabilities, model) | |