keesephillips's picture
Upload folder using huggingface_hub
9cdae5b verified
raw
history blame
10.2 kB
import os
import json
import numpy as np
from PIL import Image
import pandas as pd
from IPython.display import Image
from ultralytics import YOLO
import torch
from transformers import GPT2LMHeadModel, GPT2Tokenizer, Trainer, TrainingArguments
from datasets import load_dataset
import cv2
import pytesseract
from PIL import Image, ImageEnhance
import numpy as np
from sklearn.metrics import precision_score, recall_score, f1_score
def ocr_core(image):
"""
Run Tesseract OCR on the preprocessed image and return the extracted text.
Inputs:
image (PIL.Image): The preprocessed image to run OCR on.
Returns:
str: The text extracted from the image.
"""
data = pytesseract.image_to_data(image, output_type=pytesseract.Output.DICT)
df = pd.DataFrame(data)
df = df[df['conf'] != -1]
df['left_diff'] = df.groupby('block_num')['left'].diff().fillna(0).astype(int)
df['prev_width'] = df['width'].shift(1).fillna(0).astype(int)
df['spacing'] = (df['left_diff'] - df['prev_width']).fillna(0).astype(int)
df['text'] = df.apply(lambda x: '\n' + x['text'] if (x['word_num'] == 1) & (x['block_num'] != 1) else x['text'], axis=1)
df['text'] = df.apply(lambda x: ',' + x['text'] if x['spacing'] > 100 else x['text'], axis=1)
ocr_text = ""
for text in df['text']:
ocr_text += text + ' '
return ocr_text
def improve_ocr_accuracy(img):
"""
Preprocess the image to improve OCR accuracy by resizing, increasing contrast, and thresholding.
Inputs:
img (str): The path to the image file.
Returns:
np.ndarray: The preprocessed image as a binary thresholded array.
"""
# Read image with PIL (for color preservation)
img = Image.open(img)
# Increase image size (can improve accuracy for small text)
img = img.resize((img.width * 4, img.height * 4))
# Increase contrast
enhancer = ImageEnhance.Contrast(img)
img = enhancer.enhance(2)
_, thresh = cv2.threshold(np.array(img), 127, 255, cv2.THRESH_BINARY_INV)
return thresh
def create_ocr_outputs():
"""
Process images in a directory, run OCR on them, and save the extracted text
to corresponding text files.
Inputs:
Returns:
"""
directory_path = os.getcwd() + '/data/processed/hand_labeled_tables/hand_labeled_tables'
for root, dirs, files in os.walk(directory_path):
# Print the current directory
print(f"Current directory: {root}")
# Print all subdirectories in the current directory
print("Subdirectories:")
for dir in dirs:
print(f"- {dir}")
# Print all files in the current directory
print("Files:")
for image_path in files:
print(f"- {image_path}")
full_path = os.path.join(root, image_path)
# Preprocess the image
preprocessed_image = improve_ocr_accuracy(full_path)
ocr_text = ocr_core(preprocessed_image)
with open(os.getcwd() + f"/data/processed/annotations/{image_path.split('.')[0]}.txt", 'wb') as f:
f.write(ocr_text.encode('utf-8'))
print("\n") # Add a blank line for readability
def prepare_dataset(ocr_dir, csv_dir, output_file):
"""
Prepare a dataset by combining OCR text files and corresponding CSV
files into a JSONL format.
Inputs:
ocr_dir (str): The directory containing OCR text files.
csv_dir (str): The directory containing CSV files.
output_file (str): The path to the output JSONL file.
Returns:
"""
with open(output_file, 'w', encoding='utf-8') as jsonl_file:
for filename in os.listdir(ocr_dir):
if filename.endswith('.txt'):
ocr_path = os.path.join(ocr_dir, filename)
csv_path = os.path.join(csv_dir, filename)
print(csv_path)
with open(ocr_path, 'r', encoding='utf-8') as ocr_file:
ocr_text = ocr_file.read()
with open(csv_path, 'r', encoding='utf-8') as csv_file:
csv_text = csv_file.read()
json_object = {
"prompt": ocr_text,
"completion": csv_text
}
jsonl_file.write(json.dumps(json_object) + '\n')
def tokenize_function(examples):
"""
Tokenize the inputs and create labels for the tokenized inputs.
Inputs:
examples (dict): A dictionary containing 'prompt' and 'completion' keys.
Returns:
dict: A dictionary containing tokenized inputs and labels.
"""
# Tokenize the inputs
inputs = tokenizer(examples['prompt'], truncation=True, padding='max_length', max_length=1012)
# Create labels which are the same as input_ids
inputs['labels'] = inputs['input_ids'].copy()
return inputs
def calculate_metrics(model, tokenizer, texts, labels):
"""
Calculate evaluation metrics for the model based on the provided texts and labels.
Inputs:
model (GPT2LMHeadModel): The language model to evaluate.
tokenizer (GPT2Tokenizer): The tokenizer for the model.
texts (list): A list of input texts.
labels (list): A list of corresponding labels.
Returns:
list: A list containing precision, recall, F1 score.
"""
model.eval()
all_predictions = []
all_labels = []
total_loss = 0
total_tokens = 0
with torch.no_grad():
for text, label in zip(texts, labels):
# Tokenize input and label
input_ids = tokenizer.encode(text, return_tensors="pt")
label_ids = tokenizer.encode(label, return_tensors="pt")[0]
# Generate prediction
output = model.generate(input_ids, max_length=input_ids.shape[1] + len(label_ids), num_return_sequences=1)
predicted_ids = output[0][input_ids.shape[1]:]
# Convert ids to tokens
predicted_tokens = tokenizer.convert_ids_to_tokens(predicted_ids)
label_tokens = tokenizer.convert_ids_to_tokens(label_ids)
# Extend predictions and labels
all_predictions.extend(predicted_tokens)
all_labels.extend(label_tokens)
# Calculate loss
outputs = model(input_ids=input_ids, labels=label_ids.unsqueeze(0))
loss = outputs.loss
total_loss += loss.item() * len(label_ids)
total_tokens += len(label_ids)
# Calculate metrics
precision = precision_score(all_labels, all_predictions, average='weighted', zero_division=0)
recall = recall_score(all_labels, all_predictions, average='weighted', zero_division=0)
f1 = f1_score(all_labels, all_predictions, average='weighted', zero_division=0)
return precision, recall, f1
if __name__ == '__main__':
# Ensure you have installed Tesseract OCR and set the path
pytesseract.pytesseract.tesseract_cmd = r'C:/Program Files/Tesseract-OCR/tesseract.exe' # Update this path for your system
# Ensure CUDA is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
# Load a pretrained YOLOv8 model
model = YOLO('yolov8l.pt')
# Train the model on your custom dataset
results = model.train(
data='config.yaml',
epochs=1,
imgsz=640,
batch=8,
name='yolov8l_custom',
device=device
)
metrics = model.val()
print(metrics.box.map)
results = model.val()
model.save(os.getcwd() + '/models/trained_yolov8.pt')
create_ocr_outputs()
# Usage
ocr_dir = os.getcwd() + '/data/processed/annotations'
csv_dir = os.getcwd() + '/data/processed/hand_labeled_tables'
output_file = 'dataset.jsonl'
prepare_dataset(ocr_dir, csv_dir, output_file)
# Load the dataset
dataset = load_dataset('json', data_files={'train': 'dataset.jsonl'})
dataset = dataset['train'].train_test_split(test_size=0.1)
tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
tokenizer.add_special_tokens({'pad_token': '[PAD]'})
tokenized_dataset = dataset.map(tokenize_function, batched=True)
gpt_model = GPT2LMHeadModel.from_pretrained('gpt2')
gpt_model.resize_token_embeddings(len(tokenizer))
training_args = TrainingArguments(
output_dir='./results',
num_train_epochs=3,
per_device_train_batch_size=2,
per_device_eval_batch_size=2,
warmup_steps=500,
weight_decay=0.01,
logging_dir='./logs',
logging_steps=10,
evaluation_strategy="epoch", # Evaluate at the end of each epoch
save_strategy="epoch", # Save at the end of each epoch
load_best_model_at_end=True, # Load the best model when finished training (based on evaluation)
metric_for_best_model="eval_loss", # Use eval_loss to determine the best model
)
# Trainer
trainer = Trainer(
model=model,
args=training_args,
train_dataset=tokenized_dataset['train'],
eval_dataset=tokenized_dataset['test'],
)
# Train the model
trainer.train()
# Evaluate the model
eval_results = trainer.evaluate()
print(f"Evaluation results: {eval_results}")
# Save the model
gpt_model.save_pretrained(os.getcwd() + '/models/gpt')
tokenizer.save_pretrained(os.getcwd() + '/models/gpt')
# Calculate metrics
precision, recall, f1 = calculate_metrics(gpt_model, tokenizer, dataset['test']['text'], dataset['test']['label'])
# Display metrics
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")