Spaces:
Sleeping
Sleeping
| import json | |
| import os | |
| import re | |
| import torch | |
| import fitz | |
| import gradio as gr | |
| from langchain_community.document_loaders import PyMuPDFLoader | |
| from transformers import AutoModelForCausalLM, AutoTokenizer | |
| MODEL_NAME = "numind/NuExtract-1.5" | |
| DEVICE = "cpu" | |
| # MODEL = AutoModelForCausalLM.from_pretrained(MODEL_NAME, | |
| # torch_dtype=torch.bfloat16, | |
| # trust_remote_code=False, | |
| # device_map="auto") | |
| MODEL = AutoModelForCausalLM.from_pretrained(MODEL_NAME, | |
| torch_dtype=torch.bfloat16, | |
| trust_remote_code=False) | |
| TOKENIZER = AutoTokenizer.from_pretrained(MODEL_NAME, trust_remote_code=False) | |
| MODEL.eval() | |
| MAX_INPUT_SIZE = 20_000 | |
| MAX_NEW_TOKENS = 6000 | |
| TEMPLATE_PATH = "template.json" | |
| def load_template(): | |
| with open(TEMPLATE_PATH, "r", encoding="utf-8") as file: | |
| return json.load(file) | |
| def extract_text_pymupdf(pdf_path): | |
| doc = fitz.open(pdf_path) | |
| text = "\n".join(page.get_text() for page in doc) | |
| return text | |
| def extract_text_from_pdf(pdf_file_path): | |
| if not os.path.exists(pdf_file_path): | |
| raise FileNotFoundError(f"Arquivo PDF não encontrado: {pdf_file_path}") | |
| loader = PyMuPDFLoader(pdf_file_path) | |
| data = loader.load() | |
| return "\n".join([doc.page_content.strip() for doc in data]) | |
| def clean_text(text): | |
| # Remove excessive newlines and multiple spaces | |
| text = re.sub(r'\n+', '\n', text) # Replace multiple newlines with a single newline | |
| text = re.sub(r'●', '', text) # Remove bullet points | |
| text = re.sub(r'\s+', ' ', text).strip() # Normalize spaces | |
| # Remove duplicate content | |
| seen = set() | |
| cleaned_lines = [] | |
| for line in text.split('. '): # Split by full stops to catch duplicated sentences | |
| if line.strip() not in seen: | |
| seen.add(line.strip()) | |
| cleaned_lines.append(line.strip()) | |
| return '. '.join(cleaned_lines) | |
| def split_document(document, window_size, overlap): | |
| tokens = TOKENIZER.tokenize(document) | |
| print(f"\tLength of document: {len(tokens)} tokens") | |
| chunks = [] | |
| if len(tokens) > window_size: | |
| for i in range(0, len(tokens), window_size-overlap): | |
| print(f"\t{i} to {i + len(tokens[i:i + window_size])}") | |
| chunk = TOKENIZER.convert_tokens_to_string(tokens[i:i + window_size]) | |
| chunks.append(chunk) | |
| if i + len(tokens[i:i + window_size]) >= len(tokens): | |
| break | |
| else: | |
| chunks.append(document) | |
| print(f"\tSplit into {len(chunks)} chunks") | |
| return chunks | |
| def process_and_generate(pdf_file): | |
| pdf_path = pdf_file.name | |
| extracted_text = extract_text_pymupdf(pdf_path) | |
| #extracted_text = extract_text_from_pdf(pdf_path) | |
| if not extracted_text: | |
| return "Falha ao extrair texto do PDF." | |
| template = json.dumps(load_template(), ensure_ascii=False) | |
| current = json.dumps(extracted_text, ensure_ascii=False) | |
| current = """ | |
| Estruturas Condicionais | |
| Estruturas Condicionais ou de Seleção | |
| Estruturas condicionais são utilizadas quando queremos que os nossos algoritmos executem | |
| ou não executem um trecho de código (uma linha ou várias linhas). | |
| Imagine que você juntou dinheiro durante todo o ano para comprar um novo videogame na black friday deste ano. | |
| Porém, você só o comprará se ele estiver custando R$ 1.200,00. Se ele for anunciado mais barato, exemplo, R$ 1.000,00 você comprará? | |
| E se ele for anunciado por R$ 2.000,00, você comprará? Se não for comprar, fará o quê? Expressões Relacionais | |
| Para que seja possível realizarmos uma condição, precisamos comparar dois ou mais elementos. | |
| Os operadores relacionais, como o próprio nome diz, são utilizados para | |
| relacionar o conteúdo de duas variáveis ou dois valores fixos nos nossos algoritmos. | |
| Expressões Relacionais | |
| Toda expressão relacional terá como resultado os valores verdadeiro ou falso. | |
| O valor do resultado é chamado de resultado lógico. Sintaxe estrutura condicionalCondicional SE() | |
| Comando SE chamada de simples pois o comando só se preocupa quando o resultado da condição for verdadeiro. | |
| Se for, o compilador executará o trecho de código escrito dentro do par de chaves do comando SE , caso contrário, nada será feito. | |
| Atividade Faça um algoritmo que leia a idade da pessoa e verifique se a pessoa é maior de idade. | |
| Caso a pessoa seja maior de idade apresentar a mensagem “Você é maior de idade”. | |
| Se a pessoa não for maior de idade não exibir nenhuma mensagem. | |
| Condicional composta: SE SENAO Condicional composta: SE SENAO Condicional composta: SE SENAOCondicional composta: SE SENAO | |
| Condicional Múltipla: SE SENAO SE Condicional Múltipla: SE SENAO SE Condicional Múltipla: SE SENAO SE Condicional Múltipla: SE SENAO SE" | |
| """ | |
| print(template) | |
| print(current) | |
| pred_template = sliding_window_prediction(current, template, MODEL, TOKENIZER) | |
| return pred_template | |
| def predict_chunk(text, template, current, model, tokenizer): | |
| current = clean_json_text(current) | |
| input_llm = f"<|input|>\n### Template:\n{template}\n### Current:\n{current}\n### Text:\n{text}\n\n<|output|>" + "{" | |
| input_ids = tokenizer(input_llm, return_tensors="pt", truncation=True, max_length=MAX_INPUT_SIZE).to(DEVICE) | |
| output = tokenizer.decode(model.generate(**input_ids, max_new_tokens=MAX_NEW_TOKENS, do_sample=False)[0], skip_special_tokens=True) | |
| clear_json = clean_json_text(output.split("<|output|>")[1]) | |
| return clear_json | |
| def handle_broken_output(pred, prev): | |
| try: | |
| if all([(v in ["", []]) for v in json.loads(pred).values()]): | |
| # if empty json, return previous | |
| pred = prev | |
| except: | |
| # if broken json, return previous | |
| pred = prev | |
| return pred | |
| def clean_json_text(text): | |
| text = text.strip() | |
| text = text.replace(r"\#", "#").replace(r"\&", "&") | |
| clean_value = clean_text(text) | |
| return clean_value | |
| def sliding_window_prediction(text, template, model, tokenizer, window_size=4000, overlap=128): | |
| # split text into chunks of n tokens | |
| tokens = tokenizer.tokenize(text) | |
| chunks = split_document(text, window_size, overlap) | |
| # iterate over text chunks | |
| prev = template | |
| for i, chunk in enumerate(chunks): | |
| print(f"Processing chunk {i}...") | |
| pred = predict_chunk(chunk, template, prev, model, tokenizer) | |
| # handle broken output | |
| pred = handle_broken_output(pred, prev) | |
| # iterate | |
| prev = pred | |
| return pred | |
| interface = gr.Interface( | |
| fn=process_and_generate, | |
| inputs=gr.File(label="Upload PDF"), | |
| outputs="text", | |
| title="Gerador de Perguntas a partir de PDFs", | |
| description="Extrai informações do PDF, preenche um modelo JSON e gera perguntas sobre o conteúdo usando Mistral." | |
| ) | |
| interface.launch(debug=True, share=True) | |