File size: 5,471 Bytes
e95a76a 1812ee2 eb1570b 1812ee2 eb1570b 7a4df85 1812ee2 eb1570b 70ec9ae 7a4df85 1812ee2 7025d17 1812ee2 e95a76a 1812ee2 e95a76a 1812ee2 e95a76a 1812ee2 d2bb581 1812ee2 5a1c206 5b17ef7 1812ee2 eb1570b 5a1c206 1812ee2 5a1c206 1812ee2 5b17ef7 1812ee2 5b17ef7 1812ee2 532afaf 1812ee2 532afaf | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 | import json
import os
import re
import gradio as gr
import unicodedata
import torch
import shutil
import tempfile
from transformers import AutoModelForCausalLM, AutoTokenizer
from langchain_community.document_loaders import PyMuPDFLoader
# Configurações do modelo
MODEL_PATH = "numind/NuExtract-1.5"
tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = AutoModelForCausalLM.from_pretrained(
MODEL_PATH,
torch_dtype=torch.float16 # Usa FP16 para reduzir o uso de VRAM
).to(device)
torch.cuda.empty_cache()
# Constantes
MAX_INPUT_SIZE = 4000
OVERLAP = 128
TEMPLATE_PATH = "template.json"
def load_template():
with open(TEMPLATE_PATH, "r", encoding="utf-8") as file:
return json.load(file)
def extract_text_from_pdf(pdf_file_path):
if not os.path.exists(pdf_file_path):
raise FileNotFoundError(f"Arquivo PDF não encontrado: {pdf_file_path}")
loader = PyMuPDFLoader(pdf_file_path)
data = loader.load()
return "\n".join([doc.page_content.strip() for doc in data])
def split_document(document, window_size=MAX_INPUT_SIZE, overlap=OVERLAP):
words = document.split()
chunks = []
if len(words) > window_size:
for i in range(0, len(words), window_size - overlap):
chunk = " ".join(words[i:i + window_size])
chunks.append(chunk)
if i + len(words[i:i + window_size]) >= len(words):
break
else:
chunks.append(document)
return chunks
def structure_text(text):
lines = text.split("\n")
structured_data = {"titulo": "", "autor": "", "conteudo": []}
current_section = None
for line in lines:
line = line.strip()
if not structured_data["titulo"] and len(line) > 5 and line.istitle():
structured_data["titulo"] = line
continue
if not structured_data["autor"] and re.search(r"\b[A-Z][a-z]+ [A-Z][a-z]+", line):
structured_data["autor"] = line
continue
if len(line) < 60 and line.isupper():
current_section = {"secao": line, "conteudo": []}
structured_data["conteudo"].append(current_section)
continue
if current_section:
current_section["conteudo"].append(line)
return structured_data
def generate_text(prompt):
inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=MAX_INPUT_SIZE).to(device)
with torch.no_grad():
output = model.generate(
**inputs,
max_new_tokens=512, # Limita o tamanho da resposta
pad_token_id=tokenizer.eos_token_id # Evita erros na geração
)
torch.cuda.empty_cache() # Libera VRAM após a geração
return tokenizer.decode(output[0], skip_special_tokens=True)
def process_chunk(text, template, current):
input_text = f"### Template:\n{template}\n### Current:\n{current}\n### Text:\n{text}\n"
output_text = generate_text(input_text)
try:
parsed_output = json.loads(output_text)
return json.dumps(parsed_output, indent=2, ensure_ascii=False)
except json.JSONDecodeError as e:
print(f"[Erro JSON] {e}: {output_text}") # Log do erro para depuração
return json.dumps({"erro": "Saída inválida do modelo", "output_bruto": output_text}, indent=2, ensure_ascii=False)
def handle_broken_outputs(pred, prev):
try:
if all([(v in ["", []]) for v in json.loads(pred).values()]):
pred = prev # Se a saída for vazia, mantém a anterior
except json.JSONDecodeError:
pred = prev # Se houver erro no JSON, mantém a saída anterior
return pred
def send_chunk_to_model(text, template, current):
"""Envia um chunk de texto para o modelo local e processa a saída."""
input_text = f"<|input|>\n### Template:\n{template}\n### Current:\n{current}\n### Text:\n{text}\n\n<|output|>" + "{"
output_text = process_chunk(input_text, template, current)
return handle_broken_outputs(output_text, current)
def process_and_generate(pdf_file):
if not pdf_file:
return "Nenhum arquivo enviado."
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file:
with open(pdf_file.name, "rb") as f:
tmp_file.write(f.read()) # Lê e grava corretamente
tmp_file.flush()
pdf_path = tmp_file.name
try:
extracted_text = extract_text_from_pdf(pdf_path)
if not extracted_text:
return "Falha ao extrair texto do PDF."
structured_data = structure_text(extracted_text)
template = json.dumps(load_template(), ensure_ascii=False)
current = json.dumps(structured_data, ensure_ascii=False)
chunks = split_document(extracted_text)
for chunk in chunks:
current = send_chunk_to_model(chunk, template, current)
return json.dumps(json.loads(current), indent=2, ensure_ascii=False)
except Exception as e:
return f"Erro durante o processamento: {e}"
finally:
os.remove(pdf_path) # Remove o arquivo temporário após o uso
interface = gr.Interface(
fn=process_and_generate,
inputs=gr.File(label="Upload PDF"),
outputs=gr.JSON(label="Dados Extraídos"),
title="Extração de Dados com Modelo Local",
description="Envie um PDF para extrair e processar informações automaticamente.",
)
interface.launch() |