File size: 9,750 Bytes
ebcc7d1
 
 
 
 
 
 
 
 
 
 
 
9a88738
ebcc7d1
 
 
 
 
 
 
 
 
 
 
 
 
9a88738
 
 
 
ebcc7d1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9a88738
 
 
 
 
ebcc7d1
9a88738
 
ebcc7d1
9a88738
 
 
 
ebcc7d1
9a88738
 
 
ebcc7d1
 
 
 
 
 
9a88738
ebcc7d1
 
 
 
 
 
 
 
 
 
9a88738
 
ebcc7d1
 
 
 
 
 
 
 
 
9a88738
 
ebcc7d1
 
 
 
 
 
 
 
 
9a88738
 
ebcc7d1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
import torch
from torch.utils.data import DataLoader
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt
import re
import cv2
import string
from transformers import TrOCRProcessor, VisionEncoderDecoderModel
from vit import LineDataset, collate_fn
from loguru import logger
import os
from configs import hf_token

class Inference:
    def __init__(self, model_path, processor_path, target_size=(256, 64), batch_size=32):
        """
        Initialize the TextGenerator with model and processor paths.
        
        Args:
            model_path (str): Path to the pre-trained model
            processor_path (str): Path to the pre-trained processor
            target_size (tuple): Target size for input images (height, width)
            batch_size (int): Batch size for inference
        """
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        # self.model_path = self._get_absolute_path(model_path)
        # self.processor_path = self._get_absolute_path(processor_path)
        self.model_path = model_path
        self.processor_path = processor_path
        self.target_size = target_size
        self.batch_size = batch_size
        
        # Initialize model and processor
        self.processor = None
        self.model = None
        self._initialize_model()

    def _get_absolute_path(self, path):
        """Convert relative path to absolute path"""
        if os.path.isabs(path):
            return path
        # If it's a relative path, make it absolute relative to the current working directory
        return os.path.join(os.getcwd(), path.lstrip('./'))


    def _initialize_model(self):
        """Load and initialize the model and processor."""
        logger.info("Loading model...")
        
        # # Check if paths exist
        # if not os.path.exists(self.model_path):
        #     raise FileNotFoundError(f"Model path not found: {self.model_path}")
        # if not os.path.exists(self.processor_path):
        #     raise FileNotFoundError(f"Processor path not found: {self.processor_path}")
        
        # # List all files in the model directory
        # all_files = os.listdir(self.model_path)
        
        # # Validate that we have the necessary files
        # if not any(f in all_files for f in ['pytorch_model.bin', 'model.safetensors']):
        #     logger.error("No model weights file found! (pytorch_model.bin or model.safetensors)")
        #     raise FileNotFoundError("Model weights file missing")
        
        # if 'config.json' not in all_files:
        #     logger.error("config.json file not found!")
        #     raise FileNotFoundError("config.json missing")
        
        logger.info(f"Loading model from: {self.model_path}")
        logger.info(f"Loading processor from: {self.processor_path}")
        
        try:
            # Load processor
            self.processor = TrOCRProcessor.from_pretrained(self.processor_path, do_rescale=False, use_fast=True, token=hf_token)
            logger.info("Processor loaded successfully")
            
            # Try different loading methods for the model
            logger.info("Attempting to load model...")
            
            # Method 1: Try with explicit device mapping
            try:
                self.model = VisionEncoderDecoderModel.from_pretrained(
                    self.model_path,
                    use_safetensors=True,
                    device_map="auto" if torch.cuda.is_available() else None,
                    token=hf_token
                )
                logger.info("Model loaded with safetensors=True and device_map")
            except Exception as e1:
                logger.warning(f"Method 1 failed: {e1}")
                
                # Method 2: Try without device mapping
                try:
                    self.model = VisionEncoderDecoderModel.from_pretrained(
                        self.model_path,
                        use_safetensors=True,
                        token=hf_token
                    )
                    logger.info("Model loaded with safetensors=True")
                except Exception as e2:
                    logger.warning(f"Method 2 failed: {e2}")
                    
                    # Method 3: Try without safetensors
                    try:
                        self.model = VisionEncoderDecoderModel.from_pretrained(
                            self.model_path,
                            use_safetensors=True,
                            token=hf_token
                        )
                        logger.info("Model loaded with safetensors=False")
                    except Exception as e3:
                        logger.error(f"All loading methods failed: {e3}")
                        raise
            
            # Move model to device if not already done by device_map
            if not hasattr(self.model, 'device') or str(self.model.device) != str(self.device):
                logger.info(f"Moving model to device: {self.device}")
                self.model.to(self.device)
            
            self.model.eval()
            logger.info("Model loaded successfully and moved to device")
            
        except Exception as e:
            logger.error(f"Error loading model or processor: {e}")
            import traceback
            logger.error(f"Traceback: {traceback.format_exc()}")
            raise    
    def preprocess_images(self, line_segments):
        """
        Prepare line images for inference.
        
        Args:
            line_segments (dict): Dictionary containing line segment information
            
        Returns:
            tuple: (keys, line_images) - keys and corresponding images
        """
        keys = list(line_segments.keys())
        line_images = [line_segments[k]["image"] for k in keys]
        return keys, line_images
    
    def create_dataloader(self, line_images):
        """
        Create DataLoader for inference.
        
        Args:
            line_images (list): List of line images
            
        Returns:
            DataLoader: Configured DataLoader for inference
        """
        # Create dummy labels for inference
        dummy_labels = [""] * len(line_images)
        
        dataset = LineDataset(
            self.processor, 
            self.model, 
            line_images, 
            dummy_labels, 
            self.target_size, 
            apply_augmentation=False
        )
        
        dataloader = DataLoader(
            dataset, 
            batch_size=self.batch_size, 
            shuffle=False, 
            collate_fn=collate_fn
        )
        
        return dataloader
    
    def generate_texts(self, dataloader):
        """
        Generate texts from images using the model.
        
        Args:
            dataloader (DataLoader): DataLoader containing preprocessed images
            
        Returns:
            list: List of generated texts
        """
        generated_texts = []
        
        with torch.no_grad():
            for batch in dataloader:
                pixel_values = batch["pixel_values"].to(self.device)
                generated_ids = self.model.generate(pixel_values)
                generated_texts_batch = self.processor.batch_decode(
                    generated_ids, 
                    skip_special_tokens=True
                )
                generated_texts.extend(generated_texts_batch)
        
        return generated_texts
    
    def update_line_segments(self, line_segments, keys, generated_texts):
        """
        Update line segments dictionary with generated transcriptions.
        
        Args:
            line_segments (dict): Original line segments dictionary
            keys (list): List of keys corresponding to the line segments
            generated_texts (list): List of generated texts
            
        Returns:
            dict: Updated line segments dictionary with transcriptions
        """
        for key, text in zip(keys, generated_texts):
            line_segments[key]["transcription"] = text
        
        return line_segments
    
    def generate_texts_from_images(self, line_segments):
        """
        Main method to generate texts from line segment images.
        
        Args:
            line_segments (dict): Dictionary containing line segment information
                with "image" key for each segment
            
        Returns:
            dict: Updated line segments dictionary with "transcription" key added
        """
        logger.info("Starting text generation from images...")
        # Preprocess images
        keys, line_images = self.preprocess_images(line_segments)
        
        # Create dataloader
        dataloader = self.create_dataloader(line_images)
        
        # Generate texts
        generated_texts = self.generate_texts(dataloader)
        
        # Update line segments with transcriptions
        updated_line_segments = self.update_line_segments(
            line_segments, keys, generated_texts
        )
        
        return updated_line_segments
    
    def generate_single_image(self, image):
        """
        Generate text from a single image.
        
        Args:
            image: PIL Image or numpy array
            
        Returns:
            str: Generated text
        """
        if isinstance(image, np.ndarray):
            image = Image.fromarray(image)
        
        # Create a temporary line_segments-like structure
        temp_segments = {"temp_key": {"image": image}}
        
        # Use the main method
        result = self.generate_texts_from_images(temp_segments)
        
        return result["temp_key"]["transcription"]