Spaces:
Sleeping
Sleeping
| import PyPDF2 | |
| import chainlit as cl | |
| from io import BytesIO | |
| import re | |
| import os | |
| from pdf2image import convert_from_path | |
| from PIL import Image | |
| import pytesseract | |
| from fpdf import FPDF | |
| def save_pdf_file(file_name, page): | |
| pdf_writer = PyPDF2.PdfWriter() | |
| pdf_writer.add_page(page) | |
| #return pdf stream | |
| with open(f"files/{file_name}", "wb") as out: | |
| pdf_writer.write(out) | |
| pdf_writer.close() | |
| def process_estafeta_pdf(file_name, page): | |
| x = page['/MediaBox'][2] | |
| y = page['/MediaBox'][3] | |
| if x > y: | |
| page.mediabox.lower_left = (40, 60) | |
| page.mediabox.upper_right = (340,520) | |
| else: | |
| page.mediabox.lower_left = (30,40) | |
| page.mediabox.upper_right = (490,340) | |
| save_pdf_file(file_name, page) | |
| def process_estafeta_text(pdf_text): | |
| extracted_text = "" | |
| match = re.search(r'CONFIRMACION (\d+-\d+\w+)', pdf_text) | |
| if match: | |
| extracted_text = match.group(1) | |
| extracted_text = extracted_text[0:23].replace("-", "") | |
| else: | |
| raise Exception("Pattern not found in the text.") | |
| file_name = extracted_text + ".pdf" | |
| file_path = f"files/{file_name}" | |
| return file_name, file_path | |
| def process_dhl_pdf(file_name, page): | |
| page.mediabox.lower_left = (92,20) | |
| page.mediabox.upper_right = (360,560) | |
| save_pdf_file(file_name, page) | |
| def process_dhl_text(pdf_text): | |
| matches = re.findall(r'WAYBILL (\d+(?: \d+)*)', pdf_text) | |
| for match in matches: | |
| extracted_text = match.replace(" ", "") | |
| file_name = extracted_text + ".pdf" | |
| file_path = f"files/{file_name}" | |
| return file_name, file_path | |
| def process_ups_pdf(page): | |
| page.rotate(90) | |
| def process_pdf_from_image(file_name, page): | |
| page.rotate(90) | |
| save_pdf_file(file_name, page) | |
| file_path = f"files/{file_name}" | |
| images = convert_from_path(file_path) | |
| image = images[0] | |
| image.save(f"{file_path}.png", "PNG") | |
| # open the image and extract the text | |
| loaded_image = Image.open(f"{file_path}.png") | |
| extracted_text = pytesseract.image_to_string(loaded_image) | |
| image.close() | |
| os.remove(f"{file_path}.png") | |
| if re.search("Fed2x", extracted_text, re.IGNORECASE): | |
| file_name, file_path = process_fedex_text(extracted_text) | |
| process_fedex_pdf(file_name, page) | |
| else: | |
| image_path = f"{file_path}-resized.pdf" | |
| loaded_image.resize((400, 500)).save(image_path, "PDF") | |
| loaded_image.close() | |
| match = re.search(r'TRACKING #:\s+([A-Z\d\s]+)', extracted_text) | |
| file_path = image_path | |
| if match: | |
| extracted_text = match.group(1) | |
| extracted_text = extracted_text.replace(" ", "").replace("BILLING", "").replace("\n", "") | |
| file_name = extracted_text + ".pdf" | |
| else: | |
| print("Pattern not found in the text.") | |
| return file_name, file_path | |
| def process_coppel_pdf(file_name, page): | |
| page.mediabox.lower_left = (0,150) | |
| page.mediabox.upper_right = (290,520) | |
| save_pdf_file(file_name, page) | |
| def process_coppel_text(pdf_text): | |
| match = re.search(r'TN: (\w+)', pdf_text) | |
| if match: | |
| extracted_text = match.group(1) | |
| else: | |
| print("Pattern not found in the text.") | |
| file_name = extracted_text + ".pdf" | |
| file_path = f"files/{file_name}" | |
| return file_name, file_path | |
| def process_fedex_pdf(file_name, page): | |
| page.rotate(-90) | |
| page.mediabox.lower_left = (0, 0) | |
| page.mediabox.upper_right = (500,650) | |
| save_pdf_file(file_name, page) | |
| def process_fedex_text(pdf_text): | |
| return "fedex.pdf", "files/fedex.pdf" | |
| def process_pdf_file(file): | |
| """ | |
| This function processes the PDF file and returns the file name, file path and transport company | |
| Parameters: | |
| file (File): The PDF file to process | |
| Returns: | |
| file_name (str): The name of the file | |
| file_path (str): The path of the file | |
| transport_company (str): The transport company | |
| """ | |
| with open(file.path, "rb") as pdf_stream: | |
| pdf = PyPDF2.PdfReader(pdf_stream) | |
| page = pdf.pages[0] | |
| pdf_text = page.extract_text() | |
| transport_company = None | |
| file_name = file.name | |
| if re.search("estafeta", pdf_text, re.IGNORECASE): | |
| transport_company = "estafeta" | |
| file_name, file_path = process_estafeta_text(pdf_text) | |
| process_estafeta_pdf(file_name, page) | |
| elif re.search("dhl", pdf_text, re.IGNORECASE): | |
| transport_company = "dhl" | |
| file_name, file_path = process_dhl_text(pdf_text) | |
| process_dhl_pdf(file_name, page) | |
| elif re.search("coppel", pdf_text, re.IGNORECASE): | |
| transport_company = "coppel" | |
| file_name, file_path = process_coppel_text(pdf_text) | |
| process_coppel_pdf(file_name, page) | |
| else: | |
| transport_company = "ups" | |
| file_name, file_path = process_pdf_from_image(file_name, page) | |
| pdf_stream.close() | |
| return file_name, file_path, transport_company | |
| def gif2pdf(file): | |
| # Open the GIF image | |
| gif = Image.open(file.path) | |
| # Convert GIF to RGB format (because PDFs do not support palettes) | |
| rgb_images = [] | |
| for frame in range(gif.n_frames): | |
| gif.seek(frame) | |
| rgb_image = gif.convert("RGB") | |
| rgb_images.append(rgb_image) | |
| # Create a PDF instance | |
| pdf = FPDF(unit="pt") | |
| # Loop through RGB images and add them as pages to the PDF | |
| for img in rgb_images: | |
| width, height = img.size # Get the size of the image in pixels | |
| # Convert the size from pixels to points (assuming 72 DPI for the images) | |
| pdf.add_page(format=(width, height)) | |
| # Create a temporary image file | |
| img.save("temp_frame.jpg") | |
| # Add the image to the PDF (x, y, w, h are specified in points) | |
| pdf.image("temp_frame.jpg", x=0, y=0, w=width, h=height) | |
| # Save the resulting PDF | |
| file_name = file.name.split('.')[0] | |
| output_pdf_path = f".files/{file_name}.pdf" | |
| pdf.output(output_pdf_path) | |
| # Rotate the PDF pages by 90 degrees | |
| with open(output_pdf_path, "rb") as pdf_file: | |
| reader = PyPDF2.PdfReader(pdf_file) | |
| writer = PyPDF2.PdfWriter() | |
| for page_num in range(len(reader.pages)): | |
| page = reader.pages[page_num] | |
| page.rotate(90) | |
| writer.add_page(page) | |
| rotated_pdf_path = f".files/{file_name}_rotated.pdf" | |
| with open(rotated_pdf_path, "wb") as rotated_pdf_file: | |
| writer.write(rotated_pdf_file) | |
| os.remove(output_pdf_path) | |
| return file_name, rotated_pdf_path | |
| def remove_all_files_in_folder(folder_path): | |
| # Loop through all files in the specified folder | |
| for filename in os.listdir(folder_path): | |
| file_path = os.path.join(folder_path, filename) | |
| # Check if it's a file (not a directory) before removing | |
| if os.path.isfile(file_path): | |
| os.remove(file_path) | |
| async def process_chat(): | |
| await cl.Message(content="Hola! Sube las guías en formato PDF.\nPuedes subir hasta 20 guías a la vez").send() | |
| files = None | |
| # Wait for the user to upload a PDF file | |
| while files is None: | |
| files = await cl.AskFileMessage( | |
| content="Sube las guías aquí!", | |
| accept=["application/pdf", "image/gif"], | |
| max_size_mb=5, | |
| max_files=20, | |
| timeout=30, | |
| ).send() | |
| try: | |
| element_types = { | |
| 'application/pdf': 0, | |
| 'image/gif': 0 | |
| } | |
| for item in files: | |
| if item.type in element_types: | |
| element_types[item.type] += 1 | |
| if element_types["application/pdf"] > 0 and element_types["image/gif"] > 0: | |
| msg = cl.Message(content="Solo ingresa PDFs o Gifs") | |
| await msg.send() | |
| return | |
| elements = [] | |
| content = "" | |
| for file in files: | |
| if file.type == 'image/gif': | |
| file_name, file_path = gif2pdf(file) | |
| elements.append(cl.File(name=file_name, display="inline", path=file_path)) | |
| content = "Archivos convertidos en PDF" | |
| else: | |
| file_name, file_path, transport_company = process_pdf_file(file) | |
| elements.append(cl.File(name=file_name, display="inline", path=file_path)) | |
| file_name = file_name.replace(".pdf", "") | |
| content = f"Guía de {transport_company.upper()}: **{file_name}**" | |
| msg = cl.Message(content=content, elements=elements) | |
| await msg.send() | |
| remove_all_files_in_folder(".files") | |
| remove_all_files_in_folder("files") | |
| except Exception as e: | |
| msg = cl.Message(content=f"Error: {e}") | |
| await msg.send() | |
| async def start_chat(): | |
| await process_chat() | |
| async def main(message: str): | |
| await process_chat() |