|
|
import json |
|
|
import os |
|
|
import re |
|
|
import gradio as gr |
|
|
import unicodedata |
|
|
import torch |
|
|
import shutil |
|
|
import tempfile |
|
|
|
|
|
|
|
|
from transformers import AutoModelForCausalLM, AutoTokenizer |
|
|
from langchain_community.document_loaders import PyMuPDFLoader |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
MODEL_PATH = "numind/NuExtract-1.5" |
|
|
tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH) |
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
model = AutoModelForCausalLM.from_pretrained( |
|
|
MODEL_PATH, |
|
|
torch_dtype=torch.float16 |
|
|
).to(device) |
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
|
|
|
MAX_INPUT_SIZE = 4000 |
|
|
OVERLAP = 128 |
|
|
TEMPLATE_PATH = "template.json" |
|
|
|
|
|
def load_template(): |
|
|
with open(TEMPLATE_PATH, "r", encoding="utf-8") as file: |
|
|
return json.load(file) |
|
|
|
|
|
def extract_text_from_pdf(pdf_file_path): |
|
|
if not os.path.exists(pdf_file_path): |
|
|
raise FileNotFoundError(f"Arquivo PDF não encontrado: {pdf_file_path}") |
|
|
loader = PyMuPDFLoader(pdf_file_path) |
|
|
data = loader.load() |
|
|
return "\n".join([doc.page_content.strip() for doc in data]) |
|
|
|
|
|
def split_document(document, window_size=MAX_INPUT_SIZE, overlap=OVERLAP): |
|
|
words = document.split() |
|
|
chunks = [] |
|
|
if len(words) > window_size: |
|
|
for i in range(0, len(words), window_size - overlap): |
|
|
chunk = " ".join(words[i:i + window_size]) |
|
|
chunks.append(chunk) |
|
|
if i + len(words[i:i + window_size]) >= len(words): |
|
|
break |
|
|
else: |
|
|
chunks.append(document) |
|
|
return chunks |
|
|
|
|
|
def structure_text(text): |
|
|
lines = text.split("\n") |
|
|
structured_data = {"titulo": "", "autor": "", "conteudo": []} |
|
|
current_section = None |
|
|
for line in lines: |
|
|
line = line.strip() |
|
|
if not structured_data["titulo"] and len(line) > 5 and line.istitle(): |
|
|
structured_data["titulo"] = line |
|
|
continue |
|
|
if not structured_data["autor"] and re.search(r"\b[A-Z][a-z]+ [A-Z][a-z]+", line): |
|
|
structured_data["autor"] = line |
|
|
continue |
|
|
if len(line) < 60 and line.isupper(): |
|
|
current_section = {"secao": line, "conteudo": []} |
|
|
structured_data["conteudo"].append(current_section) |
|
|
continue |
|
|
if current_section: |
|
|
current_section["conteudo"].append(line) |
|
|
return structured_data |
|
|
|
|
|
def generate_text(prompt): |
|
|
inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=MAX_INPUT_SIZE).to(device) |
|
|
|
|
|
with torch.no_grad(): |
|
|
output = model.generate( |
|
|
**inputs, |
|
|
max_new_tokens=512, |
|
|
pad_token_id=tokenizer.eos_token_id |
|
|
) |
|
|
|
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
return tokenizer.decode(output[0], skip_special_tokens=True) |
|
|
|
|
|
|
|
|
|
|
|
def process_chunk(text, template, current): |
|
|
input_text = f"### Template:\n{template}\n### Current:\n{current}\n### Text:\n{text}\n" |
|
|
output_text = generate_text(input_text) |
|
|
|
|
|
try: |
|
|
parsed_output = json.loads(output_text) |
|
|
return json.dumps(parsed_output, indent=2, ensure_ascii=False) |
|
|
except json.JSONDecodeError as e: |
|
|
print(f"[Erro JSON] {e}: {output_text}") |
|
|
return json.dumps({"erro": "Saída inválida do modelo", "output_bruto": output_text}, indent=2, ensure_ascii=False) |
|
|
|
|
|
def handle_broken_outputs(pred, prev): |
|
|
try: |
|
|
if all([(v in ["", []]) for v in json.loads(pred).values()]): |
|
|
pred = prev |
|
|
except json.JSONDecodeError: |
|
|
pred = prev |
|
|
return pred |
|
|
|
|
|
|
|
|
def send_chunk_to_model(text, template, current): |
|
|
"""Envia um chunk de texto para o modelo local e processa a saída.""" |
|
|
input_text = f"<|input|>\n### Template:\n{template}\n### Current:\n{current}\n### Text:\n{text}\n\n<|output|>" + "{" |
|
|
output_text = process_chunk(input_text, template, current) |
|
|
|
|
|
return handle_broken_outputs(output_text, current) |
|
|
|
|
|
|
|
|
def process_and_generate(pdf_file): |
|
|
if not pdf_file: |
|
|
return "Nenhum arquivo enviado." |
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file: |
|
|
with open(pdf_file.name, "rb") as f: |
|
|
tmp_file.write(f.read()) |
|
|
tmp_file.flush() |
|
|
pdf_path = tmp_file.name |
|
|
|
|
|
try: |
|
|
extracted_text = extract_text_from_pdf(pdf_path) |
|
|
if not extracted_text: |
|
|
return "Falha ao extrair texto do PDF." |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
structured_data = structure_text(extracted_text) |
|
|
template = json.dumps(load_template(), ensure_ascii=False) |
|
|
current = json.dumps(structured_data, ensure_ascii=False) |
|
|
chunks = split_document(extracted_text) |
|
|
|
|
|
for chunk in chunks: |
|
|
current = send_chunk_to_model(chunk, template, current) |
|
|
|
|
|
|
|
|
|
|
|
return json.dumps(json.loads(current), indent=2, ensure_ascii=False) |
|
|
except Exception as e: |
|
|
return f"Erro durante o processamento: {e}" |
|
|
finally: |
|
|
os.remove(pdf_path) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
interface = gr.Interface( |
|
|
fn=process_and_generate, |
|
|
inputs=gr.File(label="Upload PDF"), |
|
|
outputs=gr.JSON(label="Dados Extraídos"), |
|
|
title="Extração de Dados com Modelo Local", |
|
|
description="Envie um PDF para extrair e processar informações automaticamente.", |
|
|
) |
|
|
|
|
|
interface.launch() |