File size: 4,380 Bytes
d8b7b87
 
 
4dbb811
fad436e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
28a746e
 
fad436e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
28a746e
 
fad436e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d8b7b87
fad436e
d8b7b87
 
fad436e
 
d8b7b87
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import os
import json
import re
import logging
from typing import List, Dict, Any
# Ensure langchain is available for paddlex/paddleocr
try:
    import langchain
    import langchain_community
except ImportError:
    logging.warning("LangChain modules not found. PaddleOCR might fail.")

from core.ocr_engine import OCREngine
from core.vlm_engine import GroqVLMEngine
from core.ner_engine import NEREngine

# Global instances (Lazy load)
_ocr = None
_vlm = None
_ner = None

def get_ocr():
    global _ocr
    if not _ocr:
        _ocr = OCREngine()
    return _ocr

def get_vlm():
    global _vlm
    if not _vlm:
        _vlm = GroqVLMEngine()
    return _vlm

def get_ner():
    global _ner
    if not _ner:
        _ner = NEREngine()
    return _ner

def process_image_pipeline(image_paths: List[str]) -> Dict[str, Any]:
    logging.info(f"Pipeline: Starting processing for {len(image_paths)} images.")
    vlm = get_vlm()
    ocr = get_ocr()
    ner = get_ner()

    final_results = {
        "name": [],
        "contact_number": [],
        "Designation": [],
        "email": [],
        "Location": [],
        "Link": [],
        "Company": [],
        "extracted_text": {},
        "status_message": "Primary: Groq VLM"
    }

    all_raw_text = {}

    for path in image_paths:
        img_name = os.path.basename(path)
        # 1. Primary: VLM
        logging.info(f"Pipeline: Attempting VLM extraction for {img_name}")
        vlm_data = vlm.process(path)
        if vlm_data:
            merge_structured_data(final_results, vlm_data)
            all_raw_text[path] = json.dumps(vlm_data)
            logging.info(f"Pipeline: VLM success for {img_name}")
        else:
            # 2. Fallback: OCR + NER
            logging.warning(f"Pipeline: VLM failed or skipped for {img_name}. Falling back to OCR+NER.")
            raw_text = ocr.extract_text(path)
            all_raw_text[path] = raw_text
            if raw_text:
                logging.info(f"Pipeline: OCR success for {img_name}, attempting NER.")
                ner_data = ner.extract_entities(raw_text)
                if ner_data:
                    merge_structured_data(final_results, ner_data)
                    logging.info(f"Pipeline: NER success for {img_name}")
                else:
                    logging.warning(f"Pipeline: NER failed to extract entities for {img_name}")
                final_results["status_message"] = "Fallback: OCR+NER"
            else:
                logging.error(f"Pipeline: Both VLM and OCR failed for {img_name}")
            
    final_results["extracted_text"] = all_raw_text
    cleaned = cleanup_results(final_results)
    logging.info(f"Pipeline: Completed. Extracted data for {sum(1 for v in cleaned.values() if isinstance(v, list) and v)} fields.")
    return cleaned

def merge_structured_data(main_data: Dict, new_data: Dict):
    mapping = {
        "Name": "name",
        "Contact": "contact_number",
        "Designation": "Designation",
        "Email": "email",
        "Address": "Location",
        "Link": "Link",
        "Company": "Company"
    }

    for key, val in new_data.items():
        canonical_key = mapping.get(key.capitalize(), key.lower())
        if canonical_key in main_data:
            if isinstance(val, list):
                main_data[canonical_key].extend(val)
            elif val:
                main_data[canonical_key].append(val)

def cleanup_results(results: Dict) -> Dict:
    for key, val in results.items():
        if isinstance(val, list):
            # Remove duplicates, empty strings, and 'not found'
            seen = set()
            unique = []
            for item in val:
                item_str = str(item).strip()
                if item_str.lower() not in seen and item_str.lower() not in {"", "not found", "none", "null", "[]"}:
                    unique.append(item_str)
                    seen.add(item_str.lower())
            results[key] = unique
    return results

def extract_contact_details(text: str) -> Dict[str, List[str]]:
    # Regex fallback for extra safety
    email_regex = re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b')
    phone_regex = re.compile(r'(\+?\d{1,3}[-.\s()]?)?\(?\d{3,5}\)?[-.\s()]?\d{3,5}[-.\s()]?\d{3,5}')
    
    return {
        "emails": email_regex.findall(text),
        "phone_numbers": phone_regex.findall(text)
    }