Brenno's picture
test 1.5 full
970a034
import json
import os
import re
import torch
import fitz
import gradio as gr
from langchain_community.document_loaders import PyMuPDFLoader
from transformers import AutoModelForCausalLM, AutoTokenizer
MODEL_NAME = "numind/NuExtract-1.5"
DEVICE = "cpu"
# MODEL = AutoModelForCausalLM.from_pretrained(MODEL_NAME,
# torch_dtype=torch.bfloat16,
# trust_remote_code=False,
# device_map="auto")
MODEL = AutoModelForCausalLM.from_pretrained(MODEL_NAME,
torch_dtype=torch.bfloat16,
trust_remote_code=False)
TOKENIZER = AutoTokenizer.from_pretrained(MODEL_NAME, trust_remote_code=False)
MODEL.eval()
MAX_INPUT_SIZE = 20_000
MAX_NEW_TOKENS = 6000
TEMPLATE_PATH = "template.json"
def load_template():
with open(TEMPLATE_PATH, "r", encoding="utf-8") as file:
return json.load(file)
def extract_text_pymupdf(pdf_path):
doc = fitz.open(pdf_path)
text = "\n".join(page.get_text() for page in doc)
return text
def extract_text_from_pdf(pdf_file_path):
if not os.path.exists(pdf_file_path):
raise FileNotFoundError(f"Arquivo PDF não encontrado: {pdf_file_path}")
loader = PyMuPDFLoader(pdf_file_path)
data = loader.load()
return "\n".join([doc.page_content.strip() for doc in data])
def clean_text(text):
# Remove excessive newlines and multiple spaces
text = re.sub(r'\n+', '\n', text) # Replace multiple newlines with a single newline
text = re.sub(r'●', '', text) # Remove bullet points
text = re.sub(r'\s+', ' ', text).strip() # Normalize spaces
# Remove duplicate content
seen = set()
cleaned_lines = []
for line in text.split('. '): # Split by full stops to catch duplicated sentences
if line.strip() not in seen:
seen.add(line.strip())
cleaned_lines.append(line.strip())
return '. '.join(cleaned_lines)
def split_document(document, window_size, overlap):
tokens = TOKENIZER.tokenize(document)
print(f"\tLength of document: {len(tokens)} tokens")
chunks = []
if len(tokens) > window_size:
for i in range(0, len(tokens), window_size-overlap):
print(f"\t{i} to {i + len(tokens[i:i + window_size])}")
chunk = TOKENIZER.convert_tokens_to_string(tokens[i:i + window_size])
chunks.append(chunk)
if i + len(tokens[i:i + window_size]) >= len(tokens):
break
else:
chunks.append(document)
print(f"\tSplit into {len(chunks)} chunks")
return chunks
def process_and_generate(pdf_file):
pdf_path = pdf_file.name
extracted_text = extract_text_pymupdf(pdf_path)
#extracted_text = extract_text_from_pdf(pdf_path)
if not extracted_text:
return "Falha ao extrair texto do PDF."
template = json.dumps(load_template(), ensure_ascii=False)
current = json.dumps(extracted_text, ensure_ascii=False)
current = """
Estruturas Condicionais
Estruturas Condicionais ou de Seleção
Estruturas condicionais são utilizadas quando queremos que os nossos algoritmos executem
ou não executem um trecho de código (uma linha ou várias linhas).
Imagine que você juntou dinheiro durante todo o ano para comprar um novo videogame na black friday deste ano.
Porém, você só o comprará se ele estiver custando R$ 1.200,00. Se ele for anunciado mais barato, exemplo, R$ 1.000,00 você comprará?
E se ele for anunciado por R$ 2.000,00, você comprará? Se não for comprar, fará o quê? Expressões Relacionais
Para que seja possível realizarmos uma condição, precisamos comparar dois ou mais elementos.
Os operadores relacionais, como o próprio nome diz, são utilizados para
relacionar o conteúdo de duas variáveis ou dois valores fixos nos nossos algoritmos.
Expressões Relacionais
Toda expressão relacional terá como resultado os valores verdadeiro ou falso.
O valor do resultado é chamado de resultado lógico. Sintaxe estrutura condicionalCondicional SE()
Comando SE chamada de simples pois o comando só se preocupa quando o resultado da condição for verdadeiro.
Se for, o compilador executará o trecho de código escrito dentro do par de chaves do comando SE , caso contrário, nada será feito.
Atividade Faça um algoritmo que leia a idade da pessoa e verifique se a pessoa é maior de idade.
Caso a pessoa seja maior de idade apresentar a mensagem “Você é maior de idade”.
Se a pessoa não for maior de idade não exibir nenhuma mensagem.
Condicional composta: SE SENAO Condicional composta: SE SENAO Condicional composta: SE SENAOCondicional composta: SE SENAO
Condicional Múltipla: SE SENAO SE Condicional Múltipla: SE SENAO SE Condicional Múltipla: SE SENAO SE Condicional Múltipla: SE SENAO SE"
"""
print(template)
print(current)
pred_template = sliding_window_prediction(current, template, MODEL, TOKENIZER)
return pred_template
def predict_chunk(text, template, current, model, tokenizer):
current = clean_json_text(current)
input_llm = f"<|input|>\n### Template:\n{template}\n### Current:\n{current}\n### Text:\n{text}\n\n<|output|>" + "{"
input_ids = tokenizer(input_llm, return_tensors="pt", truncation=True, max_length=MAX_INPUT_SIZE).to(DEVICE)
output = tokenizer.decode(model.generate(**input_ids, max_new_tokens=MAX_NEW_TOKENS, do_sample=False)[0], skip_special_tokens=True)
clear_json = clean_json_text(output.split("<|output|>")[1])
return clear_json
def handle_broken_output(pred, prev):
try:
if all([(v in ["", []]) for v in json.loads(pred).values()]):
# if empty json, return previous
pred = prev
except:
# if broken json, return previous
pred = prev
return pred
def clean_json_text(text):
text = text.strip()
text = text.replace(r"\#", "#").replace(r"\&", "&")
clean_value = clean_text(text)
return clean_value
def sliding_window_prediction(text, template, model, tokenizer, window_size=4000, overlap=128):
# split text into chunks of n tokens
tokens = tokenizer.tokenize(text)
chunks = split_document(text, window_size, overlap)
# iterate over text chunks
prev = template
for i, chunk in enumerate(chunks):
print(f"Processing chunk {i}...")
pred = predict_chunk(chunk, template, prev, model, tokenizer)
# handle broken output
pred = handle_broken_output(pred, prev)
# iterate
prev = pred
return pred
interface = gr.Interface(
fn=process_and_generate,
inputs=gr.File(label="Upload PDF"),
outputs="text",
title="Gerador de Perguntas a partir de PDFs",
description="Extrai informações do PDF, preenche um modelo JSON e gera perguntas sobre o conteúdo usando Mistral."
)
interface.launch(debug=True, share=True)