|
|
|
|
|
import base64
|
|
|
import io
|
|
|
import json
|
|
|
from pathlib import Path
|
|
|
from typing import Dict, Any, List, Optional, Tuple
|
|
|
from dataclasses import dataclass
|
|
|
from loguru import logger
|
|
|
from PIL import Image
|
|
|
import fitz
|
|
|
from openai import OpenAI
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
class VisionExtractionResult:
|
|
|
"""Result from vision-based extraction"""
|
|
|
parameter_id: str
|
|
|
parameter_name: str
|
|
|
value: Any
|
|
|
source: str
|
|
|
page_number: int
|
|
|
confidence: float
|
|
|
context: str
|
|
|
|
|
|
|
|
|
class VisionDocumentParser:
|
|
|
|
|
|
def __init__(self, openai_client: OpenAI, model: str = "gpt-4o"):
|
|
|
|
|
|
self.client = openai_client
|
|
|
self.model = model
|
|
|
self._image_cache = {}
|
|
|
logger.info(f"VisionDocumentParser initialized with model: {model}")
|
|
|
|
|
|
|
|
|
def pdf_to_images(self, pdf_path: str, dpi: int = 200) -> List[Image.Image]:
|
|
|
|
|
|
try:
|
|
|
|
|
|
cache_key = f"{pdf_path}_{dpi}"
|
|
|
if cache_key in self._image_cache:
|
|
|
logger.info(f"✅ Using CACHED images for: {Path(pdf_path).name} (skipping conversion)")
|
|
|
return self._image_cache[cache_key]
|
|
|
|
|
|
logger.info(f"Converting PDF to images: {Path(pdf_path).name} (DPI: {dpi})")
|
|
|
|
|
|
|
|
|
doc = fitz.open(pdf_path)
|
|
|
images = []
|
|
|
|
|
|
|
|
|
for page_num in range(len(doc)):
|
|
|
page = doc[page_num]
|
|
|
|
|
|
|
|
|
|
|
|
zoom = dpi / 72
|
|
|
mat = fitz.Matrix(zoom, zoom)
|
|
|
|
|
|
|
|
|
pix = page.get_pixmap(matrix=mat)
|
|
|
|
|
|
|
|
|
img_data = pix.tobytes("png")
|
|
|
img = Image.open(io.BytesIO(img_data))
|
|
|
|
|
|
images.append(img)
|
|
|
|
|
|
doc.close()
|
|
|
|
|
|
|
|
|
self._image_cache[cache_key] = images
|
|
|
|
|
|
logger.success(f"Converted {len(images)} pages to images (PyMuPDF) - CACHED for reuse ✅")
|
|
|
return images
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error converting PDF to images: {str(e)}")
|
|
|
return []
|
|
|
|
|
|
|
|
|
def image_to_base64(self, image: Image.Image) -> str:
|
|
|
|
|
|
try:
|
|
|
buffered = io.BytesIO()
|
|
|
image.save(buffered, format="PNG")
|
|
|
img_str = base64.b64encode(buffered.getvalue()).decode()
|
|
|
return img_str
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error encoding image: {str(e)}")
|
|
|
return ""
|
|
|
|
|
|
|
|
|
def extract_all_parameters_from_page(
|
|
|
self,
|
|
|
image: Image.Image,
|
|
|
page_num: int,
|
|
|
parameters: List[Dict[str, str]]
|
|
|
) -> Dict[str, VisionExtractionResult]:
|
|
|
|
|
|
try:
|
|
|
|
|
|
param_descriptions = []
|
|
|
for i, param in enumerate(parameters, 1):
|
|
|
param_type = param.get('type', 'text')
|
|
|
type_hint = {
|
|
|
'boolean': '(true/false)',
|
|
|
'number': '(numeric value)',
|
|
|
'date': '(date format)',
|
|
|
'text': '(text value)'
|
|
|
}.get(param_type, '')
|
|
|
|
|
|
param_descriptions.append(
|
|
|
f"{i}. **{param['name']}** {type_hint}: {param['description']}"
|
|
|
)
|
|
|
|
|
|
params_text = "\n".join(param_descriptions)
|
|
|
|
|
|
prompt = f"""Analyze this document page and extract ALL of the following parameters that you can find:
|
|
|
|
|
|
{params_text}
|
|
|
|
|
|
IMPORTANT INSTRUCTIONS:
|
|
|
1. Return a JSON object with ONLY the parameters you found on this page
|
|
|
2. For each parameter found, provide:
|
|
|
- "value": The actual value (use correct data type: number, boolean, string, or null)
|
|
|
- "source": SPECIFIC location (e.g., "Account Summary Table - Settlement column, Row 2")
|
|
|
- "confidence": Your confidence level (0.0 to 1.0)
|
|
|
- "context": Brief surrounding text for verification
|
|
|
|
|
|
3. Skip parameters not visible on this page (don't include them in response)
|
|
|
4. Be precise with sources - include table names, section headers, row/column identifiers
|
|
|
5. For booleans, return true/false, NOT "yes"/"no" or 1/0
|
|
|
|
|
|
Return ONLY valid JSON, no markdown formatting:
|
|
|
{{
|
|
|
"parameter_id_1": {{
|
|
|
"found": true,
|
|
|
"value": <actual_value>,
|
|
|
"source": "Specific location with details",
|
|
|
"confidence": 0.95,
|
|
|
"context": "Surrounding text..."
|
|
|
}},
|
|
|
"parameter_id_2": {{
|
|
|
"found": true,
|
|
|
"value": <actual_value>,
|
|
|
"source": "Another specific location",
|
|
|
"confidence": 0.90,
|
|
|
"context": "More context..."
|
|
|
}}
|
|
|
}}
|
|
|
|
|
|
Parameter IDs to use: {', '.join([p['id'] for p in parameters])}"""
|
|
|
|
|
|
|
|
|
buffered = io.BytesIO()
|
|
|
image.save(buffered, format="PNG")
|
|
|
img_base64 = base64.b64encode(buffered.getvalue()).decode()
|
|
|
|
|
|
|
|
|
response = self.client.chat.completions.create(
|
|
|
model=self.model,
|
|
|
messages=[
|
|
|
{
|
|
|
"role": "user",
|
|
|
"content": [
|
|
|
{
|
|
|
"type": "image_url",
|
|
|
"image_url": {
|
|
|
"url": f"data:image/png;base64,{img_base64}"
|
|
|
}
|
|
|
},
|
|
|
{
|
|
|
"type": "text",
|
|
|
"text": prompt
|
|
|
}
|
|
|
]
|
|
|
}
|
|
|
],
|
|
|
max_tokens=2000,
|
|
|
temperature=0.0
|
|
|
)
|
|
|
|
|
|
|
|
|
content = response.choices[0].message.content.strip()
|
|
|
|
|
|
|
|
|
if content.startswith("```json"):
|
|
|
content = content[7:]
|
|
|
if content.startswith("```"):
|
|
|
content = content[3:]
|
|
|
if content.endswith("```"):
|
|
|
content = content[:-3]
|
|
|
content = content.strip()
|
|
|
|
|
|
|
|
|
results_dict = json.loads(content)
|
|
|
|
|
|
|
|
|
param_name_map = {p['id']: p['name'] for p in parameters}
|
|
|
|
|
|
|
|
|
extraction_results = {}
|
|
|
for param_id, result_data in results_dict.items():
|
|
|
if result_data.get('found', False):
|
|
|
extraction_results[param_id] = VisionExtractionResult(
|
|
|
parameter_id=param_id,
|
|
|
parameter_name=param_name_map.get(param_id, param_id),
|
|
|
value=result_data.get('value'),
|
|
|
source=result_data.get('source', f'Page {page_num}'),
|
|
|
page_number=page_num,
|
|
|
confidence=result_data.get('confidence', 0.7),
|
|
|
context=result_data.get('context', '')
|
|
|
)
|
|
|
|
|
|
logger.success(
|
|
|
f"Page {page_num}: Found {len(extraction_results)}/{len(parameters)} parameters "
|
|
|
f"in ONE call ⚡"
|
|
|
)
|
|
|
|
|
|
return extraction_results
|
|
|
|
|
|
except json.JSONDecodeError as e:
|
|
|
logger.error(f"Failed to parse JSON from page {page_num}: {str(e)}")
|
|
|
return {}
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error extracting from page {page_num}: {str(e)}")
|
|
|
return {}
|
|
|
|
|
|
def extract_all_parameters_batch(
|
|
|
self,
|
|
|
pdf_path: str,
|
|
|
parameters: List[Dict[str, str]]
|
|
|
) -> Dict[str, VisionExtractionResult]:
|
|
|
|
|
|
try:
|
|
|
logger.info(
|
|
|
f"⚡ BATCH EXTRACTION: Processing {len(parameters)} parameters "
|
|
|
f"from {Path(pdf_path).name}"
|
|
|
)
|
|
|
|
|
|
|
|
|
images = self.pdf_to_images(pdf_path, dpi=200)
|
|
|
if not images:
|
|
|
logger.error("Failed to convert PDF to images")
|
|
|
return {}
|
|
|
|
|
|
|
|
|
best_results = {}
|
|
|
|
|
|
|
|
|
for page_num, image in enumerate(images, start=1):
|
|
|
logger.info(f"⚡ Page {page_num}/{len(images)}: Extracting ALL parameters...")
|
|
|
|
|
|
|
|
|
page_results = self.extract_all_parameters_from_page(
|
|
|
image=image,
|
|
|
page_num=page_num,
|
|
|
parameters=parameters
|
|
|
)
|
|
|
|
|
|
|
|
|
for param_id, result in page_results.items():
|
|
|
if param_id not in best_results:
|
|
|
best_results[param_id] = result
|
|
|
logger.info(f" ✓ {param_id}: {result.value} (conf: {result.confidence})")
|
|
|
elif result.confidence > best_results[param_id].confidence:
|
|
|
logger.info(
|
|
|
f" ↑ {param_id}: {result.value} (conf: {result.confidence}) "
|
|
|
f"[better than {best_results[param_id].confidence}]"
|
|
|
)
|
|
|
best_results[param_id] = result
|
|
|
|
|
|
found_count = len(best_results)
|
|
|
logger.success(
|
|
|
f"⚡ BATCH COMPLETE: Found {found_count}/{len(parameters)} parameters "
|
|
|
f"in {len(images)} API calls (vs {len(parameters) * len(images)} with old method!)"
|
|
|
)
|
|
|
|
|
|
return best_results
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error in batch extraction: {str(e)}")
|
|
|
return {}
|
|
|
|
|
|
def extract_parameter_from_page(
|
|
|
self,
|
|
|
image: Image.Image,
|
|
|
page_num: int,
|
|
|
parameter_name: str,
|
|
|
parameter_description: str,
|
|
|
parameter_type: str = "text"
|
|
|
) -> Optional[VisionExtractionResult]:
|
|
|
|
|
|
try:
|
|
|
|
|
|
img_base64 = self.image_to_base64(image)
|
|
|
if not img_base64:
|
|
|
return None
|
|
|
|
|
|
|
|
|
prompt = self._build_extraction_prompt(
|
|
|
parameter_name,
|
|
|
parameter_description,
|
|
|
parameter_type
|
|
|
)
|
|
|
|
|
|
|
|
|
response = self.client.chat.completions.create(
|
|
|
model=self.model,
|
|
|
messages=[
|
|
|
{
|
|
|
"role": "user",
|
|
|
"content": [
|
|
|
{
|
|
|
"type": "text",
|
|
|
"text": prompt
|
|
|
},
|
|
|
{
|
|
|
"type": "image_url",
|
|
|
"image_url": {
|
|
|
"url": f"data:image/png;base64,{img_base64}",
|
|
|
"detail": "high"
|
|
|
}
|
|
|
}
|
|
|
]
|
|
|
}
|
|
|
],
|
|
|
max_tokens=500,
|
|
|
temperature=0.0
|
|
|
)
|
|
|
|
|
|
|
|
|
result_text = response.choices[0].message.content
|
|
|
|
|
|
|
|
|
return self._parse_vision_response(
|
|
|
result_text,
|
|
|
parameter_name,
|
|
|
page_num
|
|
|
)
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error extracting {parameter_name} from page {page_num}: {str(e)}")
|
|
|
return None
|
|
|
|
|
|
|
|
|
def _build_extraction_prompt(
|
|
|
self,
|
|
|
parameter_name: str,
|
|
|
parameter_description: str,
|
|
|
parameter_type: str
|
|
|
) -> str:
|
|
|
"""Build prompt for GPT-4 Vision extraction"""
|
|
|
|
|
|
prompt = f"""You are analyzing a financial document (Bureau Credit Report or GST Return).
|
|
|
|
|
|
**TASK:** Extract the following parameter from this document page.
|
|
|
|
|
|
**Parameter Name:** {parameter_name}
|
|
|
**Description:** {parameter_description}
|
|
|
**Expected Type:** {parameter_type}
|
|
|
|
|
|
**INSTRUCTIONS:**
|
|
|
1. Look for this parameter in the document
|
|
|
2. If found, extract the exact value
|
|
|
3. Note the specific section/location where you found it (e.g., "Account Summary Table, Row 3" or "DPD History Section")
|
|
|
4. Provide surrounding context (nearby text)
|
|
|
|
|
|
**OUTPUT FORMAT (JSON):**
|
|
|
{{
|
|
|
"found": true/false,
|
|
|
"value": <extracted value or null>,
|
|
|
"source": "<specific section/table/location>",
|
|
|
"confidence": <0.0-1.0>,
|
|
|
"context": "<surrounding text for verification>"
|
|
|
}}
|
|
|
|
|
|
**EXAMPLES:**
|
|
|
|
|
|
For "DPD 30 Days" in a credit report:
|
|
|
{{
|
|
|
"found": true,
|
|
|
"value": 2,
|
|
|
"source": "Payment History Table - DPD 30 Days column",
|
|
|
"confidence": 0.95,
|
|
|
"context": "DPD History: 0-30 days: 2 occurrences"
|
|
|
}}
|
|
|
|
|
|
For "Settlement/Write-off" flag:
|
|
|
{{
|
|
|
"found": true,
|
|
|
"value": false,
|
|
|
"source": "Account Status Summary - Settlement Status field",
|
|
|
"confidence": 0.90,
|
|
|
"context": "Settlement Status: Not Applicable, Write-off Status: No"
|
|
|
}}
|
|
|
|
|
|
If parameter not found on this page:
|
|
|
{{
|
|
|
"found": false,
|
|
|
"value": null,
|
|
|
"source": "Not found on this page",
|
|
|
"confidence": 0.0,
|
|
|
"context": ""
|
|
|
}}
|
|
|
|
|
|
**CRITICAL RULES:**
|
|
|
- Be precise with locations (section names, table names, row/column)
|
|
|
- Extract EXACT values, don't interpret
|
|
|
- For boolean parameters, return true/false
|
|
|
- For numeric parameters, return numbers (not strings)
|
|
|
- If unsure, set confidence < 0.7
|
|
|
- Return ONLY valid JSON, no other text
|
|
|
|
|
|
Now analyze the document image and extract the parameter:"""
|
|
|
|
|
|
return prompt
|
|
|
|
|
|
|
|
|
def _parse_vision_response(
|
|
|
self,
|
|
|
response_text: str,
|
|
|
parameter_id: str,
|
|
|
page_num: int
|
|
|
) -> Optional[VisionExtractionResult]:
|
|
|
"""Parse GPT-4 Vision response into structured result"""
|
|
|
try:
|
|
|
import json
|
|
|
|
|
|
|
|
|
json_text = response_text.strip()
|
|
|
if "```json" in json_text:
|
|
|
json_text = json_text.split("```json")[1].split("```")[0].strip()
|
|
|
elif "```" in json_text:
|
|
|
json_text = json_text.split("```")[1].split("```")[0].strip()
|
|
|
|
|
|
|
|
|
data = json.loads(json_text)
|
|
|
|
|
|
|
|
|
if not data.get("found", False):
|
|
|
return None
|
|
|
|
|
|
|
|
|
result = VisionExtractionResult(
|
|
|
parameter_id=parameter_id,
|
|
|
parameter_name=parameter_id.replace("_", " ").title(),
|
|
|
value=data.get("value"),
|
|
|
source=data.get("source", "Unknown location"),
|
|
|
page_number=page_num,
|
|
|
confidence=float(data.get("confidence", 0.5)),
|
|
|
context=data.get("context", "")
|
|
|
)
|
|
|
|
|
|
return result
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error parsing vision response: {str(e)}")
|
|
|
logger.debug(f"Response text: {response_text}")
|
|
|
return None
|
|
|
|
|
|
|
|
|
def extract_parameter_from_pdf(
|
|
|
self,
|
|
|
pdf_path: str,
|
|
|
parameter_name: str,
|
|
|
parameter_description: str,
|
|
|
parameter_type: str = "text",
|
|
|
search_all_pages: bool = True
|
|
|
) -> Optional[VisionExtractionResult]:
|
|
|
|
|
|
try:
|
|
|
logger.info(f"Extracting '{parameter_name}' from {Path(pdf_path).name}")
|
|
|
|
|
|
|
|
|
images = self.pdf_to_images(pdf_path, dpi=200)
|
|
|
if not images:
|
|
|
logger.error("Failed to convert PDF to images")
|
|
|
return None
|
|
|
|
|
|
|
|
|
results = []
|
|
|
|
|
|
for page_num, image in enumerate(images, start=1):
|
|
|
logger.info(f"Searching page {page_num}/{len(images)}...")
|
|
|
|
|
|
result = self.extract_parameter_from_page(
|
|
|
image=image,
|
|
|
page_num=page_num,
|
|
|
parameter_name=parameter_name,
|
|
|
parameter_description=parameter_description,
|
|
|
parameter_type=parameter_type
|
|
|
)
|
|
|
|
|
|
if result and result.value is not None:
|
|
|
logger.success(f"Found on page {page_num}: {result.value} (confidence: {result.confidence})")
|
|
|
results.append(result)
|
|
|
|
|
|
|
|
|
if not search_all_pages and result.confidence > 0.7:
|
|
|
break
|
|
|
|
|
|
|
|
|
if results:
|
|
|
best_result = max(results, key=lambda r: r.confidence)
|
|
|
logger.success(f"Best match: page {best_result.page_number}, confidence {best_result.confidence}")
|
|
|
return best_result
|
|
|
else:
|
|
|
logger.warning(f"Parameter '{parameter_name}' not found in document")
|
|
|
return None
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error extracting parameter from PDF: {str(e)}")
|
|
|
return None
|
|
|
|
|
|
|
|
|
def extract_gst_sales_with_vision(
|
|
|
self,
|
|
|
pdf_path: str
|
|
|
) -> Optional[Dict[str, Any]]:
|
|
|
|
|
|
try:
|
|
|
logger.info(f"Extracting GST sales from {Path(pdf_path).name}")
|
|
|
|
|
|
|
|
|
images = self.pdf_to_images(pdf_path)
|
|
|
if not images:
|
|
|
return None
|
|
|
|
|
|
|
|
|
prompt = """You are analyzing a GSTR-3B (GST Return) document.
|
|
|
|
|
|
**TASK:** Extract the total taxable sales value from Table 3.1(a).
|
|
|
|
|
|
**WHAT TO LOOK FOR:**
|
|
|
- Table 3.1(a): "Details of Outward Supplies and inward supplies liable to reverse charge"
|
|
|
- Look for "Taxable value" or "Total Taxable value"
|
|
|
- This is usually in the first row of Table 3.1
|
|
|
|
|
|
**OUTPUT FORMAT (JSON):**
|
|
|
{{
|
|
|
"found": true/false,
|
|
|
"month": "<month and year, e.g., January 2025>",
|
|
|
"sales": <numeric value>,
|
|
|
"source": "GSTR-3B Table 3.1(a)",
|
|
|
"confidence": <0.0-1.0>
|
|
|
}}
|
|
|
|
|
|
**EXAMPLE:**
|
|
|
{{
|
|
|
"found": true,
|
|
|
"month": "January 2025",
|
|
|
"sales": 951381,
|
|
|
"source": "GSTR-3B Table 3.1(a) - Taxable outward supplies",
|
|
|
"confidence": 0.95
|
|
|
}}
|
|
|
|
|
|
Return ONLY valid JSON, no other text."""
|
|
|
|
|
|
|
|
|
for page_num, image in enumerate(images, start=1):
|
|
|
try:
|
|
|
img_base64 = self.image_to_base64(image)
|
|
|
|
|
|
response = self.client.chat.completions.create(
|
|
|
model=self.model,
|
|
|
messages=[
|
|
|
{
|
|
|
"role": "user",
|
|
|
"content": [
|
|
|
{"type": "text", "text": prompt},
|
|
|
{
|
|
|
"type": "image_url",
|
|
|
"image_url": {
|
|
|
"url": f"data:image/png;base64,{img_base64}",
|
|
|
"detail": "high"
|
|
|
}
|
|
|
}
|
|
|
]
|
|
|
}
|
|
|
],
|
|
|
max_tokens=300,
|
|
|
temperature=0.0
|
|
|
)
|
|
|
|
|
|
result_text = response.choices[0].message.content
|
|
|
|
|
|
|
|
|
import json
|
|
|
json_text = result_text.strip()
|
|
|
if "```json" in json_text:
|
|
|
json_text = json_text.split("```json")[1].split("```")[0].strip()
|
|
|
elif "```" in json_text:
|
|
|
json_text = json_text.split("```")[1].split("```")[0].strip()
|
|
|
|
|
|
data = json.loads(json_text)
|
|
|
|
|
|
if data.get("found") and data.get("sales"):
|
|
|
logger.success(f"Found GST sales on page {page_num}: {data['sales']}")
|
|
|
return {
|
|
|
"month": data.get("month", "Unknown"),
|
|
|
"sales": data["sales"],
|
|
|
"source": data.get("source", "GSTR-3B Table 3.1(a)")
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.debug(f"Page {page_num} - no sales data: {str(e)}")
|
|
|
continue
|
|
|
|
|
|
logger.warning("GST sales not found in document")
|
|
|
return None
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error extracting GST sales: {str(e)}")
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|