import os import time import logging from logging.handlers import RotatingFileHandler import re from datetime import datetime from dotenv import load_dotenv from cryptography.fernet import Fernet from simple_salesforce import Salesforce from transformers import pipeline, AutoTokenizer, AutoModelForSeq2SeqLM from sentence_transformers import SentenceTransformer, util from PIL import Image import pytesseract import pandas as pd from docx import Document import PyPDF2 import gradio as gr from pdf2image import convert_from_path import tempfile from pytz import timezone import shutil # Setup logging with rotation log_file = os.path.join(tempfile.gettempdir(), 'app.log') handler = RotatingFileHandler(log_file, maxBytes=10*1024*1024, backupCount=5) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ handler, logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # Check dependencies at startup def check_dependencies(): missing_deps = [] try: import pytesseract, pandas, openpyxl, xlrd, docx, PyPDF2, pdf2image # Check Tesseract try: tesseract_path = shutil.which('tesseract') if tesseract_path: pytesseract.pytesseract.tesseract_cmd = tesseract_path tesseract_version = pytesseract.get_tesseract_version() logger.info(f"Tesseract found at {tesseract_path}, version: {tesseract_version}") else: logger.warning("Tesseract not found in PATH. Install with 'sudo apt install tesseract-ocr'. OCR-dependent files (JPEG, PNG, scanned PDFs) will not be processed.") missing_deps.append("Tesseract") except Exception as e: logger.warning(f"Tesseract unavailable: {str(e)}. Install with 'sudo apt install tesseract-ocr'. OCR-dependent files (JPEG, PNG, scanned PDFs) will not be processed.") missing_deps.append("Tesseract") # Check Poppler try: poppler_path = shutil.which('pdfinfo') if poppler_path: logger.info(f"Poppler found at {poppler_path}") else: logger.warning("Poppler not found in PATH. Install with 'sudo apt install poppler-utils'. Scanned PDFs will fail.") missing_deps.append("Poppler") except Exception as e: logger.warning(f"Poppler unavailable: {str(e)}. Install with 'sudo apt install poppler-utils'. Scanned PDFs will fail.") missing_deps.append("Poppler") logger.info("Required Python packages installed") except ImportError as e: logger.error(f"Missing Python dependency: {str(e)}. Install via pip.") raise ImportError(f"Missing Python dependency: {str(e)}") return missing_deps missing_deps = check_dependencies() # Load environment variables load_dotenv() required_env_vars = [ 'ENCRYPTION_KEY', 'SALESFORCE_USERNAME', 'SALESFORCE_PASSWORD', 'SALESFORCE_SECURITY_TOKEN', 'SALESFORCE_DOMAIN' ] env = {var: os.getenv(var) for var in required_env_vars} if missing := [k for k in required_env_vars if not env[k]]: logger.error(f"Missing environment variables: {', '.join(missing)}") raise ValueError(f"Missing environment variables: {', '.join(missing)}") # Setup encryption try: fernet = Fernet(env['ENCRYPTION_KEY'].encode()) except Exception as e: logger.error(f"Invalid encryption key: {e}") raise ValueError(f"Invalid encryption key: {e}") # Salesforce connection retry def init_salesforce(max_retries=3, delay=3): for attempt in range(max_retries): try: sf = Salesforce( username=env['SALESFORCE_USERNAME'], password=env['SALESFORCE_PASSWORD'], security_token=env['SALESFORCE_SECURITY_TOKEN'], domain=env['SALESFORCE_DOMAIN'], version='58.0' ) logger.info("Connected to Salesforce") return sf except Exception as e: logger.error(f"Salesforce connection attempt {attempt + 1} failed: {str(e)}") if attempt < max_retries - 1: time.sleep(delay) logger.error("Salesforce connection failed after retries") raise ValueError("Salesforce connection failed after retries") # Initialize models def init_models(): try: summarizer = pipeline( "summarization", model=AutoModelForSeq2SeqLM.from_pretrained("t5-base"), tokenizer=AutoTokenizer.from_pretrained("t5-base") ) sentence_model = SentenceTransformer('all-MiniLM-L6-v2') logger.info("NLP models initialized successfully") return summarizer, sentence_model except Exception as e: logger.error(f"Model initialization failed: {str(e)}") raise # Clean text for better processing def clean_text(text): try: if not text: return "" text = re.sub(r'\s+', ' ', text.strip()) # Normalize whitespace text = re.sub(r'[^\x00-\x7F]+', ' ', text) # Remove non-ASCII text = re.sub(r'\b\d+\b(?!\s*,\s*\d{4})', ' ', text) # Remove standalone numbers return text except Exception as e: logger.error(f"Text cleaning failed: {str(e)}") return "" # Validate file readability def validate_file(file_path): try: ext = os.path.splitext(file_path)[1].lower() if ext not in ['.pdf', '.docx', '.png', '.jpg', '.jpeg', '.csv', '.xls', '.xlsx']: return False, f"Unsupported file type: {ext}" if not os.path.exists(file_path): return False, f"File not found: {file_path}" if os.path.getsize(file_path) == 0: return False, f"File is empty: {file_path}" return True, None except Exception as e: logger.error(f"File validation failed for {file_path}: {str(e)}") return False, f"File validation failed: {str(e)}" # Extract text from file def extract_text(file_path): is_valid, error = validate_file(file_path) if not is_valid: logger.error(error) return None, error ext = os.path.splitext(file_path)[1].lower() try: logger.debug(f"Extracting text from {file_path} (type: {ext})") if ext == '.pdf': with open(file_path, 'rb') as f: pdf_reader = PyPDF2.PdfReader(f) text = "".join([p.extract_text() or "" for p in pdf_reader.pages]) if not text or len(text.strip()) < 50: logger.warning(f"PDF text extraction failed or too short, attempting OCR") if 'Tesseract' in missing_deps or 'Poppler' in missing_deps: return None, "OCR unavailable: Tesseract or Poppler not installed. Install with 'sudo apt install tesseract-ocr poppler-utils'." try: images = convert_from_path(file_path) text = "" for i, img in enumerate(images): logger.debug(f"Processing page {i+1} for OCR") img = img.convert('L') # Convert to grayscale img = img.resize((img.width // 2, img.height // 2)) # Optimize size text += pytesseract.image_to_string(img, config='--psm 6') + "\n" except Exception as ocr_err: logger.error(f"OCR failed: {str(ocr_err)}") return None, f"OCR failed for {file_path}: {str(ocr_err)}" elif ext == '.docx': doc = Document(file_path) text = "\n".join([p.text for p in doc.paragraphs if p.text]) for table in doc.tables: for row in table.rows: for cell in row.cells: text += "\n" + cell.text elif ext in ['.png', '.jpg', '.jpeg']: if 'Tesseract' in missing_deps: return None, "OCR unavailable: Tesseract not installed. Install with 'sudo apt install tesseract-ocr'." try: img = Image.open(file_path).convert('L') img = img.resize((img.width // 2, img.height // 2)) # Optimize size text = pytesseract.image_to_string(img, config='--psm 6') except Exception as ocr_err: logger.error(f"OCR failed for {file_path}: {str(ocr_err)}") return None, f"OCR failed for {file_path}: {str(ocr_err)}" elif ext in ['.csv', '.xls', '.xlsx']: try: df = pd.read_excel(file_path) if ext in ['.xls', '.xlsx'] else pd.read_csv(file_path) logger.debug(f"Excel/CSV columns: {df.columns.tolist()}") text = df.to_string(index=False) except Exception as e: logger.error(f"Excel/CSV processing failed for {file_path}: {str(e)}") return None, f"Excel/CSV processing failed: {str(e)}" text = clean_text(text) if not text or len(text) < 50: logger.error(f"Extracted text is empty or too short: {len(text)} characters") return None, f"Text extraction failed: No valid text extracted from {file_path}" logger.debug(f"Extracted text length: {len(text)} characters") return text, None except Exception as e: logger.error(f"Text extraction failed for {file_path}: {str(e)}") return None, f"Text extraction failed: {str(e)}" # Parse dates with IST timezone def parse_dates(text): ist = timezone('Asia/Kolkata') current_date = datetime.now(ist).replace(hour=18, minute=33, second=0, microsecond=0) # 06:33 PM IST, June 26, 2025 try: date_patterns = [ r'\b(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)\s+\d{1,2}(?:-|\s*,?\s*)\d{4}\b', r'\b\d{1,2}/\d{1,2}/\d{4}\b', r'\b\d{4}-\d{2}-\d{2}\b', r'\b(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)\s*\d{4}\b', r'\b\d{4}\b' ] dates = [] unparseable_dates = [] for pattern in date_patterns: found_dates = re.findall(pattern, text, re.IGNORECASE) dates.extend(found_dates) if found_dates: logger.debug(f"Found dates with pattern {pattern}: {found_dates}") parsed_dates = [] for date in dates: try: if '/' in date: parsed = datetime.strptime(date, '%m/%d/%Y').replace(tzinfo=ist) elif '-' in date and len(date.split('-')) == 3: parsed = datetime.strptime(date, '%Y-%m-%d').replace(tzinfo=ist) elif re.match(r'\b\w+\s+\d{4}\b', date): month_year = re.sub(r'\s+', ' ', date.strip()) parsed = datetime.strptime(month_year, '%b %Y').replace(day=1, tzinfo=ist) elif ',' in date: parsed = datetime.strptime(date, '%B %d, %Y').replace(tzinfo=ist) else: parsed = datetime.strptime(date, '%Y').replace(month=1, day=1, tzinfo=ist) parsed_dates.append(parsed.strftime('%Y-%m-%d')) except Exception as e: logger.debug(f"Unparseable date '{date}': {str(e)}") unparseable_dates.append(date) if unparseable_dates: logger.warning(f"Found {len(unparseable_dates)} unparseable dates: {unparseable_dates}") if not parsed_dates: logger.warning("No valid dates extracted, using current date") parsed_dates.append(current_date.strftime('%Y-%m-%d')) while len(parsed_dates) < 2: parsed_dates.append(parsed_dates[0] if parsed_dates else current_date.strftime('%Y-%m-%d')) logger.debug(f"Extracted {len(parsed_dates)} valid dates: {parsed_dates}") return parsed_dates[:2] except Exception as e: logger.error(f"Date parsing failed: {str(e)}") return [current_date.strftime('%Y-%m-%d'), current_date.strftime('%Y-%m-%d')] # Summarize contract def summarize_contract(text, summarizer, sentence_model): aspects = ["parties", "payment terms", "obligations", "termination clauses"] try: if not text or len(text.strip()) < 50: logger.error("Input text is empty or too short") return { "full_summary": "No summary generated due to insufficient text", "aspect_summaries": {asp: "Not extracted" for asp in aspects}, "dates": parse_dates(text) }, None text = clean_text(text)[:4096] try: summary_result = summarizer(f"summarize: {text}", max_length=150, min_length=50, do_sample=False)[0]['summary_text'] if summary_result.strip() == text.strip()[:len(summary_result)]: logger.warning("Summary identical to input, generating fallback") summary_result = f"Summary: {text[:150]}..." if len(text) > 150 else text logger.debug(f"Generated summary: {summary_result[:50]}...") full_summary = summary_result except Exception as e: logger.error(f"Summarizer failed: {str(e)}") full_summary = f"Summary failed: {text[:150]}..." if len(text) > 150 else text aspect_summaries = {} aspect_synonyms = { "parties": ["contractor", "client", "party", "signatory", "entity"], "payment terms": ["payment", "compensation", "fees", "billing", "invoicing"], "obligations": ["duties", "responsibilities", "obligations", "commitments"], "termination clauses": ["termination", "cancellation", "end of contract", "exit"] } if aspects: sentences = [s.strip() for s in re.split(r'[.!?]+', text) if s.strip() and len(s.strip()) > 10] if sentences: logger.debug(f"Extracted {len(sentences)} sentences for aspect summarization") emb = sentence_model.encode(sentences, convert_to_tensor=True) for asp in aspects: asp_texts = [asp] + aspect_synonyms.get(asp, []) asp_embs = sentence_model.encode(asp_texts, convert_to_tensor=True) sims = util.cos_sim(asp_embs, emb).max(dim=0).values top = sims.argsort(descending=True)[:5] asp_text = ". ".join([sentences[i] for i in top if sims[i] > 0.05]) if asp_text: aspect_summaries[asp] = asp_text[:200] logger.debug(f"Aspect '{asp}' matched {len([i for i in top if sims[i] > 0.05])} sentences") else: logger.warning(f"No sentences matched aspect '{asp}'") aspect_summaries[asp] = "Not extracted" else: logger.warning("No valid sentences for aspect summarization") for asp in aspects: aspect_summaries[asp] = "Not extracted" return { "full_summary": full_summary, "aspect_summaries": aspect_summaries, "dates": parse_dates(text) }, None except Exception as e: logger.error(f"Summarization failed: {str(e)}") return { "full_summary": f"Summary generation failed: {text[:150]}..." if len(text) > 150 else text, "aspect_summaries": {asp: "Not extracted" for asp in aspects}, "dates": parse_dates(text) }, None # Create Contract Document record def create_contract_document(sf, file_name, file_url=None): ist = timezone('Asia/Kolkata') current_time = datetime.now(ist).replace(hour=18, minute=33, second=0, microsecond=0) # 06:33 PM IST, June 26, 2025 try: escaped_file_name = file_name.replace("'", "\\'") today_datetime = current_time.strftime('%Y-%m-%dT%H:%M:%SZ') query_datetime = f"SELECT Id, Upload_Date__c FROM Contract_Document__c WHERE Name = '{escaped_file_name}' AND Upload_Date__c = {today_datetime} LIMIT 1" logger.debug(f"Executing SOQL query (dateTime): {query_datetime}") try: result = sf.query(query_datetime) if result['totalSize'] > 0: doc_id = result['records'][0]['Id'] logger.info(f"Contract Document exists for {file_name} on {today_datetime}, ID {doc_id}") return doc_id, None except Exception as e: logger.warning(f"dateTime query failed: {str(e)}. Trying Date format.") today_date = current_time.strftime('%Y-%m-%d') query_date = f"SELECT Id, Upload_Date__c FROM Contract_Document__c WHERE Name = '{escaped_file_name}' AND Upload_Date__c = '{today_date}' LIMIT 1" logger.debug(f"Executing SOQL query (Date): {query_date}") result = sf.query(query_date) if result['totalSize'] > 0: doc_id = result['records'][0]['Id'] logger.info(f"Contract Document exists for {file_name} on {today_date}, ID {doc_id}") return doc_id, None record = { 'Name': file_name, 'Document_URL__c': file_url or '', 'Upload_Date__c': today_datetime, 'Status__c': 'Uploaded' } result = sf.Contract_Document__c.create(record) logger.info(f"Created Contract Document for {file_name} with ID {result['id']}") return result['id'], None except Exception as e: logger.error(f"Failed to create Contract Document for {file_name}: {str(e)}") return None, f"Failed to create Contract Document: {str(e)}" # Store summary in Salesforce def store_in_salesforce(sf, summary_data, file_name, contract_document_id): try: query = f"SELECT Id FROM Contract_Summary__c WHERE Contract_Document__c = '{contract_document_id}' LIMIT 1" logger.debug(f"Executing SOQL query: {query}") result = sf.query(query) if result['totalSize'] > 0: logger.info(f"Summary exists for Contract Document ID {contract_document_id}, ID {result['records'][0]['Id']}") return {'id': result['records'][0]['Id']}, None encrypted_summary = fernet.encrypt(summary_data['full_summary'].encode()).decode() def truncate(text, length=2000): return text[:length] if text else 'Not extracted' record = { 'Name': file_name, 'Contract_Document__c': contract_document_id, 'Parties__c': truncate(summary_data['aspect_summaries'].get('parties', 'Not extracted')), 'Payment_Terms__c': truncate(summary_data['aspect_summaries'].get('payment terms', 'Not extracted')), 'Obligations__c': truncate(summary_data['aspect_summaries'].get('obligations', 'Not extracted')), 'Termination_Clause__c': truncate(summary_data['aspect_summaries'].get('termination clauses', 'Not extracted')), 'Custom_Field_1__c': encrypted_summary, 'Validation_Status__c': 'Pending', 'Start_Date__c': summary_data['dates'][0][:10] if summary_data['dates'] and len(summary_data['dates']) > 0 else None, 'End_Date__c': summary_data['dates'][1][:10] if summary_data['dates'] and len(summary_data['dates']) > 1 else summary_data['dates'][0][:10] if summary_data['dates'] else None, } logger.debug(f"Record to be created: {record}") if not any(record.get(field) not in ['', 'Not extracted'] for field in ['Parties__c', 'Payment_Terms__c', 'Obligations__c', 'Termination_Clause__c']): logger.warning(f"No valid aspects extracted for {file_name}, storing with full summary only") result = sf.Contract_Summary__c.create(record) logger.info(f"Stored summary for {file_name} with ID {result['id']}") return result, None except Exception as e: logger.error(f"Failed to store summary for {file_name}: {str(e)}") return None, f"Failed to store in Salesforce: {str(e)}. Check {log_file}" # Generate CSV report def generate_report(sf, output_file, contract_document_id): try: query = ( f"SELECT Id, Name, Parties__c, Payment_Terms__c, Obligations__c, Termination_Clause__c, Custom_Field_1__c, " f"Validation_Status__c, Start_Date__c, End_Date__c " f"FROM Contract_Summary__c WHERE Contract_Document__c = '{contract_document_id}' LIMIT 1" ) logger.debug(f"Executing SOQL query: {query}") results = sf.query(query)['records'] logger.info(f"Retrieved {len(results)} records for Contract_Document__c ID {contract_document_id}") rows = [] for r in results: try: decrypted_summary = fernet.decrypt(r.get('Custom_Field_1__c', '').encode()).decode() if r.get('Custom_Field_1__c') else 'Not extracted' except Exception as e: logger.error(f"Decryption failed for record {r.get('Id', 'unknown')}: {str(e)}") decrypted_summary = 'Decryption failed' row = { 'Contract_Name': r.get('Name', 'Not extracted'), 'Parties': r.get('Parties__c', 'Not extracted')[:50], 'Payment_Terms': r.get('Payment_Terms__c', 'Not extracted')[:50], 'Obligations': r.get('Obligations__c', 'Not extracted')[:50], 'Termination_Clause': r.get('Termination_Clause__c', 'Not extracted')[:50], 'Full_Summary': decrypted_summary[:100], 'Validation_Status': r.get('Validation_Status__c', 'Not extracted'), 'Start_Date': r.get('Start_Date__c', 'Not extracted'), 'End_Date': r.get('End_Date__c', 'Not extracted'), } rows.append(row) if not rows: logger.warning(f"No summary found for Contract_Document__c ID {contract_document_id}") return pd.DataFrame(columns=['Contract_Name', 'Parties', 'Payment_Terms', 'Obligations', 'Termination_Clause', 'Full_Summary', 'Validation_Status', 'Start_Date', 'End_Date']), None df = pd.DataFrame(rows) logger.info(f"Generated DataFrame with {len(df)} record(s) for {contract_document_id}") df.to_csv(output_file, index=False, encoding='utf-8') logger.info(f"Saved report to {output_file}") return df, output_file except Exception as e: logger.error(f"Report generation failed: {str(e)}") return pd.DataFrame(columns=['Contract_Name', 'Parties', 'Payment_Terms', 'Obligations', 'Termination_Clause', 'Full_Summary', 'Validation_Status', 'Start_Date', 'End_Date']), None # Gradio interface function def gradio_process(file, progress=gr.Progress()): try: if not file: logger.error("No file uploaded") return "Error: No file uploaded.", pd.DataFrame(), None file_path = file.name if hasattr(file, 'name') else file file_name = os.path.basename(file_path) progress(0.1, desc="Validating file...") is_valid, error = validate_file(file_path) if not is_valid: logger.error(error) return f"Error: {error}", pd.DataFrame(), None progress(0.2, desc="Extracting text...") text, error = extract_text(file_path) if error: logger.error(f"Text extraction failed: {error}") return f"Error extracting text from {file_name}: {error}. Check {log_file}", pd.DataFrame(), None progress(0.4, desc="Initializing Salesforce and models...") sf = init_salesforce() summarizer, sentence_model = init_models() progress(0.6, desc="Summarizing contract...") summary_data, err = summarize_contract(text, summarizer, sentence_model) if err: logger.error(f"Summarization failed: {err}") return f"Error summarizing {file_name}: {err}. Check {log_file}", pd.DataFrame(), None progress(0.8, desc="Storing data in Salesforce...") contract_doc_id, err = create_contract_document(sf, file_name) if err: logger.error(f"Contract document creation failed: {err}") return f"Error creating Contract Document for {file_name}: {err}. Check {log_file}", pd.DataFrame(), None result, err = store_in_salesforce(sf, summary_data, file_name, contract_doc_id) if err: logger.error(f"Salesforce storage failed: {err}") return f"Error storing summary for {file_name}: {err}. Check {log_file}", pd.DataFrame(), None progress(0.9, desc="Generating report...") csv_path = os.path.join(tempfile.gettempdir(), f"contract_summary_{file_name}.csv") report_df, csv_path = generate_report(sf, csv_path, contract_doc_id) if report_df.empty: logger.warning(f"No valid report data generated for {file_name}") return f"Success! Summary stored for {file_name} with ID {result['id']}. No report data.", pd.DataFrame(), None progress(1.0, desc="Complete!") return ( f"Success! Summary stored for {file_name} with ID {result['id']}. Report generated.", report_df, csv_path ) except Exception as e: logger.error(f"Processing error for {file_name if 'file_name' in locals() else 'unknown file'}: {str(e)}") return f"Error processing {file_name if 'file_name' in locals() else 'file'}: {str(e)}. Check {log_file}", pd.DataFrame(), None # Gradio UI setup with gr.Blocks(title="AI-Powered Contract Summarizer with Salesforce Integration") as iface: gr.Markdown("AI Contract Summarizer") with gr.Row(): file_input = gr.File(label="Upload Contract File (PDF, DOCX, PNG, JPG, CSV, XLS/XLSX)") submit_btn = gr.Button("Submit", elem_classes=["bg-orange-500"]) result_output = gr.Textbox(label="Result", lines=5) report_output = gr.DataFrame(label="Contract Summary Report", headers=['Contract_Name', 'Parties', 'Payment_Terms', 'Obligations', 'Termination_Clause', 'Full_Summary', 'Validation_Status', 'Start_Date', 'End_Date'], interactive=False) csv_output = gr.File(label="Download CSV Report") submit_btn.click( fn=gradio_process, inputs=[file_input], outputs=[result_output, report_output, csv_output] ) if __name__ == "__main__": logger.info(f"Starting Gradio interface. Logs saved to {log_file}") if missing_deps: logger.warning(f"Application running with limited functionality due to missing dependencies: {', '.join(missing_deps)}") iface.launch()