Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| OCR Document Verification with Batch Processing & Required Document Checklist | |
| Usage: | |
| # Single file (backward compatible) | |
| python ocrupdated2.py --file image.jpg --inputkeywords "keyword1 keyword2" --fuzzy --debug | |
| # Multiple files with required document checklist | |
| python ocrupdated2.py --file doc1.pdf doc2.jpg doc3.png --inputkeywords "Shaikh Anisa Rahat" --required PAN HSC AgeNationalityDomicile --fuzzy --debug | |
| NOTE: Use spaces to separate required document types, NOT commas: | |
| ✅ --required PAN Aadhaar HSC | |
| ❌ --required PAN, Aadhaar, HSC | |
| """ | |
| import argparse | |
| import re | |
| import os | |
| import tempfile | |
| from collections import defaultdict | |
| from paddleocr import PaddleOCR | |
| import difflib | |
| from concurrent.futures import ThreadPoolExecutor | |
| import multiprocessing | |
| import sys | |
| # Optional PDF support | |
| try: | |
| import fitz # PyMuPDF | |
| PDF_SUPPORT = True | |
| except ImportError: | |
| PDF_SUPPORT = False | |
| print("Warning: PyMuPDF not installed. PDF support disabled. Install with: pip install PyMuPDF") | |
| # Document keywords (unchanged) | |
| DOC_KEYWORDS = { | |
| "Aadhaar": [ | |
| "uidai", "aadhaar", "aadhar", "government of india", "भारत सरकार", | |
| "आधार", "यूआईडीएआई", "प्रधानमंत्री", "जन्म तिथि", "पता", "लिंग", | |
| "unique identification authority", "aadhaar number", "enrollment number" | |
| ], | |
| "PAN": [ | |
| "permanent account number", "income tax", "incometaxindia", "pan", | |
| "income tax department", "आयकर विभाग", "स्थायी खाता संख्या", | |
| "taxpayer", "father's name", "पिता का नाम", "signature", "inc" | |
| ], | |
| "Driving_License": [ | |
| "driving licence", "motor vehicles act", "rto", "mcwg", "lmv", | |
| "transport department", "licence no", "valid till", "date of issue", | |
| "ड्राइविंग लाइसेंस", "परिवहन विभाग", "challan", "regional transport office" | |
| ], | |
| "Passport": [ | |
| "passport", "republic of india", "ministry of external affairs", | |
| "passport number", "date of issue", "date of expiry", "surname", | |
| "given names", "nationality indian", "पासपोर्ट", "गणराज्य", "विदेश मंत्रालय", | |
| "consular", "visa" | |
| ], | |
| "SSC": [ | |
| "secondary school certificate", "statement of marks", "ssc", "10th", "class x", | |
| "board of secondary education", "maharashtra state board", "matriculation", | |
| "roll number", "seat number", "subject code", "marks obtained", "grade", "pass" | |
| ], | |
| "HSC": [ | |
| "higher secondary certificate", "statement of marks", "hsc", "12th", "class xii", | |
| "board of higher secondary education", "maharashtra state board", "intermediate", | |
| "stream", "science", "commerce", "arts", "marks obtained", "grade", "percentage" | |
| ], | |
| "AgeNationalityDomicile": [ | |
| "certificate of age nationality and domicile", "domicile certificate", | |
| "age nationality domicile", "tehsildar", "executive magistrate", "collector", | |
| "certificate of residence", "domiciled in the state of", "citizen of india", | |
| "residence proof", "maharashtra domicile", "satara", "karad", "taluka", "district" | |
| ], | |
| "Ration_Card": [ | |
| "ration card", "food and civil supplies", "apl", "bpl", "aay", "antyodaya", | |
| "ration card number", "family members", "head of family", | |
| "राशन कार्ड", "खाद्य पुरवठा", "नागरी पुरवठा विभाग", "fps", "fair price shop" | |
| ], | |
| "Cast_Certificate": [ | |
| "CASTE CERTIFICATE", | |
| "FORM - 8", | |
| "Rule No. 5(6)", | |
| "De-Notified Tribe (Vimukt Jati)", | |
| "Nomadic Tribe/Other Backward Class", | |
| "Special Backward Category", | |
| "recognised as", | |
| "Government Resolution", | |
| "Sub Divisional Officer", | |
| "belonging to the State of Maharashtra" | |
| ], | |
| "Income_Certificate": [ | |
| "१ वर्षासाठी उत्पन्नाचे प्रमाणपत्र", | |
| "ऑफिस ऑफ नायब तहसीलदार", | |
| "वार्षिक उत्पन्न", | |
| "मिळालेले १ वर्षाचे उत्पन्न", | |
| "कुटुंबातील सर्व सदस्यांचे", | |
| "प्रमाणित करण्यात येते की", | |
| "वैध राहील", | |
| "Signature valid", | |
| "Digitally Signed by" | |
| ], | |
| "PCM_Score_Card": [ | |
| "MAH-MHT CET (PCM Group)", | |
| "State Common Entrance Test Cell", | |
| "Score Card", | |
| "Physics", | |
| "Chemistry", | |
| "Mathematics", | |
| "Total Percentile", | |
| "Normalization document", | |
| "Centralized Admission Process (CAP)", | |
| "IP address of the Computer" | |
| ] | |
| } | |
| # Validate keyword uniqueness (unchanged) | |
| _keyword_sets = {k: set(v) for k, v in DOC_KEYWORDS.items()} | |
| for doc1 in DOC_KEYWORDS: | |
| for doc2 in DOC_KEYWORDS: | |
| if doc1 < doc2: | |
| overlap = _keyword_sets[doc1].intersection(_keyword_sets[doc2]) | |
| if overlap: | |
| print(f"⚠️ Warning: Overlap between {doc1} and {doc2}: {overlap}") | |
| # NEW: Pre-compile regex patterns for performance | |
| NOISE_PATTERN = re.compile(r'^[b-df-hj-np-tv-xz]{4,}$') | |
| TOKEN_PATTERN = re.compile(r'[\u0900-\u097F]{2,}|\w{3,}') | |
| STOPWORDS = {'the', 'and', 'of', 'in', 'to', 'for', 'is', 'on', 'by', 'with', 'at', 'from', 'a', 'an', 'this'} | |
| def normalize_text(text): | |
| """Robust multilingual tokenization with noise filtering""" | |
| text = text.lower() | |
| # Extract Hindi Devanagari (2+ chars) OR English alphanumeric (3+ chars) | |
| tokens = TOKEN_PATTERN.findall(text) | |
| # Remove common English stopwords | |
| tokens = [t for t in tokens if t not in STOPWORDS] | |
| # Remove OCR noise (4+ consecutive consonants = garbage) | |
| tokens = [t for t in tokens if not NOISE_PATTERN.match(t)] | |
| return tokens | |
| def pdf_to_images(pdf_path, max_pages=3): | |
| """Convert PDF pages to high-resolution temporary images""" | |
| if not PDF_SUPPORT: | |
| raise ValueError("PDF support not available. Install PyMuPDF") | |
| doc = fitz.open(pdf_path) | |
| total_pages = len(doc) | |
| pages_to_process = min(total_pages, max_pages) | |
| image_paths = [] | |
| temp_dir = tempfile.mkdtemp(prefix="ocr_pdf_") | |
| for page_num in range(pages_to_process): | |
| page = doc.load_page(page_num) | |
| zoom = 2 # 2x resolution for better OCR | |
| mat = fitz.Matrix(zoom, zoom) | |
| pix = page.get_pixmap(matrix=mat) | |
| img_path = os.path.join(temp_dir, f"page_{page_num + 1}.png") | |
| pix.save(img_path) | |
| image_paths.append(img_path) | |
| doc.close() | |
| return image_paths, total_pages, temp_dir | |
| def process_page_ocr(img_path, page_num, ocr, debug): | |
| """Process a single page with OCR (for parallel execution)""" | |
| try: | |
| if debug: | |
| print(f"\n--- Processing PDF Page {page_num} ---") | |
| result = ocr.predict(input=img_path) | |
| texts = [] | |
| for res in result: | |
| texts.extend(res['rec_texts']) | |
| return texts | |
| except Exception as e: | |
| print(f"❌ ERROR: OCR failed on page {page_num}: {str(e)}") | |
| return [] | |
| def get_ocr_text(file_path, ocr, max_pages=3, debug=False): | |
| """Process image or PDF with OCR, returning all extracted text lines""" | |
| all_texts = [] | |
| temp_dir = None | |
| try: | |
| if file_path.lower().endswith('.pdf'): | |
| if not PDF_SUPPORT: | |
| print("Error: PDF file provided but PyMuPDF not installed") | |
| return [] | |
| image_paths, total_pages, temp_dir = pdf_to_images(file_path, max_pages) | |
| print(f"Processing PDF: {total_pages} pages total, processing first {len(image_paths)} pages...") | |
| # Process pages in parallel | |
| max_workers = min(len(image_paths), 4) | |
| with ThreadPoolExecutor(max_workers=max_workers) as executor: | |
| future_to_page = { | |
| executor.submit(process_page_ocr, img_path, i+1, ocr, debug): i | |
| for i, img_path in enumerate(image_paths) | |
| } | |
| page_results = [None] * len(image_paths) | |
| for future in future_to_page: | |
| page_idx = future_to_page[future] | |
| try: | |
| page_results[page_idx] = future.result() | |
| except Exception as e: | |
| print(f"❌ ERROR: Failed to process page {page_idx+1}: {str(e)}") | |
| page_results[page_idx] = [] | |
| for texts in page_results: | |
| all_texts.extend(texts) | |
| else: | |
| result = ocr.predict(input=file_path) | |
| for res in result: | |
| all_texts.extend(res['rec_texts']) | |
| except Exception as e: | |
| print(f"❌ ERROR: Failed to process file {file_path}: {str(e)}") | |
| return [] | |
| finally: | |
| if temp_dir and os.path.exists(temp_dir): | |
| import shutil | |
| shutil.rmtree(temp_dir) | |
| return all_texts | |
| def fuzzy_match(token, target_set, threshold=0.75): | |
| """ | |
| Multi-level matching for OCR errors: | |
| 1. Exact match | |
| 2. Levenshtein distance | |
| 3. Substring containment | |
| 4. Hindi character-level similarity | |
| """ | |
| if token in target_set: | |
| return token | |
| # Levenshtein distance match | |
| matches = difflib.get_close_matches(token, target_set, n=1, cutoff=threshold) | |
| if matches: | |
| return matches[0] | |
| # Substring match (handles concatenated words) | |
| for ocr_token in target_set: | |
| if token in ocr_token or ocr_token in token: | |
| return ocr_token | |
| # Hindi-specific fuzzy matching | |
| if any('\u0900' <= c <= '\u097F' for c in token): | |
| for ocr_token in target_set: | |
| if len(ocr_token) > 3: | |
| similarity = difflib.SequenceMatcher(None, token, ocr_token).ratio() | |
| if similarity > threshold: | |
| return ocr_token | |
| return None | |
| def calculate_doc_type(ocr_tokens, debug=False): | |
| """ | |
| Enhanced document classification with CORRECTED tie-breaking logic. | |
| Only compares documents that are ACTUALLY TIED (within 5% score). | |
| """ | |
| ocr_set = set(ocr_tokens) | |
| ocr_combined = " ".join(ocr_tokens) | |
| scores = {} | |
| # Pre-calculate keyword sets once | |
| doc_keyword_sets = {} | |
| for doc_type, keywords in DOC_KEYWORDS.items(): | |
| doc_keyword_sets[doc_type] = set(k.lower() for k in keywords) | |
| for doc_type, kw_set in doc_keyword_sets.items(): | |
| # Primary: exact/fuzzy token matches (weighted 2 for exact, 1.5 for fuzzy) | |
| primary_matches = 0 | |
| for kw in kw_set: | |
| if kw in ocr_set: | |
| primary_matches += 2 | |
| elif fuzzy_match(kw, ocr_set): | |
| primary_matches += 1.5 | |
| # Secondary: multi-word phrase matches in combined text | |
| phrase_matches = sum(1 for kw in kw_set if " " in kw and kw in ocr_combined) | |
| # Tertiary: title keyword bonus | |
| title_keywords = [kw for kw in kw_set if any(word in kw for word in ["certificate", "card", "licence", "passport"])] | |
| title_match = sum(1 for kw in title_keywords if kw in ocr_combined) | |
| # Calculate weighted score | |
| max_possible = len(kw_set) * 2 | |
| weighted_score = ((primary_matches + phrase_matches + title_match) / max_possible) * 100 | |
| scores[doc_type] = weighted_score | |
| if debug: | |
| print(f" {doc_type:<25}: {weighted_score:>6.1f}% ({primary_matches:.1f} + {phrase_matches} + {title_match})") | |
| # Sort by score descending | |
| sorted_scores = sorted(scores.items(), key=lambda x: x[1], reverse=True) | |
| best_type, best_score = sorted_scores[0] | |
| # Tie-breaking logic | |
| if len(sorted_scores) > 1 and (sorted_scores[0][1] - sorted_scores[1][1]) < 5: | |
| if debug: | |
| print(f"\n⚠️ Tie detected between '{sorted_scores[0][0]}' and '{sorted_scores[1][0]}'!") | |
| tied_docs = [(doc_type, score) for doc_type, score in sorted_scores | |
| if (best_score - score) < 5] | |
| if debug: | |
| print(f"Tied documents: {[f'{doc}({score:.1f}%)' for doc, score in tied_docs]}") | |
| unique_counts = {} | |
| for doc_type, _ in tied_docs: | |
| kw_set = doc_keyword_sets[doc_type] | |
| other_tied_keywords = set() | |
| for other_doc, _ in tied_docs: | |
| if other_doc != doc_type: | |
| other_tied_keywords.update(doc_keyword_sets[other_doc]) | |
| unique_keywords = kw_set - other_tied_keywords | |
| unique_matches = sum(1 for kw in unique_keywords if fuzzy_match(kw, ocr_set)) | |
| unique_counts[doc_type] = unique_matches | |
| if debug: | |
| print(f" {doc_type:<25}: {unique_matches} unique matches ({len(unique_keywords)} available)") | |
| if unique_counts and max(unique_counts.values()) > 0: | |
| sorted_unique = sorted(unique_counts.items(), key=lambda x: x[1], reverse=True) | |
| if len(sorted_unique) > 1 and sorted_unique[0][1] > sorted_unique[1][1]: | |
| best_type = sorted_unique[0][0] | |
| best_score = scores[best_type] | |
| if debug: | |
| print(f"✓ Tie broken: {best_type} wins with {unique_counts[best_type]} unique matches") | |
| return best_type, best_score | |
| def verify_keywords(ocr_tokens, user_keywords, use_fuzzy=False): | |
| """ | |
| Sequence-aware matching for multi-keyword inputs. | |
| Checks if keywords appear consecutively in OCR text first. | |
| """ | |
| ocr_set = set(ocr_tokens) | |
| ocr_combined = " ".join(ocr_tokens) | |
| results = [] | |
| if len(user_keywords) > 1: | |
| user_phrase = " ".join([kw.lower() if all(ord(c) < 128 for c in kw) else kw for kw in user_keywords]) | |
| if user_phrase in ocr_combined: | |
| for kw in user_keywords: | |
| results.append({ | |
| 'keyword': kw, | |
| 'matched': True, | |
| 'matched_text': kw | |
| }) | |
| return results | |
| if use_fuzzy: | |
| n = len(user_keywords) | |
| ocr_phrases = [" ".join(ocr_tokens[i:i+n]) for i in range(len(ocr_tokens) - n + 1)] | |
| phrase_match = fuzzy_match(user_phrase, set(ocr_phrases)) | |
| if phrase_match: | |
| for kw in user_keywords: | |
| results.append({ | |
| 'keyword': kw, | |
| 'matched': True, | |
| 'matched_text': kw | |
| }) | |
| return results | |
| # Fallback to individual keyword matching | |
| for kw in user_keywords: | |
| kw_processed = kw.lower() if all(ord(c) < 128 for c in kw) else kw | |
| matched = False | |
| matched_text = None | |
| if kw_processed in ocr_set: | |
| matched = True | |
| matched_text = kw_processed | |
| elif " " in kw_processed and kw_processed in ocr_combined: | |
| matched = True | |
| matched_text = kw_processed | |
| elif use_fuzzy: | |
| matched_text = fuzzy_match(kw_processed, ocr_set) | |
| if matched_text: | |
| matched = True | |
| results.append({ | |
| 'keyword': kw, | |
| 'matched': matched, | |
| 'matched_text': matched_text or kw_processed if matched else None | |
| }) | |
| return results | |
| def main(): | |
| parser = argparse.ArgumentParser(description='OCR Document Verification with PDF Support') | |
| parser.add_argument('--file', nargs='+', required=True, help='Paths to image or PDF files') | |
| parser.add_argument('--inputkeywords', required=True, help='Space-separated keywords to verify') | |
| parser.add_argument('--required', nargs='+', help='List of required document types') | |
| parser.add_argument('--fuzzy', action='store_true', help='Enable fuzzy matching') | |
| parser.add_argument('--debug', action='store_true', help='Show detailed OCR and scoring output') | |
| parser.add_argument('--pages', type=int, default=3, help='Max pages to process for PDFs') | |
| global args | |
| args = parser.parse_args() | |
| # Clean required list | |
| required_list = [] | |
| if args.required: | |
| for item in args.required: | |
| parts = [part.strip() for part in item.split(',') if part.strip()] | |
| required_list.extend(parts) | |
| required_set = set(required_list) | |
| # Initialize OCR once, reuse for all files | |
| print("Initializing OCR engine (first run may take a few seconds)...") | |
| try: | |
| ocr_engine = PaddleOCR( | |
| lang="mr", | |
| use_doc_orientation_classify=False, | |
| use_doc_unwarping=False, | |
| use_textline_orientation=False, | |
| max_batch_size=16, | |
| num_workers=min(4, multiprocessing.cpu_count()), | |
| ) | |
| # Test if OCR is working | |
| test_result = ocr_engine.predict(input="") | |
| if not test_result: | |
| print("⚠️ WARNING: OCR engine test returned empty result. Models may not be loaded correctly.") | |
| except Exception as e: | |
| print(f"❌ CRITICAL ERROR: Failed to initialize OCR engine: {str(e)}") | |
| print("Please ensure PaddleOCR is installed correctly and models are downloaded.") | |
| sys.exit(1) | |
| # Process each file and collect results | |
| file_results = [] | |
| found_documents = set() | |
| all_matched_keywords_per_file = [] | |
| print(f"\n{'='*60}") | |
| print(f"PROCESSING {len(args.file)} FILES") | |
| print(f"{'='*60}\n") | |
| for idx, file_path in enumerate(args.file, 1): | |
| print(f"--- FILE {idx}/{len(args.file)}: {os.path.basename(file_path)} ---") | |
| # Check if file exists | |
| if not os.path.exists(file_path): | |
| print(f"❌ ERROR: File not found: {file_path}\n") | |
| file_results.append({ | |
| 'file': file_path, | |
| 'doc_type': 'Unknown', | |
| 'doc_score': 0, | |
| 'keywords_matched': [], | |
| 'status': 'ERROR' | |
| }) | |
| continue | |
| # Extract text from file | |
| ocr_texts = get_ocr_text(file_path, ocr_engine, args.pages, args.debug) | |
| if not ocr_texts: | |
| print(f"⚠️ No text extracted from {file_path}") | |
| print(" Possible causes:") | |
| print(" - File is corrupted or empty") | |
| print(" - OCR engine failed to process the file") | |
| print(" - Text is not in supported language/format") | |
| print(" Try running with --debug flag to see detailed OCR output\n") | |
| file_results.append({ | |
| 'file': file_path, | |
| 'doc_type': 'Unknown', | |
| 'doc_score': 0, | |
| 'keywords_matched': [], | |
| 'status': 'ERROR' | |
| }) | |
| continue | |
| # Show OCR summary even without debug if text is very short | |
| if len(ocr_texts) < 5 and not args.debug: | |
| print(f" ℹ️ Only {len(ocr_texts)} lines of text extracted. Run with --debug to see details.") | |
| # Normalize tokens | |
| ocr_tokens = normalize_text(" ".join(ocr_texts)) | |
| # Show token count | |
| print(f" Extracted {len(ocr_tokens)} valid tokens from OCR text") | |
| # Debug: Show normalized tokens | |
| if args.debug: | |
| print("="*60) | |
| print("NORMALIZED TOKENS:") | |
| print("="*60) | |
| print(f"Total tokens: {len(ocr_tokens)}") | |
| print(f"First 50 tokens: {', '.join(ocr_tokens[:50])}{'...' if len(ocr_tokens) > 50 else ''}") | |
| print("="*60 + "\n") | |
| # Document classification | |
| if args.debug: | |
| print("="*60) | |
| print("DOCUMENT TYPE SCORING:") | |
| print("="*60) | |
| doc_type, doc_score = calculate_doc_type(ocr_tokens, debug=args.debug) | |
| found_documents.add(doc_type) | |
| if args.debug: | |
| print("="*60 + "\n") | |
| # Keyword verification | |
| user_keywords = [kw.strip() for kw in args.inputkeywords.split()] | |
| verification_results = verify_keywords(ocr_tokens, user_keywords, args.fuzzy) | |
| # Status: ALL keywords must match in this file | |
| all_matched = all(r['matched'] for r in verification_results) | |
| status = "VERIFIED" if all_matched else "NOT VERIFIED" | |
| # Store results for this file | |
| file_results.append({ | |
| 'file': file_path, | |
| 'doc_type': doc_type, | |
| 'doc_score': doc_score, | |
| 'keywords_matched': verification_results, | |
| 'status': status, | |
| 'all_keywords_matched': all_matched | |
| }) | |
| # Track which keywords were matched in this file | |
| matched_keywords_in_file = {r['keyword'] for r in verification_results if r['matched']} | |
| all_matched_keywords_per_file.append(matched_keywords_in_file) | |
| # Per-file output | |
| print(f"\n{'='*60}") | |
| print(f"Document Type: {doc_type} ({doc_score:.1f}% confidence)") | |
| print(f"{'='*60}") | |
| print(f"{'Keyword':<25} | {'Status':<10} | {'Matched Text'}") | |
| print(f"{'-'*60}") | |
| for r in verification_results: | |
| status_icon = "✓" if r['matched'] else "✗" | |
| matched_text = r['matched_text'] if r['matched_text'] else "Not found" | |
| print(f"{r['keyword']:<25} | {status_icon:<10} | {matched_text}") | |
| print(f"{'='*60}") | |
| print(f"File Status: {status}") | |
| print(f"{'='*60}\n") | |
| # FINAL SUMMARY (unchanged) | |
| print(f"\n{'='*60}") | |
| print(f"FINAL SUMMARY") | |
| print(f"{'='*60}") | |
| # Required documents check | |
| if required_set: | |
| missing_docs = required_set - found_documents | |
| print(f"\nRequired Documents: {', '.join(sorted(required_set))}") | |
| print(f"Found Documents: {', '.join(sorted(found_documents)) if found_documents else 'None'}") | |
| if missing_docs: | |
| print(f"❌ Missing Documents: {', '.join(sorted(missing_docs))}") | |
| docs_status = "NOT VERIFIED" | |
| else: | |
| print(f"✅ All required documents found!") | |
| docs_status = "VERIFIED" | |
| else: | |
| docs_status = "N/A (no required list specified)" | |
| missing_docs = set() | |
| # Overall keyword verification across ALL files | |
| all_user_keywords = set(args.inputkeywords.split()) | |
| keywords_found_across_files = set() | |
| for file_keyword_set in all_matched_keywords_per_file: | |
| keywords_found_across_files.update(file_keyword_set) | |
| missing_keywords = all_user_keywords - keywords_found_across_files | |
| print(f"\nKeywords to Find: {', '.join(sorted(all_user_keywords))}") | |
| print(f"Keywords Found (across all files): {', '.join(sorted(keywords_found_across_files)) if keywords_found_across_files else 'None'}") | |
| if missing_keywords: | |
| print(f"❌ Missing Keywords: {', '.join(sorted(missing_keywords))}") | |
| keywords_status = "NOT VERIFIED" | |
| else: | |
| print(f"✅ All keywords found across uploaded documents!") | |
| keywords_status = "VERIFIED" | |
| # Overall status | |
| overall_status = "VERIFIED" if (docs_status == "VERIFIED" and keywords_status == "VERIFIED") else "NOT VERIFIED" | |
| print(f"\n{'='*60}") | |
| print(f"Documents Status: {docs_status}") | |
| print(f"Keywords Status: {keywords_status}") | |
| print(f"OVERALL STATUS: {overall_status}") | |
| print(f"{'='*60}") | |
| if __name__ == "__main__": | |
| main() |