Orc1 / app.py
mike23415's picture
Update app.py
236b2b6 verified
import os
import sys
import gc
import numpy as np
import cv2
from PIL import Image, ImageEnhance, ImageFilter
import logging
import base64
import io
import pytesseract
from flask import Flask, request, jsonify
from flask_cors import CORS
import re
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = Flask(__name__)
CORS(app)
# Configure Tesseract path (adjust if needed)
# For Ubuntu/Debian: usually /usr/bin/tesseract
# For Windows: might need to set custom path
# pytesseract.pytesseract.tesseract_cmd = r'/usr/bin/tesseract'
def preprocess_image_advanced(image, enhance_type="default"):
"""
Advanced image preprocessing with better handling for exam papers and documents
"""
try:
# Convert PIL to OpenCV format if needed
if isinstance(image, Image.Image):
# Convert to RGB first
if image.mode != 'RGB':
image = image.convert('RGB')
cv_image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
else:
cv_image = image
# Convert to grayscale
if len(cv_image.shape) == 3:
gray = cv2.cvtColor(cv_image, cv2.COLOR_BGR2GRAY)
else:
gray = cv_image
# Get image dimensions
height, width = gray.shape
# Apply preprocessing based on type
if enhance_type == "default":
# Resize if too small (important for OCR accuracy)
if max(height, width) < 600:
scale_factor = 600 / max(height, width)
new_width = int(width * scale_factor)
new_height = int(height * scale_factor)
gray = cv2.resize(gray, (new_width, new_height), interpolation=cv2.INTER_CUBIC)
# Denoise
gray = cv2.medianBlur(gray, 3)
# Enhance contrast slightly
gray = cv2.convertScaleAbs(gray, alpha=1.1, beta=10)
elif enhance_type == "document":
# Optimized for document/exam paper processing
# Resize for better OCR
if max(height, width) < 800:
scale_factor = 800 / max(height, width)
new_width = int(width * scale_factor)
new_height = int(height * scale_factor)
gray = cv2.resize(gray, (new_width, new_height), interpolation=cv2.INTER_CUBIC)
# Remove noise while preserving text
gray = cv2.bilateralFilter(gray, 9, 75, 75)
# Enhance contrast with CLAHE
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8))
gray = clahe.apply(gray)
# Morphological operations to clean up text
kernel = np.ones((1,1), np.uint8)
gray = cv2.morphologyEx(gray, cv2.MORPH_CLOSE, kernel)
elif enhance_type == "enhance":
# Maximum enhancement for poor quality images
# Aggressive resizing
if max(height, width) < 1000:
scale_factor = 1000 / max(height, width)
new_width = int(width * scale_factor)
new_height = int(height * scale_factor)
gray = cv2.resize(gray, (new_width, new_height), interpolation=cv2.INTER_CUBIC)
# Strong denoising
gray = cv2.bilateralFilter(gray, 15, 80, 80)
# Enhance contrast significantly
clahe = cv2.createCLAHE(clipLimit=4.0, tileGridSize=(8,8))
gray = clahe.apply(gray)
# Sharpening
kernel = np.array([[-1,-1,-1], [-1,9,-1], [-1,-1,-1]])
gray = cv2.filter2D(gray, -1, kernel)
elif enhance_type == "binary":
# Convert to binary for text documents
# Use adaptive thresholding
gray = cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY, 11, 2)
# Clean up with morphological operations
kernel = np.ones((1,1), np.uint8)
gray = cv2.morphologyEx(gray, cv2.MORPH_CLOSE, kernel)
return gray
except Exception as e:
logger.error(f"Preprocessing error: {e}")
# Return original grayscale as fallback
if isinstance(image, Image.Image):
return np.array(image.convert('L'))
return cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) if len(image.shape) == 3 else image
def post_process_text(text):
"""
Post-process OCR text to fix common issues
"""
if not text or not text.strip():
return text
# Clean up the text
processed_text = text
# Fix common OCR character substitutions
char_replacements = {
'0': 'O', # Zero to O in words
'1': 'I', # One to I in words
'5': 'S', # Five to S in words
'8': 'B', # Eight to B in words
'rn': 'm', # Common OCR error
'vv': 'w', # Common OCR error
'|': 'I', # Pipe to I
'!': 'I', # Exclamation to I
}
# Apply character replacements contextually
words = processed_text.split()
corrected_words = []
for word in words:
if word and len(word) > 1:
# Don't replace numbers in obvious numeric contexts
if not re.match(r'^\d+$', word):
corrected_word = word
for old_char, new_char in char_replacements.items():
if old_char in corrected_word and not corrected_word.isdigit():
corrected_word = corrected_word.replace(old_char, new_char)
corrected_words.append(corrected_word)
else:
corrected_words.append(word)
else:
corrected_words.append(word)
processed_text = ' '.join(corrected_words)
# Add spaces before capital letters that seem to be concatenated words
processed_text = re.sub(r'([a-z])([A-Z])', r'\1 \2', processed_text)
# Add spaces before numbers that seem concatenated with letters
processed_text = re.sub(r'([a-zA-Z])(\d)', r'\1 \2', processed_text)
processed_text = re.sub(r'(\d)([a-zA-Z])', r'\1 \2', processed_text)
# Fix common word concatenations
common_fixes = {
'thebest': 'the best',
'inall': 'in all',
'whichts': 'which is',
'Q1': 'Q1.',
'Q2': 'Q2.',
'Q3': 'Q3.',
'Q4': 'Q4.',
'Q5': 'Q5.',
'Q6': 'Q6.',
'Q7': 'Q7.',
'Q8': 'Q8.',
'Q9': 'Q9.',
'aWhat': 'a) What',
'bWhat': 'b) What',
'cWhat': 'c) What',
'dWhat': 'd) What',
'eWhat': 'e) What',
'bMention': 'b) Mention',
'cState': 'c) State',
'aState': 'a) State',
'bExplain': 'b) Explain',
'aExplain': 'a) Explain',
'cExplain': 'c) Explain',
'dExplain': 'd) Explain',
'eExplain': 'e) Explain',
'ENDTERM': 'END TERM',
'EXAMINATION': 'EXAMINATION',
'MaxtmumMarks': 'Maximum Marks',
'Attemptfivequestions': 'Attempt five questions',
'compulsory': 'compulsory',
'Sx525': '5×5=25',
'bjDefine': 'b) Define',
'foracoin': 'for a coin',
'tossingexperiment': 'tossing experiment',
'reasonsforhigher': 'reasons for higher',
'noiseinmixers': 'noise in mixers',
'typesofanalog': 'types of analog',
'advantagesofVSBAM': 'advantages of VSBAM',
'jointprobability': 'joint probability',
'conditionalprobability': 'conditional probability',
'twoproperties': 'two properties',
'GaussianProcess': 'Gaussian Process',
'fourproperties': 'four properties',
'powerspectraldensity': 'power spectral density',
'poissionprocess': 'Poisson process',
'weinerprocess': 'Wiener process',
'analogmodulation': 'analog modulation',
'suitablediagram': 'suitable diagram',
'needmodulation': 'need modulation',
'DSBSCmodulation': 'DSBSC modulation',
'demodulationwith': 'demodulation with',
'coherentdetection': 'coherent detection',
'theirdrawbacks': 'their drawbacks',
'broadcastradio': 'broadcast radio',
'transmitterradiates': 'transmitter radiates',
'kWpowerwhen': 'kW power when',
'modulationpercentage': 'modulation percentage',
'carrierpower': 'carrier power',
'carrierfrequency': 'carrier frequency',
'frequencymodulated': 'frequency modulated',
'sinusoidalsignal': 'sinusoidal signal',
'KHzresulting': 'KHz resulting',
'maximumfrequency': 'maximum frequency',
'deviationof': 'deviation of',
'approximatebandwidth': 'approximate bandwidth',
'modulatedsignal': 'modulated signal',
'narrowbandFM': 'narrowband FM',
'widebandFM': 'wideband FM',
'twomethods': 'two methods',
'producingFM': 'producing FM',
'ratiodetector': 'ratio detector',
'preemphasis': 'pre-emphasis',
'deemphasis': 'de-emphasis',
'processprovideoferall': 'process provide overall',
'SNRimprovement': 'SNR improvement',
'FMsystems': 'FM systems',
'shortnoteon': 'short note on',
'captureeffect': 'capture effect',
'thresholdeffect': 'threshold effect',
'externalnoise': 'external noise',
'externalsources': 'external sources',
'desemphasis': 'de-emphasis'
}
for old_phrase, new_phrase in common_fixes.items():
processed_text = processed_text.replace(old_phrase, new_phrase)
# Clean up extra spaces
processed_text = re.sub(r'\s+', ' ', processed_text)
processed_text = processed_text.strip()
return processed_text
def extract_text_tesseract_adaptive(image, lang='eng', psm=6):
"""
Adaptive OCR that tries multiple configurations for different image types
"""
try:
# Strategy 1: Try with conservative whitelist first
try:
whitelist_chars = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz .,!?-:;()[]{}=+×÷%/'
custom_config = f'--oem 3 --psm {psm} -c tessedit_char_whitelist={whitelist_chars}'
text = pytesseract.image_to_string(image, lang=lang, config=custom_config)
data = pytesseract.image_to_data(image, lang=lang, config=custom_config, output_type=pytesseract.Output.DICT)
# Check if we got reasonable results
if text.strip() and len(text.strip()) > 0:
logger.info("Strategy 1 (whitelist) successful")
return process_ocr_result(text, data, "whitelist")
except Exception as e:
logger.warning(f"Strategy 1 (whitelist) failed: {e}")
# Strategy 2: Try without whitelist but with other optimizations
try:
custom_config = f'--oem 3 --psm {psm} -c tessedit_do_invert=0'
text = pytesseract.image_to_string(image, lang=lang, config=custom_config)
data = pytesseract.image_to_data(image, lang=lang, config=custom_config, output_type=pytesseract.Output.DICT)
if text.strip() and len(text.strip()) > 0:
logger.info("Strategy 2 (no whitelist) successful")
return process_ocr_result(text, data, "no_whitelist")
except Exception as e:
logger.warning(f"Strategy 2 (no whitelist) failed: {e}")
# Strategy 3: Basic configuration as fallback
try:
custom_config = f'--oem 3 --psm {psm}'
text = pytesseract.image_to_string(image, lang=lang, config=custom_config)
data = pytesseract.image_to_data(image, lang=lang, config=custom_config, output_type=pytesseract.Output.DICT)
logger.info("Strategy 3 (basic) used as fallback")
return process_ocr_result(text, data, "basic")
except Exception as e:
logger.error(f"All OCR strategies failed: {e}")
return {'text': '', 'raw_text': '', 'confidence': 0.0, 'word_count': 0}
except Exception as e:
logger.error(f"Adaptive OCR error: {e}")
return {'text': '', 'raw_text': '', 'confidence': 0.0, 'word_count': 0}
def process_ocr_result(text, data, strategy):
"""Helper function to process OCR results consistently"""
# Calculate average confidence
confidences = [int(conf) for conf in data['conf'] if int(conf) > 0]
avg_confidence = sum(confidences) / len(confidences) if confidences else 0
# Post-process the text
cleaned_text = post_process_text(text)
return {
'text': cleaned_text,
'raw_text': text,
'confidence': avg_confidence / 100.0,
'word_count': len([w for w in data['text'] if w.strip()]),
'strategy': strategy
}
def process_image_smart_improved(image, enhance_type="default"):
"""
Smart processing with adaptive OCR strategies
"""
try:
# First, try with advanced preprocessing
processed_img = preprocess_image_advanced(image, enhance_type)
# Try different approaches with adaptive OCR
results = []
# Mode 6: Block of text (best for documents)
result = extract_text_tesseract_adaptive(processed_img, psm=6)
if result['text']:
results.append(('psm_6', result))
# If confidence is low, try document-specific preprocessing
if not results or results[0][1]['confidence'] < 0.6:
if enhance_type != "document":
doc_processed = preprocess_image_advanced(image, "document")
result = extract_text_tesseract_adaptive(doc_processed, psm=6)
if result['text'] and result['confidence'] > (results[0][1]['confidence'] if results else 0):
results = [('psm_6_document', result)]
# Try other PSM modes if still poor results
if not results or results[0][1]['confidence'] < 0.5:
# Mode 4: Single column of text
result = extract_text_tesseract_adaptive(processed_img, psm=4)
if result['text']:
results.append(('psm_4', result))
# Mode 13: Single text line
result = extract_text_tesseract_adaptive(processed_img, psm=13)
if result['text']:
results.append(('psm_13', result))
# Return the best result
if results:
best_method, best_result = max(results, key=lambda x: x[1]['confidence'])
best_result['method'] = best_method
best_result['preprocessing'] = enhance_type
return best_result
else:
return {
'text': '', 'raw_text': '', 'confidence': 0.0, 'word_count': 0,
'method': 'none', 'preprocessing': enhance_type
}
except Exception as e:
logger.error(f"Smart processing error: {e}")
return {
'text': '', 'raw_text': '', 'confidence': 0.0, 'word_count': 0,
'method': 'error', 'preprocessing': enhance_type
}
# Alternative: Image-specific preprocessing detector
def detect_image_type(image):
"""
Detect image characteristics to choose optimal processing
"""
try:
# Convert to numpy array for analysis
if isinstance(image, Image.Image):
img_array = np.array(image.convert('RGB'))
else:
img_array = image
# Calculate image statistics
gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) if len(img_array.shape) == 3 else img_array
height, width = gray.shape
# Check image size
is_small = max(height, width) < 600
# Check contrast
contrast = gray.std()
is_low_contrast = contrast < 50
# Check if mostly text (high edge density in certain patterns)
edges = cv2.Canny(gray, 50, 150)
edge_density = np.sum(edges > 0) / (height * width)
is_text_heavy = edge_density > 0.1
# Determine optimal enhancement
if is_small or is_low_contrast:
return "enhance"
elif is_text_heavy:
return "document"
else:
return "default"
except Exception as e:
logger.warning(f"Image type detection failed: {e}")
return "default"
# Enhanced OCR endpoint with auto-detection
def ocr_endpoint_enhanced():
"""
OCR endpoint with automatic image type detection
"""
try:
logger.info("OCR request received")
# ... (existing parameter handling code) ...
# Auto-detect optimal enhancement if not specified
if enhancement == 'auto':
enhancement = detect_image_type(image)
logger.info(f"Auto-detected enhancement type: {enhancement}")
# Process image with improved OCR
logger.info("Starting adaptive OCR processing")
result = process_image_smart_improved(image, enhancement)
# Add debugging info
response = {
"success": True,
"text": result['text'],
"confidence": round(result['confidence'], 3),
"character_count": len(result['text']),
"word_count": result.get('word_count', 0),
"method_used": result.get('method', 'unknown'),
"preprocessing_used": result.get('preprocessing', 'unknown'),
"ocr_strategy": result.get('strategy', 'unknown'), # New field
"language": language,
"engine": "PyTesseract Adaptive"
}
return jsonify(response)
except Exception as e:
logger.error(f"OCR processing error: {str(e)}")
return jsonify({"error": str(e), "success": False}), 500
@app.route('/')
def home():
"""Root endpoint"""
return jsonify({
"service": "Enhanced PyTesseract OCR",
"status": "running",
"version": "2.0.0",
"engine": "PyTesseract",
"description": "Advanced OCR service with improved text processing for documents and exam papers",
"endpoints": {
"health": "/health",
"ocr": "/ocr (POST)",
"batch_ocr": "/ocr/batch (POST)"
},
"supported_formats": ["PNG", "JPEG", "JPG", "BMP", "TIFF", "GIF"],
"preprocessing_types": ["default", "document", "enhance", "binary"],
"languages": ["eng", "fra", "deu", "spa", "ita", "por"],
"features": [
"Advanced text post-processing",
"Document-optimized preprocessing",
"Smart character correction",
"Word separation for concatenated text",
"Exam paper and form optimization",
"Multiple OCR modes with fallback",
"Improved spacing and formatting"
]
})
@app.route('/health', methods=['GET'])
def health_check():
"""Health check endpoint"""
try:
test_result = pytesseract.get_tesseract_version()
return jsonify({
"status": "healthy",
"tesseract_version": test_result.public,
"service": "Enhanced PyTesseract OCR"
})
except Exception as e:
return jsonify({
"status": "error",
"error": str(e),
"service": "Enhanced PyTesseract OCR"
}), 500
@app.route('/ocr', methods=['POST'])
def ocr_endpoint():
"""Enhanced OCR endpoint with better text processing"""
try:
logger.info("OCR request received")
# Check if image is provided
if 'image' not in request.files and not request.is_json:
return jsonify({"error": "No image provided. Use 'image' field for file upload or JSON with 'image_base64'"}), 400
# Get parameters
if request.is_json:
enhancement = request.json.get('enhancement', 'default')
language = request.json.get('language', 'eng')
include_raw = request.json.get('include_raw', False)
else:
enhancement = request.form.get('enhancement', 'default')
language = request.form.get('language', 'eng')
include_raw = request.form.get('include_raw', 'false').lower() == 'true'
# Validate parameters
valid_enhancements = ['default', 'document', 'enhance', 'binary']
if enhancement not in valid_enhancements:
return jsonify({"error": f"Invalid enhancement type. Use: {', '.join(valid_enhancements)}"}), 400
# Load image
try:
if 'image' in request.files:
image_file = request.files['image']
if image_file.filename == '':
return jsonify({"error": "No file selected"}), 400
image_data = image_file.read()
image = Image.open(io.BytesIO(image_data))
else:
image_data = request.json['image_base64']
if image_data.startswith('data:image'):
image_data = image_data.split(',')[1]
image_bytes = base64.b64decode(image_data)
image = Image.open(io.BytesIO(image_bytes))
# Validate image
if image.size[0] == 0 or image.size[1] == 0:
return jsonify({"error": "Invalid image dimensions"}), 400
except Exception as e:
return jsonify({"error": f"Invalid image: {str(e)}"}), 400
# Process image with improved OCR
logger.info("Starting enhanced OCR processing")
result = process_image_smart_improved(image, enhancement)
# Clean up
del image
gc.collect()
logger.info(f"OCR completed. Text length: {len(result['text'])}, Confidence: {result['confidence']:.2f}")
response = {
"success": True,
"text": result['text'],
"confidence": round(result['confidence'], 3),
"character_count": len(result['text']),
"word_count": result.get('word_count', 0),
"method_used": result.get('method', 'unknown'),
"preprocessing_used": result.get('preprocessing', 'unknown'),
"language": language,
"engine": "PyTesseract Enhanced"
}
# Include raw text if requested
if include_raw and 'raw_text' in result:
response["raw_text"] = result['raw_text']
return jsonify(response)
except Exception as e:
logger.error(f"OCR processing error: {str(e)}")
gc.collect()
return jsonify({"error": str(e), "success": False}), 500
@app.route('/ocr/batch', methods=['POST'])
def batch_ocr_endpoint():
"""Enhanced batch OCR endpoint"""
try:
logger.info("Batch OCR request received")
if 'images' not in request.files:
return jsonify({"error": "No images provided. Use 'images' field for multiple file upload"}), 400
images = request.files.getlist('images')
if not images:
return jsonify({"error": "No images found"}), 400
# Limit batch size
max_batch_size = 5
if len(images) > max_batch_size:
return jsonify({"error": f"Maximum {max_batch_size} images allowed per batch"}), 400
enhancement = request.form.get('enhancement', 'default')
language = request.form.get('language', 'eng')
include_raw = request.form.get('include_raw', 'false').lower() == 'true'
results = []
for i, image_file in enumerate(images):
try:
logger.info(f"Processing image {i+1}/{len(images)}")
if image_file.filename == '':
results.append({
"index": i,
"filename": "empty_file",
"error": "Empty filename",
"success": False
})
continue
image_data = image_file.read()
image = Image.open(io.BytesIO(image_data))
# Process with enhanced OCR
result = process_image_smart_improved(image, enhancement)
batch_result = {
"index": i,
"filename": image_file.filename,
"text": result['text'],
"confidence": round(result['confidence'], 3),
"character_count": len(result['text']),
"word_count": result.get('word_count', 0),
"method_used": result.get('method', 'unknown'),
"success": True
}
if include_raw and 'raw_text' in result:
batch_result["raw_text"] = result['raw_text']
results.append(batch_result)
# Clean up
del image
gc.collect()
except Exception as e:
logger.error(f"Error processing image {i}: {str(e)}")
results.append({
"index": i,
"filename": image_file.filename if hasattr(image_file, 'filename') else f"image_{i}",
"error": str(e),
"success": False
})
gc.collect()
successful_count = sum(1 for r in results if r["success"])
return jsonify({
"success": True,
"results": results,
"total_processed": len(results),
"successful": successful_count,
"failed": len(results) - successful_count,
"enhancement_used": enhancement,
"language": language,
"engine": "PyTesseract Enhanced"
})
except Exception as e:
logger.error(f"Batch OCR error: {str(e)}")
gc.collect()
return jsonify({"error": str(e), "success": False}), 500
@app.route('/languages', methods=['GET'])
def get_languages():
"""Get available languages"""
try:
languages = {
'eng': 'English',
'fra': 'French',
'deu': 'German',
'spa': 'Spanish',
'ita': 'Italian',
'por': 'Portuguese',
'rus': 'Russian',
'chi_sim': 'Chinese Simplified',
'chi_tra': 'Chinese Traditional',
'jpn': 'Japanese',
'kor': 'Korean',
'ara': 'Arabic',
'hin': 'Hindi'
}
return jsonify({
"available_languages": languages,
"default": "eng",
"note": "Language support depends on your Tesseract installation"
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.errorhandler(404)
def not_found(error):
return jsonify({
"error": "Endpoint not found",
"available_endpoints": {
"GET /": "Service information",
"GET /health": "Health check",
"POST /ocr": "Single image OCR",
"POST /ocr/batch": "Batch image OCR",
"GET /languages": "Available languages"
}
}), 404
@app.errorhandler(500)
def internal_error(error):
gc.collect()
return jsonify({
"error": "Internal server error",
"message": "Please check server logs"
}), 500
if __name__ == '__main__':
logger.info("Starting Enhanced PyTesseract OCR service...")
port = int(os.environ.get('PORT', 7860))
app.run(host='0.0.0.0', port=port, debug=False, threaded=True)