Spaces:
Sleeping
Sleeping
| # -*- coding: utf-8 -*- | |
| """ | |
| OCR Model Module | |
| Handles loading and inference of the Hurricane OCR / Typhoon OCR model | |
| Supports: GPU-only, CPU-only, and Hybrid (GPU+CPU) modes | |
| Supports: Base model or Fine-tuned LoRA model | |
| """ | |
| import sys | |
| import io | |
| import os | |
| # Fix Windows console encoding for Thai characters and emojis | |
| if sys.platform == 'win32': | |
| sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace') | |
| sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace') | |
| import torch | |
| import importlib | |
| # Import transformer symbols safely β some installs may not provide newer classes | |
| _transformers = importlib.import_module("transformers") | |
| AutoProcessor = getattr(_transformers, "AutoProcessor") | |
| AutoModelForImageTextToText = getattr(_transformers, "AutoModelForImageTextToText") | |
| AutoModelForVision2Seq = getattr(_transformers, "AutoModelForVision2Seq", None) | |
| BitsAndBytesConfig = getattr(_transformers, "BitsAndBytesConfig", None) | |
| AutoTokenizer = getattr(_transformers, "AutoTokenizer") | |
| from PIL import Image | |
| import numpy as np | |
| from typing import Optional, Dict, Any, Tuple, List, Union | |
| from pathlib import Path | |
| import time | |
| # ============================================================ | |
| # CONFIGURATION - Change these settings as needed | |
| # ============================================================ | |
| # Model Options: | |
| # - Set HURRICANE_MODEL_PATH to use fine-tuned Hurricane OCR model | |
| # - Default: Rattatammanoon/hurricane-ocr-tlpr-v1-LoRA (LoRA adapter from HuggingFace) | |
| # - Set to None to use base Typhoon OCR model | |
| # Priority: Using Hurricane OCR LoRA from HuggingFace | |
| HURRICANE_MODEL_PATH = "Rattatammanoon/hurricane-ocr-tlpr-v1-LoRA" # HuggingFace model | |
| # Object Detection Model (YOLOv8-based) | |
| # Set to HuggingFace model or local path for HurricaneOD_beta | |
| HURRICANE_OD_MODEL_PATH = "Rattatammanoon/hurricane-od-thai-plate-detector" # HuggingFace model | |
| # Device Mode Options: | |
| # "auto" - Automatically use GPU if available, fallback to CPU | |
| # "gpu" - Force GPU only (will error if no GPU) | |
| # "cpu" - Force CPU only | |
| # "hybrid" - Use GPU + CPU together (offload to CPU when VRAM is full) | |
| DEVICE_MODE = "hybrid" | |
| # Memory settings | |
| MAX_GPU_MEMORY = "3GB" # Maximum GPU memory (lower = more CPU offload, less total RAM) | |
| # Offline Mode Settings | |
| # Set to True to use only local files (no internet required) | |
| # Base model must be downloaded and cached first | |
| USE_OFFLINE_MODE = False # Set to True for offline usage | |
| LOCAL_BASE_MODEL_PATH = None # Optional: Path to local base model (e.g., "./models/thai-trocr") | |
| # ============================================================ | |
| def get_device_info() -> dict: | |
| """Get information about available devices""" | |
| info = { | |
| "cuda_available": torch.cuda.is_available(), | |
| "cuda_device_count": torch.cuda.device_count() if torch.cuda.is_available() else 0, | |
| "cuda_device_name": torch.cuda.get_device_name(0) if torch.cuda.is_available() else None, | |
| "cuda_memory_total": None, | |
| "cuda_memory_free": None, | |
| "cuda_memory_total_gb": 0, | |
| "cpu_count": os.cpu_count(), | |
| } | |
| if info["cuda_available"]: | |
| try: | |
| total = torch.cuda.get_device_properties(0).total_memory / (1024**3) | |
| allocated = torch.cuda.memory_allocated(0) / (1024**3) | |
| free = total - allocated | |
| info["cuda_memory_total"] = f"{total:.1f} GB" | |
| info["cuda_memory_free"] = f"{free:.1f} GB" | |
| info["cuda_memory_total_gb"] = total | |
| except: | |
| pass | |
| return info | |
| class PlateDetector: | |
| """ | |
| YOLOv8n-based License Plate Detector | |
| Detects license plates in images before OCR processing | |
| Supports: | |
| - Pretrained YOLOv8 models (yolov8n.pt, etc.) | |
| - Custom trained models (HurricaneOD_beta.pt) | |
| """ | |
| def __init__( | |
| self, | |
| model_size: str = "n", | |
| conf_threshold: float = 0.25, | |
| iou_threshold: float = 0.45, | |
| model_path: Optional[str] = None | |
| ): | |
| """ | |
| Initialize YOLOv8 detector | |
| Args: | |
| model_size: YOLOv8 model size - "n" (nano), "s" (small), "m" (medium), "l" (large), "x" (xlarge) | |
| conf_threshold: Confidence threshold for detection (0.0-1.0) | |
| iou_threshold: IoU threshold for NMS (0.0-1.0) | |
| model_path: Path to custom trained model (e.g., HurricaneOD_beta.pt). If None, uses pretrained model. | |
| """ | |
| self.model_size = model_size | |
| self.conf_threshold = conf_threshold | |
| self.iou_threshold = iou_threshold | |
| # Keep model_path as string to support HuggingFace paths (e.g., "username/model-name") | |
| # Will convert to Path only for local file paths in load() method | |
| self.model_path = model_path | |
| self.model = None | |
| self._is_loaded = False | |
| def load(self, device: str = "auto"): | |
| """Load YOLOv8 model""" | |
| if self._is_loaded: | |
| return | |
| try: | |
| from ultralytics import YOLO | |
| # Determine device | |
| if device == "auto": | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| # Check for custom trained model (HurricaneOD_beta) | |
| # Priority: 1) Provided model_path, 2) HuggingFace, 3) hurricane_ocr_model, 4) training folder, 5) pretrained | |
| model_path_to_load = None | |
| model_source = None | |
| is_huggingface = False | |
| # Check if model_path is a HuggingFace path FIRST (format: username/model-name) | |
| if self.model_path and "/" in str(self.model_path) and "\\" not in str(self.model_path): | |
| parts = str(self.model_path).split("/") | |
| if len(parts) == 2 and not str(self.model_path).startswith("."): | |
| # This looks like a HuggingFace path | |
| model_path_to_load = str(self.model_path) | |
| model_source = "HuggingFace Hub" | |
| is_huggingface = True | |
| # Check if model_path is a local file path | |
| if not model_path_to_load and self.model_path: | |
| local_path = Path(self.model_path) | |
| if local_path.exists(): | |
| model_path_to_load = str(local_path) | |
| model_source = "provided path" | |
| if not model_path_to_load: | |
| # Try to find HurricaneOD_beta model locally | |
| # Priority 1: hurricane_ocr_model (recommended location) | |
| hurricaneod_path = Path("hurricane_ocr_model/HurricaneOD_beta/HurricaneOD_beta.pt") | |
| # Priority 2: training folder weights | |
| training_best_path = Path("HurricaneOD/HurricaneOD_beta/weights/best.pt") | |
| training_last_path = Path("HurricaneOD/HurricaneOD_beta/weights/last.pt") | |
| if hurricaneod_path.exists(): | |
| model_path_to_load = str(hurricaneod_path) | |
| model_source = "hurricane_ocr_model/HurricaneOD_beta" | |
| elif training_best_path.exists(): | |
| model_path_to_load = str(training_best_path) | |
| model_source = "HurricaneOD/HurricaneOD_beta/weights (best.pt)" | |
| elif training_last_path.exists(): | |
| model_path_to_load = str(training_last_path) | |
| model_source = "HurricaneOD/HurricaneOD_beta/weights (last.pt)" | |
| if model_path_to_load: | |
| if is_huggingface: | |
| print(f"π Loading HurricaneOD_beta model from: HuggingFace Hub") | |
| print(f" π¦ Model: {model_path_to_load}") | |
| print(f" π‘ First load: downloads from HuggingFace (~6-10s)") | |
| print(f" π‘ Next loads: uses cached model (~1-2s)") | |
| print(f" π Cache: ~/.cache/huggingface/hub/") | |
| else: | |
| print(f"π Loading HurricaneOD_beta model from: Local File") | |
| print(f" π Source: {model_source}") | |
| # Convert to absolute path for display (local files only) | |
| abs_path = Path(model_path_to_load).resolve() | |
| print(f" π Path: {abs_path}") | |
| print(f" π₯οΈ Device: {device}") | |
| import time | |
| det_start = time.time() | |
| try: | |
| # For HuggingFace models, download via huggingface_hub to track downloads | |
| if is_huggingface: | |
| try: | |
| from huggingface_hub import hf_hub_download, list_repo_files | |
| print(f" β³ Checking available files in HuggingFace repo...") | |
| # List all files in the repository | |
| try: | |
| repo_files = list_repo_files(repo_id=model_path_to_load, repo_type="model") | |
| pt_files = [f for f in repo_files if f.endswith('.pt')] | |
| print(f" π Found .pt files: {pt_files}") | |
| except: | |
| pt_files = [] | |
| # Try multiple possible filenames (most likely first) | |
| possible_filenames = [ | |
| "HurricaneOD_beta.pt", # Primary filename for HurricaneOD | |
| "best.pt", | |
| "model.pt", | |
| "weights/best.pt" | |
| ] | |
| # If we found files, use the first .pt file | |
| if pt_files: | |
| possible_filenames = pt_files + possible_filenames | |
| local_model_path = None | |
| for filename in possible_filenames: | |
| try: | |
| print(f" β³ Trying to download: {filename}") | |
| local_model_path = hf_hub_download( | |
| repo_id=model_path_to_load, | |
| filename=filename, | |
| repo_type="model" | |
| ) | |
| print(f" β Downloaded: {filename}") | |
| print(f" π₯ Saved to: {local_model_path}") | |
| break | |
| except Exception as e: | |
| print(f" β οΈ {filename} not found: {e}") | |
| continue | |
| if local_model_path: | |
| model_path_to_load = local_model_path | |
| else: | |
| raise FileNotFoundError( | |
| f"No YOLO model file (.pt) found in {model_path_to_load}\n" | |
| f" Please upload one of these files to your HuggingFace repo:\n" | |
| f" - best.pt (recommended)\n" | |
| f" - HurricaneOD_beta.pt\n" | |
| f" - model.pt\n" | |
| f" Available files: {repo_files if 'repo_files' in locals() else 'unknown'}" | |
| ) | |
| except ImportError: | |
| print(f" β οΈ huggingface_hub not installed, downloads won't be tracked") | |
| print(f" π‘ Install with: pip install huggingface_hub") | |
| raise | |
| except Exception as e: | |
| print(f" β Could not download from HuggingFace: {e}") | |
| print(f" π‘ Make sure you uploaded a .pt file to the repository") | |
| print(f" π‘ Repository: https://huggingface.co/{model_path_to_load}") | |
| raise | |
| self.model = YOLO(model_path_to_load) | |
| if self.model is None: | |
| raise RuntimeError(f"YOLO returned None when loading {model_path_to_load}") | |
| except Exception as load_error: | |
| raise RuntimeError(f"Failed to load YOLOv8 model from {model_path_to_load}: {load_error}") | |
| det_elapsed = time.time() - det_start | |
| print(f" β HurricaneOD_beta model loaded successfully! (took {det_elapsed:.2f} seconds)") | |
| else: | |
| # Fallback to pretrained YOLOv8 model (not recommended) | |
| model_name = f"yolov8{self.model_size}.pt" | |
| print(f"β οΈ HurricaneOD_beta model not found, using pretrained YOLOv8{self.model_size}") | |
| print(f" Device: {device}") | |
| print(f" β³ Downloading pretrained model (first time only, ~6MB)...") | |
| print(f" Note: For better results, train and use HurricaneOD_beta model") | |
| import time | |
| det_start = time.time() | |
| try: | |
| self.model = YOLO(model_name) | |
| if self.model is None: | |
| raise RuntimeError(f"YOLO returned None when loading {model_name}") | |
| except Exception as load_error: | |
| raise RuntimeError(f"Failed to load YOLOv8 model {model_name}: {load_error}") | |
| det_elapsed = time.time() - det_start | |
| print(f" β YOLOv8 detector loaded (pretrained) (took {det_elapsed:.2f} seconds)") | |
| # Verify model is loaded before marking as loaded | |
| if self.model is None: | |
| raise RuntimeError("YOLOv8 model is None after loading. Model loading failed.") | |
| self._is_loaded = True | |
| except ImportError: | |
| raise ImportError( | |
| "ultralytics not installed. Install with: pip install ultralytics\n" | |
| "Note: YOLOv8 will use pretrained COCO model. For better results, " | |
| "fine-tune on Thai license plate dataset." | |
| ) | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to load YOLOv8 model: {e}") | |
| def detect(self, image: Image.Image) -> List[Dict[str, Any]]: | |
| """ | |
| Detect license plates in image | |
| Args: | |
| image: PIL Image | |
| Returns: | |
| List of detections with bounding boxes, confidence scores | |
| Format: [{"bbox": [x1, y1, x2, y2], "confidence": float, "class": int}, ...] | |
| """ | |
| if not self._is_loaded: | |
| self.load() | |
| # Check if model is loaded | |
| if self.model is None: | |
| raise RuntimeError( | |
| "Plate detector model is not loaded. " | |
| "Please ensure YOLOv8 model was loaded successfully." | |
| ) | |
| # Convert PIL to numpy array | |
| img_array = np.array(image) | |
| # Run detection | |
| results = self.model.predict( | |
| img_array, | |
| conf=self.conf_threshold, | |
| iou=self.iou_threshold, | |
| verbose=False | |
| ) | |
| detections = [] | |
| if results and len(results) > 0: | |
| result = results[0] | |
| # Extract boxes, confidences, classes | |
| if result.boxes is not None: | |
| boxes = result.boxes.xyxy.cpu().numpy() # [x1, y1, x2, y2] | |
| confidences = result.boxes.conf.cpu().numpy() | |
| classes = result.boxes.cls.cpu().numpy().astype(int) | |
| for i in range(len(boxes)): | |
| detections.append({ | |
| "bbox": boxes[i].tolist(), | |
| "confidence": float(confidences[i]), | |
| "class": int(classes[i]) | |
| }) | |
| return detections | |
| def crop_plate(self, image: Image.Image, bbox: List[float], padding: int = 10) -> Image.Image: | |
| """ | |
| Crop license plate region from image | |
| Args: | |
| image: PIL Image | |
| bbox: Bounding box [x1, y1, x2, y2] | |
| padding: Padding pixels around the bbox | |
| Returns: | |
| Cropped PIL Image | |
| """ | |
| x1, y1, x2, y2 = bbox | |
| # Add padding | |
| width, height = image.size | |
| x1 = max(0, int(x1) - padding) | |
| y1 = max(0, int(y1) - padding) | |
| x2 = min(width, int(x2) + padding) | |
| y2 = min(height, int(y2) + padding) | |
| # Crop | |
| cropped = image.crop((x1, y1, x2, y2)) | |
| return cropped | |
| def _get_coco_class_name(self, class_id: int) -> str: | |
| """Get COCO class name from class ID""" | |
| coco_classes = [ | |
| 'person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus', 'train', 'truck', | |
| 'boat', 'traffic light', 'fire hydrant', 'stop sign', 'parking meter', 'bench', | |
| 'bird', 'cat', 'dog', 'horse', 'sheep', 'cow', 'elephant', 'bear', 'zebra', | |
| 'giraffe', 'backpack', 'umbrella', 'handbag', 'tie', 'suitcase', 'frisbee', | |
| 'skis', 'snowboard', 'sports ball', 'kite', 'baseball bat', 'baseball glove', | |
| 'skateboard', 'surfboard', 'tennis racket', 'bottle', 'wine glass', 'cup', | |
| 'fork', 'knife', 'spoon', 'bowl', 'banana', 'apple', 'sandwich', 'orange', | |
| 'broccoli', 'carrot', 'hot dog', 'pizza', 'donut', 'cake', 'chair', 'couch', | |
| 'potted plant', 'bed', 'dining table', 'toilet', 'tv', 'laptop', 'mouse', | |
| 'remote', 'keyboard', 'cell phone', 'microwave', 'oven', 'toaster', 'sink', | |
| 'refrigerator', 'book', 'clock', 'vase', 'scissors', 'teddy bear', 'hair drier', | |
| 'toothbrush' | |
| ] | |
| if 0 <= class_id < len(coco_classes): | |
| return coco_classes[class_id] | |
| return f"class_{class_id}" | |
| def _select_best_plate_region(self, image: Image.Image, detections: List[Dict]) -> Optional[Dict]: | |
| """ | |
| Select the best detection for license plate region | |
| Strategy: | |
| 1. Prefer detections in bottom-center region (where plates usually are) | |
| 2. Prefer vehicle classes (car, truck, bus, motorcycle) | |
| 3. Use aspect ratio filtering (plates are usually wider than tall) | |
| 4. Prefer detections with higher confidence | |
| """ | |
| if not detections: | |
| return None | |
| width, height = image.size | |
| center_x = width / 2 | |
| bottom_y = height * 0.7 # Bottom 30% of image | |
| # Vehicle class IDs in COCO | |
| vehicle_classes = {2: 'car', 3: 'motorcycle', 5: 'bus', 7: 'truck'} | |
| scored_detections = [] | |
| for det in detections: | |
| bbox = det["bbox"] | |
| x1, y1, x2, y2 = bbox | |
| center_bbox_x = (x1 + x2) / 2 | |
| center_bbox_y = (y1 + y2) / 2 | |
| # Calculate score | |
| score = det["confidence"] | |
| # Bonus for vehicle classes | |
| class_id = det.get("class", -1) | |
| if class_id in vehicle_classes: | |
| score *= 1.5 | |
| # Bonus for bottom-center region | |
| distance_from_center = abs(center_bbox_x - center_x) / width | |
| distance_from_bottom = abs(center_bbox_y - bottom_y) / height | |
| if distance_from_center < 0.3: # Within 30% of center | |
| score *= 1.3 | |
| if center_bbox_y > height * 0.5: # In bottom half | |
| score *= 1.2 | |
| # Check aspect ratio (plates are usually wider) | |
| bbox_width = x2 - x1 | |
| bbox_height = y2 - y1 | |
| aspect_ratio = bbox_width / bbox_height if bbox_height > 0 else 1 | |
| if 1.5 < aspect_ratio < 5.0: # Reasonable plate aspect ratio | |
| score *= 1.4 | |
| scored_detections.append({ | |
| **det, | |
| "score": score, | |
| "class_name": self._get_coco_class_name(class_id) | |
| }) | |
| # Sort by score | |
| scored_detections.sort(key=lambda x: x["score"], reverse=True) | |
| # Return best detection | |
| if scored_detections: | |
| return scored_detections[0] | |
| return None | |
| def _fallback_region_detection(self, image: Image.Image) -> Optional[Image.Image]: | |
| """ | |
| Fallback method: Use bottom-center region if YOLOv8 doesn't detect properly | |
| This assumes license plate is in the bottom-center region of the image | |
| """ | |
| width, height = image.size | |
| # Crop bottom-center region (typical plate location) | |
| # Use bottom 30% and center 60% of image | |
| x1 = int(width * 0.2) | |
| y1 = int(height * 0.7) | |
| x2 = int(width * 0.8) | |
| y2 = int(height * 0.95) | |
| cropped = image.crop((x1, y1, x2, y2)) | |
| return cropped | |
| def _contour_based_detection(self, image: Image.Image) -> Optional[Image.Image]: | |
| """ | |
| Alternative detection using contour detection | |
| Looks for rectangular regions that might be license plates | |
| """ | |
| try: | |
| import cv2 | |
| # Convert PIL to OpenCV format | |
| img_array = np.array(image.convert("RGB")) | |
| img_cv = cv2.cvtColor(img_array, cv2.COLOR_RGB2BGR) | |
| gray = cv2.cvtColor(img_cv, cv2.COLOR_BGR2GRAY) | |
| # Apply threshold | |
| _, thresh = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) | |
| # Find contours | |
| contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| width, height = image.size | |
| best_contour = None | |
| best_score = 0 | |
| for contour in contours: | |
| # Get bounding rect | |
| x, y, w, h = cv2.boundingRect(contour) | |
| # Filter by size and aspect ratio (typical plate dimensions) | |
| area = w * h | |
| aspect_ratio = w / h if h > 0 else 0 | |
| # Plate-like characteristics | |
| if (area > width * height * 0.01 and # At least 1% of image | |
| area < width * height * 0.3 and # At most 30% of image | |
| 1.5 < aspect_ratio < 5.0 and # Reasonable aspect ratio | |
| y > height * 0.5): # In bottom half | |
| # Score based on position and size | |
| score = area * (1.0 - abs(aspect_ratio - 2.5) / 2.5) | |
| if y > height * 0.6: # Bonus for bottom region | |
| score *= 1.5 | |
| if score > best_score: | |
| best_score = score | |
| best_contour = (x, y, w, h) | |
| if best_contour: | |
| x, y, w, h = best_contour | |
| # Add padding | |
| padding = 10 | |
| x1 = max(0, x - padding) | |
| y1 = max(0, y - padding) | |
| x2 = min(width, x + w + padding) | |
| y2 = min(height, y + h + padding) | |
| cropped = image.crop((x1, y1, x2, y2)) | |
| return cropped | |
| except ImportError: | |
| pass # OpenCV not available | |
| except Exception: | |
| pass # Contour detection failed | |
| return None | |
| def detect_and_crop(self, image: Image.Image, return_all: bool = False) -> Tuple[Optional[Image.Image], List[Dict]]: | |
| """ | |
| Detect and crop the best license plate from image | |
| Uses smart selection strategy: | |
| 1. Try YOLOv8 detection with smart region selection | |
| 2. Fallback to bottom-center region if detection fails | |
| Args: | |
| image: PIL Image | |
| return_all: If True, return all detections, else return only the best one | |
| Returns: | |
| Tuple of (cropped_image, all_detections) | |
| If no detection, returns (None, []) | |
| """ | |
| detections = self.detect(image) | |
| if return_all: | |
| if not detections: | |
| return [], [] | |
| # Return all cropped plates | |
| cropped_images = [] | |
| for det in detections: | |
| cropped = self.crop_plate(image, det["bbox"]) | |
| cropped_images.append(cropped) | |
| return cropped_images, detections | |
| # Select best detection using smart strategy | |
| best_det = self._select_best_plate_region(image, detections) | |
| if best_det: | |
| # Use selected detection | |
| cropped = self.crop_plate(image, best_det["bbox"], padding=15) | |
| return cropped, [best_det] | |
| else: | |
| # Try contour-based detection first | |
| print(" β οΈ No YOLOv8 detection found, trying contour-based detection...") | |
| cropped = self._contour_based_detection(image) | |
| if cropped: | |
| # Create detection info for contour method | |
| width, height = image.size | |
| contour_det = { | |
| "bbox": [0, 0, width, height], # Approximate | |
| "confidence": 0.6, | |
| "class": -1, | |
| "class_name": "contour_detection", | |
| "score": 0.6, | |
| "method": "contour" | |
| } | |
| return cropped, [contour_det] | |
| # Final fallback: use bottom-center region | |
| print(" β οΈ Contour detection failed, using fallback region detection") | |
| cropped = self._fallback_region_detection(image) | |
| if cropped: | |
| # Create dummy detection info for fallback | |
| width, height = image.size | |
| fallback_det = { | |
| "bbox": [width * 0.2, height * 0.7, width * 0.8, height * 0.95], | |
| "confidence": 0.5, | |
| "class": -1, | |
| "class_name": "fallback_region", | |
| "score": 0.5, | |
| "method": "region" | |
| } | |
| return cropped, [fallback_det] | |
| return None, [] | |
| class OCRModel: | |
| """ | |
| OCR Model wrapper for Hurricane OCR / Typhoon OCR | |
| Supports multiple device modes: auto, gpu, cpu, hybrid | |
| Supports: Base model or Fine-tuned LoRA model | |
| """ | |
| BASE_MODEL_NAME = "scb10x/typhoon-ocr1.5-2b" | |
| MAX_IMAGE_SIZE = 1024 # Reduced from 1800 for lower RAM usage | |
| def __init__(self, device_mode: str = None, use_8bit: bool = True, max_gpu_memory: str = None, | |
| hurricane_model_path: str = None, use_detection: bool = True): | |
| """ | |
| Initialize the OCR model | |
| Args: | |
| device_mode: "auto", "gpu", "cpu", or "hybrid" (default: use global DEVICE_MODE) | |
| use_8bit: Whether to use 8-bit quantization for memory efficiency | |
| max_gpu_memory: Maximum GPU memory to use (e.g., "6GB") | |
| hurricane_model_path: Path to fine-tuned Hurricane OCR model (LoRA) | |
| use_detection: Enable YOLOv8 plate detection before OCR (default: True) | |
| """ | |
| self.device_mode = device_mode or DEVICE_MODE | |
| self.use_8bit = use_8bit | |
| self.max_gpu_memory = max_gpu_memory or MAX_GPU_MEMORY | |
| self.hurricane_model_path = hurricane_model_path or HURRICANE_MODEL_PATH | |
| self.use_detection = use_detection | |
| self.model = None | |
| self.processor = None | |
| self.tokenizer = None | |
| self.plate_detector = None | |
| self._is_loaded = False | |
| self._device = None | |
| self._device_info = get_device_info() | |
| self._mode_used = None | |
| self._using_hurricane = False | |
| def load(self) -> None: | |
| """Load the model and processor""" | |
| if self._is_loaded: | |
| print("β Model already loaded (using cached instance).") | |
| return | |
| import time | |
| start_time = time.time() | |
| # Check if Hurricane OCR model exists and what type it is | |
| # Support both local paths and HuggingFace Hub paths (username/model-name) | |
| use_hurricane = False | |
| is_huggingface_path = False | |
| if self.hurricane_model_path: | |
| # Check if it's a HuggingFace Hub path (format: username/model-name) | |
| # HuggingFace paths have exactly one "/" and no backslashes or dots at start | |
| if "/" in self.hurricane_model_path and "\\" not in self.hurricane_model_path: | |
| parts = self.hurricane_model_path.split("/") | |
| if len(parts) == 2 and not self.hurricane_model_path.startswith("."): | |
| # This looks like a HuggingFace path (e.g., "Rattatammanoon/hurricane-ocr-tlpr-v1-LoRA") | |
| is_huggingface_path = True | |
| use_hurricane = True | |
| print(f" π¦ Detected HuggingFace model path: {self.hurricane_model_path}") | |
| else: | |
| # Local path with slashes | |
| use_hurricane = os.path.exists(self.hurricane_model_path) | |
| else: | |
| # Local path | |
| use_hurricane = os.path.exists(self.hurricane_model_path) | |
| # Check if it's LoRA adapter or merged model | |
| is_lora_adapter = False | |
| is_merged_model = False | |
| merged_model_path = None | |
| adapter_config_path = None # Initialize to avoid UnboundLocalError | |
| is_trocr_base = False # Track if base model is TrOCR | |
| if use_hurricane: | |
| # For HuggingFace paths, we need to download and check the files | |
| # For local paths, we can check directly | |
| if is_huggingface_path: | |
| # HuggingFace model - assume it's a LoRA adapter | |
| # (HuggingFace will auto-detect type when loading) | |
| is_lora_adapter = True | |
| print(f" π¦ HuggingFace model detected - will load as LoRA adapter") | |
| print(f" π‘ Model will be downloaded from HuggingFace Hub on first load") | |
| else: | |
| # Local path - check files to determine type | |
| # Check for merged model first (has config.json but no adapter_config.json) | |
| config_json_path = os.path.join(self.hurricane_model_path, "config.json") | |
| adapter_config_path = os.path.join(self.hurricane_model_path, "adapter_config.json") | |
| if os.path.exists(config_json_path) and not os.path.exists(adapter_config_path): | |
| # This might be a merged model (full model, not just adapter) | |
| is_merged_model = True | |
| merged_model_path = self.hurricane_model_path | |
| print(f" π Detected merged model (full model, no base model needed)") | |
| print(f" π Model path: {merged_model_path}") | |
| elif os.path.exists(adapter_config_path): | |
| # Check for pre-merged model in merged/ subdirectory | |
| merged_subdir = os.path.join(self.hurricane_model_path, "merged") | |
| if os.path.exists(merged_subdir) and os.path.exists(os.path.join(merged_subdir, "config.json")): | |
| is_merged_model = True | |
| merged_model_path = merged_subdir | |
| print(f" π Found pre-merged model in merged/ subdirectory") | |
| print(f" π Will use merged model (no base model loading needed)") | |
| else: | |
| is_lora_adapter = True | |
| try: | |
| import json | |
| with open(adapter_config_path, 'r', encoding='utf-8') as f: | |
| adapter_config = json.load(f) | |
| base_model = adapter_config.get('base_model_name_or_path', '') | |
| is_trocr_base = 'trocr' in base_model.lower() | |
| model_type_str = "Thai TrOCR" if is_trocr_base else "Vision-Language (Qwen3VL/Typhoon)" | |
| print(f" π Detected LoRA adapter model") | |
| print(f" π Base model: {base_model}") | |
| print(f" π Model type: {model_type_str}") | |
| print(f" π‘ Note: Base model will be loaded first, then LoRA adapter") | |
| print(f" π‘ Base model is cached by HuggingFace (~/.cache/huggingface/)") | |
| print(f" π‘ First load: downloads base model (~2-5GB, 30-60s)") | |
| print(f" π‘ Next loads: uses cached base model (~5-10s)") | |
| except Exception as e: | |
| print(f" β οΈ Warning: Could not read adapter config: {e}") | |
| # Use Typhoon OCR (Transformers) with or without LoRA | |
| # Check device availability | |
| print("=" * 60) | |
| if use_hurricane and is_merged_model: | |
| print("π HURRICANE OCR - Merged Model (Full Model)") | |
| print(f" Model Path: {merged_model_path}") | |
| print(f" Type: Merged Model (LoRA merged into base model)") | |
| print(f" β No base model loading needed!") | |
| elif use_hurricane and is_lora_adapter: | |
| print("π HURRICANE OCR - Fine-tuned Model (Typhoon OCR + LoRA)") | |
| print(f" Model Path: {self.hurricane_model_path}") | |
| print(f" Type: LoRA Adapter (Parameter-Efficient Fine-Tuning)") | |
| print(f" β οΈ Will load base model first, then LoRA adapter") | |
| print(f" π‘ Base model is cached by HuggingFace (~/.cache/huggingface/)") | |
| print(f" π‘ First load: downloads base model (~2-5GB, 30-60s)") | |
| print(f" π‘ Next loads: uses cached base model (~5-10s)") | |
| elif use_hurricane: | |
| print("π HURRICANE OCR - Fine-tuned Model (Typhoon)") | |
| print(f" Model Path: {self.hurricane_model_path}") | |
| else: | |
| print("π TYPHOON OCR - Base Model") | |
| if self.hurricane_model_path: | |
| print(f" β οΈ Hurricane model not found: {self.hurricane_model_path}") | |
| print(f" β οΈ Falling back to base Typhoon OCR model") | |
| print("=" * 60) | |
| print("π₯οΈ Device Information:") | |
| print(f" CUDA Available: {self._device_info['cuda_available']}") | |
| print(f" CPU Cores: {self._device_info['cpu_count']}") | |
| if self._device_info['cuda_available']: | |
| print(f" GPU: {self._device_info['cuda_device_name']}") | |
| print(f" VRAM Total: {self._device_info['cuda_memory_total']}") | |
| print(f" VRAM Free: {self._device_info['cuda_memory_free']}") | |
| print(f" Requested Mode: {self.device_mode.upper()}") | |
| print("=" * 60) | |
| # Determine actual device configuration | |
| device_map = self._get_device_map() | |
| print("\nπ¦ Loading processor and tokenizer...") | |
| # If merged model, try loading from merged model path first | |
| # For LoRA adapter: use base model (from adapter_config) for processor/tokenizer | |
| model_path_to_load = merged_model_path if (is_merged_model and merged_model_path) else self.BASE_MODEL_NAME | |
| processor_model_path = model_path_to_load | |
| if use_hurricane and is_lora_adapter and adapter_config_path and os.path.exists(adapter_config_path): | |
| try: | |
| import json | |
| with open(adapter_config_path, "r", encoding="utf-8") as f: | |
| adapter_cfg = json.load(f) | |
| base_model_from_adapter = adapter_cfg.get("base_model_name_or_path") | |
| if base_model_from_adapter: | |
| processor_model_path = base_model_from_adapter | |
| print(f" π¦ Using base model for processor/tokenizer: {processor_model_path}") | |
| except Exception as e: | |
| print(f" β οΈ Could not read adapter config for processor: {e}") | |
| try: | |
| # Load tokenizer FIRST with fix_mistral_regex=True to avoid warning | |
| # This prevents the warning from being shown when processor loads tokenizer | |
| try: | |
| print(f" Loading tokenizer with fix_mistral_regex=True...") | |
| self.tokenizer = AutoTokenizer.from_pretrained( | |
| processor_model_path, | |
| trust_remote_code=True, | |
| fix_mistral_regex=True # Fix tokenizer regex pattern issue - MUST be set here | |
| ) | |
| print(f" β Tokenizer loaded with fix_mistral_regex=True") | |
| except Exception as tokenizer_error: | |
| print(f" β οΈ Could not load tokenizer separately: {tokenizer_error}") | |
| self.tokenizer = None | |
| # Load processor (will use tokenizer if already loaded) | |
| self.processor = AutoProcessor.from_pretrained( | |
| processor_model_path, | |
| trust_remote_code=True | |
| ) | |
| # If we loaded tokenizer separately, update processor's tokenizer | |
| if self.tokenizer is not None: | |
| self.processor.tokenizer = self.tokenizer | |
| print(f" β Processor loaded and updated with fixed tokenizer") | |
| else: | |
| # Fallback: use processor's tokenizer and try to fix it | |
| self.tokenizer = self.processor.tokenizer | |
| # Try to set fix_mistral_regex if available | |
| if hasattr(self.tokenizer, 'fix_mistral_regex'): | |
| self.tokenizer.fix_mistral_regex = True | |
| print(f" β Processor loaded (using processor's tokenizer)") | |
| print(f" β Processor and tokenizer loaded from: {model_path_to_load}") | |
| except Exception as e: | |
| print(f" β οΈ Could not load from {model_path_to_load}: {e}") | |
| print(f" β οΈ Falling back to base model") | |
| try: | |
| # Load tokenizer FIRST with fix_mistral_regex=True | |
| try: | |
| print(f" Loading base model tokenizer with fix_mistral_regex=True...") | |
| self.tokenizer = AutoTokenizer.from_pretrained( | |
| self.BASE_MODEL_NAME, | |
| trust_remote_code=True, | |
| fix_mistral_regex=True # Fix tokenizer regex pattern issue | |
| ) | |
| print(f" β Base tokenizer loaded with fix_mistral_regex=True") | |
| except Exception as tokenizer_error: | |
| print(f" β οΈ Could not load base tokenizer separately: {tokenizer_error}") | |
| self.tokenizer = None | |
| # Load processor | |
| self.processor = AutoProcessor.from_pretrained( | |
| self.BASE_MODEL_NAME, | |
| trust_remote_code=True | |
| ) | |
| # Update processor's tokenizer if we loaded it separately | |
| if self.tokenizer is not None: | |
| self.processor.tokenizer = self.tokenizer | |
| print(f" β Base processor loaded and updated with fixed tokenizer") | |
| else: | |
| self.tokenizer = self.processor.tokenizer | |
| if hasattr(self.tokenizer, 'fix_mistral_regex'): | |
| self.tokenizer.fix_mistral_regex = True | |
| print(f" β Base processor loaded (using processor's tokenizer)") | |
| except Exception as fallback_error: | |
| print(f" β Failed to load base model: {fallback_error}") | |
| raise | |
| print(" β Processor and tokenizer loaded successfully") | |
| print(f"\nπ€ Loading model in {self._mode_used.upper()} mode...") | |
| # Load model based on configuration | |
| if is_merged_model and merged_model_path: | |
| # Load merged model directly (no base model + LoRA needed) | |
| print(f" π¦ Loading merged model from: {merged_model_path}") | |
| if self._mode_used == "hybrid" and self._device_info['cuda_available']: | |
| self._load_merged_model(merged_model_path, device_map, mode="hybrid") | |
| elif self._mode_used == "gpu" and self._device_info['cuda_available']: | |
| self._load_merged_model(merged_model_path, device_map, mode="gpu") | |
| else: | |
| self._load_merged_model(merged_model_path, device_map="cpu", mode="cpu") | |
| else: | |
| # Load base model first, then LoRA adapter | |
| # Determine base model name for LoRA adapter | |
| base_model_name = self.BASE_MODEL_NAME | |
| if use_hurricane and is_lora_adapter: | |
| try: | |
| import json | |
| adapter_config_path = os.path.join(self.hurricane_model_path, "adapter_config.json") | |
| with open(adapter_config_path, 'r', encoding='utf-8') as f: | |
| adapter_config = json.load(f) | |
| base_model_name = adapter_config.get('base_model_name_or_path', self.BASE_MODEL_NAME) | |
| is_trocr_base = 'trocr' in base_model_name.lower() | |
| except: | |
| pass | |
| # Check for offline mode or local base model | |
| if USE_OFFLINE_MODE or LOCAL_BASE_MODEL_PATH: | |
| # Use local base model if provided | |
| if LOCAL_BASE_MODEL_PATH and os.path.exists(LOCAL_BASE_MODEL_PATH): | |
| print(f" π¦ Using local base model: {LOCAL_BASE_MODEL_PATH}") | |
| base_model_name = LOCAL_BASE_MODEL_PATH | |
| is_trocr_base = 'trocr' in base_model_name.lower() or os.path.exists(os.path.join(LOCAL_BASE_MODEL_PATH, "config.json")) | |
| elif USE_OFFLINE_MODE: | |
| print(f" π¦ Offline mode enabled - using cached model: {base_model_name}") | |
| print(f" β οΈ If model not cached, loading will fail") | |
| if self._mode_used == "hybrid" and self._device_info['cuda_available']: | |
| self._load_hybrid_mode(device_map, base_model_name=base_model_name, is_trocr=is_trocr_base) | |
| elif self._mode_used == "gpu" and self._device_info['cuda_available']: | |
| self._load_gpu_mode(device_map, base_model_name=base_model_name, is_trocr=is_trocr_base) | |
| else: | |
| self._load_cpu_mode(base_model_name=base_model_name, is_trocr=is_trocr_base) | |
| # Load Hurricane OCR LoRA adapter if available | |
| if use_hurricane and is_lora_adapter: | |
| self._load_hurricane_adapter() | |
| model_name = "Hurricane OCR" if self._using_hurricane else "Typhoon OCR" | |
| print(f"\nβ {model_name} loaded successfully!") | |
| print(f" Mode: {self._mode_used.upper()}") | |
| print(f" Fine-tuned: {'Yes (LoRA)' if self._using_hurricane else 'No (Base)'}") | |
| if hasattr(self.model, 'hf_device_map'): | |
| devices_used = set(str(v) for v in self.model.hf_device_map.values()) | |
| print(f" Devices: {', '.join(devices_used)}") | |
| # Load YOLOv8 detector if enabled | |
| if self.use_detection: | |
| try: | |
| # Priority 1: Use HuggingFace model (HURRICANE_OD_MODEL_PATH) | |
| # Priority 2: Check for local HurricaneOD_beta | |
| # Priority 3: Fallback to pretrained YOLOv8n | |
| model_path_to_use = HURRICANE_OD_MODEL_PATH | |
| # If HuggingFace path not set, check for local models | |
| if not model_path_to_use: | |
| hurricaneod_path = Path("hurricane_ocr_model/HurricaneOD_beta/HurricaneOD_beta.pt") | |
| training_best_path = Path("HurricaneOD/HurricaneOD_beta/weights/best.pt") | |
| training_last_path = Path("HurricaneOD/HurricaneOD_beta/weights/last.pt") | |
| if hurricaneod_path.exists(): | |
| model_path_to_use = str(hurricaneod_path) | |
| elif training_best_path.exists(): | |
| model_path_to_use = str(training_best_path) | |
| elif training_last_path.exists(): | |
| model_path_to_use = str(training_last_path) | |
| if model_path_to_use: | |
| self.plate_detector = PlateDetector( | |
| model_size="n", | |
| conf_threshold=0.25, | |
| model_path=model_path_to_use | |
| ) | |
| else: | |
| print(f" β οΈ HurricaneOD_beta model not found, using pretrained YOLOv8n") | |
| print(f" Expected: HuggingFace model or local paths") | |
| print(f" - HuggingFace: {HURRICANE_OD_MODEL_PATH}") | |
| print(f" - Local: hurricane_ocr_model/HurricaneOD_beta/HurricaneOD_beta.pt") | |
| self.plate_detector = PlateDetector(model_size="n", conf_threshold=0.25) | |
| device = "cuda" if self._device_info['cuda_available'] else "cpu" | |
| self.plate_detector.load(device=device) | |
| except Exception as e: | |
| print(f" β οΈ Warning: Could not load YOLOv8 detector: {e}") | |
| print(" Continuing without detection (will process full image)") | |
| self.use_detection = False | |
| self._is_loaded = True | |
| elapsed = time.time() - start_time | |
| print(f"\nβ±οΈ Total loading time: {elapsed:.2f} seconds") | |
| def _load_hurricane_adapter(self) -> None: | |
| """Load Hurricane OCR LoRA adapter""" | |
| try: | |
| from peft import PeftModel | |
| print(f"\nπ Loading Hurricane OCR adapter from: {self.hurricane_model_path}") | |
| print(f" β οΈ Note: LoRA adapter requires base model to be loaded first") | |
| print(f" π‘ This is normal - LoRA adapters work on top of base models") | |
| print(f" π‘ Base model is cached by HuggingFace (~/.cache/huggingface/)") | |
| print(f" π‘ First load: downloads base model (~2-5GB, 30-60s)") | |
| print(f" π‘ Next loads: uses cached base model (~5-10s)") | |
| print(f" π‘ Tip: Use merge_and_unload() to create merged model (faster loading)") | |
| # Load the LoRA adapter | |
| self.model = PeftModel.from_pretrained( | |
| self.model, | |
| self.hurricane_model_path, | |
| is_trainable=False # Inference only | |
| ) | |
| # Option to merge adapter for faster inference (saves merged model) | |
| # Uncomment the code below to merge and save merged model (one-time operation) | |
| # This will create a merged model that doesn't need base model loading | |
| # After merging, set HURRICANE_MODEL_PATH to the merged/ directory | |
| # | |
| # print(" π Merging LoRA adapter into base model...") | |
| # print(" β³ This may take a few minutes...") | |
| # merged_model = self.model.merge_and_unload() | |
| # merged_path = os.path.join(self.hurricane_model_path, "merged") | |
| # os.makedirs(merged_path, exist_ok=True) | |
| # print(f" πΎ Saving merged model to: {merged_path}") | |
| # merged_model.save_pretrained(merged_path) | |
| # self.processor.save_pretrained(merged_path) | |
| # print(" β Merged model saved!") | |
| # print(f" π‘ Next time, set HURRICANE_MODEL_PATH to: {merged_path}") | |
| # print(f" π‘ This will load faster (no base model needed)") | |
| self._using_hurricane = True | |
| print(" β Hurricane OCR adapter loaded successfully!") | |
| except ImportError: | |
| print(" β οΈ PEFT not installed. Install with: pip install peft") | |
| print(" Using base Typhoon OCR model instead.") | |
| except Exception as e: | |
| print(f" β οΈ Failed to load Hurricane adapter: {e}") | |
| print(" Using base Typhoon OCR model instead.") | |
| def _get_device_map(self) -> Any: | |
| """Determine device map based on mode""" | |
| if self.device_mode == "cpu": | |
| self._mode_used = "cpu" | |
| self._device = "cpu" | |
| return "cpu" | |
| if not self._device_info['cuda_available']: | |
| print(" β οΈ GPU not available, falling back to CPU") | |
| self._mode_used = "cpu" | |
| self._device = "cpu" | |
| return "cpu" | |
| if self.device_mode == "gpu": | |
| self._mode_used = "gpu" | |
| self._device = "cuda" | |
| return "cuda:0" | |
| if self.device_mode == "hybrid": | |
| self._mode_used = "hybrid" | |
| self._device = "cuda" | |
| # Auto device map with memory limits | |
| return "auto" | |
| # Auto mode | |
| self._mode_used = "gpu" if self._device_info['cuda_available'] else "cpu" | |
| self._device = "cuda" if self._device_info['cuda_available'] else "cpu" | |
| return "auto" if self._device_info['cuda_available'] else "cpu" | |
| def _load_hybrid_mode(self, device_map, base_model_name=None, is_trocr=False): | |
| """Load model in hybrid GPU+CPU mode""" | |
| print(" Using HYBRID mode (GPU + CPU)") | |
| print(f" Max GPU Memory: {self.max_gpu_memory}") | |
| model_name = base_model_name or self.BASE_MODEL_NAME | |
| model_class = AutoModelForVision2Seq if (is_trocr and AutoModelForVision2Seq is not None) else AutoModelForImageTextToText | |
| if self.use_8bit: | |
| if BitsAndBytesConfig is None: | |
| raise ImportError("bitsandbytes or compatible BitsAndBytesConfig not available. Install bitsandbytes and retry (pip install bitsandbytes).") | |
| print(" Using 4-bit quantization (NF4) + CPU offloading") | |
| bnb_config = BitsAndBytesConfig( | |
| load_in_4bit=True, | |
| bnb_4bit_compute_dtype=torch.bfloat16, | |
| bnb_4bit_use_double_quant=True, | |
| bnb_4bit_quant_type="nf4", | |
| llm_int8_enable_fp32_cpu_offload=True # Enable CPU offloading | |
| ) | |
| self.model = model_class.from_pretrained( | |
| model_name, | |
| quantization_config=bnb_config, | |
| device_map=device_map, | |
| max_memory={0: self.max_gpu_memory, "cpu": "4GB"}, | |
| trust_remote_code=True, | |
| offload_folder="offload_weights" | |
| ) | |
| else: | |
| print(" Using float16 + CPU offloading") | |
| self.model = model_class.from_pretrained( | |
| model_name, | |
| device_map=device_map, | |
| max_memory={0: self.max_gpu_memory, "cpu": "4GB"}, | |
| trust_remote_code=True, | |
| dtype=torch.float16, | |
| offload_folder="offload_weights" | |
| ) | |
| def _load_gpu_mode(self, device_map, base_model_name=None, is_trocr=False): | |
| """Load model on GPU only""" | |
| print(" Using GPU-only mode") | |
| model_name = base_model_name or self.BASE_MODEL_NAME | |
| model_class = AutoModelForVision2Seq if (is_trocr and AutoModelForVision2Seq is not None) else AutoModelForImageTextToText | |
| # Prepare loading kwargs | |
| load_kwargs = { | |
| "device_map": device_map, | |
| "trust_remote_code": True | |
| } | |
| # Add offline mode if enabled | |
| if USE_OFFLINE_MODE: | |
| load_kwargs["local_files_only"] = True | |
| print(" π¦ Offline mode: local_files_only=True") | |
| if self.use_8bit: | |
| print(" Using 4-bit quantization (NF4)") | |
| bnb_config = BitsAndBytesConfig( | |
| load_in_4bit=True, | |
| bnb_4bit_compute_dtype=torch.bfloat16, | |
| bnb_4bit_use_double_quant=True, | |
| bnb_4bit_quant_type="nf4", | |
| llm_int8_enable_fp32_cpu_offload=False | |
| ) | |
| load_kwargs["quantization_config"] = bnb_config | |
| self.model = model_class.from_pretrained(model_name, **load_kwargs) | |
| else: | |
| print(" Using float16 precision") | |
| load_kwargs["dtype"] = torch.float16 | |
| self.model = model_class.from_pretrained(model_name, **load_kwargs) | |
| def _load_merged_model(self, model_path: str, device_map: Any, mode: str = "auto"): | |
| """Load merged model directly (no base model + LoRA needed)""" | |
| print(f" Using merged model (no base model loading needed)") | |
| if mode == "hybrid" and self._device_info['cuda_available']: | |
| print(" Using HYBRID mode (GPU + CPU)") | |
| print(f" Max GPU Memory: {self.max_gpu_memory}") | |
| self.model = AutoModelForImageTextToText.from_pretrained( | |
| model_path, | |
| device_map=device_map, | |
| max_memory={0: self.max_gpu_memory, "cpu": "4GB"}, | |
| trust_remote_code=True, | |
| dtype=torch.float16, | |
| offload_folder="offload_weights" | |
| ) | |
| elif mode == "gpu" and self._device_info['cuda_available']: | |
| print(" Using GPU-only mode") | |
| print(" Using float16 precision") | |
| self.model = AutoModelForImageTextToText.from_pretrained( | |
| model_path, | |
| device_map=device_map, | |
| trust_remote_code=True, | |
| dtype=torch.float16 | |
| ) | |
| else: | |
| print(" Using CPU-only mode") | |
| print(" Using float32 precision") | |
| self.model = AutoModelForImageTextToText.from_pretrained( | |
| model_path, | |
| device_map="cpu", | |
| trust_remote_code=True, | |
| dtype=torch.float32 | |
| ) | |
| self._using_hurricane = True | |
| def _load_cpu_mode(self, base_model_name=None, is_trocr=False): | |
| """Load model on CPU only""" | |
| print(" Using CPU-only mode") | |
| print(" Using float32 precision") | |
| self._mode_used = "cpu" | |
| self._device = "cpu" | |
| model_name = base_model_name or self.BASE_MODEL_NAME | |
| model_class = AutoModelForVision2Seq if (is_trocr and AutoModelForVision2Seq is not None) else AutoModelForImageTextToText | |
| # Prepare loading kwargs | |
| load_kwargs = { | |
| "device_map": "cpu", | |
| "trust_remote_code": True, | |
| "dtype": torch.float32 | |
| } | |
| # Add offline mode if enabled | |
| if USE_OFFLINE_MODE: | |
| load_kwargs["local_files_only"] = True | |
| print(" π¦ Offline mode: local_files_only=True") | |
| self.model = model_class.from_pretrained(model_name, **load_kwargs) | |
| def device(self) -> str: | |
| """Get current device""" | |
| return self._device | |
| def mode_used(self) -> str: | |
| """Get actual mode being used""" | |
| return self._mode_used | |
| def device_info(self) -> dict: | |
| """Get device information""" | |
| return self._device_info | |
| def resize_image(self, img: Image.Image, max_size: int = None) -> Image.Image: | |
| """ | |
| Resize image if it exceeds max dimensions | |
| Args: | |
| img: PIL Image object | |
| max_size: Maximum dimension size (default: MAX_IMAGE_SIZE) | |
| Returns: | |
| Resized PIL Image | |
| """ | |
| if max_size is None: | |
| max_size = self.MAX_IMAGE_SIZE | |
| width, height = img.size | |
| if width > max_size or height > max_size: | |
| if width >= height: | |
| scale = max_size / float(width) | |
| new_size = (max_size, int(height * scale)) | |
| else: | |
| scale = max_size / float(height) | |
| new_size = (int(width * scale), max_size) | |
| img = img.resize(new_size, Image.Resampling.LANCZOS) | |
| print(f"Original size: {width, height} ==> Resized to: {img.size}") | |
| return img | |
| def perform_ocr( | |
| self, | |
| image: Image.Image, | |
| prompt: str = None, | |
| max_new_tokens: int = 128, # Reduced to 128 for lower RAM (enough for license plates) | |
| return_detection_info: bool = False, | |
| cropped_image: Image.Image = None, | |
| detection_info: Dict[str, Any] = None | |
| ) -> str: | |
| """ | |
| Perform OCR on an image with optional YOLOv8 plate detection | |
| Pipeline: | |
| 1. Step 1: License Plate Detection | |
| - If cropped_image is provided: Use it directly (detection already done in app.py) | |
| - If cropped_image is None: Run HurricaneOD_beta detection to crop plate | |
| - Result: img_to_ocr (cropped plate or full image) | |
| 2. Step 2: OCR Processing | |
| - Resize image if needed | |
| - Prepare prompt and messages | |
| - Run Hurricane OCR model to extract text | |
| - Result: OCR text output | |
| Note: This function is called from app.py which already does detection. | |
| If cropped_image is passed, it will be used directly (no duplicate detection). | |
| Args: | |
| image: PIL Image object (original full image) | |
| prompt: Custom prompt for OCR (default: Thai license plate extraction) | |
| max_new_tokens: Maximum tokens to generate (not used for PaddleOCR) | |
| return_detection_info: If True, return tuple (ocr_text, detection_info) | |
| cropped_image: Pre-cropped plate image (optional, to avoid duplicate detection) | |
| detection_info: Pre-computed detection info (optional, to avoid duplicate detection) | |
| Returns: | |
| OCR result text, or tuple (text, detection_info) if return_detection_info=True | |
| """ | |
| if not self._is_loaded: | |
| raise RuntimeError("Model not loaded. Call load() first.") | |
| # Use provided detection info or initialize default | |
| if detection_info is None: | |
| detection_info = { | |
| "detected": False, | |
| "bbox": None, | |
| "confidence": None, | |
| "used_full_image": True | |
| } | |
| # Step 1: Use pre-cropped image if provided, otherwise detect and crop | |
| print(" π Step 1: License Plate Detection...") | |
| if cropped_image is not None: | |
| # Use pre-cropped image (detection already done) | |
| img_to_ocr = cropped_image.convert("RGB") | |
| if detection_info.get("detected", False): | |
| print(f" π Using pre-cropped plate (confidence: {detection_info.get('confidence', 0):.2f})") | |
| else: | |
| # Detect and crop license plate if detection is enabled | |
| img_to_ocr = image.convert("RGB") | |
| if self.use_detection and self.plate_detector is not None: | |
| try: | |
| print(" π Running HurricaneOD_beta detection...") | |
| cropped_plate, detections = self.plate_detector.detect_and_crop(img_to_ocr, return_all=False) | |
| if cropped_plate is not None and len(detections) > 0: | |
| # Use cropped plate for OCR | |
| img_to_ocr = cropped_plate | |
| detection_info = { | |
| "detected": True, | |
| "bbox": detections[0]["bbox"], | |
| "confidence": detections[0]["confidence"], | |
| "used_full_image": False, | |
| "all_detections": detections | |
| } | |
| print(f" β Detected plate (confidence: {detections[0]['confidence']:.2f})") | |
| print(f" π Bounding box: {detections[0]['bbox']}") | |
| else: | |
| print(" β οΈ No plate detected, using full image") | |
| detection_info["used_full_image"] = True | |
| except Exception as e: | |
| print(f" β οΈ Detection error: {e}, using full image") | |
| detection_info["used_full_image"] = True | |
| else: | |
| print(" β οΈ Detection disabled or detector not available, using full image") | |
| detection_info["used_full_image"] = True | |
| print(f" β Step 1 completed - Image size: {img_to_ocr.size}") | |
| # Step 2: Perform OCR on (cropped) image | |
| try: | |
| print(" π Starting OCR processing...") | |
| # Check if model and processor are loaded | |
| if self.model is None: | |
| raise RuntimeError("Model is not loaded. Please load the model first.") | |
| if self.processor is None: | |
| raise RuntimeError("Processor is not loaded. Please load the processor first.") | |
| # Resize image if needed (reduce size for faster processing) | |
| print(" π Resizing image if needed...") | |
| # Reduce max size for faster OCR processing | |
| original_max_size = self.MAX_IMAGE_SIZE | |
| self.MAX_IMAGE_SIZE = min(original_max_size, 1024) # Limit to 1024px for faster processing | |
| img = self.resize_image(img_to_ocr) | |
| self.MAX_IMAGE_SIZE = original_max_size # Restore original | |
| print(f" β Image ready for OCR (size: {img.size})") | |
| # Detect model type (TrOCR or Vision-Language) | |
| is_trocr = False | |
| model_class_name = self.model.__class__.__name__ | |
| if "Vision2Seq" in model_class_name or "TrOCR" in model_class_name: | |
| is_trocr = True | |
| elif hasattr(self.processor, 'apply_chat_template'): | |
| # Check processor type | |
| processor_class_name = self.processor.__class__.__name__ | |
| if "TrOCR" in processor_class_name: | |
| is_trocr = True | |
| if is_trocr: | |
| # TrOCR format: Direct image processing (no chat template) | |
| print(" π Processing with TrOCR format (direct image-to-text)...") | |
| inputs = self.processor(images=img, return_tensors="pt") | |
| else: | |
| # Vision-Language format (Qwen3VL/Typhoon): Chat template | |
| # Default prompt for Thai license plate | |
| if prompt is None: | |
| prompt = """ΰΈΰΉΰΈ²ΰΈΰΉΰΈ₯ΰΈ°ΰΈΰΈΆΰΈΰΈΰΉΰΈΰΈ‘ΰΈΉΰΈ₯ΰΈΰΈ²ΰΈΰΈ£ΰΈΉΰΈΰΈΰΉΰΈ²ΰΈ’ΰΈΰΈ°ΰΉΰΈΰΈ΅ΰΈ’ΰΈΰΈ£ΰΈΰΉΰΈΰΈ’ΰΈΰΈ΅ΰΉ ΰΉΰΈΰΈ’ΰΈ£ΰΈ°ΰΈΰΈΈΰΈΰΉΰΈΰΈ‘ΰΈΉΰΈ₯ΰΈΰΉΰΈΰΉΰΈΰΈΰΈ΅ΰΉ: | |
| - ΰΉΰΈ₯ΰΈΰΈΰΈ°ΰΉΰΈΰΈ΅ΰΈ’ΰΈ (Plate Number): ΰΈΰΈ±ΰΈ§ΰΈΰΈ±ΰΈΰΈ©ΰΈ£ΰΉΰΈ₯ΰΈ°ΰΈΰΈ±ΰΈ§ΰΉΰΈ₯ΰΈΰΈΰΈΰΈΰΉΰΈ²ΰΈ’ ΰΉΰΈΰΉΰΈ "ΰΈΰΈ 1234" ΰΈ«ΰΈ£ΰΈ·ΰΈ "1ΰΈΰΈ 5678" | |
| - ΰΈΰΈ±ΰΈ§ΰΈΰΈ±ΰΈΰΈ©ΰΈ£ (Characters): ΰΈͺΰΉΰΈ§ΰΈΰΈΰΈ±ΰΈ§ΰΈΰΈ±ΰΈΰΈ©ΰΈ£ΰΉΰΈΰΈ’ ΰΉΰΈΰΉΰΈ "ΰΈΰΈ" ΰΈ«ΰΈ£ΰΈ·ΰΈ "1ΰΈΰΈ" | |
| - ΰΈΰΈ±ΰΈ§ΰΉΰΈ₯ΰΈ (Digits): ΰΈͺΰΉΰΈ§ΰΈΰΈΰΈ±ΰΈ§ΰΉΰΈ₯ΰΈ ΰΉΰΈΰΉΰΈ "1234" | |
| - ΰΈΰΈ±ΰΈΰΈ«ΰΈ§ΰΈ±ΰΈ (Province): ΰΈΰΈ·ΰΉΰΈΰΈΰΈ±ΰΈΰΈ«ΰΈ§ΰΈ±ΰΈΰΈΰΈΰΈΰΉΰΈ²ΰΈ’ ΰΉΰΈΰΉΰΈ "ΰΈΰΈ£ΰΈΈΰΈΰΉΰΈΰΈΰΈ‘ΰΈ«ΰΈ²ΰΈΰΈΰΈ£" | |
| - ΰΈΰΈ£ΰΈ°ΰΉΰΈ ΰΈΰΈ£ΰΈ (Vehicle Type): ΰΈΰΉΰΈ²ΰΈ‘ΰΈ΅ΰΈ£ΰΈ°ΰΈΰΈΈ | |
| - ΰΈͺΰΈ΅ΰΈΰΉΰΈ²ΰΈ’ (Plate Color): ΰΈΰΈ²ΰΈ§, ΰΉΰΈΰΈ΅ΰΈ’ΰΈ§, ΰΉΰΈ«ΰΈ₯ΰΈ·ΰΈΰΈ, ΰΉΰΈΰΈ ΰΈ―ΰΈ₯ΰΈ― | |
| ΰΈΰΈ£ΰΈΈΰΈΰΈ²ΰΈΰΉΰΈ²ΰΈΰΈΰΉΰΈΰΈΰΈ§ΰΈ²ΰΈ‘ΰΈΰΈ±ΰΉΰΈΰΈ«ΰΈ‘ΰΈΰΈΰΈ΅ΰΉΰΉΰΈ«ΰΉΰΈΰΈΰΈΰΈΰΉΰΈ²ΰΈ’ΰΈΰΈ°ΰΉΰΈΰΈ΅ΰΈ’ΰΈ: """ | |
| print(" π Preparing messages...") | |
| messages = [ | |
| { | |
| "role": "user", | |
| "content": [ | |
| { | |
| "type": "image", | |
| "image": img, | |
| }, | |
| { | |
| "type": "text", | |
| "text": prompt | |
| } | |
| ], | |
| } | |
| ] | |
| # Prepare inputs | |
| print(" π Applying chat template and tokenizing...") | |
| inputs = self.processor.apply_chat_template( | |
| messages, | |
| tokenize=True, | |
| add_generation_prompt=True, | |
| return_dict=True, | |
| return_tensors="pt" | |
| ) | |
| # Move inputs to model device | |
| # Get actual model device (handle device_map case) | |
| try: | |
| if hasattr(self.model, 'hf_device_map'): | |
| # Model is using device_map, get first device | |
| first_device = list(self.model.hf_device_map.values())[0] | |
| if isinstance(first_device, torch.device): | |
| model_device = first_device | |
| else: | |
| model_device = torch.device(first_device) | |
| else: | |
| model_device = next(self.model.parameters()).device | |
| except: | |
| # Fallback: try to get device from model | |
| try: | |
| model_device = self.model.device if hasattr(self.model, 'device') else torch.device('cpu') | |
| except: | |
| model_device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| print(f" π Moving inputs to device: {model_device}") | |
| inputs = {k: v.to(model_device) if isinstance(v, torch.Tensor) else v for k, v in inputs.items()} | |
| print(" β Inputs prepared") | |
| # Only remove image_grid_thw for TrOCR models (they don't use it) | |
| # Vision-Language models (Qwen3VL/Typhoon) NEED image_grid_thw! | |
| if is_trocr and "image_grid_thw" in inputs: | |
| inputs.pop("image_grid_thw") | |
| print(" π Removed unused key: image_grid_thw (TrOCR model)") | |
| # Generate output with optimized parameters for speed | |
| print(f" π Generating OCR output (max_new_tokens={min(max_new_tokens, 128)})...") | |
| print(" β³ This may take a while...") | |
| # Check model device | |
| print(f" π Model device: {model_device}") | |
| if model_device.type == 'cpu': | |
| print(" β οΈ WARNING: Model is on CPU! This will be VERY slow.") | |
| print(" π‘ Consider using GPU mode for faster inference") | |
| # Use optimized generation parameters for faster inference | |
| import time | |
| gen_start = time.time() | |
| # Get tokenizer eos and pad tokens | |
| tokenizer = self.processor.tokenizer | |
| pad_token_id = getattr(tokenizer, 'pad_token_id', None) or getattr(tokenizer, 'eos_token_id', None) | |
| eos_token_id = getattr(tokenizer, 'eos_token_id', None) | |
| with torch.no_grad(): # Disable gradient computation for faster inference | |
| # Use only valid parameters for Qwen3VL model | |
| # Note: early_stopping, temperature, top_p, top_k are not valid for greedy decoding (do_sample=False, num_beams=1) | |
| generation_kwargs = { | |
| "max_new_tokens": min(max_new_tokens, 128), # Capped at 128 for lower RAM (enough for license plates) | |
| "do_sample": False, # Use greedy decoding (faster than sampling) | |
| "use_cache": True, # Enable KV cache for faster generation | |
| "num_beams": 1, # Greedy search (fastest) | |
| } | |
| # Add token IDs if available | |
| if pad_token_id is not None: | |
| generation_kwargs["pad_token_id"] = pad_token_id | |
| if eos_token_id is not None: | |
| generation_kwargs["eos_token_id"] = eos_token_id | |
| generated_ids = self.model.generate(**inputs, **generation_kwargs) | |
| gen_elapsed = time.time() - gen_start | |
| print(f" β Generation completed in {gen_elapsed:.2f} seconds") | |
| if gen_elapsed > 60: | |
| print(f" β οΈ WARNING: Generation took {gen_elapsed:.2f} seconds (>1 minute)") | |
| print(f" π‘ This is unusually slow. Check if model is on GPU.") | |
| # Decode output based on model type | |
| print(" π Decoding output...") | |
| if is_trocr: | |
| # TrOCR: Direct decode | |
| ocr_result = self.processor.decode(generated_ids[0], skip_special_tokens=True) | |
| else: | |
| # Vision-Language: Trim input_ids and decode | |
| input_ids = inputs['input_ids'] if isinstance(inputs, dict) else inputs.input_ids | |
| generated_ids_trimmed = [ | |
| out_ids[len(in_ids):] | |
| for in_ids, out_ids in zip(input_ids, generated_ids) | |
| ] | |
| output_text = self.processor.batch_decode( | |
| generated_ids_trimmed, | |
| skip_special_tokens=True, | |
| clean_up_tokenization_spaces=False | |
| ) | |
| ocr_result = output_text[0] | |
| # Extract assistant response if present | |
| if "<|assistant|>" in ocr_result: | |
| ocr_result = ocr_result.split("<|assistant|>")[-1].strip() | |
| print(f" β OCR completed successfully") | |
| print(f" π OCR Result length: {len(ocr_result)} characters") | |
| except Exception as e: | |
| import traceback | |
| error_details = traceback.format_exc() | |
| print(f" β Error in Step 2 (OCR): {e}") | |
| print(f" π Error details:\n{error_details}") | |
| # Return error message instead of crashing | |
| ocr_result = f"[OCR ERROR] {str(e)}\n\nError details:\n{error_details}" | |
| # Memory cleanup after inference (reduce RAM usage across requests) | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| import gc | |
| gc.collect() | |
| # Return result with optional detection info | |
| if return_detection_info: | |
| return ocr_result, detection_info | |
| return ocr_result | |
| def is_loaded(self) -> bool: | |
| """Check if model is loaded""" | |
| return self._is_loaded | |
| # Global model instance for Gradio app | |
| _global_model: Optional[OCRModel] = None | |
| def get_model(device_mode: str = None, use_8bit: bool = True) -> OCRModel: | |
| """Get or create global model instance""" | |
| global _global_model | |
| if _global_model is None: | |
| _global_model = OCRModel(device_mode=device_mode, use_8bit=use_8bit) | |
| return _global_model | |
| def load_model(device_mode: str = None, use_8bit: bool = True) -> OCRModel: | |
| """ | |
| Load and return global model instance | |
| Args: | |
| device_mode: "auto", "gpu", "cpu", or "hybrid" (default: uses global DEVICE_MODE) | |
| use_8bit: Whether to use 8-bit quantization (default: True, saves VRAM) | |
| Device Modes: | |
| - "auto": Use GPU if available, fallback to CPU | |
| - "gpu": Force GPU only | |
| - "cpu": Force CPU only | |
| - "hybrid": GPU + CPU working together (recommended for limited VRAM) | |
| """ | |
| model = get_model(device_mode=device_mode, use_8bit=use_8bit) | |
| if not model.is_loaded: | |
| model.load() | |
| return model | |
| def check_gpu() -> str: | |
| """Check GPU availability and return status message""" | |
| info = get_device_info() | |
| if info["cuda_available"]: | |
| return f"""β GPU Available! | |
| Device: {info['cuda_device_name']} | |
| VRAM Total: {info['cuda_memory_total']} | |
| VRAM Free: {info['cuda_memory_free']} | |
| CPU Cores: {info['cpu_count']} | |
| Recommended Modes: | |
| - VRAM >= 8GB: Use "gpu" mode | |
| - VRAM 4-8GB: Use "hybrid" mode | |
| - No GPU: Use "cpu" mode""" | |
| else: | |
| return f"""β GPU Not Available | |
| CPU Cores: {info['cpu_count']} | |
| Mode: CPU only (slower) | |
| To use GPU, please install: | |
| 1. NVIDIA GPU drivers | |
| 2. CUDA Toolkit | |
| 3. PyTorch with CUDA support""" | |