Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, File, UploadFile, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| import torch | |
| import torch.nn as nn | |
| import torchvision.models as models | |
| import torchvision.transforms as transforms | |
| from PIL import Image | |
| import json | |
| import io | |
| import os | |
| import cv2 | |
| import numpy as np | |
| import base64 | |
| import math | |
| import contextlib | |
| import requests | |
| import unicodedata | |
| import time | |
| from huggingface_hub import login | |
| app = FastAPI() | |
| # Allow CORS for React development | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # ============================================================ | |
| # Model Architecture — SYNCED with Training8.ipynb | |
| # ResNet50 + Transformer Decoder (d_model=384, nhead=6) | |
| # ============================================================ | |
| class PositionalEncoding1D(nn.Module): | |
| def __init__(self, d_model, max_len=512): | |
| super().__init__() | |
| pe = torch.zeros(max_len, d_model) | |
| position = torch.arange(0, max_len, dtype=torch.float32).unsqueeze(1) | |
| div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) | |
| pe[:, 0::2] = torch.sin(position * div_term) | |
| pe[:, 1::2] = torch.cos(position * div_term) | |
| self.register_buffer('pe', pe.unsqueeze(0)) | |
| def forward(self, x): | |
| return x + self.pe[:, :x.size(1)] | |
| class OCRTransformerModel(nn.Module): | |
| def __init__(self, vocab_size, d_model=384, nhead=6, | |
| num_decoder_layers=4, dim_feedforward=1024, dropout=0.2): | |
| super().__init__() | |
| # ResNet50 encoder (matches Training8.ipynb exactly) | |
| backbone = models.resnet50(weights=None) | |
| self.encoder = nn.Sequential(*list(backbone.children())[:-2]) | |
| # ResNet50 outputs 2048 channels → project to d_model=384 | |
| self.enc_proj = nn.Conv2d(2048, d_model, kernel_size=1) | |
| self.token_embed = nn.Embedding(vocab_size, d_model) | |
| self.pos_decoder = PositionalEncoding1D(d_model) | |
| decoder_layer = nn.TransformerDecoderLayer( | |
| d_model=d_model, nhead=nhead, | |
| dim_feedforward=dim_feedforward, | |
| dropout=dropout, batch_first=True) | |
| self.decoder = nn.TransformerDecoder(decoder_layer, num_layers=num_decoder_layers) | |
| self.output_layer = nn.Linear(d_model, vocab_size) | |
| def forward(self, images, tgt): | |
| feat = self.encoder(images) | |
| feat = self.enc_proj(feat) | |
| memory = feat.flatten(2).permute(0, 2, 1) | |
| tgt = self.token_embed(tgt) | |
| tgt = self.pos_decoder(tgt) | |
| mask = torch.triu(torch.ones(tgt.size(1), tgt.size(1), device=tgt.device), 1).bool() | |
| out = self.decoder(tgt, memory, tgt_mask=mask) | |
| return self.output_layer(out) | |
| # ============================================================ | |
| # Global Resources | |
| # ============================================================ | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| model = None | |
| stoi = None | |
| itos = None | |
| # English Engine Resources (TrOCR) | |
| model_eng = None | |
| processor_eng = None | |
| hf_token = None | |
| lexicon = [] | |
| lexicon_set = set() | |
| # ============================================================ | |
| # ImageNet Normalization — MUST match Training8.ipynb | |
| # ============================================================ | |
| IMAGENET_MEAN = [0.485, 0.456, 0.406] | |
| IMAGENET_STD = [0.229, 0.224, 0.225] | |
| # ============================================================ | |
| # Resource Loading | |
| # ============================================================ | |
| def load_hf_token(): | |
| """Load Hugging Face token from hf_token.txt or environment for faster authorized downloads.""" | |
| token = os.getenv("HF_TOKEN") | |
| if not token and os.path.exists("hf_token.txt"): | |
| with open("hf_token.txt", "r") as f: | |
| token = f.read().strip() | |
| if token: | |
| # Set token directly as env var — avoids calling /whoami-v2 which hits rate limits on Docker restarts | |
| os.environ["HF_TOKEN"] = token | |
| print("[OK] Hugging Face Token Loaded! Authorized for faster downloads.") | |
| return token | |
| async def query_inference_api(image_bytes, token): | |
| """ | |
| Call the Hugging Face Inference API (Serverless) for English OCR. | |
| This offloads the 2.2GB model from local RAM/CPU to HF GPUs. | |
| """ | |
| API_URL = "https://api-inference.huggingface.co/models/microsoft/trocr-large-handwritten" | |
| headers = {"Authorization": f"Bearer {token}"} | |
| # Retry logic for model loading (warm-up) | |
| for attempt in range(3): | |
| response = requests.post(API_URL, headers=headers, data=image_bytes) | |
| if response.status_code == 200: | |
| return response.json()[0].get("generated_text", "").strip() | |
| elif response.status_code == 503: | |
| print(f"[RETRY] Model is loading on HF Side: {response.json()}") | |
| import asyncio | |
| await asyncio.sleep(5) | |
| else: | |
| print(f"[FAIL] Inference API Error ({response.status_code}): {response.text}") | |
| break | |
| return "Error: Inference API failed" | |
| async def load_resources(): | |
| global model, stoi, itos, hf_token | |
| hf_token = load_hf_token() | |
| # --- Load Lexicon --- | |
| for lex_path in ["lexicon.txt", "Hindi-DS/lexicon.txt"]: | |
| if os.path.exists(lex_path): | |
| with open(lex_path, "r", encoding="utf-8") as f: | |
| lexicon = [unicodedata.normalize("NFC", l.strip()) for l in f if l.strip()] | |
| lexicon_set = set(lexicon) | |
| print(f"[OK] Lexicon Loaded ({len(lexicon)} words)") | |
| break | |
| # --- Load Hindi Engine --- | |
| checkpoint_path = "best_model_finetuned.pt" | |
| vocab_file = "vocab.json" | |
| stoi, itos = {}, {} | |
| if os.path.exists(vocab_file): | |
| with open(vocab_file, 'r', encoding='utf-8') as f: | |
| vlist = json.load(f) | |
| stoi = {val: i for i, val in enumerate(vlist)} | |
| itos = {i: val for i, val in enumerate(vlist)} | |
| if not os.path.exists(checkpoint_path): | |
| from huggingface_hub import hf_hub_download | |
| checkpoint_path = hf_hub_download(repo_id="Angstormy/parsify-ocr-weights", filename="best_model_finetuned.pt", token=hf_token) | |
| checkpoint = torch.load(checkpoint_path, map_location=device) | |
| # Smart Checkpoint Parsing (Supports various training formats) | |
| if isinstance(checkpoint, dict): | |
| state_dict = checkpoint.get('state_dict', checkpoint.get('model_state_dict', checkpoint)) | |
| # Support d_model from checkpoint | |
| d_model = checkpoint.get('d_model', 384) | |
| # Attempt to load vocabulary from checkpoint if it exists | |
| if 'stoi' in checkpoint and not stoi: | |
| stoi = checkpoint['stoi'] | |
| itos = {int(k): v for k, v in checkpoint.get('itos', {}).items()} | |
| else: | |
| state_dict = checkpoint | |
| d_model = 384 | |
| # Dynamic Vocab Sizing (Ensures model matches checkpoint exactly) | |
| if 'output_layer.bias' in state_dict: | |
| vocab_size = state_dict['output_layer.bias'].size(0) | |
| elif 'token_embed.weight' in state_dict: | |
| vocab_size = state_dict['token_embed.weight'].size(0) | |
| else: | |
| vocab_size = len(stoi) | |
| print(f"[INFO] Initializing model with vocab_size={vocab_size} (Mapping has {len(stoi)} tokens)") | |
| model = OCRTransformerModel(vocab_size, d_model=d_model).to(device) | |
| # Load the weights | |
| try: | |
| model.load_state_dict(state_dict, strict=True) | |
| except RuntimeError: | |
| # If strict fails, try non-strict (handles minor version diffs) | |
| print("[WARN] Strict loading failed, attempting non-strict...") | |
| model.load_state_dict(state_dict, strict=False) | |
| model.eval() | |
| print(f"[OK] Hindi Engine loaded — ResNet50 + d_model={d_model} ({vocab_size} classes)") | |
| # --- Load English Engine (Local) --- | |
| print("\n" + "=" * 60) | |
| print("🧠 ALLOCATING MEMORY FOR LOCAL ENGLISH MODEL") | |
| print("=" * 60) | |
| from transformers import VisionEncoderDecoderModel, TrOCRProcessor | |
| eng_model_path = "trocr-large-english" | |
| if os.path.exists(eng_model_path): | |
| global processor_eng, model_eng | |
| start_load = time.time() | |
| print("⏳ [1/2] Fetching tiny processor config from Hugging Face...") | |
| processor_eng = TrOCRProcessor.from_pretrained("microsoft/trocr-large-handwritten") | |
| print(f"⏳ [2/2] Loading 2.5 GB model weights from '{eng_model_path}' into {str(device).upper()} RAM...") | |
| print(" -> Please wait. This blocks the server and usually takes 10-30 seconds...") | |
| model_eng = VisionEncoderDecoderModel.from_pretrained(eng_model_path).to(device) | |
| model_eng.eval() | |
| elapsed = time.time() - start_load | |
| print(f"✅ SUCCESS! English Engine fully loaded into RAM in {elapsed:.1f} seconds!") | |
| print("=" * 60 + "\n") | |
| else: | |
| print(f"[WARN] English model folder '{eng_model_path}' not found! The /predict endpoint will fail for English.") | |
| print("=" * 60 + "\n") | |
| async def lifespan(app: FastAPI): | |
| await load_resources() | |
| yield | |
| app = FastAPI(lifespan=lifespan) | |
| # Allow CORS for React development | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # ============================================================ | |
| # Preprocessing & Robustness helpers | |
| # ============================================================ | |
| # ============================================================ | |
| def ink_crop(img_gray, margin=2): | |
| """Crop to ink bounding box using Otsu (for bounding box only).""" | |
| _, binary = cv2.threshold(img_gray, 0, 255, | |
| cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU) | |
| coords = cv2.findNonZero(binary) | |
| if coords is not None: | |
| x, y, w, h = cv2.boundingRect(coords) | |
| x1 = max(0, x - margin) | |
| y1 = max(0, y - margin) | |
| x2 = min(img_gray.shape[1], x + w + margin) | |
| y2 = min(img_gray.shape[0], y + h + margin) | |
| return y1, y2, x1, x2 | |
| return 0, img_gray.shape[0], 0, img_gray.shape[1] | |
| def preprocess_image(image_input): | |
| """ | |
| Preprocessing pipeline — Handles both bytes and pre-decoded BGR images. | |
| """ | |
| if isinstance(image_input, bytes): | |
| nparr = np.frombuffer(image_input, np.uint8) | |
| img_bgr = cv2.imdecode(nparr, cv2.IMREAD_COLOR) | |
| else: | |
| img_bgr = image_input | |
| if img_bgr is None: return None, None | |
| img_gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY) | |
| # Use the crop directly (do not apply another destructive ink_crop) | |
| img_gray_cropped = img_gray | |
| # 2. CLAHE (clipLimit=2.0, tileGridSize=4×4) BEFORE PADDING | |
| clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(4, 4)) | |
| img_enhanced = clahe.apply(img_gray_cropped) | |
| # 3. Aspect-ratio resize (height=64, max_width=400) | |
| IMG_HEIGHT = 64 | |
| MAX_WIDTH = 400 | |
| h, w = img_enhanced.shape | |
| new_w = min(int(w * (IMG_HEIGHT / h)), MAX_WIDTH) | |
| pil_img = Image.fromarray(img_enhanced).convert("L") | |
| resample_method = Image.Resampling.LANCZOS if hasattr(Image, "Resampling") else Image.LANCZOS | |
| pil_img = pil_img.resize((new_w, IMG_HEIGHT), resample_method) | |
| # 4. Grayscale → 3-channel | |
| pil_img_rgb = Image.merge('RGB', [pil_img, pil_img, pil_img]) | |
| # 5. ToTensor + ImageNet Normalize | |
| img_tensor = transforms.Compose([ | |
| transforms.ToTensor(), | |
| transforms.Normalize(mean=IMAGENET_MEAN, std=IMAGENET_STD), | |
| ])(pil_img_rgb) | |
| # 6. Black pad to MAX_WIDTH (value=0.0 — matches Training8) | |
| if img_tensor.shape[2] < MAX_WIDTH: | |
| img_tensor = torch.nn.functional.pad( | |
| img_tensor, (0, MAX_WIDTH - img_tensor.shape[2], 0, 0), value=0.0) | |
| img_tensor = img_tensor.unsqueeze(0).to(device) | |
| # Debug View (Denormalized) | |
| debug_arr = (pil_img_rgb).convert("RGB") | |
| debug_arr = np.array(debug_arr) | |
| debug_arr = cv2.cvtColor(debug_arr, cv2.COLOR_RGB2BGR) | |
| _, buffer = cv2.imencode('.png', debug_arr) | |
| debug_raw_b64 = base64.b64encode(buffer).decode() | |
| return img_tensor, debug_raw_b64 | |
| # ============================================================ | |
| # Inference Logic & Post-Processing | |
| # ============================================================ | |
| def levenshtein(a, b): | |
| """Standard Levenshtein distance for word correction.""" | |
| m, n = len(a), len(b) | |
| dp = [[0] * (n + 1) for _ in range(m + 1)] | |
| for i in range(m + 1): dp[i][0] = i | |
| for j in range(n + 1): dp[0][j] = j | |
| for i in range(1, m + 1): | |
| for j in range(1, n + 1): | |
| cost = 0 if a[i-1] == b[j-1] else 1 | |
| dp[i][j] = min(dp[i-1][j]+1, dp[i][j-1]+1, dp[i-1][j-1]+cost) | |
| return dp[m][n] | |
| def lexicon_correct(pred, max_edit_dist=1): | |
| """Corrects OCR predictions using the loaded lexicon.""" | |
| # We have removed manual grammatical rules to ensure 100% transparency | |
| # of the raw tensor predictions for pure ML evaluation. | |
| if not lexicon_set: return pred | |
| pred = unicodedata.normalize("NFC", pred.strip()) | |
| if not pred or pred in lexicon_set: return pred | |
| best_word, best_dist = pred, max_edit_dist + 1 | |
| for word in lexicon: | |
| if abs(len(word) - len(pred)) > max_edit_dist: continue | |
| dist = levenshtein(pred, word) | |
| if dist < best_dist: | |
| best_dist, best_word = dist, word | |
| return best_word if best_dist <= max_edit_dist else pred | |
| # ============================================================ | |
| # Beam Search Decoder | |
| # ============================================================ | |
| def beam_search_decode(model, images, k=3, max_len=25): | |
| """ | |
| Ultra-Accuracy Beam Search Decoder. | |
| Optimized for high-precision diagnostic output and memory efficiency. | |
| """ | |
| B = images.size(0) | |
| BOS_VAL = stoi.get("<bos>", 1) | |
| EOS_VAL = stoi.get("<eos>", 2) | |
| PAD_VAL = stoi.get("<pad>", 0) | |
| # Initial beam: (sequence_tensor, score, diagnostic_history) | |
| # History is tracked per beam to ensure the matrix stays in sync with the winning path | |
| beams = [(torch.full((1, 1), BOS_VAL, dtype=torch.long, device=device), 0.0, [])] | |
| for step_idx in range(max_len): | |
| step_start_time = time.time() | |
| candidates = [] | |
| for seq, score, history in beams: | |
| # Skip beams that reached EOS | |
| if seq[0, -1].item() == EOS_VAL: | |
| candidates.append((seq, score, history)) | |
| continue | |
| # Single forward pass for the current beam | |
| with (torch.amp.autocast('cuda') if device.type == 'cuda' else contextlib.nullcontext()): | |
| logits = model(images, seq) | |
| # Extract log-probabilities for the next token | |
| log_probs = torch.log_softmax(logits[:, -1, :], dim=-1) | |
| top_lp, top_i = log_probs[0].topk(k) | |
| # --- Vector Diagnostic Generation --- | |
| # We calculate this once per active beam to avoid redundant loops | |
| current_diagnostics = [ | |
| { | |
| "char": itos.get(idx.item(), '<?>'), | |
| "confidence": round(torch.exp(lp).item(), 4) | |
| } | |
| for lp, idx in zip(top_lp, top_i) | |
| ] | |
| # Branch into k candidates | |
| for lp, idx in zip(top_lp, top_i): | |
| new_seq = torch.cat([seq, idx.unsqueeze(0).unsqueeze(0)], dim=1) | |
| new_score = score + lp.item() | |
| # Append diagnostic data only to the relevant path | |
| new_history = history + [{"step": step_idx + 1, "top_candidates": current_diagnostics}] | |
| candidates.append((new_seq, new_score, new_history)) | |
| # Sort by cumulative score and prune to keep top K beams | |
| beams = sorted(candidates, key=lambda x: x[1], reverse=True)[:k] | |
| # Calculate step duration in seconds | |
| step_duration_sec = time.time() - step_start_time | |
| # Update history with duration for each beam | |
| new_beams = [] | |
| for seq, score, history in beams: | |
| if history: | |
| history[-1]["duration_sec"] = round(step_duration_sec, 4) | |
| new_beams.append((seq, score, history)) | |
| beams = new_beams | |
| # Stop if all surviving beams have reached EOS | |
| if all(b[0][0, -1].item() == EOS_VAL for b in beams): | |
| break | |
| # Pick the absolute best path | |
| best_seq, _, best_history = beams[0] | |
| ids = best_seq[0].tolist() | |
| out_chars = [] | |
| for i in ids: | |
| if i == EOS_VAL: break | |
| if i in [PAD_VAL, BOS_VAL]: continue | |
| out_chars.append(itos.get(i, "")) | |
| prediction = "".join(out_chars) | |
| return prediction, best_history | |
| def greedy_decode(model, images, max_len=25): | |
| """Legacy Greedy Decode (Backup/English logic).""" | |
| return beam_search_decode(model, images, k=1, max_len=max_len) | |
| # ============================================================ | |
| # English Preprocessing | |
| # ============================================================ | |
| def preprocess_english(image_bytes): | |
| """ | |
| Absolute Raw Vision: Crop and Pass. | |
| No padding, no aspect-ratio manipulation. Let TrOCR processor handle it. | |
| """ | |
| nparr = np.frombuffer(image_bytes, np.uint8) | |
| img_bgr = cv2.imdecode(nparr, cv2.IMREAD_COLOR) | |
| img_gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY) | |
| # 1. Natural Laser Crop (Removes excess background) | |
| _, thresh = cv2.threshold(img_gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU) | |
| coords = cv2.findNonZero(thresh) | |
| if coords is not None: | |
| x, y, w, h = cv2.boundingRect(coords) | |
| pad = 20 | |
| y1, y2 = max(0, y-pad), min(img_bgr.shape[0], y+h+pad) | |
| x1, x2 = max(0, x-pad), min(img_bgr.shape[1], x+w+pad) | |
| cropped = img_bgr[y1:y2, x1:x2] | |
| else: | |
| cropped = img_bgr | |
| # 2. Raw PIL Conversion | |
| pil_img = Image.fromarray(cv2.cvtColor(cropped, cv2.COLOR_BGR2RGB)) | |
| pixel_values = processor_eng(images=pil_img, return_tensors="pt").pixel_values.to(device) | |
| # Debug View | |
| debug_arr = pixel_values.squeeze(0).cpu().numpy().transpose(1, 2, 0) | |
| debug_arr = (debug_arr * 0.5 + 0.5) * 255 | |
| debug_arr = debug_arr.clip(0, 255).astype(np.uint8) | |
| debug_arr = cv2.cvtColor(debug_arr, cv2.COLOR_RGB2BGR) | |
| _, buffer = cv2.imencode('.png', debug_arr) | |
| return pixel_values, base64.b64encode(buffer).decode('utf-8') | |
| # ============================================================ | |
| # Script Detection | |
| # ============================================================ | |
| def detect_script(image_bytes): | |
| """ | |
| Intelligent Script Identification v4: Peak Prominence + Sentence Awareness. | |
| """ | |
| nparr = np.frombuffer(image_bytes, np.uint8) | |
| img_gray = cv2.imdecode(nparr, cv2.IMREAD_GRAYSCALE) | |
| if img_gray is None: return "hindi" | |
| _, thresh = cv2.threshold(img_gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU) | |
| coords = cv2.findNonZero(thresh) | |
| if coords is not None: | |
| x, y, w, h = cv2.boundingRect(coords) | |
| cropped_thresh = thresh[y:y+h, x:x+w] | |
| kernel = np.ones((1, 10), np.uint8) | |
| dilated = cv2.dilate(cropped_thresh, kernel, iterations=1) | |
| h_proj = np.sum(dilated, axis=1) | |
| search_range = int(h * 0.45) | |
| if search_range > 0: | |
| top_h_proj = h_proj[:search_range] | |
| max_density = np.max(top_h_proj) | |
| avg_density = np.mean(h_proj) | |
| density_ratio = max_density / (w * 255) if w > 0 else 0 | |
| prominence = max_density / (avg_density + 1e-6) | |
| top_half_ink = np.sum(cropped_thresh[:h//2]) | |
| bottom_half_ink = np.sum(cropped_thresh[h//2:]) | |
| weight_ratio = top_half_ink / (bottom_half_ink + 1e-6) | |
| print(f"DEBUG [Detect v4]: Ratio={density_ratio:.2f}, Prominence={prominence:.2f}, Weight={weight_ratio:.2f}") | |
| if (density_ratio > 0.35 and prominence > 1.8) or weight_ratio > 1.5: | |
| print("DEBUG [Detect]: HINDI Identified.") | |
| return "hindi" | |
| print("DEBUG [Detect]: ENGLISH Identified.") | |
| return "english" | |
| # ============================================================ | |
| # Prediction Endpoint | |
| # ============================================================ | |
| async def predict_ocr(file: UploadFile = File(...), lang: str = "hindi"): | |
| try: | |
| image_bytes = await file.read() | |
| # --- Language Selection --- | |
| final_lang = lang | |
| inference_steps = [] | |
| pretrained_prediction = "" | |
| if final_lang == "english": | |
| if model_eng is None or processor_eng is None: | |
| return {"error": "English local model not loaded. Run download_model.py first."} | |
| # Preprocess and prepare for TrOCR | |
| pixel_values, debug_b64 = preprocess_english(image_bytes) | |
| # Local Inference | |
| start_eng = time.time() | |
| with torch.no_grad(): | |
| generated_ids = model_eng.generate(pixel_values) | |
| prediction = processor_eng.batch_decode(generated_ids, skip_special_tokens=True)[0] | |
| eng_duration_sec = time.time() - start_eng | |
| final_prediction = prediction | |
| inference_steps = [{"word": prediction, "steps": [{"step": "Total", "top_candidates": [{"char": "Full Sequence", "confidence": 1.0}], "duration_sec": round(eng_duration_sec, 3)}]}] | |
| print(f"ROUTING TO '{final_lang}': Local Inference -> FINAL: '{final_prediction}' ({eng_duration_sec:.3f}s)") | |
| else: | |
| if model is None: return {"error": "Hindi model not loaded"} | |
| img_bgr = cv2.imdecode(np.frombuffer(image_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| if img_bgr is None: return {"error": "Could not decode image"} | |
| # Process entire image as a single sequence | |
| images, debug_b64 = preprocess_image(img_bgr) | |
| results = [] | |
| all_steps = [] | |
| if images is not None: | |
| pred, steps = beam_search_decode(model, images, k=3) | |
| if pred: | |
| results.append(pred) | |
| all_steps.append({"word": pred, "steps": steps}) | |
| final_prediction = " ".join(results) | |
| inference_steps = all_steps | |
| print(f"ROUTING TO '{final_lang}': Full-Sequence -> FINAL: '{final_prediction}'") | |
| return { | |
| "prediction": final_prediction, | |
| "raw_model_prediction": final_prediction, | |
| "engine_view": f"data:image/png;base64,{debug_b64}", | |
| "detected_lang": final_lang, | |
| "inference_steps": inference_steps | |
| } | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def predict_sentence(file: UploadFile = File(...), | |
| text_threshold: float = 0.7, | |
| link_threshold: float = 0.4, | |
| low_text: float = 0.4): | |
| try: | |
| if model is None: return {"error": "Hindi model not loaded"} | |
| image_bytes = await file.read() | |
| img_bgr = cv2.imdecode(np.frombuffer(image_bytes, np.uint8), cv2.IMREAD_COLOR) | |
| if img_bgr is None: return {"error": "Could not decode image"} | |
| # Process entire image as a single sequence | |
| images, debug_b64 = preprocess_image(img_bgr) | |
| results = [] | |
| all_steps = [] | |
| if images is not None: | |
| pred, steps = beam_search_decode(model, images, k=3) | |
| if pred: | |
| results.append(pred) | |
| all_steps.append({"word": pred, "steps": steps}) | |
| return { | |
| "prediction": " ".join(results), | |
| "words": results, | |
| "engine_view": f"data:image/png;base64,{debug_b64}", | |
| "inference_steps": all_steps, | |
| } | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |