import os import json import numpy as np from PIL import Image import pandas as pd from IPython.display import Image from ultralytics import YOLO import torch from transformers import GPT2LMHeadModel, GPT2Tokenizer, Trainer, TrainingArguments from datasets import load_dataset import cv2 import pytesseract from PIL import Image, ImageEnhance import numpy as np from sklearn.metrics import precision_score, recall_score, f1_score def ocr_core(image): """ Run Tesseract OCR on the preprocessed image and return the extracted text. Inputs: image (PIL.Image): The preprocessed image to run OCR on. Returns: str: The text extracted from the image. """ data = pytesseract.image_to_data(image, output_type=pytesseract.Output.DICT) df = pd.DataFrame(data) df = df[df['conf'] != -1] df['left_diff'] = df.groupby('block_num')['left'].diff().fillna(0).astype(int) df['prev_width'] = df['width'].shift(1).fillna(0).astype(int) df['spacing'] = (df['left_diff'] - df['prev_width']).fillna(0).astype(int) df['text'] = df.apply(lambda x: '\n' + x['text'] if (x['word_num'] == 1) & (x['block_num'] != 1) else x['text'], axis=1) df['text'] = df.apply(lambda x: ',' + x['text'] if x['spacing'] > 100 else x['text'], axis=1) ocr_text = "" for text in df['text']: ocr_text += text + ' ' return ocr_text def improve_ocr_accuracy(img): """ Preprocess the image to improve OCR accuracy by resizing, increasing contrast, and thresholding. Inputs: img (str): The path to the image file. Returns: np.ndarray: The preprocessed image as a binary thresholded array. """ # Read image with PIL (for color preservation) img = Image.open(img) # Increase image size (can improve accuracy for small text) img = img.resize((img.width * 4, img.height * 4)) # Increase contrast enhancer = ImageEnhance.Contrast(img) img = enhancer.enhance(2) _, thresh = cv2.threshold(np.array(img), 127, 255, cv2.THRESH_BINARY_INV) return thresh def create_ocr_outputs(): """ Process images in a directory, run OCR on them, and save the extracted text to corresponding text files. Inputs: Returns: """ directory_path = os.getcwd() + '/data/processed/hand_labeled_tables/hand_labeled_tables' for root, dirs, files in os.walk(directory_path): # Print the current directory print(f"Current directory: {root}") # Print all subdirectories in the current directory print("Subdirectories:") for dir in dirs: print(f"- {dir}") # Print all files in the current directory print("Files:") for image_path in files: print(f"- {image_path}") full_path = os.path.join(root, image_path) # Preprocess the image preprocessed_image = improve_ocr_accuracy(full_path) ocr_text = ocr_core(preprocessed_image) with open(os.getcwd() + f"/data/processed/annotations/{image_path.split('.')[0]}.txt", 'wb') as f: f.write(ocr_text.encode('utf-8')) print("\n") # Add a blank line for readability def prepare_dataset(ocr_dir, csv_dir, output_file): """ Prepare a dataset by combining OCR text files and corresponding CSV files into a JSONL format. Inputs: ocr_dir (str): The directory containing OCR text files. csv_dir (str): The directory containing CSV files. output_file (str): The path to the output JSONL file. Returns: """ with open(output_file, 'w', encoding='utf-8') as jsonl_file: for filename in os.listdir(ocr_dir): if filename.endswith('.txt'): ocr_path = os.path.join(ocr_dir, filename) csv_path = os.path.join(csv_dir, filename) print(csv_path) with open(ocr_path, 'r', encoding='utf-8') as ocr_file: ocr_text = ocr_file.read() with open(csv_path, 'r', encoding='utf-8') as csv_file: csv_text = csv_file.read() json_object = { "prompt": ocr_text, "completion": csv_text } jsonl_file.write(json.dumps(json_object) + '\n') def tokenize_function(examples): """ Tokenize the inputs and create labels for the tokenized inputs. Inputs: examples (dict): A dictionary containing 'prompt' and 'completion' keys. Returns: dict: A dictionary containing tokenized inputs and labels. """ # Tokenize the inputs inputs = tokenizer(examples['prompt'], truncation=True, padding='max_length', max_length=1012) # Create labels which are the same as input_ids inputs['labels'] = inputs['input_ids'].copy() return inputs def calculate_metrics(model, tokenizer, texts, labels): """ Calculate evaluation metrics for the model based on the provided texts and labels. Inputs: model (GPT2LMHeadModel): The language model to evaluate. tokenizer (GPT2Tokenizer): The tokenizer for the model. texts (list): A list of input texts. labels (list): A list of corresponding labels. Returns: list: A list containing precision, recall, F1 score. """ model.eval() all_predictions = [] all_labels = [] total_loss = 0 total_tokens = 0 with torch.no_grad(): for text, label in zip(texts, labels): # Tokenize input and label input_ids = tokenizer.encode(text, return_tensors="pt") label_ids = tokenizer.encode(label, return_tensors="pt")[0] # Generate prediction output = model.generate(input_ids, max_length=input_ids.shape[1] + len(label_ids), num_return_sequences=1) predicted_ids = output[0][input_ids.shape[1]:] # Convert ids to tokens predicted_tokens = tokenizer.convert_ids_to_tokens(predicted_ids) label_tokens = tokenizer.convert_ids_to_tokens(label_ids) # Extend predictions and labels all_predictions.extend(predicted_tokens) all_labels.extend(label_tokens) # Calculate loss outputs = model(input_ids=input_ids, labels=label_ids.unsqueeze(0)) loss = outputs.loss total_loss += loss.item() * len(label_ids) total_tokens += len(label_ids) # Calculate metrics precision = precision_score(all_labels, all_predictions, average='weighted', zero_division=0) recall = recall_score(all_labels, all_predictions, average='weighted', zero_division=0) f1 = f1_score(all_labels, all_predictions, average='weighted', zero_division=0) return precision, recall, f1 if __name__ == '__main__': # Ensure you have installed Tesseract OCR and set the path pytesseract.pytesseract.tesseract_cmd = r'C:/Program Files/Tesseract-OCR/tesseract.exe' # Update this path for your system # Ensure CUDA is available device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"Using device: {device}") # Load a pretrained YOLOv8 model model = YOLO('yolov8l.pt') # Train the model on your custom dataset results = model.train( data='config.yaml', epochs=1, imgsz=640, batch=8, name='yolov8l_custom', device=device ) metrics = model.val() print(metrics.box.map) results = model.val() model.save(os.getcwd() + '/models/trained_yolov8.pt') create_ocr_outputs() # Usage ocr_dir = os.getcwd() + '/data/processed/annotations' csv_dir = os.getcwd() + '/data/processed/hand_labeled_tables' output_file = 'dataset.jsonl' prepare_dataset(ocr_dir, csv_dir, output_file) # Load the dataset dataset = load_dataset('json', data_files={'train': 'dataset.jsonl'}) dataset = dataset['train'].train_test_split(test_size=0.1) tokenizer = GPT2Tokenizer.from_pretrained('gpt2') tokenizer.add_special_tokens({'pad_token': '[PAD]'}) tokenized_dataset = dataset.map(tokenize_function, batched=True) gpt_model = GPT2LMHeadModel.from_pretrained('gpt2') gpt_model.resize_token_embeddings(len(tokenizer)) training_args = TrainingArguments( output_dir='./results', num_train_epochs=3, per_device_train_batch_size=2, per_device_eval_batch_size=2, warmup_steps=500, weight_decay=0.01, logging_dir='./logs', logging_steps=10, evaluation_strategy="epoch", # Evaluate at the end of each epoch save_strategy="epoch", # Save at the end of each epoch load_best_model_at_end=True, # Load the best model when finished training (based on evaluation) metric_for_best_model="eval_loss", # Use eval_loss to determine the best model ) # Trainer trainer = Trainer( model=model, args=training_args, train_dataset=tokenized_dataset['train'], eval_dataset=tokenized_dataset['test'], ) # Train the model trainer.train() # Evaluate the model eval_results = trainer.evaluate() print(f"Evaluation results: {eval_results}") # Save the model gpt_model.save_pretrained(os.getcwd() + '/models/gpt') tokenizer.save_pretrained(os.getcwd() + '/models/gpt') # Calculate metrics precision, recall, f1 = calculate_metrics(gpt_model, tokenizer, dataset['test']['text'], dataset['test']['label']) # Display metrics print(f"Precision: {precision:.4f}") print(f"Recall: {recall:.4f}") print(f"F1 Score: {f1:.4f}")