| import os |
|
|
| os.environ["TOKENIZERS_PARALLELISM"] = "false" |
|
|
| import functools |
| from PIL import Image, ImageDraw |
| import gradio as gr |
|
|
| import torch |
| from docquery.pipeline import get_pipeline |
| from docquery.document import load_bytes, load_document, ImageDocument |
|
|
|
|
| def ensure_list(x): |
| if isinstance(x, list): |
| return x |
| else: |
| return [x] |
|
|
|
|
| CHECKPOINTS = { |
| "LayoutLMv1 🦉": "impira/layoutlm-document-qa", |
| "Donut 🍩": "naver-clova-ix/donut-base-finetuned-docvqa", |
| } |
|
|
| PIPELINES = {} |
|
|
|
|
| def construct_pipeline(model): |
| global PIPELINES |
| if model in PIPELINES: |
| return PIPELINES[model] |
|
|
| device = "cuda" if torch.cuda.is_available() else "cpu" |
| ret = get_pipeline(checkpoint=CHECKPOINTS[model], device=device) |
| PIPELINES[model] = ret |
| return ret |
|
|
|
|
| @functools.lru_cache(1024) |
| def run_pipeline(model, question, document, top_k): |
| pipeline = construct_pipeline(model) |
| return pipeline(question=question, **document.context, top_k=top_k) |
|
|
|
|
| |
| |
| def lift_word_boxes(document): |
| return document.context["image"][0][1] |
|
|
|
|
| def expand_bbox(word_boxes, padding=0.1): |
| if len(word_boxes) == 0: |
| return None |
|
|
| min_x, min_y, max_x, max_y = zip(*[x[1] for x in word_boxes]) |
| min_x, min_y, max_x, max_y = [min(min_x), min(min_y), max(max_x), max(max_y)] |
| if padding != 0: |
| padding = max((max_x - min_x) * padding, (max_y - min_y) * padding) |
| min_x = max(0, min_x - padding) |
| min_y = max(0, min_y - padding) |
| max_x = max_x + padding |
| max_y = max_y + padding |
| return [min_x, min_y, max_x, max_y] |
|
|
|
|
| |
| def normalize_bbox(box, width, height): |
| pct = [c / 1000 for c in box] |
| return [pct[0] * width, pct[1] * height, pct[2] * width, pct[3] * height] |
|
|
|
|
| examples = [ |
| [ |
| "invoice.png", |
| "What is the invoice number?", |
| ], |
| [ |
| "contract.jpeg", |
| "What is the purchase amount?", |
| ], |
| [ |
| "statement.png", |
| "What are net sales for 2020?", |
| ], |
| ] |
|
|
|
|
| def process_path(path): |
| if path: |
| try: |
| document = load_document(path) |
| return document, document.preview, None |
| except Exception: |
| pass |
| return None, None, None |
|
|
|
|
| def process_upload(file): |
| if file: |
| return process_path(file.name) |
| else: |
| return None, None, None |
|
|
|
|
| colors = ["#64A087", "green", "black"] |
|
|
|
|
| def process_question(question, document, model=list(CHECKPOINTS.keys())[0]): |
| if document is None: |
| return None, None |
|
|
| predictions = run_pipeline(model, question, document, 3) |
| image = document.preview.copy() |
| draw = ImageDraw.Draw(image, "RGBA") |
| for i, p in enumerate(ensure_list(predictions)): |
| if i > 0: |
| |
| |
| break |
|
|
| if "start" in p and "end" in p: |
| x1, y1, x2, y2 = normalize_bbox( |
| expand_bbox(lift_word_boxes(document)[p["start"] : p["end"] + 1]), |
| image.width, |
| image.height, |
| ) |
| draw.rectangle(((x1, y1), (x2, y2)), fill=(0, 255, 0, int(0.4 * 255))) |
|
|
| return image, predictions |
|
|
|
|
| def load_example_document(img, question, model): |
| document = ImageDocument(Image.fromarray(img)) |
| preview, answer = process_question(question, document, model) |
| return document, question, preview, answer |
|
|
|
|
| with gr.Blocks() as demo: |
| gr.Markdown("# DocQuery: Query Documents w/ NLP") |
| document = gr.Variable() |
| example_question = gr.Textbox(visible=False) |
| example_image = gr.Image(visible=False) |
|
|
| gr.Markdown("## 1. Upload a file or select an example") |
| with gr.Row(equal_height=True): |
| with gr.Column(): |
| upload = gr.File(label="Upload a file", interactive=True) |
| url = gr.Textbox(label="... or a URL", interactive=True) |
| gr.Examples( |
| examples=examples, |
| inputs=[example_image, example_question], |
| ) |
|
|
| gr.Markdown("## 2. Ask a question") |
|
|
| with gr.Row(equal_height=True): |
| question = gr.Textbox( |
| label="Question", |
| placeholder="e.g. What is the invoice number?", |
| lines=1, |
| max_lines=1, |
| ) |
| model = gr.Radio( |
| choices=list(CHECKPOINTS.keys()), |
| value=list(CHECKPOINTS.keys())[0], |
| label="Model", |
| ) |
|
|
| with gr.Row(): |
| clear_button = gr.Button("Clear", variant="secondary") |
| submit_button = gr.Button("Submit", variant="primary", elem_id="submit-button") |
|
|
| with gr.Row(): |
| image = gr.Image(visible=True) |
| with gr.Column(): |
| output = gr.JSON(label="Output") |
|
|
| clear_button.click( |
| lambda _: (None, None, None, None), |
| inputs=clear_button, |
| outputs=[image, document, question, output], |
| ) |
| upload.change(fn=process_upload, inputs=[upload], outputs=[document, image, output]) |
| url.change(fn=process_path, inputs=[url], outputs=[document, image, output]) |
|
|
| question.submit( |
| fn=process_question, |
| inputs=[question, document, model], |
| outputs=[image, output], |
| ) |
|
|
| submit_button.click( |
| process_question, |
| inputs=[question, document, model], |
| outputs=[image, output], |
| ) |
|
|
| model.change( |
| process_question, inputs=[question, document, model], outputs=[image, output] |
| ) |
|
|
| example_image.change( |
| fn=load_example_document, |
| inputs=[example_image, example_question, model], |
| outputs=[document, question, image, output], |
| ) |
|
|
| gr.Markdown("### More Info") |
| gr.Markdown( |
| "DocQuery uses LayoutLMv1 fine-tuned on DocVQA, a document visual question" |
| " answering dataset, as well as SQuAD, which boosts its English-language comprehension." |
| " To use it, simply upload an image or PDF, type a question, and click 'submit', or " |
| " click one of the examples to load them." |
| ) |
| gr.Markdown("[Github Repo](https://github.com/impira/docquery)") |
|
|
| if __name__ == "__main__": |
| demo.launch() |
|
|