|
|
import gradio as gr |
|
|
import fitz |
|
|
import re |
|
|
import os |
|
|
import tempfile |
|
|
from typing import List, Dict, Any |
|
|
import json |
|
|
from openai import OpenAI |
|
|
import logging |
|
|
from datetime import datetime |
|
|
import time |
|
|
from dotenv import load_dotenv |
|
|
from queue import Queue |
|
|
from threading import Thread |
|
|
|
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
|
level=logging.INFO, |
|
|
format='%(asctime)s - %(levelname)s - %(message)s', |
|
|
handlers=[ |
|
|
logging.FileHandler('pdf_processor.log'), |
|
|
logging.StreamHandler() |
|
|
] |
|
|
) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class PDFExtractor: |
|
|
"""Classe responsável pela extração de texto do PDF.""" |
|
|
|
|
|
@staticmethod |
|
|
def extract_text_from_pdf(pdf_path: str) -> str: |
|
|
"""Extrai todo o texto legível de um arquivo PDF.""" |
|
|
try: |
|
|
doc = fitz.open(pdf_path) |
|
|
full_text = [] |
|
|
|
|
|
for page_num in range(len(doc)): |
|
|
page = doc[page_num] |
|
|
|
|
|
text = page.get_text("text") |
|
|
full_text.append(text) |
|
|
|
|
|
doc.close() |
|
|
return "\n".join(full_text) |
|
|
except Exception as e: |
|
|
logger.error(f"Erro na extração do PDF: {e}") |
|
|
raise |
|
|
|
|
|
class BillDataExtractor: |
|
|
"""Classe responsável pela extração inicial de dados usando regex.""" |
|
|
|
|
|
@staticmethod |
|
|
def extract_initial_data(text: str) -> Dict[str, Any]: |
|
|
"""Extrai dados iniciais usando expressões regulares.""" |
|
|
patterns = { |
|
|
'unidade_consumidora': r'Unidade Consumidora:?\s*(\d+)', |
|
|
'mes_referencia': r'Referente a:?\s*([A-Za-z]+/\d{4})', |
|
|
'medidor': r'Medidor:?\s*(\d+)', |
|
|
'energia_injetada': r'Energia Injetada:?\s*(\d+(?:[.,]\d+)?)', |
|
|
'valores': r'R\$\s*(\d+(?:[.,]\d+)?)', |
|
|
'quantidades': r'(\d+(?:[.,]\d+)?)\s*kWh', |
|
|
'geradores': r'Geradores:?\s*([\s\S]+?)(?=Unidade Consumidora|$)', |
|
|
} |
|
|
|
|
|
extracted_data = {} |
|
|
for key, pattern in patterns.items(): |
|
|
matches = re.findall(pattern, text) |
|
|
extracted_data[key] = matches if matches else [] |
|
|
|
|
|
return extracted_data |
|
|
|
|
|
class AIProcessor: |
|
|
"""Classe responsável pelo processamento de dados usando IA.""" |
|
|
|
|
|
def __init__(self): |
|
|
self.client = OpenAI(api_key=os.getenv('OPENAI_API_KEY')) |
|
|
self.rate_limit_delay = 1 |
|
|
|
|
|
def process_with_ai(self, text: str, initial_data: Dict[str, Any]) -> List[Dict[str, Any]]: |
|
|
"""Processa o texto usando IA para extrair informações estruturadas.""" |
|
|
try: |
|
|
prompt = f"""Analise esta fatura de energia e retorne um JSON com os itens encontrados. |
|
|
Texto da fatura: {text} |
|
|
|
|
|
Dados pré-extraídos: {json.dumps(initial_data)} |
|
|
|
|
|
Retorne apenas o JSON no seguinte formato: |
|
|
{{ |
|
|
"items": [ |
|
|
{{ |
|
|
"unidade_consumidora": "string", |
|
|
"mes_referencia": "string", |
|
|
"item_fatura": "string", |
|
|
"quantidade": number, |
|
|
"valor": number, |
|
|
"energia_injetada": number, |
|
|
"medidor": "string", |
|
|
"unidade": "string", |
|
|
"tarifa_unitaria": number |
|
|
}} |
|
|
] |
|
|
}}""" |
|
|
|
|
|
response = self.client.chat.completions.create( |
|
|
model="gpt-4o-mini", |
|
|
messages=[ |
|
|
{"role": "system", "content": "Você é um especialista em análise de faturas de energia. Extraia todos os Itens da fatura de energia. Use o seguinte formato: {\"items\": [...]}"}, |
|
|
{"role": "user", "content": prompt} |
|
|
], |
|
|
max_tokens=4000, |
|
|
temperature=0.1 |
|
|
) |
|
|
|
|
|
content = response.choices[0].message.content.strip() |
|
|
json_match = re.search(r'\{[\s\S]*\}', content) |
|
|
|
|
|
if json_match: |
|
|
return json.loads(json_match.group(0))['items'] |
|
|
return [] |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Erro no processamento com IA: {e}") |
|
|
raise |
|
|
|
|
|
def process_pdf(pdf_file) -> tuple: |
|
|
"""Função principal de processamento do PDF.""" |
|
|
try: |
|
|
if pdf_file is None: |
|
|
return "Nenhum arquivo foi enviado.", None, None |
|
|
|
|
|
|
|
|
pdf_path = pdf_file.name |
|
|
|
|
|
|
|
|
pdf_extractor = PDFExtractor() |
|
|
extracted_text = pdf_extractor.extract_text_from_pdf(pdf_path) |
|
|
|
|
|
|
|
|
bill_extractor = BillDataExtractor() |
|
|
initial_data = bill_extractor.extract_initial_data(extracted_text) |
|
|
|
|
|
|
|
|
ai_processor = AIProcessor() |
|
|
processed_items = ai_processor.process_with_ai(extracted_text, initial_data) |
|
|
|
|
|
|
|
|
json_str = json.dumps(processed_items, indent=2, ensure_ascii=False) |
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.json', mode='w', encoding='utf-8') as json_file: |
|
|
json.dump(processed_items, json_file, indent=2, ensure_ascii=False) |
|
|
json_path = json_file.name |
|
|
|
|
|
return ( |
|
|
f"Processamento concluído! {len(processed_items)} itens encontrados.", |
|
|
processed_items, |
|
|
json_path |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Erro durante o processamento: {e}") |
|
|
return ( |
|
|
f"Erro durante o processamento: {str(e)}", |
|
|
None, |
|
|
None |
|
|
) |
|
|
|
|
|
def process_queue(queue: Queue, status_output, json_output, download_output): |
|
|
results = [] |
|
|
while not queue.empty(): |
|
|
pdf_file = queue.get() |
|
|
status, json_data, json_path = process_pdf(pdf_file) |
|
|
results.append((status, json_data, json_path)) |
|
|
queue.task_done() |
|
|
|
|
|
|
|
|
status_output.value = "\n".join([result[0] for result in results]) |
|
|
json_output.value = [result[1] for result in results] |
|
|
download_output.value = [result[2] for result in results] |
|
|
|
|
|
def create_interface(): |
|
|
with gr.Blocks(title="Processador de Faturas de Energia") as interface: |
|
|
gr.Markdown("# Processador de Faturas de Energia") |
|
|
gr.Markdown("Faça upload de arquivos PDF de fatura de energia para processamento.") |
|
|
|
|
|
with gr.Row(): |
|
|
file_input = gr.File( |
|
|
label="Upload dos PDFs", |
|
|
file_types=[".pdf"], |
|
|
file_count="multiple", |
|
|
type="filepath" |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
process_button = gr.Button("Processar Faturas") |
|
|
|
|
|
with gr.Row(): |
|
|
status_output = gr.Textbox(label="Status") |
|
|
|
|
|
with gr.Row(): |
|
|
json_output = gr.JSON(label="Resultados") |
|
|
|
|
|
with gr.Row(): |
|
|
download_output = gr.File(label="Download JSON") |
|
|
|
|
|
def process_files(files): |
|
|
queue = Queue() |
|
|
for file in files: |
|
|
queue.put(file) |
|
|
|
|
|
status_output.value = "" |
|
|
json_output.value = [] |
|
|
download_output.value = [] |
|
|
|
|
|
thread = Thread(target=process_queue, args=(queue, status_output, json_output, download_output)) |
|
|
thread.start() |
|
|
thread.join() |
|
|
|
|
|
return status_output.value, json_output.value, download_output.value |
|
|
|
|
|
process_button.click( |
|
|
fn=process_files, |
|
|
inputs=[file_input], |
|
|
outputs=[status_output, json_output, download_output] |
|
|
) |
|
|
|
|
|
return interface |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
if not os.getenv('OPENAI_API_KEY'): |
|
|
print("Error: OPENAI_API_KEY not found in environment variables") |
|
|
print("Please set your OpenAI API key in the .env file") |
|
|
exit(1) |
|
|
|
|
|
print("Starting Gradio interface...") |
|
|
interface = create_interface() |
|
|
interface.launch(share=False) |
|
|
|