| | import os
|
| | import json
|
| | import numpy as np
|
| | from PIL import Image
|
| | import pandas as pd
|
| | from IPython.display import Image
|
| | from ultralytics import YOLO
|
| | import torch
|
| | from transformers import GPT2LMHeadModel, GPT2Tokenizer, Trainer, TrainingArguments
|
| | from datasets import load_dataset
|
| | import cv2
|
| | import pytesseract
|
| | from PIL import Image, ImageEnhance
|
| | import numpy as np
|
| |
|
| |
|
| | pytesseract.pytesseract.tesseract_cmd = r'C:/Program Files/Tesseract-OCR/tesseract.exe'
|
| |
|
| | def ocr_core(image):
|
| |
|
| | data = pytesseract.image_to_data(image, output_type=pytesseract.Output.DICT)
|
| | df = pd.DataFrame(data)
|
| | df = df[df['conf'] != -1]
|
| | df['left_diff'] = df.groupby('block_num')['left'].diff().fillna(0).astype(int)
|
| | df['prev_width'] = df['width'].shift(1).fillna(0).astype(int)
|
| | df['spacing'] = (df['left_diff'] - df['prev_width']).fillna(0).astype(int)
|
| | df['text'] = df.apply(lambda x: '\n' + x['text'] if (x['word_num'] == 1) & (x['block_num'] != 1) else x['text'], axis=1)
|
| | df['text'] = df.apply(lambda x: ',' + x['text'] if x['spacing'] > 100 else x['text'], axis=1)
|
| | ocr_text = ""
|
| | for text in df['text']:
|
| | ocr_text += text + ' '
|
| | return ocr_text
|
| |
|
| | def improve_ocr_accuracy(img):
|
| |
|
| | img =Image.open(img)
|
| |
|
| |
|
| | img = img.resize((img.width * 4, img.height * 4))
|
| |
|
| |
|
| | enhancer = ImageEnhance.Contrast(img)
|
| | img = enhancer.enhance(2)
|
| |
|
| | _, thresh = cv2.threshold(np.array(img), 127, 255, cv2.THRESH_BINARY_INV)
|
| |
|
| | return thresh
|
| |
|
| |
|
| | def create_ocr_outputs():
|
| | directory_path = os.getcwd() + '/data/processed/hand_labeled_tables/hand_labeled_tables'
|
| |
|
| | for root, dirs, files in os.walk(directory_path):
|
| |
|
| | print(f"Current directory: {root}")
|
| |
|
| |
|
| | print("Subdirectories:")
|
| | for dir in dirs:
|
| | print(f"- {dir}")
|
| |
|
| |
|
| | print("Files:")
|
| | for image_path in files:
|
| | print(f"- {image_path}")
|
| | full_path = os.path.join(root, image_path)
|
| |
|
| | preprocessed_image = improve_ocr_accuracy(full_path)
|
| |
|
| | ocr_text = ocr_core(preprocessed_image)
|
| | with open(os.getcwd() + f"/data/processed/annotations/{image_path.split('.')[0]}.txt", 'wb') as f:
|
| | f.write(ocr_text.encode('utf-8'))
|
| |
|
| | print("\n")
|
| |
|
| |
|
| | def prepare_dataset(ocr_dir, csv_dir, output_file):
|
| | with open(output_file, 'w', encoding='utf-8') as jsonl_file:
|
| | for filename in os.listdir(ocr_dir):
|
| | if filename.endswith('.txt'):
|
| | ocr_path = os.path.join(ocr_dir, filename)
|
| | csv_path = os.path.join(csv_dir, filename)
|
| | print(csv_path)
|
| |
|
| |
|
| |
|
| |
|
| | with open(ocr_path, 'r', encoding='utf-8') as ocr_file:
|
| | ocr_text = ocr_file.read()
|
| |
|
| | with open(csv_path, 'r', encoding='utf-8') as csv_file:
|
| | csv_text = csv_file.read()
|
| |
|
| | json_object = {
|
| | "prompt": ocr_text,
|
| | "completion": csv_text
|
| | }
|
| | jsonl_file.write(json.dumps(json_object) + '\n')
|
| |
|
| | def tokenize_function(examples):
|
| |
|
| | inputs = tokenizer(examples['prompt'], truncation=True, padding='max_length', max_length=1012)
|
| |
|
| |
|
| | inputs['labels'] = inputs['input_ids'].copy()
|
| | return inputs
|
| |
|
| |
|
| | if __name__ == '__name__':
|
| |
|
| |
|
| | device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
| | print(f"Using device: {device}")
|
| |
|
| |
|
| | model = YOLO('yolov8l.pt')
|
| |
|
| |
|
| | results = model.train(
|
| | data='config.yaml',
|
| | epochs=10,
|
| | imgsz=640,
|
| | batch=8,
|
| | name='yolov8l_custom',
|
| | device=device
|
| | )
|
| |
|
| |
|
| | metrics = model.val()
|
| | print(metrics.box.map)
|
| | torch.save(model, os.getcwd() + '/models/trained_yolov8.pt')
|
| |
|
| | create_ocr_outputs()
|
| |
|
| |
|
| | ocr_dir = os.getcwd() + '/data/processed/annotations'
|
| | csv_dir = os.getcwd() + '/data/processed/hand_labeled_tables'
|
| | output_file = 'dataset.jsonl'
|
| | prepare_dataset(ocr_dir, csv_dir, output_file)
|
| |
|
| |
|
| |
|
| | dataset = load_dataset('json', data_files={'train': 'dataset.jsonl'})
|
| | dataset = dataset['train'].train_test_split(test_size=0.1)
|
| |
|
| |
|
| | model_name = 'gpt2'
|
| | tokenizer = GPT2Tokenizer.from_pretrained(model_name)
|
| |
|
| |
|
| | tokenizer.add_special_tokens({'pad_token': '[PAD]'})
|
| |
|
| | tokenized_dataset = dataset.map(tokenize_function, batched=True)
|
| |
|
| |
|
| | model = GPT2LMHeadModel.from_pretrained(model_name)
|
| |
|
| |
|
| | model.resize_token_embeddings(len(tokenizer))
|
| |
|
| | training_args = TrainingArguments(
|
| | output_dir='./results',
|
| | num_train_epochs=3,
|
| | per_device_train_batch_size=2,
|
| | per_device_eval_batch_size=2,
|
| | warmup_steps=500,
|
| | weight_decay=0.01,
|
| | logging_dir='./logs',
|
| | logging_steps=10,
|
| | evaluation_strategy="epoch",
|
| | save_strategy="epoch",
|
| | load_best_model_at_end=True,
|
| | metric_for_best_model="eval_loss",
|
| | )
|
| |
|
| |
|
| | trainer = Trainer(
|
| | model=model,
|
| | args=training_args,
|
| | train_dataset=tokenized_dataset['train'],
|
| | eval_dataset=tokenized_dataset['test'],
|
| | )
|
| |
|
| |
|
| | trainer.train()
|
| |
|
| |
|
| | eval_results = trainer.evaluate()
|
| | print(f"Evaluation results: {eval_results}")
|
| |
|
| |
|
| | model.save_pretrained(os.getcwd() + '/models/gpt')
|
| | tokenizer.save_pretrained(os.getcwd() + '/models/gpt') |