Spaces:
Sleeping
Sleeping
| import os | |
| import hashlib | |
| from subprocess import call | |
| from shiny import App, reactive, render, ui | |
| from core.read_pdf import process_pdf, temp_dir | |
| last_pdf_md5_preprocess_stage = None | |
| def compute_hash(file_pth): | |
| with open(file_pth, 'rb') as file_to_check: | |
| # read contents of the file | |
| data = file_to_check.read() | |
| # pipe contents of the file through | |
| md5_returned = hashlib.md5(data).hexdigest() | |
| return md5_returned | |
| def ui_card(title, *args): | |
| return (ui.div( | |
| {"class": "card mb-4"}, | |
| ui.div(title, class_="card-header"), | |
| ui.div({"class": "card-body"}, *args), | |
| ), ) | |
| app_ui = ui.page_fluid( | |
| ui.h1("Document2Slide Demo"), | |
| ui_card( | |
| "Upload PDF", | |
| ui.input_file("input_pdf", "Choose a .pdf file to upload:", multiple=True), | |
| ui.output_text("upload_file_status", ), | |
| ), | |
| ui_card( | |
| "Preprocess", | |
| ui.p( | |
| ui.input_action_button("preprocess_action", "Preprocess file", class_="btn-primary"), | |
| ui.output_text("preprocess_result", ), | |
| ), | |
| ui.output_text("preprocess_status", ), | |
| ui.download_button("download_preprocessed", "Download preprocessed file"), | |
| ), | |
| ui_card( | |
| "Download the bullet points in Markdown format.", | |
| ui.download_button("download_bullet_point", "Download bullet point"), | |
| ), | |
| ui_card( | |
| "Download the beamer source code `.tex` of the slide", | |
| ui.download_button("download_beamer", "Download beamer source code"), | |
| ), | |
| ui_card( | |
| "Download the PDF of slide.", | |
| ui.download_button("download_slide", "Download slide generated"), | |
| ), | |
| ) | |
| def server(input, output, session): | |
| def upload_file_status(): | |
| file_infos = input.input_pdf() | |
| # print(file_infos) # [{'name': 'Poster.pdf', 'size': 598394, 'type': 'application/pdf', 'datapath': '/tmp/fileupload-2c21fv0a/tmpi91sy07h/0.pdf'}] | |
| if not file_infos: | |
| return "There is no file provided currently." | |
| elif file_infos[0]['type'] != 'application/pdf': | |
| return "the file you provide is not in PDF format, upload another one!" | |
| else: | |
| return "PDF file successfully uploaded!" | |
| def preprocess_status(): | |
| global last_pdf_md5_preprocess_stage | |
| file_infos = input.input_pdf() | |
| file_md5 = compute_hash(file_infos[0]['datapath']) if file_infos else None | |
| if (file_infos is not None) and file_infos[0]['type'] == 'application/pdf' and (file_md5 != last_pdf_md5_preprocess_stage): | |
| return "Ready to preprocess the PDF!" | |
| elif file_md5 == last_pdf_md5_preprocess_stage: | |
| return "PDF already preprocessed! You can continue!" | |
| else: | |
| return "No PDF ready currently, please upload a PDF!" | |
| # Take a dependency on the button | |
| async def preprocess_result(): | |
| global last_pdf_md5_preprocess_stage | |
| file_infos = input.input_pdf() | |
| if (file_infos is not None) and file_infos[0]['type'] == 'application/pdf': | |
| file_name = file_infos[0]['name'] | |
| original_pdf_pth = file_infos[0]['datapath'] | |
| dir_name = os.path.dirname(original_pdf_pth) | |
| new_pdf_pth = os.path.join(dir_name, file_name) | |
| os.rename(original_pdf_pth, new_pdf_pth) | |
| file_infos[0]['datapath'] = new_pdf_pth | |
| file_md5 = compute_hash(file_infos[0]['datapath']) | |
| try: | |
| if file_md5 != last_pdf_md5_preprocess_stage: | |
| process_pdf(pdf_pth=new_pdf_pth, file_name=file_name) | |
| last_pdf_md5_preprocess_stage = file_md5 | |
| return "Process successfully!" | |
| else: | |
| return "Already processed!!!" | |
| except: | |
| return "Something wrong happen, please switch to another file!" | |
| else: | |
| return "No PDF provided!" | |
| def download_preprocessed(): | |
| file_infos = input.input_pdf() | |
| file_name = file_infos[0]['name'][:-4] | |
| preprocessed_file_dir = os.path.join(temp_dir, file_name) | |
| if os.path.exists(preprocessed_file_dir): # this dir exists | |
| args = ['zip', '-r', file_name + '.zip', './' + file_name] | |
| call(args, cwd=temp_dir) | |
| return str(os.path.join(temp_dir, file_name + '.zip')) | |
| app = App(app_ui, server) | |