invoice_extractor / inference.py
github-actions[bot]
Sync from GitHub: e183541197d8e0dcce4d172968f2290122cd5d77
904c006
"""
Inference Processor - Handles VLM extraction, validation, and result formatting
"""
import torch
import time
import json
import codecs
import re
import cv2
import numpy as np
from PIL import Image
from qwen_vl_utils import process_vision_info
from typing import Dict, Tuple
from config import (
MAX_IMAGE_SIZE,
HP_VALID_RANGE,
ASSET_COST_VALID_RANGE,
COST_PER_GPU_HOUR
)
from model_manager import model_manager
# Single-step extraction prompt (original "simple" mode)
EXTRACTION_PROMPT = """
You are an expert at reading noisy, handwritten Indian invoices and quotations.
Your task is to extract text EXACTLY as it appears in the image.
Do NOT translate, summarize, normalize, or rewrite any text.
Preserve the original language (Hindi, Marathi, Kannada, English, etc.).
Carefully read the image and extract the following fields.
Return ONLY valid JSON in this format:
{
"dealer_name": string,
"model_name": string,
"horse_power": number,
"asset_cost": number
}
Critical rules:
- Dealer name must be copied exactly from the image in the original language and spelling.
- Model name must be copied exactly from the image without translation.
- Do NOT convert regional language text into English.
- Do NOT expand abbreviations or correct spelling.
- Only numbers may be normalized.
Extraction hints:
- Asset cost is the total amount, usually the largest number on the page, the total amount after TAX, final price or final cost.
- Dealer name is usually at the top header or company name.
- Model name often appears near words like Model, Tractor, Variant.
- Horse power must come ONLY from explicit HP text, never from model numbers.
- Horse power may appear as "HP", handwritten like "49 HP", "63hp", "HP-30".
- Remove commas and currency symbols from numbers only.
- If handwriting is unclear, make your best reasonable interpretation of the characters — but preserve language.
Output rules:
- Output ONLY valid JSON.
- Do NOT include markdown, explanations, or extra text.
"""
# Combined Chain of Thought prompt (reasoning mode) - Single call with reasoning and extraction
COMBINED_REASONING_EXTRACTION_PROMPT = """
Analyze this Indian tractor invoice using Chain of Thought reasoning.
First, share your observations about the 2 key fields:
MODEL NAME:
- How is it presented? (checkbox/handwritten/printed or any other way)
- If a checkboxes or anything marked?
- What exact text do you see?
- There could be only one model asssociated with a deal. If you see multiple model names listed, check if one of them is marked or highlighted in some way.
- Based on this, which model name you think is involved in the deal.
HORSE POWER:
- Where is HP mentioned?
- Explicit text like "49 HP" or in checkbox?
- Which value is marked?
- HP must come from explicit HP text only, never from model numbers
- If only one value for HP is associated with the correct Model name, it is the models HP.
- If multiple HP values are associated with the correct Model, the correct HP will be usuall marked.
After reasoning, extract the fields.
Return ONLY valid JSON:
{{
"reasoning": "your observations and thoughts here",
"dealer_name": "string",
"model_name": string,
"horse_power": number,
"asset_cost": number
}}
Rules for extraction:
- Copy dealer/model names EXACTLY in original language, don't translate
- HP as number only ("49 HP" → 49), use selected checkbox
- Asset cost as number (remove ₹, commas: "1,50,000" → 150000)
- Asset cost is the final total after TAX
- Dealer is usually at top header
- If handwriting unclear, make best interpretation but preserve language
Extraction hints:
- Dealer name is usually at the top header or company name.
- Model name often appears near words like Model, Tractor, Variant.
- Horse power must come ONLY from explicit HP text, never from model numbers.
- Horse power may appear as "HP", handwritten like "49 HP", "63hp", "HP-30".
- Remove commas and currency symbols from numbers only.
- If handwriting is unclear, make your best reasonable interpretation of the characters — but preserve language.
Output ONLY valid JSON, no markdown.
"""
class InferenceProcessor:
"""Handles VLM inference, validation, and result processing"""
@staticmethod
def enhance_image_opencv(image_path: str) -> str:
"""
Apply OpenCV preprocessing to enhance image quality
Returns path to enhanced image (same as input, modified in place)
"""
# Load image (BGR)
img = cv2.imread(image_path)
if img is None:
raise ValueError(f"Could not read image from {image_path}")
# Convert to LAB color space (better for contrast)
lab = cv2.cvtColor(img, cv2.COLOR_BGR2LAB)
l, a, b = cv2.split(lab)
# CLAHE on L-channel
clahe = cv2.createCLAHE(clipLimit=2.5, tileGridSize=(8, 8))
l_enhanced = clahe.apply(l)
# Merge back
lab_enhanced = cv2.merge((l_enhanced, a, b))
contrast_enhanced = cv2.cvtColor(lab_enhanced, cv2.COLOR_LAB2BGR)
# Noise reduction
denoised = cv2.fastNlMeansDenoisingColored(
contrast_enhanced,
None,
h=10, hColor=10,
templateWindowSize=7,
searchWindowSize=21
)
# Sharpening (Unsharp Mask)
blur = cv2.GaussianBlur(denoised, (0, 0), sigmaX=1.2)
sharpened = cv2.addWeighted(denoised, 1.5, blur, -0.5, 0)
# Save enhanced image back to the same path
cv2.imwrite(image_path, sharpened)
print(f"✨ Image enhanced with OpenCV preprocessing")
return image_path
@staticmethod
def preprocess_image(image_path: str) -> Image.Image:
"""Load and resize image if needed"""
image = Image.open(image_path).convert("RGB")
# Resize if too large
if max(image.size) > MAX_IMAGE_SIZE:
ratio = MAX_IMAGE_SIZE / max(image.size)
new_size = (int(image.size[0] * ratio), int(image.size[1] * ratio))
image = image.resize(new_size, Image.LANCZOS)
print(f"🔄 Image resized to {new_size}")
return image
@staticmethod
def run_vlm_extraction(image: Image.Image) -> Tuple[str, float]:
"""Run VLM model to extract invoice fields"""
if not model_manager.is_loaded():
raise RuntimeError("Models not loaded")
model = model_manager.vlm_model
processor = model_manager.processor
messages = [
{
"role": "user",
"content": [
{"type": "image", "image": image},
{"type": "text", "text": EXTRACTION_PROMPT}
]
}
]
# Apply chat template
text = processor.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True
)
# Process vision input
image_inputs, video_inputs = process_vision_info(messages)
inputs = processor(
text=[text],
images=image_inputs,
videos=video_inputs,
padding=True,
return_tensors="pt",
)
inputs = inputs.to("cuda")
start = time.time()
# Generate
generated_ids = model.generate(**inputs, max_new_tokens=256)
latency = time.time() - start
# Decode output
generated_ids_trimmed = [
out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)
]
output_text = processor.batch_decode(
generated_ids_trimmed,
skip_special_tokens=True,
clean_up_tokenization_spaces=False
)
output_text = output_text[0] if isinstance(output_text, list) else output_text
# Clean up GPU memory
del inputs, generated_ids, generated_ids_trimmed
if torch.cuda.is_available():
torch.cuda.empty_cache()
return output_text, latency
@staticmethod
def run_vlm_reasoning_and_extraction(image: Image.Image) -> Tuple[str, str, float]:
"""
Run VLM model with combined Chain of Thought reasoning and extraction in single call
Returns: (reasoning_text, extraction_json_str, latency)
"""
if not model_manager.is_loaded():
raise RuntimeError("Models not loaded")
model = model_manager.vlm_model
processor = model_manager.processor
messages = [
{
"role": "user",
"content": [
{"type": "image", "image": image},
{"type": "text", "text": COMBINED_REASONING_EXTRACTION_PROMPT}
]
}
]
# Apply chat template
text = processor.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True
)
# Process vision input
image_inputs, video_inputs = process_vision_info(messages)
inputs = processor(
text=[text],
images=image_inputs,
videos=video_inputs,
padding=True,
return_tensors="pt",
)
inputs = inputs.to("cuda")
start = time.time()
# Generate with more tokens for combined reasoning + extraction
generated_ids = model.generate(**inputs, max_new_tokens=384)
latency = time.time() - start
# Decode output
generated_ids_trimmed = [
out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)
]
output_text = processor.batch_decode(
generated_ids_trimmed,
skip_special_tokens=True,
clean_up_tokenization_spaces=False
)
output_text = output_text[0] if isinstance(output_text, list) else output_text
# Clean up GPU memory
del inputs, generated_ids, generated_ids_trimmed
if torch.cuda.is_available():
torch.cuda.empty_cache()
# Parse the combined output to separate reasoning from extraction
reasoning_text = ""
extraction_json = output_text
# First, remove markdown code fences if present
cleaned_output = output_text.strip()
if cleaned_output.startswith('```'):
# Remove opening ```json or ```
lines = cleaned_output.split('\n')
if lines[0].startswith('```'):
lines = lines[1:]
# Remove closing ```
if lines and lines[-1].strip() == '```':
lines = lines[:-1]
cleaned_output = '\n'.join(lines).strip()
try:
# Try to parse the cleaned JSON
parsed = json.loads(cleaned_output)
if "reasoning" in parsed:
reasoning_text = parsed["reasoning"]
# Remove reasoning from output to get clean extraction JSON
extraction_dict = {k: v for k, v in parsed.items() if k != "reasoning"}
extraction_json = json.dumps(extraction_dict)
else:
# No reasoning field, use entire output as extraction
extraction_json = cleaned_output
except json.JSONDecodeError:
# If parsing fails, try to find JSON pattern in the text
json_match = re.search(r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}', cleaned_output, re.DOTALL)
if json_match:
json_str = json_match.group(0)
try:
parsed = json.loads(json_str)
if "reasoning" in parsed:
reasoning_text = parsed["reasoning"]
extraction_dict = {k: v for k, v in parsed.items() if k != "reasoning"}
extraction_json = json.dumps(extraction_dict)
else:
extraction_json = json_str
except:
extraction_json = json_str
# Everything before JSON is additional reasoning
prefix = cleaned_output[:json_match.start()].strip()
if prefix and not reasoning_text:
reasoning_text = prefix
print(f"🧠 Combined reasoning + extraction completed in {latency:.2f}s")
return reasoning_text, extraction_json, latency
@staticmethod
def extract_json_from_output(text: str) -> Dict:
"""Extract JSON from model output"""
# Handle single/double backticks
if text.count('```') in [1, 2]:
data = text.split('```')[1]
if data.startswith('json'):
data = data[4:]
try:
return json.loads(data.strip())
except:
pass
# Try markdown code blocks
markdown_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', text, re.DOTALL)
if markdown_match:
try:
return json.loads(markdown_match.group(1))
except json.JSONDecodeError:
pass
# Find JSON blocks
json_matches = re.finditer(r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}', text, re.DOTALL)
for match in json_matches:
json_str = match.group(0)
try:
parsed = json.loads(json_str)
# Verify expected keys
if all(key in parsed for key in ["dealer_name", "model_name", "horse_power", "asset_cost"]):
return parsed
except json.JSONDecodeError:
continue
# Fallback
return {
"dealer_name": None,
"model_name": None,
"horse_power": None,
"asset_cost": None
}
@staticmethod
def clean_text(text) -> str:
"""Clean text field"""
if not text:
return None
text = str(text).strip()
text = re.sub(r"\s+", " ", text)
return text if len(text) > 1 else None
@staticmethod
def clean_number(num):
"""Clean number field"""
try:
if num is None:
return None
return int(float(num))
except:
return None
@staticmethod
def fix_horse_power(vlm_hp, model_name) -> Tuple:
"""Fix common HP extraction mistakes"""
# Accept if in valid range
if vlm_hp is not None and HP_VALID_RANGE[0] <= vlm_hp <= HP_VALID_RANGE[1]:
return vlm_hp, 1.0
# Try extracting from model name
if model_name:
match = re.search(r"HP[- ]?(\d+)", model_name, re.I)
if match:
hp = int(match.group(1))
if HP_VALID_RANGE[0] <= hp <= HP_VALID_RANGE[1]:
return hp, 0.8
return None, 0.2
@staticmethod
def validate_asset_cost(cost) -> Tuple:
"""Validate asset cost"""
if cost is None:
return None, 0.2
cost = InferenceProcessor.clean_number(cost)
if ASSET_COST_VALID_RANGE[0] <= cost <= ASSET_COST_VALID_RANGE[1]:
return cost, 1.0
return None, 0.3
@staticmethod
def validate_text_field(text) -> Tuple:
"""Validate text fields"""
text = InferenceProcessor.clean_text(text)
if not text or len(text) < 3:
return None, 0.3
return text, 1.0
@staticmethod
def validate_prediction(raw_json: Dict) -> Tuple[Dict, float, list]:
"""Validate and fix extracted fields"""
warnings = []
confidences = []
# Dealer
dealer, dealer_conf = InferenceProcessor.validate_text_field(raw_json.get("dealer_name"))
if dealer is None:
warnings.append("Dealer name invalid")
confidences.append(dealer_conf)
# Model
model_name, model_conf = InferenceProcessor.validate_text_field(raw_json.get("model_name"))
if model_name is None:
warnings.append("Model name invalid")
confidences.append(model_conf)
# Horse Power
hp_raw = InferenceProcessor.clean_number(raw_json.get("horse_power"))
hp, hp_conf = InferenceProcessor.fix_horse_power(hp_raw, model_name)
if hp is None:
warnings.append("Horse power invalid")
confidences.append(hp_conf)
# Asset Cost
cost_raw = InferenceProcessor.clean_number(raw_json.get("asset_cost"))
cost, cost_conf = InferenceProcessor.validate_asset_cost(cost_raw)
if cost is None:
warnings.append("Asset cost invalid")
confidences.append(cost_conf)
# Overall field confidence
field_confidence = round(sum(confidences) / len(confidences), 3)
validated = {
"dealer_name": dealer,
"model_name": model_name,
"horse_power": hp,
"asset_cost": cost
}
return validated, field_confidence, warnings
@staticmethod
def process_invoice(image_path: str, doc_id: str = None, enhance_image: bool = False, reasoning_mode: str = "simple") -> Dict:
"""
Complete invoice processing pipeline
Args:
image_path: Path to invoice image
doc_id: Document identifier (optional)
enhance_image: Whether to apply OpenCV enhancement (optional)
reasoning_mode: "simple" for single-step extraction, "reason" for Chain of Thought (optional)
Returns:
dict: Complete JSON output with all fields
"""
total_start = time.time()
timing_breakdown = {}
# Generate doc_id if not provided
if doc_id is None:
import os
doc_id = os.path.splitext(os.path.basename(image_path))[0]
# Step 0: Apply OpenCV Enhancement if requested
if enhance_image:
t0 = time.time()
image_path = InferenceProcessor.enhance_image_opencv(image_path)
timing_breakdown['opencv_enhancement'] = round(time.time() - t0, 3)
# Step 1: Preprocess image
t1 = time.time()
image = InferenceProcessor.preprocess_image(image_path)
timing_breakdown['image_preprocessing'] = round(time.time() - t1, 3)
# Step 2: YOLO Detection
t2 = time.time()
signature_info, stamp_info, signature_conf, stamp_conf = model_manager.detect_sign_stamp(image_path)
timing_breakdown['yolo_detection'] = round(time.time() - t2, 3)
# Step 3: VLM Extraction (either simple or with Chain of Thought reasoning)
t3 = time.time()
if reasoning_mode == "reason":
# Combined Chain of Thought: reasoning + extraction in single call
print("🧠 Using Chain of Thought reasoning mode (single call)")
reasoning_output, vlm_output, vlm_latency = InferenceProcessor.run_vlm_reasoning_and_extraction(image)
timing_breakdown['vlm_inference'] = round(vlm_latency, 3)
# Store reasoning for debugging/transparency
timing_breakdown['reasoning_output'] = reasoning_output
else:
# Single-step simple extraction (original approach)
print("⚡ Using simple mode (1-step)")
vlm_output, vlm_latency = InferenceProcessor.run_vlm_extraction(image)
timing_breakdown['vlm_inference'] = round(vlm_latency, 3)
# Clean up image
image.close()
del image
# Step 4: Parse JSON
t4 = time.time()
raw_json = InferenceProcessor.extract_json_from_output(vlm_output)
timing_breakdown['json_parsing'] = round(time.time() - t4, 3)
# Step 5: Validate and fix
t5 = time.time()
validated_fields, field_confidence, warnings = InferenceProcessor.validate_prediction(raw_json)
timing_breakdown['validation'] = round(time.time() - t5, 3)
# Add signature and stamp
validated_fields["signature"] = signature_info
validated_fields["stamp"] = stamp_info
# Calculate overall confidence - average of YOLO detection scores only
confidences = []
if signature_info["present"]:
confidences.append(signature_conf)
if stamp_info["present"]:
confidences.append(stamp_conf)
# If both sign and stamp detected, average them; otherwise use whichever is present
overall_confidence = round(sum(confidences) / len(confidences), 3) if confidences else 0.0
# Calculate time and cost
total_time = time.time() - total_start
cost_estimate = (COST_PER_GPU_HOUR * total_time) / 3600
# Build result
result = {
"doc_id": doc_id,
"fields": validated_fields,
"confidence": overall_confidence,
"processing_time_sec": round(total_time, 2),
"timing_breakdown": timing_breakdown,
"cost_estimate_usd": round(cost_estimate, 6),
"warnings": warnings if warnings else None
}
return result