""" True End-to-End Agent Orchestrator =================================== Autonomous agent that: 1. Decides which tools to use based on document analysis 2. Validates its own output 3. Self-corrects when confidence is low 4. Learns from patterns """ import json import sys import logging from pathlib import Path from typing import Dict, List, Optional, Tuple from dataclasses import dataclass from enum import Enum # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[logging.StreamHandler(sys.stdout)], force=True ) logger = logging.getLogger(__name__) class AgentDecision(Enum): """Agent's possible decisions""" EXTRACT_TEXT = "extract_text" EXTRACT_TABLES = "extract_tables" RUN_NER = "run_ner" USE_GEMINI = "use_gemini" USE_REGEX = "use_regex" VALIDATE = "validate" RETRY = "retry" COMPLETE = "complete" HUMAN_REVIEW = "human_review" @dataclass class AgentState: """Agent's internal state""" doc_id: str file_path: Path # Extracted data raw_text: Optional[str] = None tables: Optional[List] = None entities: Optional[List] = None entity_map: Optional[Dict] = None # Mapped fields fields: Optional[Dict] = None confidence_map: Optional[Dict] = None # Decision tracking attempts: int = 0 max_attempts: int = 3 history: List[str] = None errors: List[str] = None def __post_init__(self): if self.history is None: self.history = [] if self.errors is None: self.errors = [] class InvoiceAgent: """ Autonomous agent that processes invoices with self-correction. """ def __init__(self, text_extractor, table_extractor, ner_extractor, gemini_mapper): """ Args: text_extractor: Function(file_path) -> (success, text, error) table_extractor: Function(file_path) -> (success, tables, error) ner_extractor: Function(text) -> (success, entities, entity_map, error) gemini_mapper: Function(text, entities, entity_map, tables) -> (success, fields, error) """ self.text_extractor = text_extractor self.table_extractor = table_extractor self.ner_extractor = ner_extractor self.gemini_mapper = gemini_mapper # Minimum confidence thresholds self.MIN_CONFIDENCE = { 'cust_number': 0.6, 'posting_date': 0.7, 'total_open_amount': 0.7, 'cust_payment_terms': 0.5 } def process(self, state: AgentState) -> AgentState: """ Main agent loop - autonomous decision-making and execution. """ logger.info("=" * 70) logger.info(f"**** AGENT STARTING: {state.file_path.name}") logger.info("=" * 70) while state.attempts < state.max_attempts: state.attempts += 1 logger.info(f"\n**** ATTEMPT {state.attempts}/{state.max_attempts}") # Step 1: Decide next action decision = self._decide_next_action(state) logger.info(f"**** DECISION: {decision.value}") state.history.append(decision.value) # Step 2: Execute action success = self._execute_action(decision, state) if not success: logger.warning(f"**** Action {decision.value} failed") continue # Step 3: Check if we're done if decision == AgentDecision.COMPLETE: logger.info("**** AGENT COMPLETE") break if decision == AgentDecision.HUMAN_REVIEW: logger.info("**** AGENT REQUESTING HUMAN REVIEW") break logger.info("=" * 70) logger.info(f"**** Final confidence: {self._calculate_overall_confidence(state):.2f}") logger.info(f"**** Actions taken: {' → '.join(state.history)}") logger.info("=" * 70) return state def _decide_next_action(self, state: AgentState) -> AgentDecision: """ Agent's brain - decides what to do next based on current state. """ # 1. If no text, extract it if state.raw_text is None: return AgentDecision.EXTRACT_TEXT # 2. If text exists but no entities, run NER if state.entities is None: return AgentDecision.RUN_NER # 3. If no fields mapped yet, try Gemini first if state.fields is None: return AgentDecision.USE_GEMINI # 4. If fields exist, validate them if not self._is_validated(state): return AgentDecision.VALIDATE # 5. Check confidence - retry if low overall_confidence = self._calculate_overall_confidence(state) if overall_confidence < 0.6 and state.attempts < state.max_attempts: # Try alternative approach if 'use_gemini' in state.history and 'use_regex' not in state.history: return AgentDecision.USE_REGEX elif 'extract_tables' not in state.history: return AgentDecision.EXTRACT_TABLES else: return AgentDecision.RETRY # 6. If still low confidence, request human review if overall_confidence < 0.5: return AgentDecision.HUMAN_REVIEW # 7. Otherwise, we're done! return AgentDecision.COMPLETE def _execute_action(self, decision: AgentDecision, state: AgentState) -> bool: """Execute the decided action.""" try: if decision == AgentDecision.EXTRACT_TEXT: return self._extract_text(state) elif decision == AgentDecision.EXTRACT_TABLES: return self._extract_tables(state) elif decision == AgentDecision.RUN_NER: return self._run_ner(state) elif decision == AgentDecision.USE_GEMINI: return self._use_gemini(state) elif decision == AgentDecision.USE_REGEX: return self._use_regex(state) elif decision == AgentDecision.VALIDATE: return self._validate_fields(state) elif decision == AgentDecision.RETRY: # Clear fields and try again with different approach state.fields = None state.confidence_map = None return True elif decision in [AgentDecision.COMPLETE, AgentDecision.HUMAN_REVIEW]: return True return False except Exception as e: logger.error(f"**** Action failed: {e}") state.errors.append(str(e)) return False def _extract_text(self, state: AgentState) -> bool: """Extract text from document.""" logger.info("**** Extracting text...") success, text, error = self.text_extractor(state.file_path) if success and text and len(text.strip()) > 10: state.raw_text = text logger.info(f"**** Extracted {len(text)} characters") return True state.errors.append(f"Text extraction failed: {error}") return False def _extract_tables(self, state: AgentState) -> bool: """Extract tables from document.""" logger.info("**** Extracting tables...") success, tables, error = self.table_extractor(state.file_path) if success: state.tables = tables logger.info(f"**** Extracted {len(tables)} tables") return True logger.warning(f"**** Table extraction failed: {error}") state.tables = [] return True # Non-critical, continue def _run_ner(self, state: AgentState) -> bool: """Run Named Entity Recognition.""" logger.info("**** Running NER...") success, entities, entity_map, error = self.ner_extractor(state.raw_text) if success: state.entities = entities state.entity_map = entity_map logger.info(f"**** Found {len(entities)} entities") return True logger.warning(f"**** NER failed: {error}") state.entities = [] state.entity_map = {} return True # Non-critical, continue def _use_gemini(self, state: AgentState) -> bool: """Use Gemini for intelligent mapping.""" logger.info("**** Using Gemini mapping...") success, result, error = self.gemini_mapper( state.raw_text, state.entities or [], state.entity_map or {}, state.tables or [] ) if success and result: state.fields = { 'cust_number': result.get('customer_name', 'UNKNOWN')[:20], 'posting_date': result.get('date', '2024-01-01'), 'total_open_amount': float(result.get('total_amount', 0.0)), 'business_code': 'U001', 'cust_payment_terms': result.get('payment_terms', 'NAH4')[:10] } # High confidence from Gemini state.confidence_map = { 'cust_number': 0.9, 'posting_date': 0.9, 'total_open_amount': 0.9, 'business_code': 0.3, 'cust_payment_terms': 0.8 } logger.info(f"**** Gemini mapped: {state.fields}") return True logger.warning(f"**** Gemini failed: {error}") state.errors.append(f"Gemini mapping failed: {error}") return False def _use_regex(self, state: AgentState) -> bool: """Fallback regex-based extraction.""" logger.info("**** Using regex fallback...") from backend.app.api.ingest import map_with_regex fields, confidence = map_with_regex(state.raw_text, state.entities or []) state.fields = fields state.confidence_map = confidence logger.info(f"**** Regex mapped: {fields}") return True def _validate_fields(self, state: AgentState) -> bool: """ Validate extracted fields using business rules. Agent learns if data makes sense. """ logger.info("✓ Validating fields...") if not state.fields: return False validation_results = {} # 1. Customer number shouldn't be empty or generic cust = state.fields.get('cust_number', '') if cust and cust != 'UNKNOWN' and len(cust) > 2: validation_results['cust_number'] = True else: validation_results['cust_number'] = False logger.warning("**** Customer number looks invalid") # 2. Date should be reasonable (not default) date = state.fields.get('posting_date', '') if date and date != '2024-01-01': validation_results['posting_date'] = True else: validation_results['posting_date'] = False logger.warning("**** Date looks like default value") # 3. Amount should be > 0 amount = state.fields.get('total_open_amount', 0.0) if amount > 0: validation_results['total_open_amount'] = True else: validation_results['total_open_amount'] = False logger.warning("**** Amount is zero or missing") # Adjust confidence based on validation for field, is_valid in validation_results.items(): if not is_valid and state.confidence_map: state.confidence_map[field] *= 0.5 # Reduce confidence # Mark as validated state.history.append('validated') success_count = sum(validation_results.values()) logger.info(f"✓ Validation: {success_count}/{len(validation_results)} checks passed") return success_count >= 2 # At least 2 fields should be valid def _is_validated(self, state: AgentState) -> bool: """Check if validation has been performed.""" return 'validated' in state.history def _calculate_overall_confidence(self, state: AgentState) -> float: """Calculate overall confidence score.""" if not state.confidence_map: return 0.0 # Weighted average (important fields have more weight) weights = { 'cust_number': 0.3, 'posting_date': 0.2, 'total_open_amount': 0.3, 'cust_payment_terms': 0.1, 'business_code': 0.1 } total_confidence = 0.0 total_weight = 0.0 for field, weight in weights.items(): if field in state.confidence_map: total_confidence += state.confidence_map[field] * weight total_weight += weight return total_confidence / total_weight if total_weight > 0 else 0.0 # ============================================== # Integration with existing code # ============================================== def create_agent(text_extractor_fn, table_extractor_fn, ner_fn, gemini_fn): """ Factory function to create agent with your existing functions. Usage: from backend.app.api.ingest import ( call_text_extractor, call_table_extractor, call_ner, map_with_gemini ) agent = create_agent( call_text_extractor, call_table_extractor, call_ner, map_with_gemini ) state = AgentState(doc_id="doc123", file_path=Path("invoice.pdf")) result_state = agent.process(state) """ return InvoiceAgent(text_extractor_fn, table_extractor_fn, ner_fn, gemini_fn) def run_agent_pipeline(job_id: str, doc_id: str, file_path: Path): """ Replace your existing process_document() with this agentic version. """ from backend.app.api.ingest import ( call_text_extractor, call_table_extractor, call_ner, map_with_gemini, save_extraction, save_invoice_fields, update_job_status ) try: update_job_status(job_id, 'processing') # Create agent agent = create_agent( call_text_extractor, call_table_extractor, call_ner, map_with_gemini ) # Initialize state state = AgentState(doc_id=doc_id, file_path=file_path) # Let agent decide and execute autonomously result_state = agent.process(state) # Save results if result_state.fields: save_extraction( doc_id, result_state.raw_text, result_state.tables or [], result_state.entities or [], { 'method': 'autonomous_agent', 'attempts': result_state.attempts, 'actions': result_state.history, 'confidence': agent._calculate_overall_confidence(result_state) }, None ) save_invoice_fields( doc_id, result_state.fields, result_state.confidence_map or {} ) # Check if needs human review if AgentDecision.HUMAN_REVIEW.value in result_state.history: update_job_status(job_id, 'needs_review') else: update_job_status(job_id, 'completed') logger.info(f"**** Agent completed with {len(result_state.history)} actions") else: update_job_status(job_id, 'failed', 'Agent could not extract fields') except Exception as e: logger.error(f"**** Agent failed: {e}") import traceback traceback.print_exc() update_job_status(job_id, 'failed', str(e))