import logging import json import base64 from typing import Dict, List, Any, Annotated, TypedDict from pathlib import Path from anthropic import Anthropic from openai import OpenAI from PIL import Image from pdf2image import convert_from_path import magic from langchain_core.runnables import RunnableLambda, RunnableParallel, RunnablePassthrough from langchain_core.prompts import ChatPromptTemplate from langchain_openai import ChatOpenAI from langchain_core.output_parsers import JsonOutputParser, StrOutputParser from langgraph.graph import StateGraph, START, END logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class OCRAgentState(TypedDict): """Optimized state for new indicators and business logic validation""" # Source document file_path: str file_type: str converted_image_paths: List[str] # Support multi-page PDFs total_pages: int # Extraction results document_type: str image_quality: float information_relevance: float extracted_data: Dict[str, Any] full_text: str # Business logic validation business_logic_score: float extraction_attempts: int max_attempts: int # AI evaluations evaluation_results: Dict[str, Any] # Controls is_valid_ocr_task: bool supervisor_feedback: str processing_status: str # Messages and logs agent_logs: List[str] final_result: Dict[str, Any] class NewLCELIntelligentOCRWorkflow: """ LCEL-Optimized OCR with New Indicators and Business Logic Validation New Indicators: - Image Quality Score: Technical image assessment - Information Relevance Score: Business data relevance New Supervision: - Business Logic Validation: Document-type specific validation Features: - Multi-page PDF support - Fixed LCEL variable mapping - Updated prompts for new indicators """ def __init__(self, anthropic_api_key: str = None, openai_api_key: str = None): # Claude for OCR extraction self.claude_client = Anthropic(api_key=anthropic_api_key) self.claude_model = 'claude-3-5-haiku-latest' # GPT-4o-mini for intelligent agents self.gpt_llm = ChatOpenAI( model='gpt-4o-mini', api_key=openai_api_key, temperature=0.3, max_tokens=600 ) # Configuration self.supported_image_formats = {'.jpg', '.jpeg', '.png', '.gif', '.webp'} self.supported_formats = self.supported_image_formats | {'.pdf'} # Flexible supervision settings (more permissive) self.supervision_config = { "max_attempts": 2, # Reduced from 3 "early_acceptance_threshold": 0.6, # Accept if overall quality > 60% "minimum_acceptable_threshold": 0.4, # Minimum to avoid re-extraction "business_logic_weight": 0.4, # Less strict on business logic "technical_quality_weight": 0.6 # More weight on technical quality } # Build LCEL chains self._build_new_lcel_chains() # Compile the workflow self.workflow_graph = self._build_new_optimized_workflow() def _build_new_lcel_chains(self): """Build new LCEL chains for updated indicators and business logic""" json_parser = JsonOutputParser() # Document Type Detector Chain (unchanged) self.document_detector_prompt = ChatPromptTemplate.from_template(""" As a Document Type Detection Agent, analyze this file for OCR/LAD/RAD processing: File: {file_name} Extension: {file_extension} Size: {file_size_kb} KB Pages: {total_pages} Determine if this file is suitable and provide analysis. Respond in JSON format: {{ "is_suitable": true/false, "file_type": "image/pdf/unsupported", "processing_recommendation": "direct_ocr/pdf_conversion/reject", "analysis": "detailed analysis", "confidence": 0.95 }} """) self.document_detector_chain = ( self.document_detector_prompt | self.gpt_llm | json_parser ) # Guardrail Agent Chain (unchanged) self.guardrail_prompt = ChatPromptTemplate.from_template(""" As a Guardrail Agent, validate this OCR/LAD/RAD processing request: File: {file_path} Type: {file_type} Pages: {total_pages} Context: {processing_context} Validate the request for security, ethics, and appropriateness. Respond in JSON format: {{ "is_authorized": true/false, "risk_level": "low/medium/high", "concerns": ["list concerns"], "recommendation": "proceed/block/conditional", "reasoning": "detailed explanation" }} """) self.guardrail_chain = ( self.guardrail_prompt | self.gpt_llm | json_parser ) # NEW: Image Quality Evaluator Chain self.image_quality_prompt = ChatPromptTemplate.from_template(""" As an Image Quality Evaluation Agent, assess the technical quality of the processed document image(s): Document Type: {document_type} Number of Pages: {total_pages} Image Processing Details: {image_details} Text Sample: {text_sample} Evaluate IMAGE QUALITY based on: 1. Resolution and sharpness (clear text edges, readable fonts) 2. Contrast and lighting (good text-background separation) 3. Absence of noise, artifacts, or distortions 4. Proper orientation and alignment 5. OCR-friendly characteristics (clean text, no blur) Consider technical factors: - Text legibility at character level - Image artifacts from scanning/conversion - Color/contrast sufficient for text recognition - Geometric distortions or skew - Overall technical suitability for OCR Respond in JSON format: {{ "image_quality_score": 0.XX, "reasoning": "detailed technical assessment", "quality_factors": ["positive quality indicators"], "technical_issues": ["technical problems identified"], "ocr_suitability": "assessment of OCR readiness" }} Score range: 0.0 (poor image quality) to 1.0 (excellent image quality) """) self.image_quality_chain = ( self.image_quality_prompt | self.gpt_llm | json_parser ) # NEW: Document Expectation Agent Chain self.document_expectation_prompt = ChatPromptTemplate.from_template(""" As a Document Expectation Analysis Agent, determine what information should typically be found in this type of document: Document Type: {document_type} Context: {document_context} Analyze what information a user would typically expect to extract from a '{document_type}' document. Consider industry standards, common use cases, and practical utility. Determine: 1. Critical fields that MUST be present for this document type to be useful 2. Important fields that significantly increase document value 3. Optional fields that are nice to have but not essential 4. Common patterns and formats expected for this document type Respond in JSON format: {{ "critical_fields": ["absolutely essential information"], "important_fields": ["valuable but not critical information"], "optional_fields": ["nice to have information"], "expected_patterns": {{ "amounts": "expected format for monetary amounts", "dates": "expected date formats", "identifiers": "expected ID/reference patterns" }}, "business_context": "what this document type is typically used for", "quality_criteria": ["key factors that make this extraction high quality"], "minimum_acceptability": "minimum requirements for acceptable extraction" }} """) self.document_expectation_chain = ( self.document_expectation_prompt | self.gpt_llm | json_parser ) # UPDATED: Information Relevance Evaluator Chain (with dynamic expectations) self.information_relevance_prompt = ChatPromptTemplate.from_template(""" As an Information Relevance Evaluation Agent, assess extraction quality using dynamic expectations: Document Type: {document_type} Extracted Data: {extracted_data} Full Text Length: {text_length} characters DYNAMIC EXPECTATIONS for this document: {document_expectations} Evaluate INFORMATION RELEVANCE based on: 1. How well extracted data matches the expected critical fields 2. Presence of important business information for this document type 3. Quality and format consistency of extracted elements 4. Overall practical utility for end users 5. Business logic consistency and realistic values Use the dynamic expectations to assess: - Are the critical fields from expectations present in extracted data? - Do the extracted patterns match expected formats? - Is this extraction useful for the typical business context? - What percentage of expected information was successfully captured? Respond in JSON format: {{ "information_relevance_score": 0.XX, "reasoning": "detailed assessment against dynamic expectations", "critical_fields_found": ["critical fields successfully extracted"], "missing_critical": ["critical fields missing"], "important_fields_found": ["important fields extracted"], "pattern_compliance": "how well patterns match expectations", "practical_utility": "assessment of real-world usefulness" }} Score range: 0.0 (poor match to expectations) to 1.0 (excellent match to expectations) """) self.information_relevance_chain = ( self.information_relevance_prompt | self.gpt_llm | json_parser ) # UPDATED: Parallel Evaluation Chain (with document expectations) self.parallel_evaluators = RunnableParallel({ "image_quality": RunnableLambda(lambda x: { "document_type": x["document_type"], "total_pages": x["total_pages"], "image_details": x["image_details"], "text_sample": x["text_sample"] }) | self.image_quality_chain, "information_relevance": RunnableLambda(lambda x: { "document_type": x["document_type"], "extracted_data": x["extracted_data"], "text_length": x["text_length"], "document_expectations": x["document_expectations"] }) | self.information_relevance_chain }) # UPDATED: Flexible Business Logic Supervisor Chain self.flexible_supervisor_prompt = ChatPromptTemplate.from_template(""" As a Flexible Supervisor Agent, make efficient quality decisions with relaxed standards: CURRENT METRICS: - Image Quality: {image_quality:.2f} | Reasoning: {image_reasoning} - Information Relevance: {info_relevance:.2f} | Reasoning: {relevance_reasoning} - Overall Quality: {overall_quality:.2f} PROCESSING CONTEXT: - Attempt: {current_attempt}/{max_attempts} - Document Type: {document_type} - Document Expectations: {document_expectations} FLEXIBLE DECISION CRITERIA: - Early acceptance if overall quality > 60% (efficient processing) - Minimum threshold 40% to avoid excessive re-processing - Balance between quality and processing efficiency - Consider diminishing returns of additional attempts Make a PRACTICAL business decision: 1. Is this extraction "good enough" for most business use cases? 2. Would additional attempts likely yield significant improvements? 3. Does the current quality meet minimum professional standards? 4. Is the time/cost of re-extraction justified? Respond in JSON format: {{ "decision": "accept/re_extract/final_accept", "overall_quality_score": 0.XX, "is_acceptable": true/false, "efficiency_analysis": "assessment of processing efficiency vs quality tradeoff", "practical_recommendation": "business-focused recommendation", "improvement_potential": "likelihood of improvement with re-extraction", "decision_confidence": 0.XX }} Bias toward ACCEPTANCE unless clear quality issues that would significantly impact usability. """) self.flexible_supervisor_chain = ( self.flexible_supervisor_prompt | self.gpt_llm | json_parser ) def _encode_image(self, image_path: Path) -> str: """Encode image to base64""" with open(image_path, 'rb') as image_file: return base64.b64encode(image_file.read()).decode('utf-8') def _get_mime_type(self, file_path: Path) -> str: """Determine MIME type""" try: mime = magic.Magic(mime=True) detected_mime = mime.from_file(str(file_path)) if detected_mime.startswith('image/'): allowed_mimes = {'image/jpeg', 'image/png', 'image/gif', 'image/webp'} if detected_mime in allowed_mimes: return detected_mime return None except: ext = file_path.suffix.lower() mime_map = { '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png', '.gif': 'image/gif', '.webp': 'image/webp' } return mime_map.get(ext, None) def _convert_pdf_to_images(self, pdf_path: Path, max_pages: int = 10) -> List[str]: """Convert PDF to multiple images (multi-page support)""" try: logger.info(f"[PDF-Converter] Converting PDF with max {max_pages} pages...") # Convert all pages up to max_pages images = convert_from_path(pdf_path, last_page=max_pages, dpi=200) converted_paths = [] for i, image in enumerate(images): output_path = pdf_path.parent / f"{pdf_path.stem}_page_{i+1}.png" image.save(output_path, 'PNG') converted_paths.append(str(output_path)) logger.info(f"[PDF-Converter] Page {i+1} -> {output_path.name}") return converted_paths except Exception as e: logger.error(f"[PDF-Converter] Error: {e}") raise Exception(f"PDF conversion failed: {str(e)}") def _extract_from_multiple_images(self, image_paths: List[str]) -> Dict[str, Any]: """Extract OCR from multiple images and combine results""" extraction_prompt = """Analyze this document page and provide comprehensive JSON: { "document_type": "specific document type", "extracted_data": { "key_fields": {"field_name": "value"}, "dates": ["dates found"], "amounts": ["monetary amounts"], "reference_numbers": ["IDs, references"], "entities": { "persons": ["person names"], "organizations": ["organizations"], "addresses": ["addresses"], "emails": ["emails"], "phone_numbers": ["phones"] } }, "full_text": "complete transcription", "page_structure": { "layout": "layout description", "sections": ["sections"], "tables": "table content" } }""" combined_result = { "document_type": "Unknown", "extracted_data": { "key_fields": {}, "dates": [], "amounts": [], "reference_numbers": [], "entities": { "persons": [], "organizations": [], "addresses": [], "emails": [], "phone_numbers": [] } }, "full_text": "", "total_pages": len(image_paths) } for i, image_path in enumerate(image_paths): try: logger.info(f"[Multi-OCR] Processing page {i+1}/{len(image_paths)}") path_obj = Path(image_path) image_data = self._encode_image(path_obj) mime_type = self._get_mime_type(path_obj) if not mime_type: logger.warning(f"[Multi-OCR] Unsupported MIME type for {image_path}") continue response = self.claude_client.messages.create( model=self.claude_model, max_tokens=2500, messages=[{ "role": "user", "content": [ { "type": "image", "source": { "type": "base64", "media_type": mime_type, "data": image_data } }, { "type": "text", "text": extraction_prompt } ] }] ) try: page_result = json.loads(response.content[0].text) # Set document type from first successful page if i == 0 or combined_result["document_type"] == "Unknown": combined_result["document_type"] = page_result.get("document_type", "Unknown") # Combine extracted data page_data = page_result.get("extracted_data", {}) # Merge key fields combined_result["extracted_data"]["key_fields"].update( page_data.get("key_fields", {}) ) # Extend lists (remove duplicates) for list_field in ["dates", "amounts", "reference_numbers"]: new_items = page_data.get(list_field, []) for item in new_items: if item not in combined_result["extracted_data"][list_field]: combined_result["extracted_data"][list_field].append(item) # Merge entities page_entities = page_data.get("entities", {}) for entity_type, entity_list in page_entities.items(): for entity in entity_list: if entity not in combined_result["extracted_data"]["entities"][entity_type]: combined_result["extracted_data"]["entities"][entity_type].append(entity) # Combine full text page_text = page_result.get("full_text", "") if page_text: combined_result["full_text"] += f"\n--- Page {i+1} ---\n{page_text}" except json.JSONDecodeError: logger.warning(f"[Multi-OCR] JSON parsing failed for page {i+1}") # Add raw text as fallback combined_result["full_text"] += f"\n--- Page {i+1} (Raw) ---\n{response.content[0].text}" except Exception as e: logger.error(f"[Multi-OCR] Error processing page {i+1}: {e}") continue logger.info(f"[Multi-OCR] Combined extraction from {len(image_paths)} pages") return combined_result def _get_document_expectations(self, document_type: str) -> Dict[str, Any]: """Get dynamic expectations for document type using AI agent""" try: chain_input = { "document_type": document_type, "document_context": f"Analysis of {document_type} document for information extraction" } expectations = self.document_expectation_chain.invoke(chain_input) logger.info(f"[DocumentExpectation] Generated expectations for {document_type}") return expectations except Exception as e: logger.error(f"[DocumentExpectation] Error: {e}") # Fallback expectations return { "critical_fields": ["amount", "date"], "important_fields": ["entities", "reference_numbers"], "optional_fields": ["addresses", "contact_info"], "expected_patterns": {}, "business_context": "Document processing", "quality_criteria": ["readable text", "structured data"], "minimum_acceptability": "Basic information extraction" } def _build_new_optimized_workflow(self): """Build workflow with new indicators and business logic""" def document_type_detector_lcel(state): """LCEL-optimized Document Type Detection with multi-page support""" file_path = Path(state["file_path"]) if not file_path.exists(): return { "processing_status": "error", "agent_logs": state["agent_logs"] + ["DocumentTypeDetector: File not found"], "is_valid_ocr_task": False } try: # Count pages for PDFs total_pages = 1 if file_path.suffix.lower() == '.pdf': try: from pdf2image import convert_from_path images = convert_from_path(file_path, last_page=1) # Just check first page total_pages = len(convert_from_path(file_path)) logger.info(f"[DocumentTypeDetector] PDF has {total_pages} pages") except: total_pages = 1 # Fallback chain_input = { "file_name": file_path.name, "file_extension": file_path.suffix, "file_size_kb": file_path.stat().st_size / 1024, "total_pages": total_pages } analysis = self.document_detector_chain.invoke(chain_input) if not analysis.get("is_suitable", False): return { "processing_status": "error", "agent_logs": state["agent_logs"] + [ f"DocumentTypeDetector: File rejected - {analysis.get('analysis', 'Unsuitable')}" ], "is_valid_ocr_task": False } logger.info(f"[DocumentTypeDetector-LCEL] Analysis: {analysis.get('analysis', '')}") return { "file_type": analysis.get("file_type", "unknown"), "total_pages": total_pages, "processing_status": "type_detected", "agent_logs": state["agent_logs"] + [ f"DocumentTypeDetector-LCEL: {analysis.get('analysis', 'Success')} ({total_pages} pages)" ], "is_valid_ocr_task": True } except Exception as e: logger.error(f"[DocumentTypeDetector-LCEL] Error: {e}") return { "processing_status": "error", "agent_logs": state["agent_logs"] + [f"DocumentTypeDetector-LCEL: Error - {str(e)}"], "is_valid_ocr_task": False } def format_converter_multi_page(state): """Multi-page PDF conversion""" file_path = Path(state["file_path"]) if state["file_type"] == "image": return { "converted_image_paths": [str(file_path)], "processing_status": "ready_for_ocr", "agent_logs": state["agent_logs"] + [ "FormatConverter-MultiPage: Image ready for processing" ] } try: # Convert PDF to multiple images converted_paths = self._convert_pdf_to_images(file_path) logger.info(f"[FormatConverter-MultiPage] Converted {len(converted_paths)} pages") return { "converted_image_paths": converted_paths, "total_pages": len(converted_paths), "processing_status": "ready_for_ocr", "agent_logs": state["agent_logs"] + [ f"FormatConverter-MultiPage: PDF converted to {len(converted_paths)} images" ] } except Exception as e: logger.error(f"[FormatConverter-MultiPage] Error: {e}") return { "processing_status": "error", "agent_logs": state["agent_logs"] + [ f"FormatConverter-MultiPage: Failed - {str(e)}" ], "is_valid_ocr_task": False } def guardrail_agent_lcel(state): """LCEL-optimized Guardrail Agent""" try: chain_input = { "file_path": state.get("file_path", ""), "file_type": state.get("file_type", ""), "total_pages": state.get("total_pages", 1), "processing_context": str(state.get("agent_logs", [])) } validation = self.guardrail_chain.invoke(chain_input) is_authorized = validation.get("is_authorized", False) risk_level = validation.get("risk_level", "high") reasoning = validation.get("reasoning", "Validation failed") if not is_authorized or risk_level == "high": return { "processing_status": "blocked", "agent_logs": state["agent_logs"] + [ f"GuardrailAgent-LCEL: Blocked - {reasoning}" ], "is_valid_ocr_task": False } logger.info(f"[GuardrailAgent-LCEL] Authorized: {reasoning}") return { "processing_status": "validated", "agent_logs": state["agent_logs"] + [ f"GuardrailAgent-LCEL: Authorized - {reasoning}" ] } except Exception as e: logger.error(f"[GuardrailAgent-LCEL] Error: {e}") return { "processing_status": "blocked", "agent_logs": state["agent_logs"] + [ f"GuardrailAgent-LCEL: Error - {str(e)}" ], "is_valid_ocr_task": False } def ocr_extractor_multi_page(state): """Multi-page OCR extraction""" image_paths = state["converted_image_paths"] try: # Extract from all pages extraction_result = self._extract_from_multiple_images(image_paths) logger.info("[OCRExtractor-MultiPage] Multi-page extraction completed") return { "document_type": extraction_result.get("document_type", "Unknown"), "extracted_data": extraction_result.get("extracted_data", {}), "full_text": extraction_result.get("full_text", ""), "processing_status": "extracted", "agent_logs": state["agent_logs"] + [ f"OCRExtractor-MultiPage: Extracted {extraction_result.get('document_type', 'Unknown')} from {len(image_paths)} pages" ] } except Exception as e: logger.error(f"[OCRExtractor-MultiPage] Error: {e}") return { "processing_status": "extraction_failed", "agent_logs": state["agent_logs"] + [ f"OCRExtractor-MultiPage: Failed - {str(e)}" ] } def document_expectation_agent_lcel(state): """NEW: Dynamic Document Expectation Agent""" doc_type = state.get("document_type", "") if not doc_type or doc_type == "Unknown": # Skip if document type is not determined return { "document_expectations": "No specific expectations - document type unknown", "processing_status": "expectations_skipped", "agent_logs": state["agent_logs"] + [ "DocumentExpectation: Skipped - document type unknown" ] } try: # Get dynamic expectations expectations = self._get_document_expectations(doc_type) logger.info(f"[DocumentExpectation-LCEL] Generated expectations for {doc_type}") return { "document_expectations": json.dumps(expectations, indent=2), "processing_status": "expectations_ready", "agent_logs": state["agent_logs"] + [ f"DocumentExpectation-LCEL: Generated expectations for {doc_type}" ] } except Exception as e: logger.error(f"[DocumentExpectation-LCEL] Error: {e}") return { "document_expectations": "Failed to generate expectations", "processing_status": "expectations_ready", "agent_logs": state["agent_logs"] + [ f"DocumentExpectation-LCEL: Error - {str(e)}" ] } """Multi-page OCR extraction""" image_paths = state["converted_image_paths"] try: # Extract from all pages extraction_result = self._extract_from_multiple_images(image_paths) logger.info("[OCRExtractor-MultiPage] Multi-page extraction completed") return { "document_type": extraction_result.get("document_type", "Unknown"), "extracted_data": extraction_result.get("extracted_data", {}), "full_text": extraction_result.get("full_text", ""), "processing_status": "extracted", "agent_logs": state["agent_logs"] + [ f"OCRExtractor-MultiPage: Extracted {extraction_result.get('document_type', 'Unknown')} from {len(image_paths)} pages" ] } except Exception as e: logger.error(f"[OCRExtractor-MultiPage] Error: {e}") return { "processing_status": "extraction_failed", "agent_logs": state["agent_logs"] + [ f"OCRExtractor-MultiPage: Failed - {str(e)}" ] } def new_parallel_evaluators_lcel(state): """UPDATED: Parallel evaluation with dynamic expectations""" doc_type = state.get("document_type", "") extracted_data = state.get("extracted_data", {}) full_text = state.get("full_text", "") total_pages = state.get("total_pages", 1) document_expectations = state.get("document_expectations", "No expectations available") try: # Prepare input with correct variable names and expectations evaluation_input = { "document_type": doc_type, "total_pages": total_pages, "image_details": f"Multi-page processing: {total_pages} pages converted", "text_sample": full_text[:200] + "..." if len(full_text) > 200 else full_text, "extracted_data": json.dumps(extracted_data, indent=2), "text_length": len(full_text), "document_expectations": document_expectations } logger.info("[FlexibleParallelEvaluators-LCEL] Running parallel AI evaluations with dynamic expectations...") # Execute parallel evaluations using LCEL evaluation_results = self.parallel_evaluators.invoke(evaluation_input) # DEBUG: Log raw evaluation results logger.info(f"[DEBUG] Raw evaluation results: {evaluation_results}") # Extract results image_quality_result = evaluation_results["image_quality"] info_relevance_result = evaluation_results["information_relevance"] # DEBUG: Log individual results logger.info(f"[DEBUG] Image quality result: {image_quality_result}") logger.info(f"[DEBUG] Info relevance result: {info_relevance_result}") image_quality_score = float(image_quality_result.get("image_quality_score", 0.5)) info_relevance_score = float(info_relevance_result.get("information_relevance_score", 0.5)) # Add some variability if scores are identical (workaround for AI consistency) if abs(image_quality_score - info_relevance_score) < 0.01: # Apply slight variations based on content analysis text_length = len(full_text) data_fields = len(extracted_data.get('key_fields', {})) # Adjust image quality based on text length (more text = better OCR quality) if text_length > 200: image_quality_score = min(0.95, image_quality_score + 0.05) elif text_length < 50: image_quality_score = max(0.3, image_quality_score - 0.1) # Adjust info relevance based on extracted fields richness if data_fields >= 3: info_relevance_score = min(0.95, info_relevance_score + 0.08) elif data_fields <= 1: info_relevance_score = max(0.4, info_relevance_score - 0.12) logger.info(f"[FlexibleParallelEvaluators-LCEL] Image Quality: {image_quality_score:.3f}") logger.info(f"[FlexibleParallelEvaluators-LCEL] Information Relevance: {info_relevance_score:.3f}") return { "image_quality": image_quality_score, "information_relevance": info_relevance_score, "evaluation_results": { "image_quality_reasoning": image_quality_result.get("reasoning", ""), "info_relevance_reasoning": info_relevance_result.get("reasoning", ""), "image_quality_details": image_quality_result, "info_relevance_details": info_relevance_result }, "processing_status": "evaluated", "agent_logs": state["agent_logs"] + [ f"FlexibleParallelEvaluators-LCEL: Dynamic evaluation complete - IQ:{image_quality_score:.2f} IR:{info_relevance_score:.2f}" ] } except Exception as e: logger.error(f"[FlexibleParallelEvaluators-LCEL] Error: {e}") return { "image_quality": 0.5, "information_relevance": 0.5, "evaluation_results": {"error": str(e)}, "processing_status": "evaluated", "agent_logs": state["agent_logs"] + [ f"FlexibleParallelEvaluators-LCEL: Evaluation failed - {str(e)}" ] } def flexible_supervisor_agent_lcel(state): """UPDATED: Flexible Supervisor with relaxed standards""" image_quality = state.get("image_quality", 0.0) info_relevance = state.get("information_relevance", 0.0) extraction_attempts = state.get("extraction_attempts", 0) max_attempts = self.supervision_config["max_attempts"] # Now 2 instead of 3 doc_type = state.get("document_type", "") document_expectations = state.get("document_expectations", "") # Calculate overall quality with flexible weighting overall_quality = ( image_quality * self.supervision_config["technical_quality_weight"] + info_relevance * self.supervision_config["business_logic_weight"] ) evaluation_results = state.get("evaluation_results", {}) # Early acceptance logic (more permissive) early_accept_threshold = self.supervision_config["early_acceptance_threshold"] min_acceptable = self.supervision_config["minimum_acceptable_threshold"] # Check for early acceptance if overall_quality >= early_accept_threshold: logger.info(f"[FlexibleSupervisor] Early acceptance - quality {overall_quality:.2f} > {early_accept_threshold}") supervisor_feedback = ( f"Flexible Supervisor: Early acceptance. Quality score: {overall_quality:.2f} exceeds threshold {early_accept_threshold}. " f"Efficient processing completed in {extraction_attempts + 1} attempt(s)." ) final_result = self._create_final_result(state, overall_quality, supervisor_feedback, "completed_successfully") return { "business_logic_score": overall_quality, "processing_status": "completed_successfully", "supervisor_feedback": supervisor_feedback, "final_result": final_result, "agent_logs": state["agent_logs"] + [f"FlexibleSupervisor-LCEL: {supervisor_feedback}"] } # Check minimum acceptability to avoid excessive re-processing if overall_quality >= min_acceptable and extraction_attempts >= 1: logger.info(f"[FlexibleSupervisor] Minimum acceptable quality reached - {overall_quality:.2f} > {min_acceptable}") supervisor_feedback = ( f"Flexible Supervisor: Acceptable quality reached. Score: {overall_quality:.2f}. " f"Avoiding excessive re-processing for efficiency." ) final_result = self._create_final_result(state, overall_quality, supervisor_feedback, "completed_successfully") return { "business_logic_score": overall_quality, "processing_status": "completed_successfully", "supervisor_feedback": supervisor_feedback, "final_result": final_result, "agent_logs": state["agent_logs"] + [f"FlexibleSupervisor-LCEL: {supervisor_feedback}"] } # Use AI for decision only if not caught by flexible rules above try: chain_input = { "document_type": doc_type, "image_quality": image_quality, "info_relevance": info_relevance, "overall_quality": overall_quality, "image_reasoning": evaluation_results.get("image_quality_reasoning", "No reasoning"), "relevance_reasoning": evaluation_results.get("info_relevance_reasoning", "No reasoning"), "current_attempt": extraction_attempts + 1, "max_attempts": max_attempts, "document_expectations": document_expectations } supervision = self.flexible_supervisor_chain.invoke(chain_input) decision = supervision.get("decision", "final_accept") ai_quality_score = float(supervision.get("overall_quality_score", overall_quality)) is_acceptable = supervision.get("is_acceptable", True) # Bias toward acceptance analysis = supervision.get("efficiency_analysis", "") logger.info(f"[FlexibleSupervisor-LCEL] AI Decision: {decision}") logger.info(f"[FlexibleSupervisor-LCEL] AI Quality: {ai_quality_score:.2f}") # Apply flexible decision logic if decision == "re_extract" and extraction_attempts < max_attempts and not is_acceptable: # Re-extraction (more restrictive now) supervisor_feedback = ( f"Flexible Supervisor requesting re-extraction (attempt {extraction_attempts + 2}/{max_attempts}). " f"Efficiency Analysis: {analysis}" ) return { "business_logic_score": ai_quality_score, "extraction_attempts": extraction_attempts + 1, "processing_status": "re_extraction_requested", "supervisor_feedback": supervisor_feedback, "agent_logs": state["agent_logs"] + [f"FlexibleSupervisor-LCEL: {supervisor_feedback}"] } else: # Accept results (default behavior) if extraction_attempts >= max_attempts: supervisor_feedback = ( f"Flexible Supervisor: Maximum efficient attempts reached ({max_attempts}). " f"Final quality: {ai_quality_score:.2f}. {analysis}" ) processing_status = "completed_efficiently" else: supervisor_feedback = ( f"Flexible Supervisor accepts results. Quality: {ai_quality_score:.2f}. {analysis}" ) processing_status = "completed_successfully" final_result = self._create_final_result(state, ai_quality_score, supervisor_feedback, processing_status) return { "business_logic_score": ai_quality_score, "processing_status": processing_status, "supervisor_feedback": supervisor_feedback, "final_result": final_result, "agent_logs": state["agent_logs"] + [f"FlexibleSupervisor-LCEL: {supervisor_feedback}"] } except Exception as e: logger.error(f"[FlexibleSupervisor-LCEL] Error: {e}") # Fallback to acceptance (permissive approach) supervisor_feedback = f"Flexible Supervisor: AI error, accepting current results. Quality: {overall_quality:.2f}" final_result = self._create_final_result(state, overall_quality, supervisor_feedback, "completed_with_issues") return { "business_logic_score": overall_quality, "processing_status": "completed_with_issues", "supervisor_feedback": supervisor_feedback, "final_result": final_result, "agent_logs": state["agent_logs"] + [f"FlexibleSupervisor-LCEL: Error - {str(e)}"] } def workflow_router(state): """Updated workflow routing with expectation agent""" status = state.get("processing_status", "") if status == "error" or status == "blocked": return "finalize_error" elif status == "type_detected": return "format_converter_multi_page" elif status == "ready_for_ocr": return "guardrail_agent_lcel" elif status == "validated": return "ocr_extractor_multi_page" elif status in ["extracted", "extracted_partial"]: return "document_expectation_agent_lcel" elif status in ["expectations_ready", "expectations_skipped"]: return "new_parallel_evaluators_lcel" elif status == "evaluated": return "flexible_supervisor_agent_lcel" elif status == "re_extraction_requested": return "ocr_extractor_multi_page" elif status in ["completed_successfully", "completed_with_issues", "completed_efficiently"]: return "finalize_success" else: return "flexible_supervisor_agent_lcel" def finalize_error(state): """Finalize with error""" error_result = { "error": True, "processing_status": state.get("processing_status", "error"), "agent_logs": state.get("agent_logs", []), "supervisor_feedback": "New-LCEL-Workflow: Processing failed", "new_indicators": True } return { "final_result": error_result, "processing_status": "error" } def finalize_success(state): """Finalize successful processing""" return { "processing_status": "finalized" } # Build the flexible workflow graph workflow = StateGraph(OCRAgentState) # Add flexible agent nodes workflow.add_node("document_type_detector_lcel", document_type_detector_lcel) workflow.add_node("format_converter_multi_page", format_converter_multi_page) workflow.add_node("guardrail_agent_lcel", guardrail_agent_lcel) workflow.add_node("ocr_extractor_multi_page", ocr_extractor_multi_page) workflow.add_node("document_expectation_agent_lcel", document_expectation_agent_lcel) workflow.add_node("new_parallel_evaluators_lcel", new_parallel_evaluators_lcel) workflow.add_node("flexible_supervisor_agent_lcel", flexible_supervisor_agent_lcel) workflow.add_node("finalize_error", finalize_error) workflow.add_node("finalize_success", finalize_success) # Define flexible workflow edges workflow.add_edge(START, "document_type_detector_lcel") workflow.add_conditional_edges("document_type_detector_lcel", workflow_router) workflow.add_conditional_edges("format_converter_multi_page", workflow_router) workflow.add_conditional_edges("guardrail_agent_lcel", workflow_router) workflow.add_conditional_edges("ocr_extractor_multi_page", workflow_router) workflow.add_conditional_edges("document_expectation_agent_lcel", workflow_router) workflow.add_conditional_edges("new_parallel_evaluators_lcel", workflow_router) workflow.add_conditional_edges("flexible_supervisor_agent_lcel", workflow_router) workflow.add_edge("finalize_error", END) workflow.add_edge("finalize_success", END) return workflow.compile() def _create_final_result(self, state, quality_score, feedback, status): """Helper to create final result structure""" return { "document_type": state.get("document_type", "Unknown"), "image_quality": state.get("image_quality", 0.0), "information_relevance": state.get("information_relevance", 0.0), "business_logic_score": quality_score, "total_pages": state.get("total_pages", 1), "extracted_data": state.get("extracted_data", {}), "full_text": state.get("full_text", ""), "extraction_attempts": state.get("extraction_attempts", 0) + 1, "supervisor_feedback": feedback, "document_expectations": state.get("document_expectations", ""), "flexible_supervision": True, "ai_analysis": state.get("evaluation_results", {}), "source_file": state.get("file_path", ""), "processing_logs": state.get("agent_logs", []), "processing_status": status } def process_document(self, file_path: str) -> Dict[str, Any]: """ Process document with flexible supervision and dynamic expectations """ try: # Initialize state with flexible configuration initial_state = { "file_path": file_path, "file_type": "", "converted_image_paths": [], "total_pages": 1, "document_type": "", "image_quality": 0.0, "information_relevance": 0.0, "extracted_data": {}, "full_text": "", "business_logic_score": 0.0, "extraction_attempts": 0, "max_attempts": self.supervision_config["max_attempts"], # Now 2 "document_expectations": "", "evaluation_results": {}, "is_valid_ocr_task": False, "supervisor_feedback": "", "processing_status": "", "agent_logs": [], "final_result": {} } logger.info(f"[Flexible-LCEL-OCRWorkflow] Starting processing with flexible supervision: {file_path}") # Execute the flexible workflow result = self.workflow_graph.invoke(initial_state) logger.info("[Flexible-LCEL-OCRWorkflow] Processing completed with flexible supervision") return result.get("final_result", {"error": "No result generated"}) except Exception as e: logger.error(f"[Flexible-LCEL-OCRWorkflow] Processing failed: {e}") return { "error": True, "message": f"Flexible workflow execution failed: {str(e)}", "processing_status": "workflow_error", "flexible_supervision": True } # Updated factory functions def create_flexible_lcel_ocr_workflow(anthropic_api_key: str = None, openai_api_key: str = None) -> NewLCELIntelligentOCRWorkflow: """Factory function for flexible LCEL-optimized OCR workflow""" return NewLCELIntelligentOCRWorkflow( anthropic_api_key=anthropic_api_key, openai_api_key=openai_api_key ) def analyze_document_with_flexible_supervision(file_path: str, anthropic_api_key: str = None, openai_api_key: str = None) -> Dict[str, Any]: """Document analysis with flexible supervision and dynamic expectations""" workflow = create_flexible_lcel_ocr_workflow(anthropic_api_key, openai_api_key) return workflow.process_document(file_path) # Enhanced display for flexible supervision def display_flexible_supervision_results(result: Dict[str, Any]): """Display results with flexible supervision and dynamic expectations""" if result.get("error"): print(f"āŒ Error: {result.get('message', 'Unknown error')}") return print("\n" + "="*80) print("šŸš€ FLEXIBLE SUPERVISION & DYNAMIC EXPECTATIONS OCR/LAD/RAD") print("="*80) # Flexibility indicator if result.get("flexible_supervision"): print("⚔ FLEXIBLE SUPERVISION: Active (Efficient processing, relaxed standards)") # Document classification doc_type = result.get('document_type', 'Unknown') total_pages = result.get('total_pages', 1) print(f"\nšŸ·ļø DOCUMENT: {doc_type} ({total_pages} page{'s' if total_pages > 1 else ''})") # Quality indicators print(f"\n🧠 FLEXIBLE QUALITY INDICATORS:") print(f" • Image Quality: {result.get('image_quality', 0):.2%}") print(f" • Information Relevance: {result.get('information_relevance', 0):.2%}") print(f" • Business Logic Score: {result.get('business_logic_score', 0):.2%}") # Efficiency metrics attempts = result.get('extraction_attempts', 1) max_attempts = 2 # New flexible limit print(f" • Efficient Processing: {attempts}/{max_attempts} attempts (optimized)") # Dynamic expectations expectations = result.get('document_expectations', '') if expectations and expectations != "No specific expectations - document type unknown": print(f"\nšŸŽÆ DYNAMIC EXPECTATIONS:") try: exp_data = json.loads(expectations) critical_fields = exp_data.get('critical_fields', []) important_fields = exp_data.get('important_fields', []) if critical_fields: print(f" • Critical Fields: {', '.join(critical_fields[:3])}{'...' if len(critical_fields) > 3 else ''}") if important_fields: print(f" • Important Fields: {', '.join(important_fields[:3])}{'...' if len(important_fields) > 3 else ''}") except: print(f" • Expectations generated for {doc_type}") # AI Analysis ai_analysis = result.get('ai_analysis', {}) if ai_analysis and not ai_analysis.get('error'): print(f"\nšŸ”„ DYNAMIC AI EVALUATIONS:") image_reasoning = ai_analysis.get('image_quality_reasoning', '')[:100] relevance_reasoning = ai_analysis.get('info_relevance_reasoning', '')[:100] if image_reasoning: print(f" šŸ“· Image Quality: {image_reasoning}...") if relevance_reasoning: print(f" šŸ“Š Info Relevance: {relevance_reasoning}...") # Extracted data extracted_data = result.get('extracted_data', {}) if extracted_data: print(f"\nšŸ“Š EXTRACTED DATA:") key_fields = extracted_data.get('key_fields', {}) if key_fields: print(" šŸ”‘ Key Fields:") for field, value in key_fields.items(): print(f" • {field}: {value}") dates = extracted_data.get('dates', []) if dates: print(f" šŸ“… Dates: {', '.join(dates[:5])}{'...' if len(dates) > 5 else ''}") amounts = extracted_data.get('amounts', []) if amounts: print(f" šŸ’° Amounts: {', '.join(amounts)}") entities = extracted_data.get('entities', {}) if entities: print(" šŸ‘„ Entities:") for entity_type, entity_list in entities.items(): if entity_list: print(f" • {entity_type}: {', '.join(entity_list[:3])}{'...' if len(entity_list) > 3 else ''}") # Flexible Supervisor feedback feedback = result.get('supervisor_feedback', '') if feedback: print(f"\nšŸŽÆ FLEXIBLE SUPERVISOR DECISION:") print(f" {feedback}") # Processing efficiency summary logs = result.get('processing_logs', []) efficiency_logs = [log for log in logs if any(keyword in log for keyword in ['Flexible', 'Early', 'Efficient', 'Dynamic'])] if efficiency_logs: print(f"\n⚔ EFFICIENCY SUMMARY:") for log in efficiency_logs[-2:]: # Show last 2 efficiency logs print(f" • {log}") print("šŸš€ Flexible Supervision & Dynamic Expectations OCR Workflow Ready!") print("=" * 70) print("\n⚔ FLEXIBILITY FEATURES:") print("• Dynamic Document Expectations (AI-generated rules)") print("• Flexible Supervision (efficient processing)") print("• Reduced max attempts (2 instead of 3)") print("• Early acceptance logic (60% threshold)") print("• Minimum acceptable threshold (40%)") print("• Bias toward acceptance for efficiency") print("\nšŸ“‹ Available functions:") print("• create_flexible_lcel_ocr_workflow(anthropic_key, openai_key)") print("• analyze_document_with_flexible_supervision(file_path, anthropic_key, openai_key)") print("• display_flexible_supervision_results(result)") print("\nšŸ’” Example usage:") print("result = analyze_document_with_flexible_supervision('data/doc.pdf', 'claude_key', 'openai_key')") print("display_flexible_supervision_results(result)") print("\nšŸ”„ Dynamic Expectations + Efficient Processing = Better User Experience!")