Contract / app.py
prudhviLatha's picture
Create app.py
04d8f7e verified
import os
import time
import logging
from logging.handlers import RotatingFileHandler
import re
from datetime import datetime
from dotenv import load_dotenv
from cryptography.fernet import Fernet
from simple_salesforce import Salesforce
from transformers import pipeline, AutoTokenizer, AutoModelForSeq2SeqLM
from sentence_transformers import SentenceTransformer, util
from PIL import Image
import pytesseract
import pandas as pd
from docx import Document
import PyPDF2
import gradio as gr
from pdf2image import convert_from_path
import tempfile
from pytz import timezone
import shutil
# Setup logging with rotation
log_file = os.path.join(tempfile.gettempdir(), 'app.log')
handler = RotatingFileHandler(log_file, maxBytes=10*1024*1024, backupCount=5)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
handler,
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Check dependencies at startup
def check_dependencies():
missing_deps = []
try:
import pytesseract, pandas, openpyxl, xlrd, docx, PyPDF2, pdf2image
# Check Tesseract
try:
tesseract_path = shutil.which('tesseract')
if tesseract_path:
pytesseract.pytesseract.tesseract_cmd = tesseract_path
tesseract_version = pytesseract.get_tesseract_version()
logger.info(f"Tesseract found at {tesseract_path}, version: {tesseract_version}")
else:
logger.warning("Tesseract not found in PATH. Install with 'sudo apt install tesseract-ocr'. OCR-dependent files (JPEG, PNG, scanned PDFs) will not be processed.")
missing_deps.append("Tesseract")
except Exception as e:
logger.warning(f"Tesseract unavailable: {str(e)}. Install with 'sudo apt install tesseract-ocr'. OCR-dependent files (JPEG, PNG, scanned PDFs) will not be processed.")
missing_deps.append("Tesseract")
# Check Poppler
try:
poppler_path = shutil.which('pdfinfo')
if poppler_path:
logger.info(f"Poppler found at {poppler_path}")
else:
logger.warning("Poppler not found in PATH. Install with 'sudo apt install poppler-utils'. Scanned PDFs will fail.")
missing_deps.append("Poppler")
except Exception as e:
logger.warning(f"Poppler unavailable: {str(e)}. Install with 'sudo apt install poppler-utils'. Scanned PDFs will fail.")
missing_deps.append("Poppler")
logger.info("Required Python packages installed")
except ImportError as e:
logger.error(f"Missing Python dependency: {str(e)}. Install via pip.")
raise ImportError(f"Missing Python dependency: {str(e)}")
return missing_deps
missing_deps = check_dependencies()
# Load environment variables
load_dotenv()
required_env_vars = [
'ENCRYPTION_KEY', 'SALESFORCE_USERNAME', 'SALESFORCE_PASSWORD',
'SALESFORCE_SECURITY_TOKEN', 'SALESFORCE_DOMAIN'
]
env = {var: os.getenv(var) for var in required_env_vars}
if missing := [k for k in required_env_vars if not env[k]]:
logger.error(f"Missing environment variables: {', '.join(missing)}")
raise ValueError(f"Missing environment variables: {', '.join(missing)}")
# Setup encryption
try:
fernet = Fernet(env['ENCRYPTION_KEY'].encode())
except Exception as e:
logger.error(f"Invalid encryption key: {e}")
raise ValueError(f"Invalid encryption key: {e}")
# Salesforce connection retry
def init_salesforce(max_retries=3, delay=3):
for attempt in range(max_retries):
try:
sf = Salesforce(
username=env['SALESFORCE_USERNAME'],
password=env['SALESFORCE_PASSWORD'],
security_token=env['SALESFORCE_SECURITY_TOKEN'],
domain=env['SALESFORCE_DOMAIN'],
version='58.0'
)
logger.info("Connected to Salesforce")
return sf
except Exception as e:
logger.error(f"Salesforce connection attempt {attempt + 1} failed: {str(e)}")
if attempt < max_retries - 1:
time.sleep(delay)
logger.error("Salesforce connection failed after retries")
raise ValueError("Salesforce connection failed after retries")
# Initialize models
def init_models():
try:
summarizer = pipeline(
"summarization",
model=AutoModelForSeq2SeqLM.from_pretrained("t5-base"),
tokenizer=AutoTokenizer.from_pretrained("t5-base")
)
sentence_model = SentenceTransformer('all-MiniLM-L6-v2')
logger.info("NLP models initialized successfully")
return summarizer, sentence_model
except Exception as e:
logger.error(f"Model initialization failed: {str(e)}")
raise
# Clean text for better processing
def clean_text(text):
try:
if not text:
return ""
text = re.sub(r'\s+', ' ', text.strip()) # Normalize whitespace
text = re.sub(r'[^\x00-\x7F]+', ' ', text) # Remove non-ASCII
text = re.sub(r'\b\d+\b(?!\s*,\s*\d{4})', ' ', text) # Remove standalone numbers
return text
except Exception as e:
logger.error(f"Text cleaning failed: {str(e)}")
return ""
# Validate file readability
def validate_file(file_path):
try:
ext = os.path.splitext(file_path)[1].lower()
if ext not in ['.pdf', '.docx', '.png', '.jpg', '.jpeg', '.csv', '.xls', '.xlsx']:
return False, f"Unsupported file type: {ext}"
if not os.path.exists(file_path):
return False, f"File not found: {file_path}"
if os.path.getsize(file_path) == 0:
return False, f"File is empty: {file_path}"
return True, None
except Exception as e:
logger.error(f"File validation failed for {file_path}: {str(e)}")
return False, f"File validation failed: {str(e)}"
# Extract text from file
def extract_text(file_path):
is_valid, error = validate_file(file_path)
if not is_valid:
logger.error(error)
return None, error
ext = os.path.splitext(file_path)[1].lower()
try:
logger.debug(f"Extracting text from {file_path} (type: {ext})")
if ext == '.pdf':
with open(file_path, 'rb') as f:
pdf_reader = PyPDF2.PdfReader(f)
text = "".join([p.extract_text() or "" for p in pdf_reader.pages])
if not text or len(text.strip()) < 50:
logger.warning(f"PDF text extraction failed or too short, attempting OCR")
if 'Tesseract' in missing_deps or 'Poppler' in missing_deps:
return None, "OCR unavailable: Tesseract or Poppler not installed. Install with 'sudo apt install tesseract-ocr poppler-utils'."
try:
images = convert_from_path(file_path)
text = ""
for i, img in enumerate(images):
logger.debug(f"Processing page {i+1} for OCR")
img = img.convert('L') # Convert to grayscale
img = img.resize((img.width // 2, img.height // 2)) # Optimize size
text += pytesseract.image_to_string(img, config='--psm 6') + "\n"
except Exception as ocr_err:
logger.error(f"OCR failed: {str(ocr_err)}")
return None, f"OCR failed for {file_path}: {str(ocr_err)}"
elif ext == '.docx':
doc = Document(file_path)
text = "\n".join([p.text for p in doc.paragraphs if p.text])
for table in doc.tables:
for row in table.rows:
for cell in row.cells:
text += "\n" + cell.text
elif ext in ['.png', '.jpg', '.jpeg']:
if 'Tesseract' in missing_deps:
return None, "OCR unavailable: Tesseract not installed. Install with 'sudo apt install tesseract-ocr'."
try:
img = Image.open(file_path).convert('L')
img = img.resize((img.width // 2, img.height // 2)) # Optimize size
text = pytesseract.image_to_string(img, config='--psm 6')
except Exception as ocr_err:
logger.error(f"OCR failed for {file_path}: {str(ocr_err)}")
return None, f"OCR failed for {file_path}: {str(ocr_err)}"
elif ext in ['.csv', '.xls', '.xlsx']:
try:
df = pd.read_excel(file_path) if ext in ['.xls', '.xlsx'] else pd.read_csv(file_path)
logger.debug(f"Excel/CSV columns: {df.columns.tolist()}")
text = df.to_string(index=False)
except Exception as e:
logger.error(f"Excel/CSV processing failed for {file_path}: {str(e)}")
return None, f"Excel/CSV processing failed: {str(e)}"
text = clean_text(text)
if not text or len(text) < 50:
logger.error(f"Extracted text is empty or too short: {len(text)} characters")
return None, f"Text extraction failed: No valid text extracted from {file_path}"
logger.debug(f"Extracted text length: {len(text)} characters")
return text, None
except Exception as e:
logger.error(f"Text extraction failed for {file_path}: {str(e)}")
return None, f"Text extraction failed: {str(e)}"
# Parse dates with IST timezone
def parse_dates(text):
ist = timezone('Asia/Kolkata')
current_date = datetime.now(ist).replace(hour=18, minute=33, second=0, microsecond=0) # 06:33 PM IST, June 26, 2025
try:
date_patterns = [
r'\b(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)\s+\d{1,2}(?:-|\s*,?\s*)\d{4}\b',
r'\b\d{1,2}/\d{1,2}/\d{4}\b',
r'\b\d{4}-\d{2}-\d{2}\b',
r'\b(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)\s*\d{4}\b',
r'\b\d{4}\b'
]
dates = []
unparseable_dates = []
for pattern in date_patterns:
found_dates = re.findall(pattern, text, re.IGNORECASE)
dates.extend(found_dates)
if found_dates:
logger.debug(f"Found dates with pattern {pattern}: {found_dates}")
parsed_dates = []
for date in dates:
try:
if '/' in date:
parsed = datetime.strptime(date, '%m/%d/%Y').replace(tzinfo=ist)
elif '-' in date and len(date.split('-')) == 3:
parsed = datetime.strptime(date, '%Y-%m-%d').replace(tzinfo=ist)
elif re.match(r'\b\w+\s+\d{4}\b', date):
month_year = re.sub(r'\s+', ' ', date.strip())
parsed = datetime.strptime(month_year, '%b %Y').replace(day=1, tzinfo=ist)
elif ',' in date:
parsed = datetime.strptime(date, '%B %d, %Y').replace(tzinfo=ist)
else:
parsed = datetime.strptime(date, '%Y').replace(month=1, day=1, tzinfo=ist)
parsed_dates.append(parsed.strftime('%Y-%m-%d'))
except Exception as e:
logger.debug(f"Unparseable date '{date}': {str(e)}")
unparseable_dates.append(date)
if unparseable_dates:
logger.warning(f"Found {len(unparseable_dates)} unparseable dates: {unparseable_dates}")
if not parsed_dates:
logger.warning("No valid dates extracted, using current date")
parsed_dates.append(current_date.strftime('%Y-%m-%d'))
while len(parsed_dates) < 2:
parsed_dates.append(parsed_dates[0] if parsed_dates else current_date.strftime('%Y-%m-%d'))
logger.debug(f"Extracted {len(parsed_dates)} valid dates: {parsed_dates}")
return parsed_dates[:2]
except Exception as e:
logger.error(f"Date parsing failed: {str(e)}")
return [current_date.strftime('%Y-%m-%d'), current_date.strftime('%Y-%m-%d')]
# Summarize contract
def summarize_contract(text, summarizer, sentence_model):
aspects = ["parties", "payment terms", "obligations", "termination clauses"]
try:
if not text or len(text.strip()) < 50:
logger.error("Input text is empty or too short")
return {
"full_summary": "No summary generated due to insufficient text",
"aspect_summaries": {asp: "Not extracted" for asp in aspects},
"dates": parse_dates(text)
}, None
text = clean_text(text)[:4096]
try:
summary_result = summarizer(f"summarize: {text}", max_length=150, min_length=50, do_sample=False)[0]['summary_text']
if summary_result.strip() == text.strip()[:len(summary_result)]:
logger.warning("Summary identical to input, generating fallback")
summary_result = f"Summary: {text[:150]}..." if len(text) > 150 else text
logger.debug(f"Generated summary: {summary_result[:50]}...")
full_summary = summary_result
except Exception as e:
logger.error(f"Summarizer failed: {str(e)}")
full_summary = f"Summary failed: {text[:150]}..." if len(text) > 150 else text
aspect_summaries = {}
aspect_synonyms = {
"parties": ["contractor", "client", "party", "signatory", "entity"],
"payment terms": ["payment", "compensation", "fees", "billing", "invoicing"],
"obligations": ["duties", "responsibilities", "obligations", "commitments"],
"termination clauses": ["termination", "cancellation", "end of contract", "exit"]
}
if aspects:
sentences = [s.strip() for s in re.split(r'[.!?]+', text) if s.strip() and len(s.strip()) > 10]
if sentences:
logger.debug(f"Extracted {len(sentences)} sentences for aspect summarization")
emb = sentence_model.encode(sentences, convert_to_tensor=True)
for asp in aspects:
asp_texts = [asp] + aspect_synonyms.get(asp, [])
asp_embs = sentence_model.encode(asp_texts, convert_to_tensor=True)
sims = util.cos_sim(asp_embs, emb).max(dim=0).values
top = sims.argsort(descending=True)[:5]
asp_text = ". ".join([sentences[i] for i in top if sims[i] > 0.05])
if asp_text:
aspect_summaries[asp] = asp_text[:200]
logger.debug(f"Aspect '{asp}' matched {len([i for i in top if sims[i] > 0.05])} sentences")
else:
logger.warning(f"No sentences matched aspect '{asp}'")
aspect_summaries[asp] = "Not extracted"
else:
logger.warning("No valid sentences for aspect summarization")
for asp in aspects:
aspect_summaries[asp] = "Not extracted"
return {
"full_summary": full_summary,
"aspect_summaries": aspect_summaries,
"dates": parse_dates(text)
}, None
except Exception as e:
logger.error(f"Summarization failed: {str(e)}")
return {
"full_summary": f"Summary generation failed: {text[:150]}..." if len(text) > 150 else text,
"aspect_summaries": {asp: "Not extracted" for asp in aspects},
"dates": parse_dates(text)
}, None
# Create Contract Document record
def create_contract_document(sf, file_name, file_url=None):
ist = timezone('Asia/Kolkata')
current_time = datetime.now(ist).replace(hour=18, minute=33, second=0, microsecond=0) # 06:33 PM IST, June 26, 2025
try:
escaped_file_name = file_name.replace("'", "\\'")
today_datetime = current_time.strftime('%Y-%m-%dT%H:%M:%SZ')
query_datetime = f"SELECT Id, Upload_Date__c FROM Contract_Document__c WHERE Name = '{escaped_file_name}' AND Upload_Date__c = {today_datetime} LIMIT 1"
logger.debug(f"Executing SOQL query (dateTime): {query_datetime}")
try:
result = sf.query(query_datetime)
if result['totalSize'] > 0:
doc_id = result['records'][0]['Id']
logger.info(f"Contract Document exists for {file_name} on {today_datetime}, ID {doc_id}")
return doc_id, None
except Exception as e:
logger.warning(f"dateTime query failed: {str(e)}. Trying Date format.")
today_date = current_time.strftime('%Y-%m-%d')
query_date = f"SELECT Id, Upload_Date__c FROM Contract_Document__c WHERE Name = '{escaped_file_name}' AND Upload_Date__c = '{today_date}' LIMIT 1"
logger.debug(f"Executing SOQL query (Date): {query_date}")
result = sf.query(query_date)
if result['totalSize'] > 0:
doc_id = result['records'][0]['Id']
logger.info(f"Contract Document exists for {file_name} on {today_date}, ID {doc_id}")
return doc_id, None
record = {
'Name': file_name,
'Document_URL__c': file_url or '',
'Upload_Date__c': today_datetime,
'Status__c': 'Uploaded'
}
result = sf.Contract_Document__c.create(record)
logger.info(f"Created Contract Document for {file_name} with ID {result['id']}")
return result['id'], None
except Exception as e:
logger.error(f"Failed to create Contract Document for {file_name}: {str(e)}")
return None, f"Failed to create Contract Document: {str(e)}"
# Store summary in Salesforce
def store_in_salesforce(sf, summary_data, file_name, contract_document_id):
try:
query = f"SELECT Id FROM Contract_Summary__c WHERE Contract_Document__c = '{contract_document_id}' LIMIT 1"
logger.debug(f"Executing SOQL query: {query}")
result = sf.query(query)
if result['totalSize'] > 0:
logger.info(f"Summary exists for Contract Document ID {contract_document_id}, ID {result['records'][0]['Id']}")
return {'id': result['records'][0]['Id']}, None
encrypted_summary = fernet.encrypt(summary_data['full_summary'].encode()).decode()
def truncate(text, length=2000):
return text[:length] if text else 'Not extracted'
record = {
'Name': file_name,
'Contract_Document__c': contract_document_id,
'Parties__c': truncate(summary_data['aspect_summaries'].get('parties', 'Not extracted')),
'Payment_Terms__c': truncate(summary_data['aspect_summaries'].get('payment terms', 'Not extracted')),
'Obligations__c': truncate(summary_data['aspect_summaries'].get('obligations', 'Not extracted')),
'Termination_Clause__c': truncate(summary_data['aspect_summaries'].get('termination clauses', 'Not extracted')),
'Custom_Field_1__c': encrypted_summary,
'Validation_Status__c': 'Pending',
'Start_Date__c': summary_data['dates'][0][:10] if summary_data['dates'] and len(summary_data['dates']) > 0 else None,
'End_Date__c': summary_data['dates'][1][:10] if summary_data['dates'] and len(summary_data['dates']) > 1 else summary_data['dates'][0][:10] if summary_data['dates'] else None,
}
logger.debug(f"Record to be created: {record}")
if not any(record.get(field) not in ['', 'Not extracted'] for field in ['Parties__c', 'Payment_Terms__c', 'Obligations__c', 'Termination_Clause__c']):
logger.warning(f"No valid aspects extracted for {file_name}, storing with full summary only")
result = sf.Contract_Summary__c.create(record)
logger.info(f"Stored summary for {file_name} with ID {result['id']}")
return result, None
except Exception as e:
logger.error(f"Failed to store summary for {file_name}: {str(e)}")
return None, f"Failed to store in Salesforce: {str(e)}. Check {log_file}"
# Generate CSV report
def generate_report(sf, output_file, contract_document_id):
try:
query = (
f"SELECT Id, Name, Parties__c, Payment_Terms__c, Obligations__c, Termination_Clause__c, Custom_Field_1__c, "
f"Validation_Status__c, Start_Date__c, End_Date__c "
f"FROM Contract_Summary__c WHERE Contract_Document__c = '{contract_document_id}' LIMIT 1"
)
logger.debug(f"Executing SOQL query: {query}")
results = sf.query(query)['records']
logger.info(f"Retrieved {len(results)} records for Contract_Document__c ID {contract_document_id}")
rows = []
for r in results:
try:
decrypted_summary = fernet.decrypt(r.get('Custom_Field_1__c', '').encode()).decode() if r.get('Custom_Field_1__c') else 'Not extracted'
except Exception as e:
logger.error(f"Decryption failed for record {r.get('Id', 'unknown')}: {str(e)}")
decrypted_summary = 'Decryption failed'
row = {
'Contract_Name': r.get('Name', 'Not extracted'),
'Parties': r.get('Parties__c', 'Not extracted')[:50],
'Payment_Terms': r.get('Payment_Terms__c', 'Not extracted')[:50],
'Obligations': r.get('Obligations__c', 'Not extracted')[:50],
'Termination_Clause': r.get('Termination_Clause__c', 'Not extracted')[:50],
'Full_Summary': decrypted_summary[:100],
'Validation_Status': r.get('Validation_Status__c', 'Not extracted'),
'Start_Date': r.get('Start_Date__c', 'Not extracted'),
'End_Date': r.get('End_Date__c', 'Not extracted'),
}
rows.append(row)
if not rows:
logger.warning(f"No summary found for Contract_Document__c ID {contract_document_id}")
return pd.DataFrame(columns=['Contract_Name', 'Parties', 'Payment_Terms', 'Obligations', 'Termination_Clause', 'Full_Summary', 'Validation_Status', 'Start_Date', 'End_Date']), None
df = pd.DataFrame(rows)
logger.info(f"Generated DataFrame with {len(df)} record(s) for {contract_document_id}")
df.to_csv(output_file, index=False, encoding='utf-8')
logger.info(f"Saved report to {output_file}")
return df, output_file
except Exception as e:
logger.error(f"Report generation failed: {str(e)}")
return pd.DataFrame(columns=['Contract_Name', 'Parties', 'Payment_Terms', 'Obligations', 'Termination_Clause', 'Full_Summary', 'Validation_Status', 'Start_Date', 'End_Date']), None
# Gradio interface function
def gradio_process(file, progress=gr.Progress()):
try:
if not file:
logger.error("No file uploaded")
return "Error: No file uploaded.", pd.DataFrame(), None
file_path = file.name if hasattr(file, 'name') else file
file_name = os.path.basename(file_path)
progress(0.1, desc="Validating file...")
is_valid, error = validate_file(file_path)
if not is_valid:
logger.error(error)
return f"Error: {error}", pd.DataFrame(), None
progress(0.2, desc="Extracting text...")
text, error = extract_text(file_path)
if error:
logger.error(f"Text extraction failed: {error}")
return f"Error extracting text from {file_name}: {error}. Check {log_file}", pd.DataFrame(), None
progress(0.4, desc="Initializing Salesforce and models...")
sf = init_salesforce()
summarizer, sentence_model = init_models()
progress(0.6, desc="Summarizing contract...")
summary_data, err = summarize_contract(text, summarizer, sentence_model)
if err:
logger.error(f"Summarization failed: {err}")
return f"Error summarizing {file_name}: {err}. Check {log_file}", pd.DataFrame(), None
progress(0.8, desc="Storing data in Salesforce...")
contract_doc_id, err = create_contract_document(sf, file_name)
if err:
logger.error(f"Contract document creation failed: {err}")
return f"Error creating Contract Document for {file_name}: {err}. Check {log_file}", pd.DataFrame(), None
result, err = store_in_salesforce(sf, summary_data, file_name, contract_doc_id)
if err:
logger.error(f"Salesforce storage failed: {err}")
return f"Error storing summary for {file_name}: {err}. Check {log_file}", pd.DataFrame(), None
progress(0.9, desc="Generating report...")
csv_path = os.path.join(tempfile.gettempdir(), f"contract_summary_{file_name}.csv")
report_df, csv_path = generate_report(sf, csv_path, contract_doc_id)
if report_df.empty:
logger.warning(f"No valid report data generated for {file_name}")
return f"Success! Summary stored for {file_name} with ID {result['id']}. No report data.", pd.DataFrame(), None
progress(1.0, desc="Complete!")
return (
f"Success! Summary stored for {file_name} with ID {result['id']}. Report generated.",
report_df,
csv_path
)
except Exception as e:
logger.error(f"Processing error for {file_name if 'file_name' in locals() else 'unknown file'}: {str(e)}")
return f"Error processing {file_name if 'file_name' in locals() else 'file'}: {str(e)}. Check {log_file}", pd.DataFrame(), None
# Gradio UI setup
with gr.Blocks(title="AI-Powered Contract Summarizer with Salesforce Integration") as iface:
gr.Markdown("AI Contract Summarizer")
with gr.Row():
file_input = gr.File(label="Upload Contract File (PDF, DOCX, PNG, JPG, CSV, XLS/XLSX)")
submit_btn = gr.Button("Submit", elem_classes=["bg-orange-500"])
result_output = gr.Textbox(label="Result", lines=5)
report_output = gr.DataFrame(label="Contract Summary Report", headers=['Contract_Name', 'Parties', 'Payment_Terms', 'Obligations', 'Termination_Clause', 'Full_Summary', 'Validation_Status', 'Start_Date', 'End_Date'], interactive=False)
csv_output = gr.File(label="Download CSV Report")
submit_btn.click(
fn=gradio_process,
inputs=[file_input],
outputs=[result_output, report_output, csv_output]
)
if __name__ == "__main__":
logger.info(f"Starting Gradio interface. Logs saved to {log_file}")
if missing_deps:
logger.warning(f"Application running with limited functionality due to missing dependencies: {', '.join(missing_deps)}")
iface.launch()