|
|
import re |
|
|
import logging |
|
|
import numpy as np |
|
|
from io import BytesIO |
|
|
from datetime import datetime |
|
|
from typing import List, Dict, Optional |
|
|
from flask import Flask, request, jsonify |
|
|
from PyPDF2 import PdfReader |
|
|
from sentence_transformers import SentenceTransformer |
|
|
from simple_salesforce import Salesforce |
|
|
import torch |
|
|
|
|
|
|
|
|
app = Flask(__name__) |
|
|
logging.basicConfig(level=logging.INFO) |
|
|
|
|
|
class DocumentProcessor: |
|
|
def __init__(self): |
|
|
|
|
|
self._verify_numpy() |
|
|
|
|
|
|
|
|
self.model = SentenceTransformer('all-MiniLM-L6-v2', device='cpu') |
|
|
|
|
|
|
|
|
self.compliance_requirements = { |
|
|
'insurance': [ |
|
|
"proof of insurance coverage", |
|
|
"liability limits documentation", |
|
|
"policy effective dates" |
|
|
], |
|
|
'financial': [ |
|
|
"audited financial statements", |
|
|
"tax identification number", |
|
|
"bank references" |
|
|
], |
|
|
'certifications': [ |
|
|
"industry certifications", |
|
|
"safety compliance", |
|
|
"quality standards" |
|
|
] |
|
|
} |
|
|
|
|
|
|
|
|
self.requirement_embeddings = {} |
|
|
for category, requirements in self.compliance_requirements.items(): |
|
|
try: |
|
|
embeddings = self.model.encode(requirements, convert_to_numpy=True) |
|
|
self.requirement_embeddings[category] = embeddings |
|
|
except Exception as e: |
|
|
logging.error(f"Error encoding requirements for {category}: {str(e)}") |
|
|
raise |
|
|
|
|
|
def _verify_numpy(self): |
|
|
"""Verify numpy is working properly""" |
|
|
try: |
|
|
test_array = np.array([1, 2, 3]) |
|
|
assert test_array.sum() == 6 |
|
|
except Exception as e: |
|
|
logging.error(f"NumPy verification failed: {str(e)}") |
|
|
raise RuntimeError("NumPy is not functioning properly") from e |
|
|
|
|
|
def extract_text(self, pdf_bytes: bytes) -> str: |
|
|
"""Extract text from PDF document""" |
|
|
try: |
|
|
with BytesIO(pdf_bytes) as pdf_file: |
|
|
reader = PdfReader(pdf_file) |
|
|
text = " ".join(page.extract_text() or "" for page in reader.pages) |
|
|
return text.strip() |
|
|
except Exception as e: |
|
|
logging.error(f"PDF extraction error: {str(e)}") |
|
|
raise RuntimeError("Failed to extract text from PDF") from e |
|
|
|
|
|
def score_document(self, text: str) -> Dict: |
|
|
"""Score document against compliance requirements""" |
|
|
if not text: |
|
|
return {'error': 'Empty document text', 'score': 0, 'categories': {}} |
|
|
|
|
|
try: |
|
|
|
|
|
chunks = self._split_into_chunks(text) |
|
|
chunk_embeddings = self.model.encode(chunks, convert_to_numpy=True) |
|
|
|
|
|
results = {'categories': {}, 'score': 0} |
|
|
total_matches = 0 |
|
|
total_possible = 0 |
|
|
|
|
|
for category, req_embeddings in self.requirement_embeddings.items(): |
|
|
|
|
|
similarity_matrix = np.inner(chunk_embeddings, req_embeddings) |
|
|
max_similarities = np.max(similarity_matrix, axis=0) |
|
|
|
|
|
|
|
|
matches = (max_similarities > 0.65).sum() |
|
|
coverage = matches / len(req_embeddings) |
|
|
|
|
|
results['categories'][category] = { |
|
|
'coverage': float(coverage), |
|
|
'matched_requirements': [ |
|
|
self.compliance_requirements[category][i] |
|
|
for i, score in enumerate(max_similarities) |
|
|
if score > 0.65 |
|
|
], |
|
|
'missing_requirements': [ |
|
|
self.compliance_requirements[category][i] |
|
|
for i, score in enumerate(max_similarities) |
|
|
if score <= 0.65 |
|
|
] |
|
|
} |
|
|
total_matches += matches |
|
|
total_possible += len(req_embeddings) |
|
|
|
|
|
|
|
|
if total_possible > 0: |
|
|
results['score'] = min(5.0, round(5 * total_matches / total_possible, 1)) |
|
|
return results |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"Scoring error: {str(e)}") |
|
|
return {'error': str(e), 'score': 0, 'categories': {}} |
|
|
|
|
|
def _split_into_chunks(self, text: str, chunk_size: int = 500) -> List[str]: |
|
|
"""Split text into meaningful chunks of approximately chunk_size characters""" |
|
|
words = text.split() |
|
|
chunks = [] |
|
|
current_chunk = [] |
|
|
current_length = 0 |
|
|
|
|
|
for word in words: |
|
|
if current_length + len(word) + 1 > chunk_size and current_chunk: |
|
|
chunks.append(" ".join(current_chunk)) |
|
|
current_chunk = [] |
|
|
current_length = 0 |
|
|
current_chunk.append(word) |
|
|
current_length += len(word) + 1 |
|
|
|
|
|
if current_chunk: |
|
|
chunks.append(" ".join(current_chunk)) |
|
|
|
|
|
return chunks |
|
|
|
|
|
class SalesforceHandler: |
|
|
def __init__(self): |
|
|
try: |
|
|
self.sf = Salesforce( |
|
|
username='your_username', |
|
|
password='your_password', |
|
|
security_token='your_token', |
|
|
domain='login' |
|
|
) |
|
|
except Exception as e: |
|
|
logging.error(f"Salesforce connection error: {str(e)}") |
|
|
raise |
|
|
|
|
|
def create_scorecard(self, vendor_id: str, results: Dict) -> Dict: |
|
|
"""Create vendor scorecard in Salesforce""" |
|
|
try: |
|
|
record = { |
|
|
'Vendor_Name__c': vendor_id, |
|
|
'Score__c': results.get('score', 0), |
|
|
'Evaluation_Date__c': datetime.now().isoformat(), |
|
|
'Status__c': 'Evaluated', |
|
|
'Details__c': self._format_details(results), |
|
|
'Error__c': results.get('error', '') |
|
|
} |
|
|
|
|
|
response = self.sf.Vendor_Scorecard__c.create(record) |
|
|
return {'success': True, 'id': response['id']} |
|
|
except Exception as e: |
|
|
logging.error(f"Salesforce create error: {str(e)}") |
|
|
return {'success': False, 'error': str(e)} |
|
|
|
|
|
def _format_details(self, results: Dict) -> str: |
|
|
"""Format evaluation details for Salesforce""" |
|
|
if 'error' in results: |
|
|
return f"Error: {results['error']}" |
|
|
|
|
|
details = [] |
|
|
for category, data in results.get('categories', {}).items(): |
|
|
details.append( |
|
|
f"{category.upper()}:\n" |
|
|
f"Coverage: {data.get('coverage', 0):.0%}\n" |
|
|
f"Matched: {', '.join(data.get('matched_requirements', ['None']))}\n" |
|
|
f"Missing: {', '.join(data.get('missing_requirements', ['None']))}\n" |
|
|
) |
|
|
return "\n".join(details) if details else "No evaluation details available" |
|
|
|
|
|
|
|
|
try: |
|
|
processor = DocumentProcessor() |
|
|
sf_handler = SalesforceHandler() |
|
|
except Exception as e: |
|
|
logging.error(f"Initialization failed: {str(e)}") |
|
|
processor = None |
|
|
sf_handler = None |
|
|
|
|
|
@app.route('/api/evaluate', methods=['POST']) |
|
|
def evaluate_document(): |
|
|
"""API endpoint for document evaluation""" |
|
|
if not processor or not sf_handler: |
|
|
return jsonify({'error': 'Service initialization failed'}), 500 |
|
|
|
|
|
if 'file' not in request.files: |
|
|
return jsonify({'error': 'No file provided'}), 400 |
|
|
|
|
|
vendor_id = request.form.get('vendor_id', 'UNKNOWN') |
|
|
file = request.files['file'] |
|
|
|
|
|
try: |
|
|
|
|
|
text = processor.extract_text(file.read()) |
|
|
results = processor.score_document(text) |
|
|
|
|
|
|
|
|
sf_result = sf_handler.create_scorecard(vendor_id, results) |
|
|
|
|
|
if not sf_result['success']: |
|
|
return jsonify({ |
|
|
'error': f"Salesforce error: {sf_result.get('error', 'Unknown error')}", |
|
|
'results': results |
|
|
}), 500 |
|
|
|
|
|
return jsonify({ |
|
|
'success': True, |
|
|
'score': results.get('score', 0), |
|
|
'salesforce_id': sf_result.get('id'), |
|
|
'evaluation': results.get('categories', {}), |
|
|
'error': results.get('error', '') |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
logging.error(f"Processing error: {str(e)}") |
|
|
return jsonify({'error': str(e)}), 500 |
|
|
|
|
|
@app.route('/health', methods=['GET']) |
|
|
def health_check(): |
|
|
"""Health check endpoint""" |
|
|
status = { |
|
|
'status': 'healthy' if processor and sf_handler else 'unhealthy', |
|
|
'torch_available': torch.cuda.is_available() if torch else False, |
|
|
'numpy_version': np.__version__, |
|
|
'numpy_working': False |
|
|
} |
|
|
|
|
|
try: |
|
|
test_array = np.array([1, 2, 3]) |
|
|
status['numpy_working'] = test_array.sum() == 6 |
|
|
except Exception as e: |
|
|
logging.error(f"Health check numpy test failed: {str(e)}") |
|
|
|
|
|
return jsonify(status) |
|
|
|
|
|
if __name__ == '__main__': |
|
|
app.run(host='0.0.0.0', port=5000) |