# -*- coding: utf-8 -*- """ OCR Model Module Handles loading and inference of the Hurricane OCR / Typhoon OCR model Supports: GPU-only, CPU-only, and Hybrid (GPU+CPU) modes Supports: Base model or Fine-tuned LoRA model """ import sys import io import os # Fix Windows console encoding for Thai characters and emojis if sys.platform == 'win32': sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace') sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace') import torch import importlib # Import transformer symbols safely — some installs may not provide newer classes _transformers = importlib.import_module("transformers") AutoProcessor = getattr(_transformers, "AutoProcessor") AutoModelForImageTextToText = getattr(_transformers, "AutoModelForImageTextToText") AutoModelForVision2Seq = getattr(_transformers, "AutoModelForVision2Seq", None) BitsAndBytesConfig = getattr(_transformers, "BitsAndBytesConfig", None) AutoTokenizer = getattr(_transformers, "AutoTokenizer") from PIL import Image import numpy as np from typing import Optional, Dict, Any, Tuple, List, Union from pathlib import Path import time # ============================================================ # CONFIGURATION - Change these settings as needed # ============================================================ # Model Options: # - Set HURRICANE_MODEL_PATH to use fine-tuned Hurricane OCR model # - Default: Rattatammanoon/hurricane-ocr-tlpr-v1-LoRA (LoRA adapter from HuggingFace) # - Set to None to use base Typhoon OCR model # Priority: Using Hurricane OCR LoRA from HuggingFace HURRICANE_MODEL_PATH = "Rattatammanoon/hurricane-ocr-tlpr-v1-LoRA" # HuggingFace model # Object Detection Model (YOLOv8-based) # Set to HuggingFace model or local path for HurricaneOD_beta HURRICANE_OD_MODEL_PATH = "Rattatammanoon/hurricane-od-thai-plate-detector" # HuggingFace model # Device Mode Options: # "auto" - Automatically use GPU if available, fallback to CPU # "gpu" - Force GPU only (will error if no GPU) # "cpu" - Force CPU only # "hybrid" - Use GPU + CPU together (offload to CPU when VRAM is full) DEVICE_MODE = "hybrid" # Memory settings MAX_GPU_MEMORY = "3GB" # Maximum GPU memory (lower = more CPU offload, less total RAM) # Offline Mode Settings # Set to True to use only local files (no internet required) # Base model must be downloaded and cached first USE_OFFLINE_MODE = False # Set to True for offline usage LOCAL_BASE_MODEL_PATH = None # Optional: Path to local base model (e.g., "./models/thai-trocr") # ============================================================ def get_device_info() -> dict: """Get information about available devices""" info = { "cuda_available": torch.cuda.is_available(), "cuda_device_count": torch.cuda.device_count() if torch.cuda.is_available() else 0, "cuda_device_name": torch.cuda.get_device_name(0) if torch.cuda.is_available() else None, "cuda_memory_total": None, "cuda_memory_free": None, "cuda_memory_total_gb": 0, "cpu_count": os.cpu_count(), } if info["cuda_available"]: try: total = torch.cuda.get_device_properties(0).total_memory / (1024**3) allocated = torch.cuda.memory_allocated(0) / (1024**3) free = total - allocated info["cuda_memory_total"] = f"{total:.1f} GB" info["cuda_memory_free"] = f"{free:.1f} GB" info["cuda_memory_total_gb"] = total except: pass return info class PlateDetector: """ YOLOv8n-based License Plate Detector Detects license plates in images before OCR processing Supports: - Pretrained YOLOv8 models (yolov8n.pt, etc.) - Custom trained models (HurricaneOD_beta.pt) """ def __init__( self, model_size: str = "n", conf_threshold: float = 0.25, iou_threshold: float = 0.45, model_path: Optional[str] = None ): """ Initialize YOLOv8 detector Args: model_size: YOLOv8 model size - "n" (nano), "s" (small), "m" (medium), "l" (large), "x" (xlarge) conf_threshold: Confidence threshold for detection (0.0-1.0) iou_threshold: IoU threshold for NMS (0.0-1.0) model_path: Path to custom trained model (e.g., HurricaneOD_beta.pt). If None, uses pretrained model. """ self.model_size = model_size self.conf_threshold = conf_threshold self.iou_threshold = iou_threshold # Keep model_path as string to support HuggingFace paths (e.g., "username/model-name") # Will convert to Path only for local file paths in load() method self.model_path = model_path self.model = None self._is_loaded = False def load(self, device: str = "auto"): """Load YOLOv8 model""" if self._is_loaded: return try: from ultralytics import YOLO # Determine device if device == "auto": device = "cuda" if torch.cuda.is_available() else "cpu" # Check for custom trained model (HurricaneOD_beta) # Priority: 1) Provided model_path, 2) HuggingFace, 3) hurricane_ocr_model, 4) training folder, 5) pretrained model_path_to_load = None model_source = None is_huggingface = False # Check if model_path is a HuggingFace path FIRST (format: username/model-name) if self.model_path and "/" in str(self.model_path) and "\\" not in str(self.model_path): parts = str(self.model_path).split("/") if len(parts) == 2 and not str(self.model_path).startswith("."): # This looks like a HuggingFace path model_path_to_load = str(self.model_path) model_source = "HuggingFace Hub" is_huggingface = True # Check if model_path is a local file path if not model_path_to_load and self.model_path: local_path = Path(self.model_path) if local_path.exists(): model_path_to_load = str(local_path) model_source = "provided path" if not model_path_to_load: # Try to find HurricaneOD_beta model locally # Priority 1: hurricane_ocr_model (recommended location) hurricaneod_path = Path("hurricane_ocr_model/HurricaneOD_beta/HurricaneOD_beta.pt") # Priority 2: training folder weights training_best_path = Path("HurricaneOD/HurricaneOD_beta/weights/best.pt") training_last_path = Path("HurricaneOD/HurricaneOD_beta/weights/last.pt") if hurricaneod_path.exists(): model_path_to_load = str(hurricaneod_path) model_source = "hurricane_ocr_model/HurricaneOD_beta" elif training_best_path.exists(): model_path_to_load = str(training_best_path) model_source = "HurricaneOD/HurricaneOD_beta/weights (best.pt)" elif training_last_path.exists(): model_path_to_load = str(training_last_path) model_source = "HurricaneOD/HurricaneOD_beta/weights (last.pt)" if model_path_to_load: if is_huggingface: print(f"🔍 Loading HurricaneOD_beta model from: HuggingFace Hub") print(f" 📦 Model: {model_path_to_load}") print(f" 💡 First load: downloads from HuggingFace (~6-10s)") print(f" 💡 Next loads: uses cached model (~1-2s)") print(f" 📍 Cache: ~/.cache/huggingface/hub/") else: print(f"🔍 Loading HurricaneOD_beta model from: Local File") print(f" 📂 Source: {model_source}") # Convert to absolute path for display (local files only) abs_path = Path(model_path_to_load).resolve() print(f" 📍 Path: {abs_path}") print(f" 🖥️ Device: {device}") import time det_start = time.time() try: # For HuggingFace models, download via huggingface_hub to track downloads if is_huggingface: try: from huggingface_hub import hf_hub_download, list_repo_files print(f" ⏳ Checking available files in HuggingFace repo...") # List all files in the repository try: repo_files = list_repo_files(repo_id=model_path_to_load, repo_type="model") pt_files = [f for f in repo_files if f.endswith('.pt')] print(f" 📋 Found .pt files: {pt_files}") except: pt_files = [] # Try multiple possible filenames (most likely first) possible_filenames = [ "HurricaneOD_beta.pt", # Primary filename for HurricaneOD "best.pt", "model.pt", "weights/best.pt" ] # If we found files, use the first .pt file if pt_files: possible_filenames = pt_files + possible_filenames local_model_path = None for filename in possible_filenames: try: print(f" ⏳ Trying to download: {filename}") local_model_path = hf_hub_download( repo_id=model_path_to_load, filename=filename, repo_type="model" ) print(f" ✅ Downloaded: {filename}") print(f" 📥 Saved to: {local_model_path}") break except Exception as e: print(f" ⚠️ {filename} not found: {e}") continue if local_model_path: model_path_to_load = local_model_path else: raise FileNotFoundError( f"No YOLO model file (.pt) found in {model_path_to_load}\n" f" Please upload one of these files to your HuggingFace repo:\n" f" - best.pt (recommended)\n" f" - HurricaneOD_beta.pt\n" f" - model.pt\n" f" Available files: {repo_files if 'repo_files' in locals() else 'unknown'}" ) except ImportError: print(f" ⚠️ huggingface_hub not installed, downloads won't be tracked") print(f" 💡 Install with: pip install huggingface_hub") raise except Exception as e: print(f" ❌ Could not download from HuggingFace: {e}") print(f" 💡 Make sure you uploaded a .pt file to the repository") print(f" 💡 Repository: https://huggingface.co/{model_path_to_load}") raise self.model = YOLO(model_path_to_load) if self.model is None: raise RuntimeError(f"YOLO returned None when loading {model_path_to_load}") except Exception as load_error: raise RuntimeError(f"Failed to load YOLOv8 model from {model_path_to_load}: {load_error}") det_elapsed = time.time() - det_start print(f" ✅ HurricaneOD_beta model loaded successfully! (took {det_elapsed:.2f} seconds)") else: # Fallback to pretrained YOLOv8 model (not recommended) model_name = f"yolov8{self.model_size}.pt" print(f"⚠️ HurricaneOD_beta model not found, using pretrained YOLOv8{self.model_size}") print(f" Device: {device}") print(f" ⏳ Downloading pretrained model (first time only, ~6MB)...") print(f" Note: For better results, train and use HurricaneOD_beta model") import time det_start = time.time() try: self.model = YOLO(model_name) if self.model is None: raise RuntimeError(f"YOLO returned None when loading {model_name}") except Exception as load_error: raise RuntimeError(f"Failed to load YOLOv8 model {model_name}: {load_error}") det_elapsed = time.time() - det_start print(f" ✅ YOLOv8 detector loaded (pretrained) (took {det_elapsed:.2f} seconds)") # Verify model is loaded before marking as loaded if self.model is None: raise RuntimeError("YOLOv8 model is None after loading. Model loading failed.") self._is_loaded = True except ImportError: raise ImportError( "ultralytics not installed. Install with: pip install ultralytics\n" "Note: YOLOv8 will use pretrained COCO model. For better results, " "fine-tune on Thai license plate dataset." ) except Exception as e: raise RuntimeError(f"Failed to load YOLOv8 model: {e}") def detect(self, image: Image.Image) -> List[Dict[str, Any]]: """ Detect license plates in image Args: image: PIL Image Returns: List of detections with bounding boxes, confidence scores Format: [{"bbox": [x1, y1, x2, y2], "confidence": float, "class": int}, ...] """ if not self._is_loaded: self.load() # Check if model is loaded if self.model is None: raise RuntimeError( "Plate detector model is not loaded. " "Please ensure YOLOv8 model was loaded successfully." ) # Convert PIL to numpy array img_array = np.array(image) # Run detection results = self.model.predict( img_array, conf=self.conf_threshold, iou=self.iou_threshold, verbose=False ) detections = [] if results and len(results) > 0: result = results[0] # Extract boxes, confidences, classes if result.boxes is not None: boxes = result.boxes.xyxy.cpu().numpy() # [x1, y1, x2, y2] confidences = result.boxes.conf.cpu().numpy() classes = result.boxes.cls.cpu().numpy().astype(int) for i in range(len(boxes)): detections.append({ "bbox": boxes[i].tolist(), "confidence": float(confidences[i]), "class": int(classes[i]) }) return detections def crop_plate(self, image: Image.Image, bbox: List[float], padding: int = 10) -> Image.Image: """ Crop license plate region from image Args: image: PIL Image bbox: Bounding box [x1, y1, x2, y2] padding: Padding pixels around the bbox Returns: Cropped PIL Image """ x1, y1, x2, y2 = bbox # Add padding width, height = image.size x1 = max(0, int(x1) - padding) y1 = max(0, int(y1) - padding) x2 = min(width, int(x2) + padding) y2 = min(height, int(y2) + padding) # Crop cropped = image.crop((x1, y1, x2, y2)) return cropped def _get_coco_class_name(self, class_id: int) -> str: """Get COCO class name from class ID""" coco_classes = [ 'person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus', 'train', 'truck', 'boat', 'traffic light', 'fire hydrant', 'stop sign', 'parking meter', 'bench', 'bird', 'cat', 'dog', 'horse', 'sheep', 'cow', 'elephant', 'bear', 'zebra', 'giraffe', 'backpack', 'umbrella', 'handbag', 'tie', 'suitcase', 'frisbee', 'skis', 'snowboard', 'sports ball', 'kite', 'baseball bat', 'baseball glove', 'skateboard', 'surfboard', 'tennis racket', 'bottle', 'wine glass', 'cup', 'fork', 'knife', 'spoon', 'bowl', 'banana', 'apple', 'sandwich', 'orange', 'broccoli', 'carrot', 'hot dog', 'pizza', 'donut', 'cake', 'chair', 'couch', 'potted plant', 'bed', 'dining table', 'toilet', 'tv', 'laptop', 'mouse', 'remote', 'keyboard', 'cell phone', 'microwave', 'oven', 'toaster', 'sink', 'refrigerator', 'book', 'clock', 'vase', 'scissors', 'teddy bear', 'hair drier', 'toothbrush' ] if 0 <= class_id < len(coco_classes): return coco_classes[class_id] return f"class_{class_id}" def _select_best_plate_region(self, image: Image.Image, detections: List[Dict]) -> Optional[Dict]: """ Select the best detection for license plate region Strategy: 1. Prefer detections in bottom-center region (where plates usually are) 2. Prefer vehicle classes (car, truck, bus, motorcycle) 3. Use aspect ratio filtering (plates are usually wider than tall) 4. Prefer detections with higher confidence """ if not detections: return None width, height = image.size center_x = width / 2 bottom_y = height * 0.7 # Bottom 30% of image # Vehicle class IDs in COCO vehicle_classes = {2: 'car', 3: 'motorcycle', 5: 'bus', 7: 'truck'} scored_detections = [] for det in detections: bbox = det["bbox"] x1, y1, x2, y2 = bbox center_bbox_x = (x1 + x2) / 2 center_bbox_y = (y1 + y2) / 2 # Calculate score score = det["confidence"] # Bonus for vehicle classes class_id = det.get("class", -1) if class_id in vehicle_classes: score *= 1.5 # Bonus for bottom-center region distance_from_center = abs(center_bbox_x - center_x) / width distance_from_bottom = abs(center_bbox_y - bottom_y) / height if distance_from_center < 0.3: # Within 30% of center score *= 1.3 if center_bbox_y > height * 0.5: # In bottom half score *= 1.2 # Check aspect ratio (plates are usually wider) bbox_width = x2 - x1 bbox_height = y2 - y1 aspect_ratio = bbox_width / bbox_height if bbox_height > 0 else 1 if 1.5 < aspect_ratio < 5.0: # Reasonable plate aspect ratio score *= 1.4 scored_detections.append({ **det, "score": score, "class_name": self._get_coco_class_name(class_id) }) # Sort by score scored_detections.sort(key=lambda x: x["score"], reverse=True) # Return best detection if scored_detections: return scored_detections[0] return None def _fallback_region_detection(self, image: Image.Image) -> Optional[Image.Image]: """ Fallback method: Use bottom-center region if YOLOv8 doesn't detect properly This assumes license plate is in the bottom-center region of the image """ width, height = image.size # Crop bottom-center region (typical plate location) # Use bottom 30% and center 60% of image x1 = int(width * 0.2) y1 = int(height * 0.7) x2 = int(width * 0.8) y2 = int(height * 0.95) cropped = image.crop((x1, y1, x2, y2)) return cropped def _contour_based_detection(self, image: Image.Image) -> Optional[Image.Image]: """ Alternative detection using contour detection Looks for rectangular regions that might be license plates """ try: import cv2 # Convert PIL to OpenCV format img_array = np.array(image.convert("RGB")) img_cv = cv2.cvtColor(img_array, cv2.COLOR_RGB2BGR) gray = cv2.cvtColor(img_cv, cv2.COLOR_BGR2GRAY) # Apply threshold _, thresh = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) # Find contours contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) width, height = image.size best_contour = None best_score = 0 for contour in contours: # Get bounding rect x, y, w, h = cv2.boundingRect(contour) # Filter by size and aspect ratio (typical plate dimensions) area = w * h aspect_ratio = w / h if h > 0 else 0 # Plate-like characteristics if (area > width * height * 0.01 and # At least 1% of image area < width * height * 0.3 and # At most 30% of image 1.5 < aspect_ratio < 5.0 and # Reasonable aspect ratio y > height * 0.5): # In bottom half # Score based on position and size score = area * (1.0 - abs(aspect_ratio - 2.5) / 2.5) if y > height * 0.6: # Bonus for bottom region score *= 1.5 if score > best_score: best_score = score best_contour = (x, y, w, h) if best_contour: x, y, w, h = best_contour # Add padding padding = 10 x1 = max(0, x - padding) y1 = max(0, y - padding) x2 = min(width, x + w + padding) y2 = min(height, y + h + padding) cropped = image.crop((x1, y1, x2, y2)) return cropped except ImportError: pass # OpenCV not available except Exception: pass # Contour detection failed return None def detect_and_crop(self, image: Image.Image, return_all: bool = False) -> Tuple[Optional[Image.Image], List[Dict]]: """ Detect and crop the best license plate from image Uses smart selection strategy: 1. Try YOLOv8 detection with smart region selection 2. Fallback to bottom-center region if detection fails Args: image: PIL Image return_all: If True, return all detections, else return only the best one Returns: Tuple of (cropped_image, all_detections) If no detection, returns (None, []) """ detections = self.detect(image) if return_all: if not detections: return [], [] # Return all cropped plates cropped_images = [] for det in detections: cropped = self.crop_plate(image, det["bbox"]) cropped_images.append(cropped) return cropped_images, detections # Select best detection using smart strategy best_det = self._select_best_plate_region(image, detections) if best_det: # Use selected detection cropped = self.crop_plate(image, best_det["bbox"], padding=15) return cropped, [best_det] else: # Try contour-based detection first print(" ⚠️ No YOLOv8 detection found, trying contour-based detection...") cropped = self._contour_based_detection(image) if cropped: # Create detection info for contour method width, height = image.size contour_det = { "bbox": [0, 0, width, height], # Approximate "confidence": 0.6, "class": -1, "class_name": "contour_detection", "score": 0.6, "method": "contour" } return cropped, [contour_det] # Final fallback: use bottom-center region print(" ⚠️ Contour detection failed, using fallback region detection") cropped = self._fallback_region_detection(image) if cropped: # Create dummy detection info for fallback width, height = image.size fallback_det = { "bbox": [width * 0.2, height * 0.7, width * 0.8, height * 0.95], "confidence": 0.5, "class": -1, "class_name": "fallback_region", "score": 0.5, "method": "region" } return cropped, [fallback_det] return None, [] class OCRModel: """ OCR Model wrapper for Hurricane OCR / Typhoon OCR Supports multiple device modes: auto, gpu, cpu, hybrid Supports: Base model or Fine-tuned LoRA model """ BASE_MODEL_NAME = "scb10x/typhoon-ocr1.5-2b" MAX_IMAGE_SIZE = 1024 # Reduced from 1800 for lower RAM usage def __init__(self, device_mode: str = None, use_8bit: bool = True, max_gpu_memory: str = None, hurricane_model_path: str = None, use_detection: bool = True): """ Initialize the OCR model Args: device_mode: "auto", "gpu", "cpu", or "hybrid" (default: use global DEVICE_MODE) use_8bit: Whether to use 8-bit quantization for memory efficiency max_gpu_memory: Maximum GPU memory to use (e.g., "6GB") hurricane_model_path: Path to fine-tuned Hurricane OCR model (LoRA) use_detection: Enable YOLOv8 plate detection before OCR (default: True) """ self.device_mode = device_mode or DEVICE_MODE self.use_8bit = use_8bit self.max_gpu_memory = max_gpu_memory or MAX_GPU_MEMORY self.hurricane_model_path = hurricane_model_path or HURRICANE_MODEL_PATH self.use_detection = use_detection self.model = None self.processor = None self.tokenizer = None self.plate_detector = None self._is_loaded = False self._device = None self._device_info = get_device_info() self._mode_used = None self._using_hurricane = False def load(self) -> None: """Load the model and processor""" if self._is_loaded: print("✅ Model already loaded (using cached instance).") return import time start_time = time.time() # Check if Hurricane OCR model exists and what type it is # Support both local paths and HuggingFace Hub paths (username/model-name) use_hurricane = False is_huggingface_path = False if self.hurricane_model_path: # Check if it's a HuggingFace Hub path (format: username/model-name) # HuggingFace paths have exactly one "/" and no backslashes or dots at start if "/" in self.hurricane_model_path and "\\" not in self.hurricane_model_path: parts = self.hurricane_model_path.split("/") if len(parts) == 2 and not self.hurricane_model_path.startswith("."): # This looks like a HuggingFace path (e.g., "Rattatammanoon/hurricane-ocr-tlpr-v1-LoRA") is_huggingface_path = True use_hurricane = True print(f" 📦 Detected HuggingFace model path: {self.hurricane_model_path}") else: # Local path with slashes use_hurricane = os.path.exists(self.hurricane_model_path) else: # Local path use_hurricane = os.path.exists(self.hurricane_model_path) # Check if it's LoRA adapter or merged model is_lora_adapter = False is_merged_model = False merged_model_path = None adapter_config_path = None # Initialize to avoid UnboundLocalError is_trocr_base = False # Track if base model is TrOCR if use_hurricane: # For HuggingFace paths, we need to download and check the files # For local paths, we can check directly if is_huggingface_path: # HuggingFace model - assume it's a LoRA adapter # (HuggingFace will auto-detect type when loading) is_lora_adapter = True print(f" 📦 HuggingFace model detected - will load as LoRA adapter") print(f" 💡 Model will be downloaded from HuggingFace Hub on first load") else: # Local path - check files to determine type # Check for merged model first (has config.json but no adapter_config.json) config_json_path = os.path.join(self.hurricane_model_path, "config.json") adapter_config_path = os.path.join(self.hurricane_model_path, "adapter_config.json") if os.path.exists(config_json_path) and not os.path.exists(adapter_config_path): # This might be a merged model (full model, not just adapter) is_merged_model = True merged_model_path = self.hurricane_model_path print(f" 📋 Detected merged model (full model, no base model needed)") print(f" 📋 Model path: {merged_model_path}") elif os.path.exists(adapter_config_path): # Check for pre-merged model in merged/ subdirectory merged_subdir = os.path.join(self.hurricane_model_path, "merged") if os.path.exists(merged_subdir) and os.path.exists(os.path.join(merged_subdir, "config.json")): is_merged_model = True merged_model_path = merged_subdir print(f" 📋 Found pre-merged model in merged/ subdirectory") print(f" 📋 Will use merged model (no base model loading needed)") else: is_lora_adapter = True try: import json with open(adapter_config_path, 'r', encoding='utf-8') as f: adapter_config = json.load(f) base_model = adapter_config.get('base_model_name_or_path', '') is_trocr_base = 'trocr' in base_model.lower() model_type_str = "Thai TrOCR" if is_trocr_base else "Vision-Language (Qwen3VL/Typhoon)" print(f" 📋 Detected LoRA adapter model") print(f" 📋 Base model: {base_model}") print(f" 📋 Model type: {model_type_str}") print(f" 💡 Note: Base model will be loaded first, then LoRA adapter") print(f" 💡 Base model is cached by HuggingFace (~/.cache/huggingface/)") print(f" 💡 First load: downloads base model (~2-5GB, 30-60s)") print(f" 💡 Next loads: uses cached base model (~5-10s)") except Exception as e: print(f" ⚠️ Warning: Could not read adapter config: {e}") # Use Typhoon OCR (Transformers) with or without LoRA # Check device availability print("=" * 60) if use_hurricane and is_merged_model: print("🌀 HURRICANE OCR - Merged Model (Full Model)") print(f" Model Path: {merged_model_path}") print(f" Type: Merged Model (LoRA merged into base model)") print(f" ✅ No base model loading needed!") elif use_hurricane and is_lora_adapter: print("🌀 HURRICANE OCR - Fine-tuned Model (Typhoon OCR + LoRA)") print(f" Model Path: {self.hurricane_model_path}") print(f" Type: LoRA Adapter (Parameter-Efficient Fine-Tuning)") print(f" ⚠️ Will load base model first, then LoRA adapter") print(f" 💡 Base model is cached by HuggingFace (~/.cache/huggingface/)") print(f" 💡 First load: downloads base model (~2-5GB, 30-60s)") print(f" 💡 Next loads: uses cached base model (~5-10s)") elif use_hurricane: print("🌀 HURRICANE OCR - Fine-tuned Model (Typhoon)") print(f" Model Path: {self.hurricane_model_path}") else: print("🌊 TYPHOON OCR - Base Model") if self.hurricane_model_path: print(f" ⚠️ Hurricane model not found: {self.hurricane_model_path}") print(f" ⚠️ Falling back to base Typhoon OCR model") print("=" * 60) print("🖥️ Device Information:") print(f" CUDA Available: {self._device_info['cuda_available']}") print(f" CPU Cores: {self._device_info['cpu_count']}") if self._device_info['cuda_available']: print(f" GPU: {self._device_info['cuda_device_name']}") print(f" VRAM Total: {self._device_info['cuda_memory_total']}") print(f" VRAM Free: {self._device_info['cuda_memory_free']}") print(f" Requested Mode: {self.device_mode.upper()}") print("=" * 60) # Determine actual device configuration device_map = self._get_device_map() print("\n📦 Loading processor and tokenizer...") # If merged model, try loading from merged model path first # For LoRA adapter: use base model (from adapter_config) for processor/tokenizer model_path_to_load = merged_model_path if (is_merged_model and merged_model_path) else self.BASE_MODEL_NAME processor_model_path = model_path_to_load if use_hurricane and is_lora_adapter and adapter_config_path and os.path.exists(adapter_config_path): try: import json with open(adapter_config_path, "r", encoding="utf-8") as f: adapter_cfg = json.load(f) base_model_from_adapter = adapter_cfg.get("base_model_name_or_path") if base_model_from_adapter: processor_model_path = base_model_from_adapter print(f" 📦 Using base model for processor/tokenizer: {processor_model_path}") except Exception as e: print(f" ⚠️ Could not read adapter config for processor: {e}") try: # Load tokenizer FIRST with fix_mistral_regex=True to avoid warning # This prevents the warning from being shown when processor loads tokenizer try: print(f" Loading tokenizer with fix_mistral_regex=True...") self.tokenizer = AutoTokenizer.from_pretrained( processor_model_path, trust_remote_code=True, fix_mistral_regex=True # Fix tokenizer regex pattern issue - MUST be set here ) print(f" ✓ Tokenizer loaded with fix_mistral_regex=True") except Exception as tokenizer_error: print(f" ⚠️ Could not load tokenizer separately: {tokenizer_error}") self.tokenizer = None # Load processor (will use tokenizer if already loaded) self.processor = AutoProcessor.from_pretrained( processor_model_path, trust_remote_code=True ) # If we loaded tokenizer separately, update processor's tokenizer if self.tokenizer is not None: self.processor.tokenizer = self.tokenizer print(f" ✓ Processor loaded and updated with fixed tokenizer") else: # Fallback: use processor's tokenizer and try to fix it self.tokenizer = self.processor.tokenizer # Try to set fix_mistral_regex if available if hasattr(self.tokenizer, 'fix_mistral_regex'): self.tokenizer.fix_mistral_regex = True print(f" ✓ Processor loaded (using processor's tokenizer)") print(f" ✓ Processor and tokenizer loaded from: {model_path_to_load}") except Exception as e: print(f" ⚠️ Could not load from {model_path_to_load}: {e}") print(f" ⚠️ Falling back to base model") try: # Load tokenizer FIRST with fix_mistral_regex=True try: print(f" Loading base model tokenizer with fix_mistral_regex=True...") self.tokenizer = AutoTokenizer.from_pretrained( self.BASE_MODEL_NAME, trust_remote_code=True, fix_mistral_regex=True # Fix tokenizer regex pattern issue ) print(f" ✓ Base tokenizer loaded with fix_mistral_regex=True") except Exception as tokenizer_error: print(f" ⚠️ Could not load base tokenizer separately: {tokenizer_error}") self.tokenizer = None # Load processor self.processor = AutoProcessor.from_pretrained( self.BASE_MODEL_NAME, trust_remote_code=True ) # Update processor's tokenizer if we loaded it separately if self.tokenizer is not None: self.processor.tokenizer = self.tokenizer print(f" ✓ Base processor loaded and updated with fixed tokenizer") else: self.tokenizer = self.processor.tokenizer if hasattr(self.tokenizer, 'fix_mistral_regex'): self.tokenizer.fix_mistral_regex = True print(f" ✓ Base processor loaded (using processor's tokenizer)") except Exception as fallback_error: print(f" ❌ Failed to load base model: {fallback_error}") raise print(" ✓ Processor and tokenizer loaded successfully") print(f"\n🤖 Loading model in {self._mode_used.upper()} mode...") # Load model based on configuration if is_merged_model and merged_model_path: # Load merged model directly (no base model + LoRA needed) print(f" 📦 Loading merged model from: {merged_model_path}") if self._mode_used == "hybrid" and self._device_info['cuda_available']: self._load_merged_model(merged_model_path, device_map, mode="hybrid") elif self._mode_used == "gpu" and self._device_info['cuda_available']: self._load_merged_model(merged_model_path, device_map, mode="gpu") else: self._load_merged_model(merged_model_path, device_map="cpu", mode="cpu") else: # Load base model first, then LoRA adapter # Determine base model name for LoRA adapter base_model_name = self.BASE_MODEL_NAME if use_hurricane and is_lora_adapter: try: import json adapter_config_path = os.path.join(self.hurricane_model_path, "adapter_config.json") with open(adapter_config_path, 'r', encoding='utf-8') as f: adapter_config = json.load(f) base_model_name = adapter_config.get('base_model_name_or_path', self.BASE_MODEL_NAME) is_trocr_base = 'trocr' in base_model_name.lower() except: pass # Check for offline mode or local base model if USE_OFFLINE_MODE or LOCAL_BASE_MODEL_PATH: # Use local base model if provided if LOCAL_BASE_MODEL_PATH and os.path.exists(LOCAL_BASE_MODEL_PATH): print(f" 📦 Using local base model: {LOCAL_BASE_MODEL_PATH}") base_model_name = LOCAL_BASE_MODEL_PATH is_trocr_base = 'trocr' in base_model_name.lower() or os.path.exists(os.path.join(LOCAL_BASE_MODEL_PATH, "config.json")) elif USE_OFFLINE_MODE: print(f" 📦 Offline mode enabled - using cached model: {base_model_name}") print(f" ⚠️ If model not cached, loading will fail") if self._mode_used == "hybrid" and self._device_info['cuda_available']: self._load_hybrid_mode(device_map, base_model_name=base_model_name, is_trocr=is_trocr_base) elif self._mode_used == "gpu" and self._device_info['cuda_available']: self._load_gpu_mode(device_map, base_model_name=base_model_name, is_trocr=is_trocr_base) else: self._load_cpu_mode(base_model_name=base_model_name, is_trocr=is_trocr_base) # Load Hurricane OCR LoRA adapter if available if use_hurricane and is_lora_adapter: self._load_hurricane_adapter() model_name = "Hurricane OCR" if self._using_hurricane else "Typhoon OCR" print(f"\n✅ {model_name} loaded successfully!") print(f" Mode: {self._mode_used.upper()}") print(f" Fine-tuned: {'Yes (LoRA)' if self._using_hurricane else 'No (Base)'}") if hasattr(self.model, 'hf_device_map'): devices_used = set(str(v) for v in self.model.hf_device_map.values()) print(f" Devices: {', '.join(devices_used)}") # Load YOLOv8 detector if enabled if self.use_detection: try: # Priority 1: Use HuggingFace model (HURRICANE_OD_MODEL_PATH) # Priority 2: Check for local HurricaneOD_beta # Priority 3: Fallback to pretrained YOLOv8n model_path_to_use = HURRICANE_OD_MODEL_PATH # If HuggingFace path not set, check for local models if not model_path_to_use: hurricaneod_path = Path("hurricane_ocr_model/HurricaneOD_beta/HurricaneOD_beta.pt") training_best_path = Path("HurricaneOD/HurricaneOD_beta/weights/best.pt") training_last_path = Path("HurricaneOD/HurricaneOD_beta/weights/last.pt") if hurricaneod_path.exists(): model_path_to_use = str(hurricaneod_path) elif training_best_path.exists(): model_path_to_use = str(training_best_path) elif training_last_path.exists(): model_path_to_use = str(training_last_path) if model_path_to_use: self.plate_detector = PlateDetector( model_size="n", conf_threshold=0.25, model_path=model_path_to_use ) else: print(f" ⚠️ HurricaneOD_beta model not found, using pretrained YOLOv8n") print(f" Expected: HuggingFace model or local paths") print(f" - HuggingFace: {HURRICANE_OD_MODEL_PATH}") print(f" - Local: hurricane_ocr_model/HurricaneOD_beta/HurricaneOD_beta.pt") self.plate_detector = PlateDetector(model_size="n", conf_threshold=0.25) device = "cuda" if self._device_info['cuda_available'] else "cpu" self.plate_detector.load(device=device) except Exception as e: print(f" ⚠️ Warning: Could not load YOLOv8 detector: {e}") print(" Continuing without detection (will process full image)") self.use_detection = False self._is_loaded = True elapsed = time.time() - start_time print(f"\n⏱️ Total loading time: {elapsed:.2f} seconds") def _load_hurricane_adapter(self) -> None: """Load Hurricane OCR LoRA adapter""" try: from peft import PeftModel print(f"\n🌀 Loading Hurricane OCR adapter from: {self.hurricane_model_path}") print(f" ⚠️ Note: LoRA adapter requires base model to be loaded first") print(f" 💡 This is normal - LoRA adapters work on top of base models") print(f" 💡 Base model is cached by HuggingFace (~/.cache/huggingface/)") print(f" 💡 First load: downloads base model (~2-5GB, 30-60s)") print(f" 💡 Next loads: uses cached base model (~5-10s)") print(f" 💡 Tip: Use merge_and_unload() to create merged model (faster loading)") # Load the LoRA adapter self.model = PeftModel.from_pretrained( self.model, self.hurricane_model_path, is_trainable=False # Inference only ) # Option to merge adapter for faster inference (saves merged model) # Uncomment the code below to merge and save merged model (one-time operation) # This will create a merged model that doesn't need base model loading # After merging, set HURRICANE_MODEL_PATH to the merged/ directory # # print(" 🔄 Merging LoRA adapter into base model...") # print(" ⏳ This may take a few minutes...") # merged_model = self.model.merge_and_unload() # merged_path = os.path.join(self.hurricane_model_path, "merged") # os.makedirs(merged_path, exist_ok=True) # print(f" 💾 Saving merged model to: {merged_path}") # merged_model.save_pretrained(merged_path) # self.processor.save_pretrained(merged_path) # print(" ✅ Merged model saved!") # print(f" 💡 Next time, set HURRICANE_MODEL_PATH to: {merged_path}") # print(f" 💡 This will load faster (no base model needed)") self._using_hurricane = True print(" ✓ Hurricane OCR adapter loaded successfully!") except ImportError: print(" ⚠️ PEFT not installed. Install with: pip install peft") print(" Using base Typhoon OCR model instead.") except Exception as e: print(f" ⚠️ Failed to load Hurricane adapter: {e}") print(" Using base Typhoon OCR model instead.") def _get_device_map(self) -> Any: """Determine device map based on mode""" if self.device_mode == "cpu": self._mode_used = "cpu" self._device = "cpu" return "cpu" if not self._device_info['cuda_available']: print(" ⚠️ GPU not available, falling back to CPU") self._mode_used = "cpu" self._device = "cpu" return "cpu" if self.device_mode == "gpu": self._mode_used = "gpu" self._device = "cuda" return "cuda:0" if self.device_mode == "hybrid": self._mode_used = "hybrid" self._device = "cuda" # Auto device map with memory limits return "auto" # Auto mode self._mode_used = "gpu" if self._device_info['cuda_available'] else "cpu" self._device = "cuda" if self._device_info['cuda_available'] else "cpu" return "auto" if self._device_info['cuda_available'] else "cpu" def _load_hybrid_mode(self, device_map, base_model_name=None, is_trocr=False): """Load model in hybrid GPU+CPU mode""" print(" Using HYBRID mode (GPU + CPU)") print(f" Max GPU Memory: {self.max_gpu_memory}") model_name = base_model_name or self.BASE_MODEL_NAME model_class = AutoModelForVision2Seq if (is_trocr and AutoModelForVision2Seq is not None) else AutoModelForImageTextToText if self.use_8bit: if BitsAndBytesConfig is None: raise ImportError("bitsandbytes or compatible BitsAndBytesConfig not available. Install bitsandbytes and retry (pip install bitsandbytes).") print(" Using 4-bit quantization (NF4) + CPU offloading") bnb_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_compute_dtype=torch.bfloat16, bnb_4bit_use_double_quant=True, bnb_4bit_quant_type="nf4", llm_int8_enable_fp32_cpu_offload=True # Enable CPU offloading ) self.model = model_class.from_pretrained( model_name, quantization_config=bnb_config, device_map=device_map, max_memory={0: self.max_gpu_memory, "cpu": "4GB"}, trust_remote_code=True, offload_folder="offload_weights" ) else: print(" Using float16 + CPU offloading") self.model = model_class.from_pretrained( model_name, device_map=device_map, max_memory={0: self.max_gpu_memory, "cpu": "4GB"}, trust_remote_code=True, dtype=torch.float16, offload_folder="offload_weights" ) def _load_gpu_mode(self, device_map, base_model_name=None, is_trocr=False): """Load model on GPU only""" print(" Using GPU-only mode") model_name = base_model_name or self.BASE_MODEL_NAME model_class = AutoModelForVision2Seq if (is_trocr and AutoModelForVision2Seq is not None) else AutoModelForImageTextToText # Prepare loading kwargs load_kwargs = { "device_map": device_map, "trust_remote_code": True } # Add offline mode if enabled if USE_OFFLINE_MODE: load_kwargs["local_files_only"] = True print(" 📦 Offline mode: local_files_only=True") if self.use_8bit: print(" Using 4-bit quantization (NF4)") bnb_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_compute_dtype=torch.bfloat16, bnb_4bit_use_double_quant=True, bnb_4bit_quant_type="nf4", llm_int8_enable_fp32_cpu_offload=False ) load_kwargs["quantization_config"] = bnb_config self.model = model_class.from_pretrained(model_name, **load_kwargs) else: print(" Using float16 precision") load_kwargs["dtype"] = torch.float16 self.model = model_class.from_pretrained(model_name, **load_kwargs) def _load_merged_model(self, model_path: str, device_map: Any, mode: str = "auto"): """Load merged model directly (no base model + LoRA needed)""" print(f" Using merged model (no base model loading needed)") if mode == "hybrid" and self._device_info['cuda_available']: print(" Using HYBRID mode (GPU + CPU)") print(f" Max GPU Memory: {self.max_gpu_memory}") self.model = AutoModelForImageTextToText.from_pretrained( model_path, device_map=device_map, max_memory={0: self.max_gpu_memory, "cpu": "4GB"}, trust_remote_code=True, dtype=torch.float16, offload_folder="offload_weights" ) elif mode == "gpu" and self._device_info['cuda_available']: print(" Using GPU-only mode") print(" Using float16 precision") self.model = AutoModelForImageTextToText.from_pretrained( model_path, device_map=device_map, trust_remote_code=True, dtype=torch.float16 ) else: print(" Using CPU-only mode") print(" Using float32 precision") self.model = AutoModelForImageTextToText.from_pretrained( model_path, device_map="cpu", trust_remote_code=True, dtype=torch.float32 ) self._using_hurricane = True def _load_cpu_mode(self, base_model_name=None, is_trocr=False): """Load model on CPU only""" print(" Using CPU-only mode") print(" Using float32 precision") self._mode_used = "cpu" self._device = "cpu" model_name = base_model_name or self.BASE_MODEL_NAME model_class = AutoModelForVision2Seq if (is_trocr and AutoModelForVision2Seq is not None) else AutoModelForImageTextToText # Prepare loading kwargs load_kwargs = { "device_map": "cpu", "trust_remote_code": True, "dtype": torch.float32 } # Add offline mode if enabled if USE_OFFLINE_MODE: load_kwargs["local_files_only"] = True print(" 📦 Offline mode: local_files_only=True") self.model = model_class.from_pretrained(model_name, **load_kwargs) @property def device(self) -> str: """Get current device""" return self._device @property def mode_used(self) -> str: """Get actual mode being used""" return self._mode_used @property def device_info(self) -> dict: """Get device information""" return self._device_info def resize_image(self, img: Image.Image, max_size: int = None) -> Image.Image: """ Resize image if it exceeds max dimensions Args: img: PIL Image object max_size: Maximum dimension size (default: MAX_IMAGE_SIZE) Returns: Resized PIL Image """ if max_size is None: max_size = self.MAX_IMAGE_SIZE width, height = img.size if width > max_size or height > max_size: if width >= height: scale = max_size / float(width) new_size = (max_size, int(height * scale)) else: scale = max_size / float(height) new_size = (int(width * scale), max_size) img = img.resize(new_size, Image.Resampling.LANCZOS) print(f"Original size: {width, height} ==> Resized to: {img.size}") return img def perform_ocr( self, image: Image.Image, prompt: str = None, max_new_tokens: int = 128, # Reduced to 128 for lower RAM (enough for license plates) return_detection_info: bool = False, cropped_image: Image.Image = None, detection_info: Dict[str, Any] = None ) -> str: """ Perform OCR on an image with optional YOLOv8 plate detection Pipeline: 1. Step 1: License Plate Detection - If cropped_image is provided: Use it directly (detection already done in app.py) - If cropped_image is None: Run HurricaneOD_beta detection to crop plate - Result: img_to_ocr (cropped plate or full image) 2. Step 2: OCR Processing - Resize image if needed - Prepare prompt and messages - Run Hurricane OCR model to extract text - Result: OCR text output Note: This function is called from app.py which already does detection. If cropped_image is passed, it will be used directly (no duplicate detection). Args: image: PIL Image object (original full image) prompt: Custom prompt for OCR (default: Thai license plate extraction) max_new_tokens: Maximum tokens to generate (not used for PaddleOCR) return_detection_info: If True, return tuple (ocr_text, detection_info) cropped_image: Pre-cropped plate image (optional, to avoid duplicate detection) detection_info: Pre-computed detection info (optional, to avoid duplicate detection) Returns: OCR result text, or tuple (text, detection_info) if return_detection_info=True """ if not self._is_loaded: raise RuntimeError("Model not loaded. Call load() first.") # Use provided detection info or initialize default if detection_info is None: detection_info = { "detected": False, "bbox": None, "confidence": None, "used_full_image": True } # Step 1: Use pre-cropped image if provided, otherwise detect and crop print(" 🔄 Step 1: License Plate Detection...") if cropped_image is not None: # Use pre-cropped image (detection already done) img_to_ocr = cropped_image.convert("RGB") if detection_info.get("detected", False): print(f" 🔍 Using pre-cropped plate (confidence: {detection_info.get('confidence', 0):.2f})") else: # Detect and crop license plate if detection is enabled img_to_ocr = image.convert("RGB") if self.use_detection and self.plate_detector is not None: try: print(" 🔍 Running HurricaneOD_beta detection...") cropped_plate, detections = self.plate_detector.detect_and_crop(img_to_ocr, return_all=False) if cropped_plate is not None and len(detections) > 0: # Use cropped plate for OCR img_to_ocr = cropped_plate detection_info = { "detected": True, "bbox": detections[0]["bbox"], "confidence": detections[0]["confidence"], "used_full_image": False, "all_detections": detections } print(f" ✅ Detected plate (confidence: {detections[0]['confidence']:.2f})") print(f" 📐 Bounding box: {detections[0]['bbox']}") else: print(" ⚠️ No plate detected, using full image") detection_info["used_full_image"] = True except Exception as e: print(f" ⚠️ Detection error: {e}, using full image") detection_info["used_full_image"] = True else: print(" ⚠️ Detection disabled or detector not available, using full image") detection_info["used_full_image"] = True print(f" ✓ Step 1 completed - Image size: {img_to_ocr.size}") # Step 2: Perform OCR on (cropped) image try: print(" 🔄 Starting OCR processing...") # Check if model and processor are loaded if self.model is None: raise RuntimeError("Model is not loaded. Please load the model first.") if self.processor is None: raise RuntimeError("Processor is not loaded. Please load the processor first.") # Resize image if needed (reduce size for faster processing) print(" 🔄 Resizing image if needed...") # Reduce max size for faster OCR processing original_max_size = self.MAX_IMAGE_SIZE self.MAX_IMAGE_SIZE = min(original_max_size, 1024) # Limit to 1024px for faster processing img = self.resize_image(img_to_ocr) self.MAX_IMAGE_SIZE = original_max_size # Restore original print(f" ✓ Image ready for OCR (size: {img.size})") # Detect model type (TrOCR or Vision-Language) is_trocr = False model_class_name = self.model.__class__.__name__ if "Vision2Seq" in model_class_name or "TrOCR" in model_class_name: is_trocr = True elif hasattr(self.processor, 'apply_chat_template'): # Check processor type processor_class_name = self.processor.__class__.__name__ if "TrOCR" in processor_class_name: is_trocr = True if is_trocr: # TrOCR format: Direct image processing (no chat template) print(" 🔄 Processing with TrOCR format (direct image-to-text)...") inputs = self.processor(images=img, return_tensors="pt") else: # Vision-Language format (Qwen3VL/Typhoon): Chat template # Default prompt for Thai license plate if prompt is None: prompt = """อ่านและดึงข้อมูลจากรูปป้ายทะเบียนรถไทยนี้ โดยระบุข้อมูลต่อไปนี้: - เลขทะเบียน (Plate Number): ตัวอักษรและตัวเลขบนป้าย เช่น "กก 1234" หรือ "1กก 5678" - ตัวอักษร (Characters): ส่วนตัวอักษรไทย เช่น "กก" หรือ "1กก" - ตัวเลข (Digits): ส่วนตัวเลข เช่น "1234" - จังหวัด (Province): ชื่อจังหวัดบนป้าย เช่น "กรุงเทพมหานคร" - ประเภทรถ (Vehicle Type): ถ้ามีระบุ - สีป้าย (Plate Color): ขาว, เขียว, เหลือง, แดง ฯลฯ กรุณาอ่านข้อความทั้งหมดที่เห็นบนป้ายทะเบียน: """ print(" 🔄 Preparing messages...") messages = [ { "role": "user", "content": [ { "type": "image", "image": img, }, { "type": "text", "text": prompt } ], } ] # Prepare inputs print(" 🔄 Applying chat template and tokenizing...") inputs = self.processor.apply_chat_template( messages, tokenize=True, add_generation_prompt=True, return_dict=True, return_tensors="pt" ) # Move inputs to model device # Get actual model device (handle device_map case) try: if hasattr(self.model, 'hf_device_map'): # Model is using device_map, get first device first_device = list(self.model.hf_device_map.values())[0] if isinstance(first_device, torch.device): model_device = first_device else: model_device = torch.device(first_device) else: model_device = next(self.model.parameters()).device except: # Fallback: try to get device from model try: model_device = self.model.device if hasattr(self.model, 'device') else torch.device('cpu') except: model_device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') print(f" 🔄 Moving inputs to device: {model_device}") inputs = {k: v.to(model_device) if isinstance(v, torch.Tensor) else v for k, v in inputs.items()} print(" ✓ Inputs prepared") # Only remove image_grid_thw for TrOCR models (they don't use it) # Vision-Language models (Qwen3VL/Typhoon) NEED image_grid_thw! if is_trocr and "image_grid_thw" in inputs: inputs.pop("image_grid_thw") print(" 🔍 Removed unused key: image_grid_thw (TrOCR model)") # Generate output with optimized parameters for speed print(f" 🔄 Generating OCR output (max_new_tokens={min(max_new_tokens, 128)})...") print(" ⏳ This may take a while...") # Check model device print(f" 📍 Model device: {model_device}") if model_device.type == 'cpu': print(" ⚠️ WARNING: Model is on CPU! This will be VERY slow.") print(" 💡 Consider using GPU mode for faster inference") # Use optimized generation parameters for faster inference import time gen_start = time.time() # Get tokenizer eos and pad tokens tokenizer = self.processor.tokenizer pad_token_id = getattr(tokenizer, 'pad_token_id', None) or getattr(tokenizer, 'eos_token_id', None) eos_token_id = getattr(tokenizer, 'eos_token_id', None) with torch.no_grad(): # Disable gradient computation for faster inference # Use only valid parameters for Qwen3VL model # Note: early_stopping, temperature, top_p, top_k are not valid for greedy decoding (do_sample=False, num_beams=1) generation_kwargs = { "max_new_tokens": min(max_new_tokens, 128), # Capped at 128 for lower RAM (enough for license plates) "do_sample": False, # Use greedy decoding (faster than sampling) "use_cache": True, # Enable KV cache for faster generation "num_beams": 1, # Greedy search (fastest) } # Add token IDs if available if pad_token_id is not None: generation_kwargs["pad_token_id"] = pad_token_id if eos_token_id is not None: generation_kwargs["eos_token_id"] = eos_token_id generated_ids = self.model.generate(**inputs, **generation_kwargs) gen_elapsed = time.time() - gen_start print(f" ✓ Generation completed in {gen_elapsed:.2f} seconds") if gen_elapsed > 60: print(f" ⚠️ WARNING: Generation took {gen_elapsed:.2f} seconds (>1 minute)") print(f" 💡 This is unusually slow. Check if model is on GPU.") # Decode output based on model type print(" 🔄 Decoding output...") if is_trocr: # TrOCR: Direct decode ocr_result = self.processor.decode(generated_ids[0], skip_special_tokens=True) else: # Vision-Language: Trim input_ids and decode input_ids = inputs['input_ids'] if isinstance(inputs, dict) else inputs.input_ids generated_ids_trimmed = [ out_ids[len(in_ids):] for in_ids, out_ids in zip(input_ids, generated_ids) ] output_text = self.processor.batch_decode( generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False ) ocr_result = output_text[0] # Extract assistant response if present if "<|assistant|>" in ocr_result: ocr_result = ocr_result.split("<|assistant|>")[-1].strip() print(f" ✓ OCR completed successfully") print(f" 📝 OCR Result length: {len(ocr_result)} characters") except Exception as e: import traceback error_details = traceback.format_exc() print(f" ❌ Error in Step 2 (OCR): {e}") print(f" 📋 Error details:\n{error_details}") # Return error message instead of crashing ocr_result = f"[OCR ERROR] {str(e)}\n\nError details:\n{error_details}" # Memory cleanup after inference (reduce RAM usage across requests) if torch.cuda.is_available(): torch.cuda.empty_cache() import gc gc.collect() # Return result with optional detection info if return_detection_info: return ocr_result, detection_info return ocr_result @property def is_loaded(self) -> bool: """Check if model is loaded""" return self._is_loaded # Global model instance for Gradio app _global_model: Optional[OCRModel] = None def get_model(device_mode: str = None, use_8bit: bool = True) -> OCRModel: """Get or create global model instance""" global _global_model if _global_model is None: _global_model = OCRModel(device_mode=device_mode, use_8bit=use_8bit) return _global_model def load_model(device_mode: str = None, use_8bit: bool = True) -> OCRModel: """ Load and return global model instance Args: device_mode: "auto", "gpu", "cpu", or "hybrid" (default: uses global DEVICE_MODE) use_8bit: Whether to use 8-bit quantization (default: True, saves VRAM) Device Modes: - "auto": Use GPU if available, fallback to CPU - "gpu": Force GPU only - "cpu": Force CPU only - "hybrid": GPU + CPU working together (recommended for limited VRAM) """ model = get_model(device_mode=device_mode, use_8bit=use_8bit) if not model.is_loaded: model.load() return model def check_gpu() -> str: """Check GPU availability and return status message""" info = get_device_info() if info["cuda_available"]: return f"""✅ GPU Available! Device: {info['cuda_device_name']} VRAM Total: {info['cuda_memory_total']} VRAM Free: {info['cuda_memory_free']} CPU Cores: {info['cpu_count']} Recommended Modes: - VRAM >= 8GB: Use "gpu" mode - VRAM 4-8GB: Use "hybrid" mode - No GPU: Use "cpu" mode""" else: return f"""❌ GPU Not Available CPU Cores: {info['cpu_count']} Mode: CPU only (slower) To use GPU, please install: 1. NVIDIA GPU drivers 2. CUDA Toolkit 3. PyTorch with CUDA support"""