AuxiliaryTools / app.py
Castri1
Refactor file upload timeout to 30 seconds
f6719e9
import PyPDF2
import chainlit as cl
from io import BytesIO
import re
import os
from pdf2image import convert_from_path
from PIL import Image
import pytesseract
from fpdf import FPDF
def save_pdf_file(file_name, page):
pdf_writer = PyPDF2.PdfWriter()
pdf_writer.add_page(page)
#return pdf stream
with open(f"files/{file_name}", "wb") as out:
pdf_writer.write(out)
pdf_writer.close()
def process_estafeta_pdf(file_name, page):
x = page['/MediaBox'][2]
y = page['/MediaBox'][3]
if x > y:
page.mediabox.lower_left = (40, 60)
page.mediabox.upper_right = (340,520)
else:
page.mediabox.lower_left = (30,40)
page.mediabox.upper_right = (490,340)
save_pdf_file(file_name, page)
def process_estafeta_text(pdf_text):
extracted_text = ""
match = re.search(r'CONFIRMACION (\d+-\d+\w+)', pdf_text)
if match:
extracted_text = match.group(1)
extracted_text = extracted_text[0:23].replace("-", "")
else:
raise Exception("Pattern not found in the text.")
file_name = extracted_text + ".pdf"
file_path = f"files/{file_name}"
return file_name, file_path
def process_dhl_pdf(file_name, page):
page.mediabox.lower_left = (92,20)
page.mediabox.upper_right = (360,560)
save_pdf_file(file_name, page)
def process_dhl_text(pdf_text):
matches = re.findall(r'WAYBILL (\d+(?: \d+)*)', pdf_text)
for match in matches:
extracted_text = match.replace(" ", "")
file_name = extracted_text + ".pdf"
file_path = f"files/{file_name}"
return file_name, file_path
def process_ups_pdf(page):
page.rotate(90)
def process_pdf_from_image(file_name, page):
page.rotate(90)
save_pdf_file(file_name, page)
file_path = f"files/{file_name}"
images = convert_from_path(file_path)
image = images[0]
image.save(f"{file_path}.png", "PNG")
# open the image and extract the text
loaded_image = Image.open(f"{file_path}.png")
extracted_text = pytesseract.image_to_string(loaded_image)
image.close()
os.remove(f"{file_path}.png")
if re.search("Fed2x", extracted_text, re.IGNORECASE):
file_name, file_path = process_fedex_text(extracted_text)
process_fedex_pdf(file_name, page)
else:
image_path = f"{file_path}-resized.pdf"
loaded_image.resize((400, 500)).save(image_path, "PDF")
loaded_image.close()
match = re.search(r'TRACKING #:\s+([A-Z\d\s]+)', extracted_text)
file_path = image_path
if match:
extracted_text = match.group(1)
extracted_text = extracted_text.replace(" ", "").replace("BILLING", "").replace("\n", "")
file_name = extracted_text + ".pdf"
else:
print("Pattern not found in the text.")
return file_name, file_path
def process_coppel_pdf(file_name, page):
page.mediabox.lower_left = (0,150)
page.mediabox.upper_right = (290,520)
save_pdf_file(file_name, page)
def process_coppel_text(pdf_text):
match = re.search(r'TN: (\w+)', pdf_text)
if match:
extracted_text = match.group(1)
else:
print("Pattern not found in the text.")
file_name = extracted_text + ".pdf"
file_path = f"files/{file_name}"
return file_name, file_path
def process_fedex_pdf(file_name, page):
page.rotate(-90)
page.mediabox.lower_left = (0, 0)
page.mediabox.upper_right = (500,650)
save_pdf_file(file_name, page)
def process_fedex_text(pdf_text):
return "fedex.pdf", "files/fedex.pdf"
def process_pdf_file(file):
"""
This function processes the PDF file and returns the file name, file path and transport company
Parameters:
file (File): The PDF file to process
Returns:
file_name (str): The name of the file
file_path (str): The path of the file
transport_company (str): The transport company
"""
with open(file.path, "rb") as pdf_stream:
pdf = PyPDF2.PdfReader(pdf_stream)
page = pdf.pages[0]
pdf_text = page.extract_text()
transport_company = None
file_name = file.name
if re.search("estafeta", pdf_text, re.IGNORECASE):
transport_company = "estafeta"
file_name, file_path = process_estafeta_text(pdf_text)
process_estafeta_pdf(file_name, page)
elif re.search("dhl", pdf_text, re.IGNORECASE):
transport_company = "dhl"
file_name, file_path = process_dhl_text(pdf_text)
process_dhl_pdf(file_name, page)
elif re.search("coppel", pdf_text, re.IGNORECASE):
transport_company = "coppel"
file_name, file_path = process_coppel_text(pdf_text)
process_coppel_pdf(file_name, page)
else:
transport_company = "ups"
file_name, file_path = process_pdf_from_image(file_name, page)
pdf_stream.close()
return file_name, file_path, transport_company
def gif2pdf(file):
# Open the GIF image
gif = Image.open(file.path)
# Convert GIF to RGB format (because PDFs do not support palettes)
rgb_images = []
for frame in range(gif.n_frames):
gif.seek(frame)
rgb_image = gif.convert("RGB")
rgb_images.append(rgb_image)
# Create a PDF instance
pdf = FPDF(unit="pt")
# Loop through RGB images and add them as pages to the PDF
for img in rgb_images:
width, height = img.size # Get the size of the image in pixels
# Convert the size from pixels to points (assuming 72 DPI for the images)
pdf.add_page(format=(width, height))
# Create a temporary image file
img.save("temp_frame.jpg")
# Add the image to the PDF (x, y, w, h are specified in points)
pdf.image("temp_frame.jpg", x=0, y=0, w=width, h=height)
# Save the resulting PDF
file_name = file.name.split('.')[0]
output_pdf_path = f".files/{file_name}.pdf"
pdf.output(output_pdf_path)
# Rotate the PDF pages by 90 degrees
with open(output_pdf_path, "rb") as pdf_file:
reader = PyPDF2.PdfReader(pdf_file)
writer = PyPDF2.PdfWriter()
for page_num in range(len(reader.pages)):
page = reader.pages[page_num]
page.rotate(90)
writer.add_page(page)
rotated_pdf_path = f".files/{file_name}_rotated.pdf"
with open(rotated_pdf_path, "wb") as rotated_pdf_file:
writer.write(rotated_pdf_file)
os.remove(output_pdf_path)
return file_name, rotated_pdf_path
def remove_all_files_in_folder(folder_path):
# Loop through all files in the specified folder
for filename in os.listdir(folder_path):
file_path = os.path.join(folder_path, filename)
# Check if it's a file (not a directory) before removing
if os.path.isfile(file_path):
os.remove(file_path)
async def process_chat():
await cl.Message(content="Hola! Sube las guías en formato PDF.\nPuedes subir hasta 20 guías a la vez").send()
files = None
# Wait for the user to upload a PDF file
while files is None:
files = await cl.AskFileMessage(
content="Sube las guías aquí!",
accept=["application/pdf", "image/gif"],
max_size_mb=5,
max_files=20,
timeout=30,
).send()
try:
element_types = {
'application/pdf': 0,
'image/gif': 0
}
for item in files:
if item.type in element_types:
element_types[item.type] += 1
if element_types["application/pdf"] > 0 and element_types["image/gif"] > 0:
msg = cl.Message(content="Solo ingresa PDFs o Gifs")
await msg.send()
return
elements = []
content = ""
for file in files:
if file.type == 'image/gif':
file_name, file_path = gif2pdf(file)
elements.append(cl.File(name=file_name, display="inline", path=file_path))
content = "Archivos convertidos en PDF"
else:
file_name, file_path, transport_company = process_pdf_file(file)
elements.append(cl.File(name=file_name, display="inline", path=file_path))
file_name = file_name.replace(".pdf", "")
content = f"Guía de {transport_company.upper()}: **{file_name}**"
msg = cl.Message(content=content, elements=elements)
await msg.send()
remove_all_files_in_folder(".files")
remove_all_files_in_folder("files")
except Exception as e:
msg = cl.Message(content=f"Error: {e}")
await msg.send()
@cl.on_chat_start
async def start_chat():
await process_chat()
@cl.on_message
async def main(message: str):
await process_chat()