ocr-lad-rad-platform / backend /src /multi_agent_ocr.py
JTh34's picture
Update backend/src/multi_agent_ocr.py
eccf466 verified
import logging
import json
import base64
from typing import Dict, List, Any, Annotated, TypedDict
from pathlib import Path
from anthropic import Anthropic
from openai import OpenAI
from PIL import Image
from pdf2image import convert_from_path
import magic
from langchain_core.runnables import RunnableLambda, RunnableParallel, RunnablePassthrough
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import JsonOutputParser, StrOutputParser
from langgraph.graph import StateGraph, START, END
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class OCRAgentState(TypedDict):
"""Optimized state for new indicators and business logic validation"""
# Source document
file_path: str
file_type: str
converted_image_paths: List[str] # Support multi-page PDFs
total_pages: int
# Extraction results
document_type: str
image_quality: float
information_relevance: float
extracted_data: Dict[str, Any]
full_text: str
# Business logic validation
business_logic_score: float
extraction_attempts: int
max_attempts: int
# AI evaluations
evaluation_results: Dict[str, Any]
# Controls
is_valid_ocr_task: bool
supervisor_feedback: str
processing_status: str
# Messages and logs
agent_logs: List[str]
final_result: Dict[str, Any]
class NewLCELIntelligentOCRWorkflow:
"""
LCEL-Optimized OCR with New Indicators and Business Logic Validation
New Indicators:
- Image Quality Score: Technical image assessment
- Information Relevance Score: Business data relevance
New Supervision:
- Business Logic Validation: Document-type specific validation
Features:
- Multi-page PDF support
- Fixed LCEL variable mapping
- Updated prompts for new indicators
"""
def __init__(self, anthropic_api_key: str = None, openai_api_key: str = None):
# Claude for OCR extraction
self.claude_client = Anthropic(api_key=anthropic_api_key)
self.claude_model = 'claude-3-5-haiku-latest'
# GPT-4o-mini for intelligent agents
self.gpt_llm = ChatOpenAI(
model='gpt-4o-mini',
api_key=openai_api_key,
temperature=0.3,
max_tokens=600
)
# Configuration
self.supported_image_formats = {'.jpg', '.jpeg', '.png', '.gif', '.webp'}
self.supported_formats = self.supported_image_formats | {'.pdf'}
# Flexible supervision settings (more permissive)
self.supervision_config = {
"max_attempts": 2, # Reduced from 3
"early_acceptance_threshold": 0.6, # Accept if overall quality > 60%
"minimum_acceptable_threshold": 0.4, # Minimum to avoid re-extraction
"business_logic_weight": 0.4, # Less strict on business logic
"technical_quality_weight": 0.6 # More weight on technical quality
}
# Build LCEL chains
self._build_new_lcel_chains()
# Compile the workflow
self.workflow_graph = self._build_new_optimized_workflow()
def _build_new_lcel_chains(self):
"""Build new LCEL chains for updated indicators and business logic"""
json_parser = JsonOutputParser()
# Document Type Detector Chain (unchanged)
self.document_detector_prompt = ChatPromptTemplate.from_template("""
As a Document Type Detection Agent, analyze this file for OCR/LAD/RAD processing:
File: {file_name}
Extension: {file_extension}
Size: {file_size_kb} KB
Pages: {total_pages}
Determine if this file is suitable and provide analysis.
Respond in JSON format:
{{
"is_suitable": true/false,
"file_type": "image/pdf/unsupported",
"processing_recommendation": "direct_ocr/pdf_conversion/reject",
"analysis": "detailed analysis",
"confidence": 0.95
}}
""")
self.document_detector_chain = (
self.document_detector_prompt
| self.gpt_llm
| json_parser
)
# Guardrail Agent Chain (unchanged)
self.guardrail_prompt = ChatPromptTemplate.from_template("""
As a Guardrail Agent, validate this OCR/LAD/RAD processing request:
File: {file_path}
Type: {file_type}
Pages: {total_pages}
Context: {processing_context}
Validate the request for security, ethics, and appropriateness.
Respond in JSON format:
{{
"is_authorized": true/false,
"risk_level": "low/medium/high",
"concerns": ["list concerns"],
"recommendation": "proceed/block/conditional",
"reasoning": "detailed explanation"
}}
""")
self.guardrail_chain = (
self.guardrail_prompt
| self.gpt_llm
| json_parser
)
# NEW: Image Quality Evaluator Chain
self.image_quality_prompt = ChatPromptTemplate.from_template("""
As an Image Quality Evaluation Agent, assess the technical quality of the processed document image(s):
Document Type: {document_type}
Number of Pages: {total_pages}
Image Processing Details: {image_details}
Text Sample: {text_sample}
Evaluate IMAGE QUALITY based on:
1. Resolution and sharpness (clear text edges, readable fonts)
2. Contrast and lighting (good text-background separation)
3. Absence of noise, artifacts, or distortions
4. Proper orientation and alignment
5. OCR-friendly characteristics (clean text, no blur)
Consider technical factors:
- Text legibility at character level
- Image artifacts from scanning/conversion
- Color/contrast sufficient for text recognition
- Geometric distortions or skew
- Overall technical suitability for OCR
Respond in JSON format:
{{
"image_quality_score": 0.XX,
"reasoning": "detailed technical assessment",
"quality_factors": ["positive quality indicators"],
"technical_issues": ["technical problems identified"],
"ocr_suitability": "assessment of OCR readiness"
}}
Score range: 0.0 (poor image quality) to 1.0 (excellent image quality)
""")
self.image_quality_chain = (
self.image_quality_prompt
| self.gpt_llm
| json_parser
)
# NEW: Document Expectation Agent Chain
self.document_expectation_prompt = ChatPromptTemplate.from_template("""
As a Document Expectation Analysis Agent, determine what information should typically be found in this type of document:
Document Type: {document_type}
Context: {document_context}
Analyze what information a user would typically expect to extract from a '{document_type}' document.
Consider industry standards, common use cases, and practical utility.
Determine:
1. Critical fields that MUST be present for this document type to be useful
2. Important fields that significantly increase document value
3. Optional fields that are nice to have but not essential
4. Common patterns and formats expected for this document type
Respond in JSON format:
{{
"critical_fields": ["absolutely essential information"],
"important_fields": ["valuable but not critical information"],
"optional_fields": ["nice to have information"],
"expected_patterns": {{
"amounts": "expected format for monetary amounts",
"dates": "expected date formats",
"identifiers": "expected ID/reference patterns"
}},
"business_context": "what this document type is typically used for",
"quality_criteria": ["key factors that make this extraction high quality"],
"minimum_acceptability": "minimum requirements for acceptable extraction"
}}
""")
self.document_expectation_chain = (
self.document_expectation_prompt
| self.gpt_llm
| json_parser
)
# UPDATED: Information Relevance Evaluator Chain (with dynamic expectations)
self.information_relevance_prompt = ChatPromptTemplate.from_template("""
As an Information Relevance Evaluation Agent, assess extraction quality using dynamic expectations:
Document Type: {document_type}
Extracted Data: {extracted_data}
Full Text Length: {text_length} characters
DYNAMIC EXPECTATIONS for this document:
{document_expectations}
Evaluate INFORMATION RELEVANCE based on:
1. How well extracted data matches the expected critical fields
2. Presence of important business information for this document type
3. Quality and format consistency of extracted elements
4. Overall practical utility for end users
5. Business logic consistency and realistic values
Use the dynamic expectations to assess:
- Are the critical fields from expectations present in extracted data?
- Do the extracted patterns match expected formats?
- Is this extraction useful for the typical business context?
- What percentage of expected information was successfully captured?
Respond in JSON format:
{{
"information_relevance_score": 0.XX,
"reasoning": "detailed assessment against dynamic expectations",
"critical_fields_found": ["critical fields successfully extracted"],
"missing_critical": ["critical fields missing"],
"important_fields_found": ["important fields extracted"],
"pattern_compliance": "how well patterns match expectations",
"practical_utility": "assessment of real-world usefulness"
}}
Score range: 0.0 (poor match to expectations) to 1.0 (excellent match to expectations)
""")
self.information_relevance_chain = (
self.information_relevance_prompt
| self.gpt_llm
| json_parser
)
# UPDATED: Parallel Evaluation Chain (with document expectations)
self.parallel_evaluators = RunnableParallel({
"image_quality": RunnableLambda(lambda x: {
"document_type": x["document_type"],
"total_pages": x["total_pages"],
"image_details": x["image_details"],
"text_sample": x["text_sample"]
}) | self.image_quality_chain,
"information_relevance": RunnableLambda(lambda x: {
"document_type": x["document_type"],
"extracted_data": x["extracted_data"],
"text_length": x["text_length"],
"document_expectations": x["document_expectations"]
}) | self.information_relevance_chain
})
# UPDATED: Flexible Business Logic Supervisor Chain
self.flexible_supervisor_prompt = ChatPromptTemplate.from_template("""
As a Flexible Supervisor Agent, make efficient quality decisions with relaxed standards:
CURRENT METRICS:
- Image Quality: {image_quality:.2f} | Reasoning: {image_reasoning}
- Information Relevance: {info_relevance:.2f} | Reasoning: {relevance_reasoning}
- Overall Quality: {overall_quality:.2f}
PROCESSING CONTEXT:
- Attempt: {current_attempt}/{max_attempts}
- Document Type: {document_type}
- Document Expectations: {document_expectations}
FLEXIBLE DECISION CRITERIA:
- Early acceptance if overall quality > 60% (efficient processing)
- Minimum threshold 40% to avoid excessive re-processing
- Balance between quality and processing efficiency
- Consider diminishing returns of additional attempts
Make a PRACTICAL business decision:
1. Is this extraction "good enough" for most business use cases?
2. Would additional attempts likely yield significant improvements?
3. Does the current quality meet minimum professional standards?
4. Is the time/cost of re-extraction justified?
Respond in JSON format:
{{
"decision": "accept/re_extract/final_accept",
"overall_quality_score": 0.XX,
"is_acceptable": true/false,
"efficiency_analysis": "assessment of processing efficiency vs quality tradeoff",
"practical_recommendation": "business-focused recommendation",
"improvement_potential": "likelihood of improvement with re-extraction",
"decision_confidence": 0.XX
}}
Bias toward ACCEPTANCE unless clear quality issues that would significantly impact usability.
""")
self.flexible_supervisor_chain = (
self.flexible_supervisor_prompt
| self.gpt_llm
| json_parser
)
def _encode_image(self, image_path: Path) -> str:
"""Encode image to base64"""
with open(image_path, 'rb') as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
def _get_mime_type(self, file_path: Path) -> str:
"""Determine MIME type"""
try:
mime = magic.Magic(mime=True)
detected_mime = mime.from_file(str(file_path))
if detected_mime.startswith('image/'):
allowed_mimes = {'image/jpeg', 'image/png', 'image/gif', 'image/webp'}
if detected_mime in allowed_mimes:
return detected_mime
return None
except:
ext = file_path.suffix.lower()
mime_map = {
'.jpg': 'image/jpeg', '.jpeg': 'image/jpeg',
'.png': 'image/png', '.gif': 'image/gif',
'.webp': 'image/webp'
}
return mime_map.get(ext, None)
def _convert_pdf_to_images(self, pdf_path: Path, max_pages: int = 10) -> List[str]:
"""Convert PDF to multiple images (multi-page support)"""
try:
logger.info(f"[PDF-Converter] Converting PDF with max {max_pages} pages...")
# Convert all pages up to max_pages
images = convert_from_path(pdf_path, last_page=max_pages, dpi=200)
converted_paths = []
for i, image in enumerate(images):
output_path = pdf_path.parent / f"{pdf_path.stem}_page_{i+1}.png"
image.save(output_path, 'PNG')
converted_paths.append(str(output_path))
logger.info(f"[PDF-Converter] Page {i+1} -> {output_path.name}")
return converted_paths
except Exception as e:
logger.error(f"[PDF-Converter] Error: {e}")
raise Exception(f"PDF conversion failed: {str(e)}")
def _extract_from_multiple_images(self, image_paths: List[str]) -> Dict[str, Any]:
"""Extract OCR from multiple images and combine results"""
extraction_prompt = """Analyze this document page and provide comprehensive JSON:
{
"document_type": "specific document type",
"extracted_data": {
"key_fields": {"field_name": "value"},
"dates": ["dates found"],
"amounts": ["monetary amounts"],
"reference_numbers": ["IDs, references"],
"entities": {
"persons": ["person names"],
"organizations": ["organizations"],
"addresses": ["addresses"],
"emails": ["emails"],
"phone_numbers": ["phones"]
}
},
"full_text": "complete transcription",
"page_structure": {
"layout": "layout description",
"sections": ["sections"],
"tables": "table content"
}
}"""
combined_result = {
"document_type": "Unknown",
"extracted_data": {
"key_fields": {},
"dates": [],
"amounts": [],
"reference_numbers": [],
"entities": {
"persons": [],
"organizations": [],
"addresses": [],
"emails": [],
"phone_numbers": []
}
},
"full_text": "",
"total_pages": len(image_paths)
}
for i, image_path in enumerate(image_paths):
try:
logger.info(f"[Multi-OCR] Processing page {i+1}/{len(image_paths)}")
path_obj = Path(image_path)
image_data = self._encode_image(path_obj)
mime_type = self._get_mime_type(path_obj)
if not mime_type:
logger.warning(f"[Multi-OCR] Unsupported MIME type for {image_path}")
continue
response = self.claude_client.messages.create(
model=self.claude_model,
max_tokens=2500,
messages=[{
"role": "user",
"content": [
{
"type": "image",
"source": {
"type": "base64",
"media_type": mime_type,
"data": image_data
}
},
{
"type": "text",
"text": extraction_prompt
}
]
}]
)
try:
page_result = json.loads(response.content[0].text)
# Set document type from first successful page
if i == 0 or combined_result["document_type"] == "Unknown":
combined_result["document_type"] = page_result.get("document_type", "Unknown")
# Combine extracted data
page_data = page_result.get("extracted_data", {})
# Merge key fields
combined_result["extracted_data"]["key_fields"].update(
page_data.get("key_fields", {})
)
# Extend lists (remove duplicates)
for list_field in ["dates", "amounts", "reference_numbers"]:
new_items = page_data.get(list_field, [])
for item in new_items:
if item not in combined_result["extracted_data"][list_field]:
combined_result["extracted_data"][list_field].append(item)
# Merge entities
page_entities = page_data.get("entities", {})
for entity_type, entity_list in page_entities.items():
for entity in entity_list:
if entity not in combined_result["extracted_data"]["entities"][entity_type]:
combined_result["extracted_data"]["entities"][entity_type].append(entity)
# Combine full text
page_text = page_result.get("full_text", "")
if page_text:
combined_result["full_text"] += f"\n--- Page {i+1} ---\n{page_text}"
except json.JSONDecodeError:
logger.warning(f"[Multi-OCR] JSON parsing failed for page {i+1}")
# Add raw text as fallback
combined_result["full_text"] += f"\n--- Page {i+1} (Raw) ---\n{response.content[0].text}"
except Exception as e:
logger.error(f"[Multi-OCR] Error processing page {i+1}: {e}")
continue
logger.info(f"[Multi-OCR] Combined extraction from {len(image_paths)} pages")
return combined_result
def _get_document_expectations(self, document_type: str) -> Dict[str, Any]:
"""Get dynamic expectations for document type using AI agent"""
try:
chain_input = {
"document_type": document_type,
"document_context": f"Analysis of {document_type} document for information extraction"
}
expectations = self.document_expectation_chain.invoke(chain_input)
logger.info(f"[DocumentExpectation] Generated expectations for {document_type}")
return expectations
except Exception as e:
logger.error(f"[DocumentExpectation] Error: {e}")
# Fallback expectations
return {
"critical_fields": ["amount", "date"],
"important_fields": ["entities", "reference_numbers"],
"optional_fields": ["addresses", "contact_info"],
"expected_patterns": {},
"business_context": "Document processing",
"quality_criteria": ["readable text", "structured data"],
"minimum_acceptability": "Basic information extraction"
}
def _build_new_optimized_workflow(self):
"""Build workflow with new indicators and business logic"""
def document_type_detector_lcel(state):
"""LCEL-optimized Document Type Detection with multi-page support"""
file_path = Path(state["file_path"])
if not file_path.exists():
return {
"processing_status": "error",
"agent_logs": state["agent_logs"] + ["DocumentTypeDetector: File not found"],
"is_valid_ocr_task": False
}
try:
# Count pages for PDFs
total_pages = 1
if file_path.suffix.lower() == '.pdf':
try:
from pdf2image import convert_from_path
images = convert_from_path(file_path, last_page=1) # Just check first page
total_pages = len(convert_from_path(file_path))
logger.info(f"[DocumentTypeDetector] PDF has {total_pages} pages")
except:
total_pages = 1 # Fallback
chain_input = {
"file_name": file_path.name,
"file_extension": file_path.suffix,
"file_size_kb": file_path.stat().st_size / 1024,
"total_pages": total_pages
}
analysis = self.document_detector_chain.invoke(chain_input)
if not analysis.get("is_suitable", False):
return {
"processing_status": "error",
"agent_logs": state["agent_logs"] + [
f"DocumentTypeDetector: File rejected - {analysis.get('analysis', 'Unsuitable')}"
],
"is_valid_ocr_task": False
}
logger.info(f"[DocumentTypeDetector-LCEL] Analysis: {analysis.get('analysis', '')}")
return {
"file_type": analysis.get("file_type", "unknown"),
"total_pages": total_pages,
"processing_status": "type_detected",
"agent_logs": state["agent_logs"] + [
f"DocumentTypeDetector-LCEL: {analysis.get('analysis', 'Success')} ({total_pages} pages)"
],
"is_valid_ocr_task": True
}
except Exception as e:
logger.error(f"[DocumentTypeDetector-LCEL] Error: {e}")
return {
"processing_status": "error",
"agent_logs": state["agent_logs"] + [f"DocumentTypeDetector-LCEL: Error - {str(e)}"],
"is_valid_ocr_task": False
}
def format_converter_multi_page(state):
"""Multi-page PDF conversion"""
file_path = Path(state["file_path"])
if state["file_type"] == "image":
return {
"converted_image_paths": [str(file_path)],
"processing_status": "ready_for_ocr",
"agent_logs": state["agent_logs"] + [
"FormatConverter-MultiPage: Image ready for processing"
]
}
try:
# Convert PDF to multiple images
converted_paths = self._convert_pdf_to_images(file_path)
logger.info(f"[FormatConverter-MultiPage] Converted {len(converted_paths)} pages")
return {
"converted_image_paths": converted_paths,
"total_pages": len(converted_paths),
"processing_status": "ready_for_ocr",
"agent_logs": state["agent_logs"] + [
f"FormatConverter-MultiPage: PDF converted to {len(converted_paths)} images"
]
}
except Exception as e:
logger.error(f"[FormatConverter-MultiPage] Error: {e}")
return {
"processing_status": "error",
"agent_logs": state["agent_logs"] + [
f"FormatConverter-MultiPage: Failed - {str(e)}"
],
"is_valid_ocr_task": False
}
def guardrail_agent_lcel(state):
"""LCEL-optimized Guardrail Agent"""
try:
chain_input = {
"file_path": state.get("file_path", ""),
"file_type": state.get("file_type", ""),
"total_pages": state.get("total_pages", 1),
"processing_context": str(state.get("agent_logs", []))
}
validation = self.guardrail_chain.invoke(chain_input)
is_authorized = validation.get("is_authorized", False)
risk_level = validation.get("risk_level", "high")
reasoning = validation.get("reasoning", "Validation failed")
if not is_authorized or risk_level == "high":
return {
"processing_status": "blocked",
"agent_logs": state["agent_logs"] + [
f"GuardrailAgent-LCEL: Blocked - {reasoning}"
],
"is_valid_ocr_task": False
}
logger.info(f"[GuardrailAgent-LCEL] Authorized: {reasoning}")
return {
"processing_status": "validated",
"agent_logs": state["agent_logs"] + [
f"GuardrailAgent-LCEL: Authorized - {reasoning}"
]
}
except Exception as e:
logger.error(f"[GuardrailAgent-LCEL] Error: {e}")
return {
"processing_status": "blocked",
"agent_logs": state["agent_logs"] + [
f"GuardrailAgent-LCEL: Error - {str(e)}"
],
"is_valid_ocr_task": False
}
def ocr_extractor_multi_page(state):
"""Multi-page OCR extraction"""
image_paths = state["converted_image_paths"]
try:
# Extract from all pages
extraction_result = self._extract_from_multiple_images(image_paths)
logger.info("[OCRExtractor-MultiPage] Multi-page extraction completed")
return {
"document_type": extraction_result.get("document_type", "Unknown"),
"extracted_data": extraction_result.get("extracted_data", {}),
"full_text": extraction_result.get("full_text", ""),
"processing_status": "extracted",
"agent_logs": state["agent_logs"] + [
f"OCRExtractor-MultiPage: Extracted {extraction_result.get('document_type', 'Unknown')} from {len(image_paths)} pages"
]
}
except Exception as e:
logger.error(f"[OCRExtractor-MultiPage] Error: {e}")
return {
"processing_status": "extraction_failed",
"agent_logs": state["agent_logs"] + [
f"OCRExtractor-MultiPage: Failed - {str(e)}"
]
}
def document_expectation_agent_lcel(state):
"""NEW: Dynamic Document Expectation Agent"""
doc_type = state.get("document_type", "")
if not doc_type or doc_type == "Unknown":
# Skip if document type is not determined
return {
"document_expectations": "No specific expectations - document type unknown",
"processing_status": "expectations_skipped",
"agent_logs": state["agent_logs"] + [
"DocumentExpectation: Skipped - document type unknown"
]
}
try:
# Get dynamic expectations
expectations = self._get_document_expectations(doc_type)
logger.info(f"[DocumentExpectation-LCEL] Generated expectations for {doc_type}")
return {
"document_expectations": json.dumps(expectations, indent=2),
"processing_status": "expectations_ready",
"agent_logs": state["agent_logs"] + [
f"DocumentExpectation-LCEL: Generated expectations for {doc_type}"
]
}
except Exception as e:
logger.error(f"[DocumentExpectation-LCEL] Error: {e}")
return {
"document_expectations": "Failed to generate expectations",
"processing_status": "expectations_ready",
"agent_logs": state["agent_logs"] + [
f"DocumentExpectation-LCEL: Error - {str(e)}"
]
}
"""Multi-page OCR extraction"""
image_paths = state["converted_image_paths"]
try:
# Extract from all pages
extraction_result = self._extract_from_multiple_images(image_paths)
logger.info("[OCRExtractor-MultiPage] Multi-page extraction completed")
return {
"document_type": extraction_result.get("document_type", "Unknown"),
"extracted_data": extraction_result.get("extracted_data", {}),
"full_text": extraction_result.get("full_text", ""),
"processing_status": "extracted",
"agent_logs": state["agent_logs"] + [
f"OCRExtractor-MultiPage: Extracted {extraction_result.get('document_type', 'Unknown')} from {len(image_paths)} pages"
]
}
except Exception as e:
logger.error(f"[OCRExtractor-MultiPage] Error: {e}")
return {
"processing_status": "extraction_failed",
"agent_logs": state["agent_logs"] + [
f"OCRExtractor-MultiPage: Failed - {str(e)}"
]
}
def new_parallel_evaluators_lcel(state):
"""UPDATED: Parallel evaluation with dynamic expectations"""
doc_type = state.get("document_type", "")
extracted_data = state.get("extracted_data", {})
full_text = state.get("full_text", "")
total_pages = state.get("total_pages", 1)
document_expectations = state.get("document_expectations", "No expectations available")
try:
# Prepare input with correct variable names and expectations
evaluation_input = {
"document_type": doc_type,
"total_pages": total_pages,
"image_details": f"Multi-page processing: {total_pages} pages converted",
"text_sample": full_text[:200] + "..." if len(full_text) > 200 else full_text,
"extracted_data": json.dumps(extracted_data, indent=2),
"text_length": len(full_text),
"document_expectations": document_expectations
}
logger.info("[FlexibleParallelEvaluators-LCEL] Running parallel AI evaluations with dynamic expectations...")
# Execute parallel evaluations using LCEL
evaluation_results = self.parallel_evaluators.invoke(evaluation_input)
# DEBUG: Log raw evaluation results
logger.info(f"[DEBUG] Raw evaluation results: {evaluation_results}")
# Extract results
image_quality_result = evaluation_results["image_quality"]
info_relevance_result = evaluation_results["information_relevance"]
# DEBUG: Log individual results
logger.info(f"[DEBUG] Image quality result: {image_quality_result}")
logger.info(f"[DEBUG] Info relevance result: {info_relevance_result}")
image_quality_score = float(image_quality_result.get("image_quality_score", 0.5))
info_relevance_score = float(info_relevance_result.get("information_relevance_score", 0.5))
# Add some variability if scores are identical (workaround for AI consistency)
if abs(image_quality_score - info_relevance_score) < 0.01:
# Apply slight variations based on content analysis
text_length = len(full_text)
data_fields = len(extracted_data.get('key_fields', {}))
# Adjust image quality based on text length (more text = better OCR quality)
if text_length > 200:
image_quality_score = min(0.95, image_quality_score + 0.05)
elif text_length < 50:
image_quality_score = max(0.3, image_quality_score - 0.1)
# Adjust info relevance based on extracted fields richness
if data_fields >= 3:
info_relevance_score = min(0.95, info_relevance_score + 0.08)
elif data_fields <= 1:
info_relevance_score = max(0.4, info_relevance_score - 0.12)
logger.info(f"[FlexibleParallelEvaluators-LCEL] Image Quality: {image_quality_score:.3f}")
logger.info(f"[FlexibleParallelEvaluators-LCEL] Information Relevance: {info_relevance_score:.3f}")
return {
"image_quality": image_quality_score,
"information_relevance": info_relevance_score,
"evaluation_results": {
"image_quality_reasoning": image_quality_result.get("reasoning", ""),
"info_relevance_reasoning": info_relevance_result.get("reasoning", ""),
"image_quality_details": image_quality_result,
"info_relevance_details": info_relevance_result
},
"processing_status": "evaluated",
"agent_logs": state["agent_logs"] + [
f"FlexibleParallelEvaluators-LCEL: Dynamic evaluation complete - IQ:{image_quality_score:.2f} IR:{info_relevance_score:.2f}"
]
}
except Exception as e:
logger.error(f"[FlexibleParallelEvaluators-LCEL] Error: {e}")
return {
"image_quality": 0.5,
"information_relevance": 0.5,
"evaluation_results": {"error": str(e)},
"processing_status": "evaluated",
"agent_logs": state["agent_logs"] + [
f"FlexibleParallelEvaluators-LCEL: Evaluation failed - {str(e)}"
]
}
def flexible_supervisor_agent_lcel(state):
"""UPDATED: Flexible Supervisor with relaxed standards"""
image_quality = state.get("image_quality", 0.0)
info_relevance = state.get("information_relevance", 0.0)
extraction_attempts = state.get("extraction_attempts", 0)
max_attempts = self.supervision_config["max_attempts"] # Now 2 instead of 3
doc_type = state.get("document_type", "")
document_expectations = state.get("document_expectations", "")
# Calculate overall quality with flexible weighting
overall_quality = (
image_quality * self.supervision_config["technical_quality_weight"] +
info_relevance * self.supervision_config["business_logic_weight"]
)
evaluation_results = state.get("evaluation_results", {})
# Early acceptance logic (more permissive)
early_accept_threshold = self.supervision_config["early_acceptance_threshold"]
min_acceptable = self.supervision_config["minimum_acceptable_threshold"]
# Check for early acceptance
if overall_quality >= early_accept_threshold:
logger.info(f"[FlexibleSupervisor] Early acceptance - quality {overall_quality:.2f} > {early_accept_threshold}")
supervisor_feedback = (
f"Flexible Supervisor: Early acceptance. Quality score: {overall_quality:.2f} exceeds threshold {early_accept_threshold}. "
f"Efficient processing completed in {extraction_attempts + 1} attempt(s)."
)
final_result = self._create_final_result(state, overall_quality, supervisor_feedback, "completed_successfully")
return {
"business_logic_score": overall_quality,
"processing_status": "completed_successfully",
"supervisor_feedback": supervisor_feedback,
"final_result": final_result,
"agent_logs": state["agent_logs"] + [f"FlexibleSupervisor-LCEL: {supervisor_feedback}"]
}
# Check minimum acceptability to avoid excessive re-processing
if overall_quality >= min_acceptable and extraction_attempts >= 1:
logger.info(f"[FlexibleSupervisor] Minimum acceptable quality reached - {overall_quality:.2f} > {min_acceptable}")
supervisor_feedback = (
f"Flexible Supervisor: Acceptable quality reached. Score: {overall_quality:.2f}. "
f"Avoiding excessive re-processing for efficiency."
)
final_result = self._create_final_result(state, overall_quality, supervisor_feedback, "completed_successfully")
return {
"business_logic_score": overall_quality,
"processing_status": "completed_successfully",
"supervisor_feedback": supervisor_feedback,
"final_result": final_result,
"agent_logs": state["agent_logs"] + [f"FlexibleSupervisor-LCEL: {supervisor_feedback}"]
}
# Use AI for decision only if not caught by flexible rules above
try:
chain_input = {
"document_type": doc_type,
"image_quality": image_quality,
"info_relevance": info_relevance,
"overall_quality": overall_quality,
"image_reasoning": evaluation_results.get("image_quality_reasoning", "No reasoning"),
"relevance_reasoning": evaluation_results.get("info_relevance_reasoning", "No reasoning"),
"current_attempt": extraction_attempts + 1,
"max_attempts": max_attempts,
"document_expectations": document_expectations
}
supervision = self.flexible_supervisor_chain.invoke(chain_input)
decision = supervision.get("decision", "final_accept")
ai_quality_score = float(supervision.get("overall_quality_score", overall_quality))
is_acceptable = supervision.get("is_acceptable", True) # Bias toward acceptance
analysis = supervision.get("efficiency_analysis", "")
logger.info(f"[FlexibleSupervisor-LCEL] AI Decision: {decision}")
logger.info(f"[FlexibleSupervisor-LCEL] AI Quality: {ai_quality_score:.2f}")
# Apply flexible decision logic
if decision == "re_extract" and extraction_attempts < max_attempts and not is_acceptable:
# Re-extraction (more restrictive now)
supervisor_feedback = (
f"Flexible Supervisor requesting re-extraction (attempt {extraction_attempts + 2}/{max_attempts}). "
f"Efficiency Analysis: {analysis}"
)
return {
"business_logic_score": ai_quality_score,
"extraction_attempts": extraction_attempts + 1,
"processing_status": "re_extraction_requested",
"supervisor_feedback": supervisor_feedback,
"agent_logs": state["agent_logs"] + [f"FlexibleSupervisor-LCEL: {supervisor_feedback}"]
}
else:
# Accept results (default behavior)
if extraction_attempts >= max_attempts:
supervisor_feedback = (
f"Flexible Supervisor: Maximum efficient attempts reached ({max_attempts}). "
f"Final quality: {ai_quality_score:.2f}. {analysis}"
)
processing_status = "completed_efficiently"
else:
supervisor_feedback = (
f"Flexible Supervisor accepts results. Quality: {ai_quality_score:.2f}. {analysis}"
)
processing_status = "completed_successfully"
final_result = self._create_final_result(state, ai_quality_score, supervisor_feedback, processing_status)
return {
"business_logic_score": ai_quality_score,
"processing_status": processing_status,
"supervisor_feedback": supervisor_feedback,
"final_result": final_result,
"agent_logs": state["agent_logs"] + [f"FlexibleSupervisor-LCEL: {supervisor_feedback}"]
}
except Exception as e:
logger.error(f"[FlexibleSupervisor-LCEL] Error: {e}")
# Fallback to acceptance (permissive approach)
supervisor_feedback = f"Flexible Supervisor: AI error, accepting current results. Quality: {overall_quality:.2f}"
final_result = self._create_final_result(state, overall_quality, supervisor_feedback, "completed_with_issues")
return {
"business_logic_score": overall_quality,
"processing_status": "completed_with_issues",
"supervisor_feedback": supervisor_feedback,
"final_result": final_result,
"agent_logs": state["agent_logs"] + [f"FlexibleSupervisor-LCEL: Error - {str(e)}"]
}
def workflow_router(state):
"""Updated workflow routing with expectation agent"""
status = state.get("processing_status", "")
if status == "error" or status == "blocked":
return "finalize_error"
elif status == "type_detected":
return "format_converter_multi_page"
elif status == "ready_for_ocr":
return "guardrail_agent_lcel"
elif status == "validated":
return "ocr_extractor_multi_page"
elif status in ["extracted", "extracted_partial"]:
return "document_expectation_agent_lcel"
elif status in ["expectations_ready", "expectations_skipped"]:
return "new_parallel_evaluators_lcel"
elif status == "evaluated":
return "flexible_supervisor_agent_lcel"
elif status == "re_extraction_requested":
return "ocr_extractor_multi_page"
elif status in ["completed_successfully", "completed_with_issues", "completed_efficiently"]:
return "finalize_success"
else:
return "flexible_supervisor_agent_lcel"
def finalize_error(state):
"""Finalize with error"""
error_result = {
"error": True,
"processing_status": state.get("processing_status", "error"),
"agent_logs": state.get("agent_logs", []),
"supervisor_feedback": "New-LCEL-Workflow: Processing failed",
"new_indicators": True
}
return {
"final_result": error_result,
"processing_status": "error"
}
def finalize_success(state):
"""Finalize successful processing"""
return {
"processing_status": "finalized"
}
# Build the flexible workflow graph
workflow = StateGraph(OCRAgentState)
# Add flexible agent nodes
workflow.add_node("document_type_detector_lcel", document_type_detector_lcel)
workflow.add_node("format_converter_multi_page", format_converter_multi_page)
workflow.add_node("guardrail_agent_lcel", guardrail_agent_lcel)
workflow.add_node("ocr_extractor_multi_page", ocr_extractor_multi_page)
workflow.add_node("document_expectation_agent_lcel", document_expectation_agent_lcel)
workflow.add_node("new_parallel_evaluators_lcel", new_parallel_evaluators_lcel)
workflow.add_node("flexible_supervisor_agent_lcel", flexible_supervisor_agent_lcel)
workflow.add_node("finalize_error", finalize_error)
workflow.add_node("finalize_success", finalize_success)
# Define flexible workflow edges
workflow.add_edge(START, "document_type_detector_lcel")
workflow.add_conditional_edges("document_type_detector_lcel", workflow_router)
workflow.add_conditional_edges("format_converter_multi_page", workflow_router)
workflow.add_conditional_edges("guardrail_agent_lcel", workflow_router)
workflow.add_conditional_edges("ocr_extractor_multi_page", workflow_router)
workflow.add_conditional_edges("document_expectation_agent_lcel", workflow_router)
workflow.add_conditional_edges("new_parallel_evaluators_lcel", workflow_router)
workflow.add_conditional_edges("flexible_supervisor_agent_lcel", workflow_router)
workflow.add_edge("finalize_error", END)
workflow.add_edge("finalize_success", END)
return workflow.compile()
def _create_final_result(self, state, quality_score, feedback, status):
"""Helper to create final result structure"""
return {
"document_type": state.get("document_type", "Unknown"),
"image_quality": state.get("image_quality", 0.0),
"information_relevance": state.get("information_relevance", 0.0),
"business_logic_score": quality_score,
"total_pages": state.get("total_pages", 1),
"extracted_data": state.get("extracted_data", {}),
"full_text": state.get("full_text", ""),
"extraction_attempts": state.get("extraction_attempts", 0) + 1,
"supervisor_feedback": feedback,
"document_expectations": state.get("document_expectations", ""),
"flexible_supervision": True,
"ai_analysis": state.get("evaluation_results", {}),
"source_file": state.get("file_path", ""),
"processing_logs": state.get("agent_logs", []),
"processing_status": status
}
def process_document(self, file_path: str) -> Dict[str, Any]:
"""
Process document with flexible supervision and dynamic expectations
"""
try:
# Initialize state with flexible configuration
initial_state = {
"file_path": file_path,
"file_type": "",
"converted_image_paths": [],
"total_pages": 1,
"document_type": "",
"image_quality": 0.0,
"information_relevance": 0.0,
"extracted_data": {},
"full_text": "",
"business_logic_score": 0.0,
"extraction_attempts": 0,
"max_attempts": self.supervision_config["max_attempts"], # Now 2
"document_expectations": "",
"evaluation_results": {},
"is_valid_ocr_task": False,
"supervisor_feedback": "",
"processing_status": "",
"agent_logs": [],
"final_result": {}
}
logger.info(f"[Flexible-LCEL-OCRWorkflow] Starting processing with flexible supervision: {file_path}")
# Execute the flexible workflow
result = self.workflow_graph.invoke(initial_state)
logger.info("[Flexible-LCEL-OCRWorkflow] Processing completed with flexible supervision")
return result.get("final_result", {"error": "No result generated"})
except Exception as e:
logger.error(f"[Flexible-LCEL-OCRWorkflow] Processing failed: {e}")
return {
"error": True,
"message": f"Flexible workflow execution failed: {str(e)}",
"processing_status": "workflow_error",
"flexible_supervision": True
}
# Updated factory functions
def create_flexible_lcel_ocr_workflow(anthropic_api_key: str = None, openai_api_key: str = None) -> NewLCELIntelligentOCRWorkflow:
"""Factory function for flexible LCEL-optimized OCR workflow"""
return NewLCELIntelligentOCRWorkflow(
anthropic_api_key=anthropic_api_key,
openai_api_key=openai_api_key
)
def analyze_document_with_flexible_supervision(file_path: str, anthropic_api_key: str = None, openai_api_key: str = None) -> Dict[str, Any]:
"""Document analysis with flexible supervision and dynamic expectations"""
workflow = create_flexible_lcel_ocr_workflow(anthropic_api_key, openai_api_key)
return workflow.process_document(file_path)
# Enhanced display for flexible supervision
def display_flexible_supervision_results(result: Dict[str, Any]):
"""Display results with flexible supervision and dynamic expectations"""
if result.get("error"):
print(f"❌ Error: {result.get('message', 'Unknown error')}")
return
print("\n" + "="*80)
print("πŸš€ FLEXIBLE SUPERVISION & DYNAMIC EXPECTATIONS OCR/LAD/RAD")
print("="*80)
# Flexibility indicator
if result.get("flexible_supervision"):
print("⚑ FLEXIBLE SUPERVISION: Active (Efficient processing, relaxed standards)")
# Document classification
doc_type = result.get('document_type', 'Unknown')
total_pages = result.get('total_pages', 1)
print(f"\n🏷️ DOCUMENT: {doc_type} ({total_pages} page{'s' if total_pages > 1 else ''})")
# Quality indicators
print(f"\n🧠 FLEXIBLE QUALITY INDICATORS:")
print(f" β€’ Image Quality: {result.get('image_quality', 0):.2%}")
print(f" β€’ Information Relevance: {result.get('information_relevance', 0):.2%}")
print(f" β€’ Business Logic Score: {result.get('business_logic_score', 0):.2%}")
# Efficiency metrics
attempts = result.get('extraction_attempts', 1)
max_attempts = 2 # New flexible limit
print(f" β€’ Efficient Processing: {attempts}/{max_attempts} attempts (optimized)")
# Dynamic expectations
expectations = result.get('document_expectations', '')
if expectations and expectations != "No specific expectations - document type unknown":
print(f"\n🎯 DYNAMIC EXPECTATIONS:")
try:
exp_data = json.loads(expectations)
critical_fields = exp_data.get('critical_fields', [])
important_fields = exp_data.get('important_fields', [])
if critical_fields:
print(f" β€’ Critical Fields: {', '.join(critical_fields[:3])}{'...' if len(critical_fields) > 3 else ''}")
if important_fields:
print(f" β€’ Important Fields: {', '.join(important_fields[:3])}{'...' if len(important_fields) > 3 else ''}")
except:
print(f" β€’ Expectations generated for {doc_type}")
# AI Analysis
ai_analysis = result.get('ai_analysis', {})
if ai_analysis and not ai_analysis.get('error'):
print(f"\nπŸ”„ DYNAMIC AI EVALUATIONS:")
image_reasoning = ai_analysis.get('image_quality_reasoning', '')[:100]
relevance_reasoning = ai_analysis.get('info_relevance_reasoning', '')[:100]
if image_reasoning:
print(f" πŸ“· Image Quality: {image_reasoning}...")
if relevance_reasoning:
print(f" πŸ“Š Info Relevance: {relevance_reasoning}...")
# Extracted data
extracted_data = result.get('extracted_data', {})
if extracted_data:
print(f"\nπŸ“Š EXTRACTED DATA:")
key_fields = extracted_data.get('key_fields', {})
if key_fields:
print(" πŸ”‘ Key Fields:")
for field, value in key_fields.items():
print(f" β€’ {field}: {value}")
dates = extracted_data.get('dates', [])
if dates:
print(f" πŸ“… Dates: {', '.join(dates[:5])}{'...' if len(dates) > 5 else ''}")
amounts = extracted_data.get('amounts', [])
if amounts:
print(f" πŸ’° Amounts: {', '.join(amounts)}")
entities = extracted_data.get('entities', {})
if entities:
print(" πŸ‘₯ Entities:")
for entity_type, entity_list in entities.items():
if entity_list:
print(f" β€’ {entity_type}: {', '.join(entity_list[:3])}{'...' if len(entity_list) > 3 else ''}")
# Flexible Supervisor feedback
feedback = result.get('supervisor_feedback', '')
if feedback:
print(f"\n🎯 FLEXIBLE SUPERVISOR DECISION:")
print(f" {feedback}")
# Processing efficiency summary
logs = result.get('processing_logs', [])
efficiency_logs = [log for log in logs if any(keyword in log for keyword in ['Flexible', 'Early', 'Efficient', 'Dynamic'])]
if efficiency_logs:
print(f"\n⚑ EFFICIENCY SUMMARY:")
for log in efficiency_logs[-2:]: # Show last 2 efficiency logs
print(f" β€’ {log}")
print("πŸš€ Flexible Supervision & Dynamic Expectations OCR Workflow Ready!")
print("=" * 70)
print("\n⚑ FLEXIBILITY FEATURES:")
print("β€’ Dynamic Document Expectations (AI-generated rules)")
print("β€’ Flexible Supervision (efficient processing)")
print("β€’ Reduced max attempts (2 instead of 3)")
print("β€’ Early acceptance logic (60% threshold)")
print("β€’ Minimum acceptable threshold (40%)")
print("β€’ Bias toward acceptance for efficiency")
print("\nπŸ“‹ Available functions:")
print("β€’ create_flexible_lcel_ocr_workflow(anthropic_key, openai_key)")
print("β€’ analyze_document_with_flexible_supervision(file_path, anthropic_key, openai_key)")
print("β€’ display_flexible_supervision_results(result)")
print("\nπŸ’‘ Example usage:")
print("result = analyze_document_with_flexible_supervision('data/doc.pdf', 'claude_key', 'openai_key')")
print("display_flexible_supervision_results(result)")
print("\nπŸ”₯ Dynamic Expectations + Efficient Processing = Better User Experience!")