import PyPDF2 import chainlit as cl from io import BytesIO import re import os from pdf2image import convert_from_path from PIL import Image import pytesseract from fpdf import FPDF def save_pdf_file(file_name, page): pdf_writer = PyPDF2.PdfWriter() pdf_writer.add_page(page) #return pdf stream with open(f"files/{file_name}", "wb") as out: pdf_writer.write(out) pdf_writer.close() def process_estafeta_pdf(file_name, page): x = page['/MediaBox'][2] y = page['/MediaBox'][3] if x > y: page.mediabox.lower_left = (40, 60) page.mediabox.upper_right = (340,520) else: page.mediabox.lower_left = (30,40) page.mediabox.upper_right = (490,340) save_pdf_file(file_name, page) def process_estafeta_text(pdf_text): extracted_text = "" match = re.search(r'CONFIRMACION (\d+-\d+\w+)', pdf_text) if match: extracted_text = match.group(1) extracted_text = extracted_text[0:23].replace("-", "") else: raise Exception("Pattern not found in the text.") file_name = extracted_text + ".pdf" file_path = f"files/{file_name}" return file_name, file_path def process_dhl_pdf(file_name, page): page.mediabox.lower_left = (92,20) page.mediabox.upper_right = (360,560) save_pdf_file(file_name, page) def process_dhl_text(pdf_text): matches = re.findall(r'WAYBILL (\d+(?: \d+)*)', pdf_text) for match in matches: extracted_text = match.replace(" ", "") file_name = extracted_text + ".pdf" file_path = f"files/{file_name}" return file_name, file_path def process_ups_pdf(page): page.rotate(90) def process_pdf_from_image(file_name, page): page.rotate(90) save_pdf_file(file_name, page) file_path = f"files/{file_name}" images = convert_from_path(file_path) image = images[0] image.save(f"{file_path}.png", "PNG") # open the image and extract the text loaded_image = Image.open(f"{file_path}.png") extracted_text = pytesseract.image_to_string(loaded_image) image.close() os.remove(f"{file_path}.png") if re.search("Fed2x", extracted_text, re.IGNORECASE): file_name, file_path = process_fedex_text(extracted_text) process_fedex_pdf(file_name, page) else: image_path = f"{file_path}-resized.pdf" loaded_image.resize((400, 500)).save(image_path, "PDF") loaded_image.close() match = re.search(r'TRACKING #:\s+([A-Z\d\s]+)', extracted_text) file_path = image_path if match: extracted_text = match.group(1) extracted_text = extracted_text.replace(" ", "").replace("BILLING", "").replace("\n", "") file_name = extracted_text + ".pdf" else: print("Pattern not found in the text.") return file_name, file_path def process_coppel_pdf(file_name, page): page.mediabox.lower_left = (0,150) page.mediabox.upper_right = (290,520) save_pdf_file(file_name, page) def process_coppel_text(pdf_text): match = re.search(r'TN: (\w+)', pdf_text) if match: extracted_text = match.group(1) else: print("Pattern not found in the text.") file_name = extracted_text + ".pdf" file_path = f"files/{file_name}" return file_name, file_path def process_fedex_pdf(file_name, page): page.rotate(-90) page.mediabox.lower_left = (0, 0) page.mediabox.upper_right = (500,650) save_pdf_file(file_name, page) def process_fedex_text(pdf_text): return "fedex.pdf", "files/fedex.pdf" def process_pdf_file(file): """ This function processes the PDF file and returns the file name, file path and transport company Parameters: file (File): The PDF file to process Returns: file_name (str): The name of the file file_path (str): The path of the file transport_company (str): The transport company """ with open(file.path, "rb") as pdf_stream: pdf = PyPDF2.PdfReader(pdf_stream) page = pdf.pages[0] pdf_text = page.extract_text() transport_company = None file_name = file.name if re.search("estafeta", pdf_text, re.IGNORECASE): transport_company = "estafeta" file_name, file_path = process_estafeta_text(pdf_text) process_estafeta_pdf(file_name, page) elif re.search("dhl", pdf_text, re.IGNORECASE): transport_company = "dhl" file_name, file_path = process_dhl_text(pdf_text) process_dhl_pdf(file_name, page) elif re.search("coppel", pdf_text, re.IGNORECASE): transport_company = "coppel" file_name, file_path = process_coppel_text(pdf_text) process_coppel_pdf(file_name, page) else: transport_company = "ups" file_name, file_path = process_pdf_from_image(file_name, page) pdf_stream.close() return file_name, file_path, transport_company def gif2pdf(file): # Open the GIF image gif = Image.open(file.path) # Convert GIF to RGB format (because PDFs do not support palettes) rgb_images = [] for frame in range(gif.n_frames): gif.seek(frame) rgb_image = gif.convert("RGB") rgb_images.append(rgb_image) # Create a PDF instance pdf = FPDF(unit="pt") # Loop through RGB images and add them as pages to the PDF for img in rgb_images: width, height = img.size # Get the size of the image in pixels # Convert the size from pixels to points (assuming 72 DPI for the images) pdf.add_page(format=(width, height)) # Create a temporary image file img.save("temp_frame.jpg") # Add the image to the PDF (x, y, w, h are specified in points) pdf.image("temp_frame.jpg", x=0, y=0, w=width, h=height) # Save the resulting PDF file_name = file.name.split('.')[0] output_pdf_path = f".files/{file_name}.pdf" pdf.output(output_pdf_path) # Rotate the PDF pages by 90 degrees with open(output_pdf_path, "rb") as pdf_file: reader = PyPDF2.PdfReader(pdf_file) writer = PyPDF2.PdfWriter() for page_num in range(len(reader.pages)): page = reader.pages[page_num] page.rotate(90) writer.add_page(page) rotated_pdf_path = f".files/{file_name}_rotated.pdf" with open(rotated_pdf_path, "wb") as rotated_pdf_file: writer.write(rotated_pdf_file) os.remove(output_pdf_path) return file_name, rotated_pdf_path def remove_all_files_in_folder(folder_path): # Loop through all files in the specified folder for filename in os.listdir(folder_path): file_path = os.path.join(folder_path, filename) # Check if it's a file (not a directory) before removing if os.path.isfile(file_path): os.remove(file_path) async def process_chat(): await cl.Message(content="Hola! Sube las guías en formato PDF.\nPuedes subir hasta 20 guías a la vez").send() files = None # Wait for the user to upload a PDF file while files is None: files = await cl.AskFileMessage( content="Sube las guías aquí!", accept=["application/pdf", "image/gif"], max_size_mb=5, max_files=20, timeout=30, ).send() try: element_types = { 'application/pdf': 0, 'image/gif': 0 } for item in files: if item.type in element_types: element_types[item.type] += 1 if element_types["application/pdf"] > 0 and element_types["image/gif"] > 0: msg = cl.Message(content="Solo ingresa PDFs o Gifs") await msg.send() return elements = [] content = "" for file in files: if file.type == 'image/gif': file_name, file_path = gif2pdf(file) elements.append(cl.File(name=file_name, display="inline", path=file_path)) content = "Archivos convertidos en PDF" else: file_name, file_path, transport_company = process_pdf_file(file) elements.append(cl.File(name=file_name, display="inline", path=file_path)) file_name = file_name.replace(".pdf", "") content = f"Guía de {transport_company.upper()}: **{file_name}**" msg = cl.Message(content=content, elements=elements) await msg.send() remove_all_files_in_folder(".files") remove_all_files_in_folder("files") except Exception as e: msg = cl.Message(content=f"Error: {e}") await msg.send() @cl.on_chat_start async def start_chat(): await process_chat() @cl.on_message async def main(message: str): await process_chat()