import re import logging import numpy as np from io import BytesIO from datetime import datetime from typing import List, Dict, Optional from flask import Flask, request, jsonify from PyPDF2 import PdfReader from sentence_transformers import SentenceTransformer from simple_salesforce import Salesforce import torch # Initialize Flask app app = Flask(__name__) logging.basicConfig(level=logging.INFO) class DocumentProcessor: def __init__(self): # Verify numpy is properly installed self._verify_numpy() # Load lightweight sentence transformer model self.model = SentenceTransformer('all-MiniLM-L6-v2', device='cpu') # Define compliance criteria (customize these) self.compliance_requirements = { 'insurance': [ "proof of insurance coverage", "liability limits documentation", "policy effective dates" ], 'financial': [ "audited financial statements", "tax identification number", "bank references" ], 'certifications': [ "industry certifications", "safety compliance", "quality standards" ] } # Pre-compute requirement embeddings with error handling self.requirement_embeddings = {} for category, requirements in self.compliance_requirements.items(): try: embeddings = self.model.encode(requirements, convert_to_numpy=True) self.requirement_embeddings[category] = embeddings except Exception as e: logging.error(f"Error encoding requirements for {category}: {str(e)}") raise def _verify_numpy(self): """Verify numpy is working properly""" try: test_array = np.array([1, 2, 3]) assert test_array.sum() == 6 except Exception as e: logging.error(f"NumPy verification failed: {str(e)}") raise RuntimeError("NumPy is not functioning properly") from e def extract_text(self, pdf_bytes: bytes) -> str: """Extract text from PDF document""" try: with BytesIO(pdf_bytes) as pdf_file: reader = PdfReader(pdf_file) text = " ".join(page.extract_text() or "" for page in reader.pages) return text.strip() except Exception as e: logging.error(f"PDF extraction error: {str(e)}") raise RuntimeError("Failed to extract text from PDF") from e def score_document(self, text: str) -> Dict: """Score document against compliance requirements""" if not text: return {'error': 'Empty document text', 'score': 0, 'categories': {}} try: # Split document into meaningful chunks (not just sentences) chunks = self._split_into_chunks(text) chunk_embeddings = self.model.encode(chunks, convert_to_numpy=True) results = {'categories': {}, 'score': 0} total_matches = 0 total_possible = 0 for category, req_embeddings in self.requirement_embeddings.items(): # Calculate similarity between document chunks and requirements similarity_matrix = np.inner(chunk_embeddings, req_embeddings) max_similarities = np.max(similarity_matrix, axis=0) # Count matches above threshold matches = (max_similarities > 0.65).sum() coverage = matches / len(req_embeddings) results['categories'][category] = { 'coverage': float(coverage), # Convert numpy float to Python float 'matched_requirements': [ self.compliance_requirements[category][i] for i, score in enumerate(max_similarities) if score > 0.65 ], 'missing_requirements': [ self.compliance_requirements[category][i] for i, score in enumerate(max_similarities) if score <= 0.65 ] } total_matches += matches total_possible += len(req_embeddings) # Calculate overall score (0-5 scale) if total_possible > 0: results['score'] = min(5.0, round(5 * total_matches / total_possible, 1)) return results except Exception as e: logging.error(f"Scoring error: {str(e)}") return {'error': str(e), 'score': 0, 'categories': {}} def _split_into_chunks(self, text: str, chunk_size: int = 500) -> List[str]: """Split text into meaningful chunks of approximately chunk_size characters""" words = text.split() chunks = [] current_chunk = [] current_length = 0 for word in words: if current_length + len(word) + 1 > chunk_size and current_chunk: chunks.append(" ".join(current_chunk)) current_chunk = [] current_length = 0 current_chunk.append(word) current_length += len(word) + 1 if current_chunk: chunks.append(" ".join(current_chunk)) return chunks class SalesforceHandler: def __init__(self): try: self.sf = Salesforce( username='your_username', password='your_password', security_token='your_token', domain='login' # or 'test' for sandbox ) except Exception as e: logging.error(f"Salesforce connection error: {str(e)}") raise def create_scorecard(self, vendor_id: str, results: Dict) -> Dict: """Create vendor scorecard in Salesforce""" try: record = { 'Vendor_Name__c': vendor_id, 'Score__c': results.get('score', 0), 'Evaluation_Date__c': datetime.now().isoformat(), 'Status__c': 'Evaluated', 'Details__c': self._format_details(results), 'Error__c': results.get('error', '') } response = self.sf.Vendor_Scorecard__c.create(record) return {'success': True, 'id': response['id']} except Exception as e: logging.error(f"Salesforce create error: {str(e)}") return {'success': False, 'error': str(e)} def _format_details(self, results: Dict) -> str: """Format evaluation details for Salesforce""" if 'error' in results: return f"Error: {results['error']}" details = [] for category, data in results.get('categories', {}).items(): details.append( f"{category.upper()}:\n" f"Coverage: {data.get('coverage', 0):.0%}\n" f"Matched: {', '.join(data.get('matched_requirements', ['None']))}\n" f"Missing: {', '.join(data.get('missing_requirements', ['None']))}\n" ) return "\n".join(details) if details else "No evaluation details available" # Initialize components with error handling try: processor = DocumentProcessor() sf_handler = SalesforceHandler() except Exception as e: logging.error(f"Initialization failed: {str(e)}") processor = None sf_handler = None @app.route('/api/evaluate', methods=['POST']) def evaluate_document(): """API endpoint for document evaluation""" if not processor or not sf_handler: return jsonify({'error': 'Service initialization failed'}), 500 if 'file' not in request.files: return jsonify({'error': 'No file provided'}), 400 vendor_id = request.form.get('vendor_id', 'UNKNOWN') file = request.files['file'] try: # Process document text = processor.extract_text(file.read()) results = processor.score_document(text) # Save to Salesforce sf_result = sf_handler.create_scorecard(vendor_id, results) if not sf_result['success']: return jsonify({ 'error': f"Salesforce error: {sf_result.get('error', 'Unknown error')}", 'results': results }), 500 return jsonify({ 'success': True, 'score': results.get('score', 0), 'salesforce_id': sf_result.get('id'), 'evaluation': results.get('categories', {}), 'error': results.get('error', '') }) except Exception as e: logging.error(f"Processing error: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/health', methods=['GET']) def health_check(): """Health check endpoint""" status = { 'status': 'healthy' if processor and sf_handler else 'unhealthy', 'torch_available': torch.cuda.is_available() if torch else False, 'numpy_version': np.__version__, 'numpy_working': False } try: test_array = np.array([1, 2, 3]) status['numpy_working'] = test_array.sum() == 6 except Exception as e: logging.error(f"Health check numpy test failed: {str(e)}") return jsonify(status) if __name__ == '__main__': app.run(host='0.0.0.0', port=5000)