| | import os |
| |
|
| | os.environ["TOKENIZERS_PARALLELISM"] = "false" |
| |
|
| | from PIL import Image, ImageDraw |
| | import traceback |
| |
|
| | import gradio as gr |
| | from gradio import processing_utils |
| |
|
| | import torch |
| | from docquery import pipeline |
| | from docquery.document import load_bytes, load_document, ImageDocument |
| | from docquery.ocr_reader import get_ocr_reader |
| |
|
| |
|
| | def ensure_list(x): |
| | if isinstance(x, list): |
| | return x |
| | else: |
| | return [x] |
| |
|
| |
|
| | CHECKPOINTS = { |
| | "LayoutLMv1 for Invoices 🧾": "impira/layoutlm-invoices", |
| | } |
| |
|
| | PIPELINES = {} |
| |
|
| |
|
| | def construct_pipeline(task, model): |
| | global PIPELINES |
| | if model in PIPELINES: |
| | return PIPELINES[model] |
| |
|
| | device = "cuda" if torch.cuda.is_available() else "cpu" |
| | ret = pipeline(task=task, model=CHECKPOINTS[model], device=device) |
| | PIPELINES[model] = ret |
| | return ret |
| |
|
| |
|
| | def run_pipeline(model, question, document, top_k): |
| | pipeline = construct_pipeline("document-question-answering", model) |
| | return pipeline(question=question, **document.context, top_k=top_k) |
| |
|
| |
|
| | |
| | |
| | def lift_word_boxes(document, page): |
| | return document.context["image"][page][1] |
| |
|
| |
|
| | def expand_bbox(word_boxes): |
| | if len(word_boxes) == 0: |
| | return None |
| |
|
| | min_x, min_y, max_x, max_y = zip(*[x[1] for x in word_boxes]) |
| | min_x, min_y, max_x, max_y = [min(min_x), min(min_y), max(max_x), max(max_y)] |
| | return [min_x, min_y, max_x, max_y] |
| |
|
| |
|
| | |
| | def normalize_bbox(box, width, height, padding=0.005): |
| | min_x, min_y, max_x, max_y = [c / 1000 for c in box] |
| | if padding != 0: |
| | min_x = max(0, min_x - padding) |
| | min_y = max(0, min_y - padding) |
| | max_x = min(max_x + padding, 1) |
| | max_y = min(max_y + padding, 1) |
| | return [min_x * width, min_y * height, max_x * width, max_y * height] |
| |
|
| |
|
| | EXAMPLES = [ |
| | [ |
| | "acze_tech.png", |
| | "Tech Invoice", |
| | ], |
| | [ |
| | "acze.png", |
| | "Commercial Goods Invoice", |
| | ], |
| | [ |
| | "north_sea.png", |
| | "Energy Invoice", |
| | ], |
| | ] |
| |
|
| | QUESTION_FILES = { |
| | "Tech Invoice": "acze_tech.pdf", |
| | "Energy Invoice": "north_sea.pdf", |
| | } |
| |
|
| | for q in QUESTION_FILES.keys(): |
| | assert any(x[1] == q for x in EXAMPLES) |
| |
|
| | FIELDS = { |
| | "Vendor Name": ["Vendor Name - Logo?", "Vendor Name - Address?"], |
| | "Vendor Address": ["Vendor Address?"], |
| | "Customer Name": ["Customer Name?"], |
| | "Customer Address": ["Customer Address?"], |
| | "Invoice Number": ["Invoice Number?"], |
| | "Invoice Date": ["Invoice Date?"], |
| | "Due Date": ["Due Date?"], |
| | "Subtotal": ["Subtotal?"], |
| | "Total Tax": ["Total Tax?"], |
| | "Invoice Total": ["Invoice Total?"], |
| | "Amount Due": ["Amount Due?"], |
| | "Payment Terms": ["Payment Terms?"], |
| | "Remit To Name": ["Remit To Name?"], |
| | "Remit To Address": ["Remit To Address?"], |
| | } |
| |
|
| |
|
| | def empty_table(fields): |
| | return {"value": [[name, None] for name in fields.keys()], "interactive": False} |
| |
|
| |
|
| | def process_document(document, fields, model, error=None): |
| | if document is not None and error is None: |
| | preview, json_output, table = process_fields(document, fields, model) |
| | return ( |
| | document, |
| | fields, |
| | preview, |
| | gr.update(visible=True), |
| | gr.update(visible=False, value=None), |
| | json_output, |
| | table, |
| | ) |
| | else: |
| | return ( |
| | None, |
| | fields, |
| | None, |
| | gr.update(visible=False), |
| | gr.update(visible=True, value=error) if error is not None else None, |
| | None, |
| | gr.update(**empty_table(fields)), |
| | ) |
| |
|
| |
|
| | def process_path(path, fields, model): |
| | error = None |
| | document = None |
| | if path: |
| | try: |
| | document = load_document(path) |
| | except Exception as e: |
| | traceback.print_exc() |
| | error = str(e) |
| |
|
| | return process_document(document, fields, model, error) |
| |
|
| |
|
| | def process_upload(file, fields, model): |
| | return process_path(file.name if file else None, fields, model) |
| |
|
| |
|
| | colors = ["#64A087", "green", "black"] |
| |
|
| |
|
| | def annotate_page(prediction, pages, document): |
| | if prediction is not None and "word_ids" in prediction: |
| | image = pages[prediction["page"]] |
| | draw = ImageDraw.Draw(image, "RGBA") |
| | word_boxes = lift_word_boxes(document, prediction["page"]) |
| | x1, y1, x2, y2 = normalize_bbox( |
| | expand_bbox([word_boxes[i] for i in prediction["word_ids"]]), |
| | image.width, |
| | image.height, |
| | ) |
| | draw.rectangle(((x1, y1), (x2, y2)), fill=(0, 255, 0, int(0.4 * 255))) |
| |
|
| |
|
| | def process_question( |
| | question, document, img_gallery, model, fields, output, output_table |
| | ): |
| | field_name = question |
| | if field_name is not None: |
| | fields = {field_name: [question], **fields} |
| |
|
| | if not question or document is None: |
| | return None, document, fields, output, gr.update(value=output_table) |
| |
|
| | text_value = None |
| | pages = [processing_utils.decode_base64_to_image(p) for p in img_gallery] |
| | prediction = run_pipeline(model, question, document, 1) |
| | annotate_page(prediction, pages, document) |
| |
|
| | output = {field_name: prediction, **output} |
| | table = [[field_name, prediction.get("answer")]] + output_table.values.tolist() |
| | return ( |
| | None, |
| | gr.update(visible=True, value=pages), |
| | fields, |
| | output, |
| | gr.update(value=table, interactive=False), |
| | ) |
| |
|
| |
|
| | def process_fields(document, fields, model=list(CHECKPOINTS.keys())[0]): |
| | pages = [x.copy().convert("RGB") for x in document.preview] |
| |
|
| | ret = {} |
| | table = [] |
| |
|
| | for (field_name, questions) in fields.items(): |
| | answers = [ |
| | a |
| | for q in questions |
| | for a in ensure_list(run_pipeline(model, q, document, top_k=1)) |
| | if a.get("score", 1) > 0.5 |
| | ] |
| | answers.sort(key=lambda x: -x.get("score", 0) if x else 0) |
| | top = answers[0] if len(answers) > 0 else None |
| | annotate_page(top, pages, document) |
| | ret[field_name] = top |
| | table.append([field_name, top.get("answer") if top is not None else None]) |
| |
|
| | return ( |
| | gr.update(visible=True, value=pages), |
| | gr.update(visible=True, value=ret), |
| | table |
| | ) |
| |
|
| |
|
| | def load_example_document(img, title, fields, model): |
| | document = None |
| | if img is not None: |
| | if title in QUESTION_FILES: |
| | document = load_document(QUESTION_FILES[title]) |
| | else: |
| | document = ImageDocument(Image.fromarray(img), ocr_reader=get_ocr_reader()) |
| |
|
| | return process_document(document, fields, model) |
| |
|
| |
|
| | CSS = """ |
| | #question input { |
| | font-size: 16px; |
| | } |
| | #url-textbox, #question-textbox { |
| | padding: 0 !important; |
| | } |
| | #short-upload-box .w-full { |
| | min-height: 10rem !important; |
| | } |
| | /* I think something like this can be used to re-shape |
| | * the table |
| | */ |
| | /* |
| | .gr-samples-table tr { |
| | display: inline; |
| | } |
| | .gr-samples-table .p-2 { |
| | width: 100px; |
| | } |
| | */ |
| | #select-a-file { |
| | width: 100%; |
| | } |
| | #file-clear { |
| | padding-top: 2px !important; |
| | padding-bottom: 2px !important; |
| | padding-left: 8px !important; |
| | padding-right: 8px !important; |
| | margin-top: 10px; |
| | } |
| | .gradio-container .gr-button-primary { |
| | background: linear-gradient(180deg, #CDF9BE 0%, #AFF497 100%); |
| | border: 1px solid #B0DCCC; |
| | border-radius: 8px; |
| | color: #1B8700; |
| | } |
| | .gradio-container.dark button#submit-button { |
| | background: linear-gradient(180deg, #CDF9BE 0%, #AFF497 100%); |
| | border: 1px solid #B0DCCC; |
| | border-radius: 8px; |
| | color: #1B8700 |
| | } |
| | |
| | table.gr-samples-table tr td { |
| | border: none; |
| | outline: none; |
| | } |
| | |
| | table.gr-samples-table tr td:first-of-type { |
| | width: 0%; |
| | } |
| | |
| | div#short-upload-box div.absolute { |
| | display: none !important; |
| | } |
| | |
| | gradio-app > div > div > div > div.w-full > div, .gradio-app > div > div > div > div.w-full > div { |
| | gap: 0px 2%; |
| | } |
| | |
| | gradio-app div div div div.w-full, .gradio-app div div div div.w-full { |
| | gap: 0px; |
| | } |
| | |
| | gradio-app h2, .gradio-app h2 { |
| | padding-top: 10px; |
| | } |
| | |
| | #answer { |
| | overflow-y: scroll; |
| | color: white; |
| | background: #666; |
| | border-color: #666; |
| | font-size: 20px; |
| | font-weight: bold; |
| | } |
| | |
| | #answer span { |
| | color: white; |
| | } |
| | |
| | #answer textarea { |
| | color:white; |
| | background: #777; |
| | border-color: #777; |
| | font-size: 18px; |
| | } |
| | |
| | #url-error input { |
| | color: red; |
| | } |
| | |
| | #results-table { |
| | max-height: 600px; |
| | overflow-y: scroll; |
| | } |
| | |
| | """ |
| |
|
| | with gr.Blocks(css=CSS) as demo: |
| | gr.Markdown("# DocQuery for Invoices") |
| | gr.Markdown( |
| | "DocQuery (created by [Impira](https://impira.com?utm_source=huggingface&utm_medium=referral&utm_campaign=invoices_space))" |
| | " uses LayoutLMv1 fine-tuned on an invoice dataset" |
| | " as well as DocVQA and SQuAD, which boot its general comprehension skills. The model is an enhanced" |
| | " QA architecture that supports selecting blocks of text which may be non-consecutive, which is a major" |
| | " issue when dealing with invoice documents (e.g. addresses)." |
| | " To use it, simply upload an image or PDF invoice and the model will predict values for several fields." |
| | " You can also create additional fields by simply typing in a question." |
| | " DocQuery is available on [Github](https://github.com/impira/docquery)." |
| | ) |
| |
|
| | document = gr.Variable() |
| | fields = gr.Variable(value={**FIELDS}) |
| | example_question = gr.Textbox(visible=False) |
| | example_image = gr.Image(visible=False) |
| |
|
| | with gr.Row(equal_height=True): |
| | with gr.Column(): |
| | with gr.Row(): |
| | gr.Markdown("## Select an invoice", elem_id="select-a-file") |
| | img_clear_button = gr.Button( |
| | "Clear", variant="secondary", elem_id="file-clear", visible=False |
| | ) |
| | image = gr.Gallery(visible=False) |
| | with gr.Row(equal_height=True): |
| | with gr.Column(): |
| | with gr.Row(): |
| | url = gr.Textbox( |
| | show_label=False, |
| | placeholder="URL", |
| | lines=1, |
| | max_lines=1, |
| | elem_id="url-textbox", |
| | ) |
| | submit = gr.Button("Get") |
| | url_error = gr.Textbox( |
| | visible=False, |
| | elem_id="url-error", |
| | max_lines=1, |
| | interactive=False, |
| | label="Error", |
| | ) |
| | gr.Markdown("— or —") |
| | upload = gr.File(label=None, interactive=True, elem_id="short-upload-box") |
| | gr.Examples( |
| | examples=EXAMPLES, |
| | inputs=[example_image, example_question], |
| | ) |
| |
|
| | with gr.Column() as col: |
| | gr.Markdown("## Results") |
| | with gr.Tabs(): |
| | with gr.TabItem("Table"): |
| | output_table = gr.Dataframe( |
| | headers=["Field", "Value"], |
| | **empty_table(fields.value), |
| | elem_id="results-table" |
| | ) |
| |
|
| | with gr.TabItem("JSON"): |
| | output = gr.JSON(label="Output", visible=True) |
| |
|
| | model = gr.Radio( |
| | choices=list(CHECKPOINTS.keys()), |
| | value=list(CHECKPOINTS.keys())[0], |
| | label="Model", |
| | visible=False, |
| | ) |
| |
|
| | gr.Markdown("### Ask a question") |
| | with gr.Row(): |
| | question = gr.Textbox( |
| | label="Question", |
| | show_label=False, |
| | placeholder="e.g. What is the invoice number?", |
| | lines=1, |
| | max_lines=1, |
| | elem_id="question-textbox", |
| | ) |
| | clear_button = gr.Button("Clear", variant="secondary", visible=False) |
| | submit_button = gr.Button( |
| | "Add", variant="primary", elem_id="submit-button" |
| | ) |
| |
|
| | for cb in [img_clear_button, clear_button]: |
| | cb.click( |
| | lambda _: ( |
| | gr.update(visible=False, value=None), |
| | None, |
| | |
| | gr.update(value=None), |
| | gr.update(**empty_table(fields.value)), |
| | gr.update(visible=False), |
| | None, |
| | None, |
| | None, |
| | gr.update(visible=False, value=None), |
| | None, |
| | ), |
| | inputs=clear_button, |
| | outputs=[ |
| | image, |
| | document, |
| | |
| | output, |
| | output_table, |
| | img_clear_button, |
| | example_image, |
| | upload, |
| | url, |
| | url_error, |
| | question, |
| | ], |
| | ) |
| |
|
| | submit_outputs = [ |
| | document, |
| | fields, |
| | image, |
| | img_clear_button, |
| | url_error, |
| | output, |
| | output_table, |
| | ] |
| |
|
| | upload.change( |
| | fn=process_upload, |
| | inputs=[upload, fields, model], |
| | outputs=submit_outputs, |
| | ) |
| |
|
| | submit.click( |
| | fn=process_path, |
| | inputs=[url, fields, model], |
| | outputs=submit_outputs, |
| | ) |
| |
|
| | for action in [question.submit, submit_button.click]: |
| | action( |
| | fn=process_question, |
| | inputs=[question, document, image, model, fields, output, output_table], |
| | outputs=[question, image, fields, output, output_table], |
| | ) |
| |
|
| | |
| | |
| | |
| | |
| | |
| |
|
| | example_image.change( |
| | fn=load_example_document, |
| | inputs=[example_image, example_question, fields, model], |
| | outputs=submit_outputs, |
| | ) |
| |
|
| | if __name__ == "__main__": |
| | demo.launch(enable_queue=False) |
| |
|