| import os |
| import time |
| import logging |
| import re |
| from datetime import datetime, timedelta |
| from dotenv import load_dotenv |
| from cryptography.fernet import Fernet |
| from simple_salesforce import Salesforce |
| from transformers import pipeline |
| from PIL import Image |
| import pytesseract |
| import pandas as pd |
| from docx import Document |
| import PyPDF2 |
| import gradio as gr |
| from pdf2image import convert_from_path |
| import tempfile |
| from pytz import timezone |
| import shutil |
| import unicodedata |
| import asyncio |
| import torch |
|
|
| |
| _sf = None |
| _summarizer = None |
| _fernet = None |
| _lock = asyncio.Lock() |
|
|
| |
| log_file = os.path.join(tempfile.gettempdir(), 'app.log') |
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(asctime)s - %(levelname)s - %(message)s', |
| handlers=[logging.FileHandler(log_file)] |
| ) |
| logger = logging.getLogger(__name__) |
|
|
| |
| def check_dependencies(): |
| try: |
| tesseract_path = shutil.which('tesseract') |
| if not tesseract_path: |
| logger.warning("Tesseract not found. OCR unavailable.") |
| return ["Tesseract"], [] |
| pytesseract.pytesseract.tesseract_cmd = tesseract_path |
| poppler_path = shutil.which('pdfinfo') |
| if not poppler_path: |
| logger.warning("Poppler not found.") |
| return ["Poppler"], [] |
| return [], [] |
| except Exception as e: |
| logger.error(f"Dependency check failed: {str(e)}") |
| return ["Tesseract", "Poppler"], [] |
|
|
| |
| load_dotenv() |
| required_env_vars = [ |
| 'ENCRYPTION_KEY', 'SALESFORCE_USERNAME', 'SALESFORCE_PASSWORD', |
| 'SALESFORCE_SECURITY_TOKEN', 'SALESFORCE_DOMAIN' |
| ] |
| env = {var: os.getenv(var) for var in required_env_vars} |
| if missing := [k for k in required_env_vars if not env[k]]: |
| raise ValueError(f"Missing env vars: {', '.join(missing)}") |
|
|
| |
| try: |
| _fernet = Fernet(env['ENCRYPTION_KEY'].encode()) |
| except Exception as e: |
| raise ValueError(f"Invalid encryption key: {e}") |
|
|
| |
| async def init_salesforce(max_retries=2, initial_delay=1): |
| global _sf |
| async with _lock: |
| if _sf is not None: |
| return _sf |
| for attempt in range(max_retries): |
| try: |
| _sf = await asyncio.get_event_loop().run_in_executor( |
| None, |
| lambda: Salesforce( |
| username=env['SALESFORCE_USERNAME'], |
| password=env['SALESFORCE_PASSWORD'], |
| security_token=env['SALESFORCE_SECURITY_TOKEN'], |
| domain=env['SALESFORCE_DOMAIN'], |
| version='58.0' |
| ) |
| ) |
| logger.info("Salesforce connection established") |
| return _sf |
| except Exception as e: |
| logger.error(f"Salesforce connection attempt {attempt + 1} failed: {str(e)}") |
| if attempt < max_retries - 1: |
| await asyncio.sleep(initial_delay * (2 ** attempt)) |
| raise ValueError("Salesforce connection failed after retries") |
|
|
| |
| async def init_models(): |
| global _summarizer |
| async with _lock: |
| if _summarizer is None: |
| try: |
| _summarizer = pipeline( |
| "summarization", |
| model="t5-small", |
| tokenizer="t5-small", |
| framework="pt", |
| device=0 if torch.cuda.is_available() else -1 |
| ) |
| except Exception as e: |
| logger.error(f"Summarizer init failed: {str(e)}") |
| raise |
| return _summarizer |
|
|
| |
| def preprocess_image(image): |
| try: |
| return image.convert('L').resize((image.width * 2, image.height * 2), Image.LANCZOS) |
| except Exception as e: |
| logger.error(f"Image preprocess failed: {str(e)}") |
| return image.convert('L') |
|
|
| |
| def clean_text(text): |
| try: |
| if not text: |
| return "" |
| text = unicodedata.normalize('NFKC', text) |
| text = re.sub(r'\s+', ' ', text.strip()) |
| return text[:512] |
| except Exception as e: |
| logger.error(f"Text cleaning failed: {str(e)}") |
| return "" |
|
|
| |
| def validate_file(file_path): |
| ext = os.path.splitext(file_path)[1].lower() |
| if ext not in ['.pdf', '.docx', '.png', '.jpg', '.jpeg', '.csv', '.xls', '.xlsx']: |
| return False, f"Unsupported file type: {ext}" |
| if not os.path.exists(file_path) or os.path.getsize(file_path) == 0: |
| return False, f"File not found or empty: {file_path}" |
| return True, None |
|
|
| |
| async def extract_text_async(file_path): |
| is_valid, error = validate_file(file_path) |
| if not is_valid: |
| return None, error |
| ext = os.path.splitext(file_path)[1].lower() |
| try: |
| if ext == '.pdf': |
| with open(file_path, 'rb') as f: |
| pdf_reader = PyPDF2.PdfReader(f) |
| text = "".join([p.extract_text() or "" for p in pdf_reader.pages[:2]]) |
| if not text or len(text.strip()) < 50: |
| images = convert_from_path(file_path, dpi=150, first_page=1, last_page=2, thread_count=4) |
| text = "".join(pytesseract.image_to_string(preprocess_image(img), config='--psm 6') for img in images) |
| logger.info(f"Extracted text: {text[:100]}...") |
| elif ext == '.docx': |
| doc = Document(file_path) |
| text = "\n".join([p.text for p in doc.paragraphs if p.text.strip()][:50]) |
| elif ext in ['.png', '.jpg', '.jpeg']: |
| img = Image.open(file_path) |
| img = preprocess_image(img) |
| text = pytesseract.image_to_string(img, config='--psm 6') |
| elif ext in ['.csv', '.xls', '.xlsx']: |
| df = pd.read_csv(file_path) if ext == '.csv' else pd.read_excel(file_path) |
| text = " ".join(df.astype(str).values.flatten())[:1000] |
| text = clean_text(text) |
| if not text or len(text) < 50: |
| return None, f"No valid text extracted from {file_path}" |
| return text, None |
| except Exception as e: |
| logger.error(f"Text extraction failed: {str(e)}") |
| return None, f"Text extraction failed: {str(e)}" |
|
|
| |
| def parse_dates(text): |
| ist = timezone('Asia/Kolkata') |
| current_date = datetime.now(ist).strftime('%Y-%m-%d') |
| try: |
| date_patterns = [ |
| r'\b\d{4}-\d{2}-\d{2}\b', |
| r'\b\d{1,2}/\d{1,2}/\d{4}\b', |
| r'\b\d{1,2}\s+(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{4}\b', |
| ] |
| term_patterns = [ |
| r'(?:term|duration|period)\s*(?:of|for)\s*(\d+)\s*(?:year|years)', |
| r'(?:end|ending|expires|expiration)\s*(?:on|at)?\s*(\d{4}-\d{2}-\d{2}|\d{1,2}/\d{1,2}/\d{4}|\d{1,2}\s+(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{4})' |
| ] |
| |
| dates = [] |
| for pattern in date_patterns: |
| dates.extend(re.findall(pattern, text, re.IGNORECASE)) |
| |
| parsed_dates = [] |
| for date in dates: |
| try: |
| if '/' in date: |
| parsed = datetime.strptime(date, '%m/%d/%Y').strftime('%Y-%m-%d') |
| elif '-' in date: |
| parsed = datetime.strptime(date, '%Y-%m-%d').strftime('%Y-%m-%d') |
| else: |
| parsed = datetime.strptime(date, '%d %B %Y').strftime('%Y-%m-%d') |
| parsed_dates.append(parsed) |
| except: |
| continue |
| |
| term_years = None |
| explicit_end_date = None |
| for pattern in term_patterns: |
| matches = re.findall(pattern, text, re.IGNORECASE) |
| for match in matches: |
| if match.isdigit(): |
| term_years = int(match) |
| else: |
| try: |
| if '/' in match: |
| explicit_end_date = datetime.strptime(match, '%m/%d/%Y').strftime('%Y-%m-%d') |
| elif '-' in match: |
| explicit_end_date = datetime.strptime(match, '%Y-%m-%d').strftime('%Y-%m-%d') |
| else: |
| explicit_end_date = datetime.strptime(match, '%d %B %Y').strftime('%Y-%m-%d') |
| except: |
| continue |
| |
| parsed_dates = sorted(set(parsed_dates)) |
| start_date = parsed_dates[0] if parsed_dates else current_date |
| |
| if explicit_end_date: |
| end_date = explicit_end_date |
| elif term_years: |
| start_dt = datetime.strptime(start_date, '%Y-%m-%d') |
| end_date = (start_dt + timedelta(days=term_years * 365)).strftime('%Y-%m-%d') |
| elif len(parsed_dates) > 1: |
| end_date = parsed_dates[-1] |
| else: |
| start_dt = datetime.strptime(start_date, '%Y-%m-%d') |
| end_date = (start_dt + timedelta(days=365)).strftime('%Y-%m-%d') |
| |
| logger.info(f"Parsed dates - Start: {start_date}, End: {end_date}") |
| return start_date, end_date |
| except Exception as e: |
| logger.error(f"Date parsing failed: {str(e)}") |
| return current_date, current_date |
|
|
| |
| async def summarize_contract_async(text, summarizer, file_name): |
| aspects = ["parties", "payment terms", "obligations", "termination clauses"] |
| try: |
| if not text or len(text.strip()) < 50: |
| ist = timezone('Asia/Kolkata') |
| current_date = datetime.now(ist).strftime('%Y-%m-%d') |
| return { |
| "full_summary": "No summary due to insufficient text", |
| "aspect_summaries": {asp: "Not extracted" for asp in aspects}, |
| "start_date": current_date, |
| "end_date": current_date |
| }, None |
| text = clean_text(text)[:512] |
| try: |
| prompt = f"summarize: Create a concise agreement summary including parties and obligations: {text}" |
| summary_result = summarizer( |
| prompt, |
| max_length=80, |
| min_length=30, |
| do_sample=False, |
| num_beams=4 |
| )[0]['summary_text'] |
| full_summary = clean_text(summary_result) |
| logger.info(f"Generated summary: {full_summary}") |
| except Exception as e: |
| logger.error(f"Summarizer failed: {str(e)}") |
| full_summary = text[:60] + "..." if len(text) > 60 else text |
| |
| aspect_summaries = {} |
| for asp in aspects: |
| if asp == "parties": |
| match = re.search(r'(?:parties|between)\s+([A-Za-z\s&]+?)(?:\sand|\,|\.)', text, re.IGNORECASE) |
| aspect_summaries[asp] = match.group(1).strip()[:100] if match else "Not extracted" |
| elif asp == "payment terms": |
| match = re.search(r'(?:payment|terms)\s+([\d,.]+\s*(?:EUR|USD|INR)\s*(?:monthly|annually|quarterly))', text, re.IGNORECASE) |
| aspect_summaries[asp] = match.group(1)[:100] if match else "Not extracted" |
| elif asp == "obligations": |
| match = re.search(r'(?:obligations|services|duties)\s+(.+?)(?:\by|\,|\.)', text, re.IGNORECASE) |
| aspect_summaries[asp] = match.group(1).strip()[:100] if match else "Not extracted" |
| elif asp == "termination clauses": |
| match = re.search(r'(?:termination|notice)\s+(\d+\s*days\'?\s*notice)', text, re.IGNORECASE) |
| aspect_summaries[asp] = match.group(1)[:100] if match else "Not extracted" |
| |
| parties = aspect_summaries.get("parties", "Not extracted") |
| obligations = aspect_summaries.get("obligations", "Not extracted") |
| if parties != "Not extracted" and obligations != "Not extracted": |
| full_summary = f"Logistics agreement between {parties} for {obligations}..." |
| else: |
| full_summary = full_summary if full_summary else text[:60] + "..." |
| logger.info(f"Final summary: {full_summary}") |
| |
| start_date, end_date = parse_dates(text) |
| return { |
| "full_summary": full_summary, |
| "aspect_summaries": aspect_summaries, |
| "start_date": start_date, |
| "end_date": end_date |
| }, None |
| except Exception as e: |
| logger.error(f"Summarization failed: {str(e)}") |
| ist = timezone('Asia/Kolkata') |
| current_date = datetime.now(ist).strftime('%Y-%m-%d') |
| return { |
| "full_summary": text[:60] + "..." if len(text) > 60 else text, |
| "aspect_summaries": {asp: "Not extracted" for asp in aspects}, |
| "start_date": current_date, |
| "end_date": current_date |
| }, None |
|
|
| |
| async def create_contract_document(sf, file_name): |
| ist = timezone('Asia/Kolkata') |
| current_time = datetime.now(ist).strftime('%Y-%m-%dT%H:%M:%SZ') |
| try: |
| escaped_file_name = file_name.replace("'", "\\'") |
| query = f"SELECT Id FROM Contract_Document__c WHERE Name = '{escaped_file_name}' LIMIT 1" |
| result = await asyncio.get_event_loop().run_in_executor(None, sf.query, query) |
| if result['totalSize'] > 0: |
| return result['records'][0]['Id'], None |
| record = { |
| 'Name': file_name, |
| 'Document_URL__c': '', |
| 'Upload_Date__c': current_time, |
| 'Status__c': 'Uploaded' |
| } |
| result = await asyncio.get_event_loop().run_in_executor(None, sf.Contract_Document__c.create, record) |
| return result['id'], None |
| except Exception as e: |
| logger.error(f"Contract document creation failed: {str(e)}") |
| return None, f"Contract document creation failed: {str(e)}" |
|
|
| |
| async def store_in_salesforce(sf, summary_data, file_name, contract_doc_id): |
| try: |
| if not contract_doc_id: |
| return None, "Contract document ID is missing" |
| query = f"SELECT Id FROM Contract_Summary__c WHERE Contract_Document__c = '{contract_doc_id}' LIMIT 1" |
| result = await asyncio.get_event_loop().run_in_executor(None, sf.query, query) |
| if result['totalSize'] > 0: |
| return {'id': result['records'][0]['Id']}, None |
| encrypted_summary = _fernet.encrypt(summary_data['full_summary'].encode()).decode() |
| def truncate(text, length=100): |
| return text[:length] if text else 'Not extracted' |
| record = { |
| 'Name': file_name, |
| 'Contract_Document__c': contract_doc_id, |
| 'Parties__c': truncate(summary_data['aspect_summaries'].get('parties', 'Not extracted')), |
| 'Payment_Terms__c': truncate(summary_data['aspect_summaries'].get('payment terms', 'Not extracted')), |
| 'Obligations__c': truncate(summary_data['aspect_summaries'].get('obligations', 'Not extracted')), |
| 'Termination_Clause__c': truncate(summary_data['aspect_summaries'].get('termination clauses', 'Not extracted')), |
| 'Custom_Field_1__c': encrypted_summary, |
| 'Validation_Status__c': 'Pending', |
| 'Start_Date__c': summary_data['start_date'][:10], |
| 'End_Date__c': summary_data['end_date'][:10], |
| } |
| result = await asyncio.get_event_loop().run_in_executor(None, sf.Contract_Summary__c.create, record) |
| return result, None |
| except Exception as e: |
| logger.error(f"Store summary failed: {str(e)}") |
| return None, f"Store summary failed: {str(e)}" |
|
|
| |
| async def generate_report(sf, output_file, contract_doc_id): |
| try: |
| if not contract_doc_id: |
| return pd.DataFrame(columns=['Field', 'Value']), "Contract document ID is missing" |
| query = ( |
| f"SELECT Id, Name, Parties__c, Payment_Terms__c, Obligations__c, Termination_Clause__c, Custom_Field_1__c, " |
| f"Validation_Status__c, Start_Date__c, End_Date__c " |
| f"FROM Contract_Summary__c WHERE Contract_Document__c = '{contract_doc_id}' LIMIT 1" |
| ) |
| results = (await asyncio.get_event_loop().run_in_executor(None, sf.query, query))['records'] |
| rows = [] |
| for r in results: |
| decrypted_summary = _fernet.decrypt(r.get('Custom_Field_1__c', '').encode()).decode() if r.get('Custom_Field_1__c') else 'Not extracted' |
| fields = [ |
| ('Contract Name', r.get('Name', 'Not extracted')), |
| ('Parties', r.get('Parties__c', 'Not extracted')[:100]), |
| ('Payment Terms', r.get('Payment_Terms__c', 'Not extracted')[:100]), |
| ('Obligations', r.get('Obligations__c', 'Not extracted')[:100]), |
| ('Termination Clause', r.get('Termination_Clause__c', 'Not extracted')[:100]), |
| ('Full Summary', decrypted_summary[:100]), |
| ('Validation Status', r.get('Validation_Status__c', 'Not extracted')), |
| ('Start Date', r.get('Start_Date__c', 'Not extracted')), |
| ('End Date', r.get('End_Date__c', 'Not extracted')), |
| ] |
| rows.extend(fields) |
| df = pd.DataFrame(rows, columns=['Field', 'Value']) if rows else pd.DataFrame(columns=['Field', 'Value']) |
| df.to_csv(output_file, index=False, encoding='utf-8') |
| return df, output_file |
| except Exception as e: |
| logger.error(f"Report generation failed: {str(e)}") |
| return pd.DataFrame(columns=['Field', 'Value']), f"Report generation failed: {str(e)}" |
|
|
| |
| async def gradio_process_async(file, progress=gr.Progress()): |
| try: |
| if not file: |
| return "No file uploaded.", pd.DataFrame(columns=['Field', 'Value']), None |
| file_path = file.name if hasattr(file, 'name') else file |
| file_name = os.path.basename(file_path) |
| progress(0.1, desc="Validating...") |
| is_valid, error = validate_file(file_path) |
| if not is_valid: |
| return f"Error: {error}", pd.DataFrame(columns=['Field', 'Value']), None |
| progress(0.2, desc="Extracting text...") |
| text, error = await extract_text_async(file_path) |
| if error: |
| return f"Error extracting text: {error}", pd.DataFrame(columns=['Field', 'Value']), None |
| progress(0.4, desc="Initializing...") |
| sf = await init_salesforce() |
| summarizer = await init_models() |
| progress(0.6, desc="Summarizing...") |
| summary_data, err = await summarize_contract_async(text, summarizer, file_name) |
| if err: |
| return f"Error summarizing: {err}", pd.DataFrame(columns=['Field', 'Value']), None |
| progress(0.8, desc="Storing in Salesforce...") |
| contract_doc_id, err = await create_contract_document(sf, file_name) |
| if err or not contract_doc_id: |
| return f"Error creating document: {err or 'No contract document ID returned'}", pd.DataFrame(columns=['Field', 'Value']), None |
| result, err = await store_in_salesforce(sf, summary_data, file_name, contract_doc_id) |
| if err: |
| return f"Error storing summary: {err}", pd.DataFrame(columns=['Field', 'Value']), None |
| progress(0.9, desc="Generating report...") |
| csv_path = os.path.join(tempfile.gettempdir(), f"contract_summary_{file_name}.csv") |
| report_df, csv_path = await generate_report(sf, csv_path, contract_doc_id) |
| if not csv_path: |
| return f"Error generating report: {err}", pd.DataFrame(columns=['Field', 'Value']), None |
| progress(1.0, desc="Complete!") |
| return ( |
| f"Success! Summary stored for {file_name}.", |
| report_df, |
| csv_path |
| ) |
| except Exception as e: |
| logger.error(f"Processing error for {file_name if 'file_name' in locals() else 'file'}: {str(e)}") |
| return f"Error processing {file_name if 'file_name' in locals() else 'file'}: {str(e)}", pd.DataFrame(columns=['Field', 'Value']), None |
|
|
| |
| with gr.Blocks() as iface: |
| file_input = gr.File(label="Upload Contract (PDF, DOCX, PNG, JPG, CSV, XLS/XLSX)") |
| submit_btn = gr.Button("Submit") |
| result_output = gr.Textbox(label="Result", lines=3) |
| report_output = gr.DataFrame(label="Summary Report", headers=['Field', 'Value'], interactive=False) |
| csv_output = gr.File(label="Download CSV") |
| submit_btn.click( |
| fn=gradio_process_async, |
| inputs=[file_input], |
| outputs=[result_output, report_output, csv_output] |
| ) |
|
|
| if __name__ == "__main__": |
| logger.info("Application startup") |
| missing_deps, _ = check_dependencies() |
| if missing_deps: |
| logger.warning(f"Missing dependencies: {', '.join(missing_deps)}") |
| iface.launch() |