anuradhakoppala's picture
Update app.py
6b062d5 verified
import os
import time
import logging
import re
from datetime import datetime, timedelta
from dotenv import load_dotenv
from cryptography.fernet import Fernet
from simple_salesforce import Salesforce
from transformers import pipeline
from PIL import Image
import pytesseract
import pandas as pd
from docx import Document
import PyPDF2
import gradio as gr
from pdf2image import convert_from_path
import tempfile
from pytz import timezone
import shutil
import unicodedata
import asyncio
import torch
# Global variables for caching
_sf = None
_summarizer = None
_fernet = None
_lock = asyncio.Lock()
# Setup logging
log_file = os.path.join(tempfile.gettempdir(), 'app.log')
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.FileHandler(log_file)]
)
logger = logging.getLogger(__name__)
# Check critical dependencies
def check_dependencies():
try:
tesseract_path = shutil.which('tesseract')
if not tesseract_path:
logger.warning("Tesseract not found. OCR unavailable.")
return ["Tesseract"], []
pytesseract.pytesseract.tesseract_cmd = tesseract_path
poppler_path = shutil.which('pdfinfo')
if not poppler_path:
logger.warning("Poppler not found.")
return ["Poppler"], []
return [], []
except Exception as e:
logger.error(f"Dependency check failed: {str(e)}")
return ["Tesseract", "Poppler"], []
# Load environment variables
load_dotenv()
required_env_vars = [
'ENCRYPTION_KEY', 'SALESFORCE_USERNAME', 'SALESFORCE_PASSWORD',
'SALESFORCE_SECURITY_TOKEN', 'SALESFORCE_DOMAIN'
]
env = {var: os.getenv(var) for var in required_env_vars}
if missing := [k for k in required_env_vars if not env[k]]:
raise ValueError(f"Missing env vars: {', '.join(missing)}")
# Setup encryption
try:
_fernet = Fernet(env['ENCRYPTION_KEY'].encode())
except Exception as e:
raise ValueError(f"Invalid encryption key: {e}")
# Salesforce connection (async)
async def init_salesforce(max_retries=2, initial_delay=1):
global _sf
async with _lock:
if _sf is not None:
return _sf
for attempt in range(max_retries):
try:
_sf = await asyncio.get_event_loop().run_in_executor(
None,
lambda: Salesforce(
username=env['SALESFORCE_USERNAME'],
password=env['SALESFORCE_PASSWORD'],
security_token=env['SALESFORCE_SECURITY_TOKEN'],
domain=env['SALESFORCE_DOMAIN'],
version='58.0'
)
)
logger.info("Salesforce connection established")
return _sf
except Exception as e:
logger.error(f"Salesforce connection attempt {attempt + 1} failed: {str(e)}")
if attempt < max_retries - 1:
await asyncio.sleep(initial_delay * (2 ** attempt))
raise ValueError("Salesforce connection failed after retries")
# Initialize lightweight models (lazy loading with caching)
async def init_models():
global _summarizer
async with _lock:
if _summarizer is None:
try:
_summarizer = pipeline(
"summarization",
model="t5-small",
tokenizer="t5-small",
framework="pt",
device=0 if torch.cuda.is_available() else -1
)
except Exception as e:
logger.error(f"Summarizer init failed: {str(e)}")
raise
return _summarizer
# Preprocess image for OCR (optimized)
def preprocess_image(image):
try:
return image.convert('L').resize((image.width * 2, image.height * 2), Image.LANCZOS)
except Exception as e:
logger.error(f"Image preprocess failed: {str(e)}")
return image.convert('L')
# Clean text (optimized)
def clean_text(text):
try:
if not text:
return ""
text = unicodedata.normalize('NFKC', text)
text = re.sub(r'\s+', ' ', text.strip())
return text[:512]
except Exception as e:
logger.error(f"Text cleaning failed: {str(e)}")
return ""
# Validate file
def validate_file(file_path):
ext = os.path.splitext(file_path)[1].lower()
if ext not in ['.pdf', '.docx', '.png', '.jpg', '.jpeg', '.csv', '.xls', '.xlsx']:
return False, f"Unsupported file type: {ext}"
if not os.path.exists(file_path) or os.path.getsize(file_path) == 0:
return False, f"File not found or empty: {file_path}"
return True, None
# Extract text (async)
async def extract_text_async(file_path):
is_valid, error = validate_file(file_path)
if not is_valid:
return None, error
ext = os.path.splitext(file_path)[1].lower()
try:
if ext == '.pdf':
with open(file_path, 'rb') as f:
pdf_reader = PyPDF2.PdfReader(f)
text = "".join([p.extract_text() or "" for p in pdf_reader.pages[:2]])
if not text or len(text.strip()) < 50:
images = convert_from_path(file_path, dpi=150, first_page=1, last_page=2, thread_count=4)
text = "".join(pytesseract.image_to_string(preprocess_image(img), config='--psm 6') for img in images)
logger.info(f"Extracted text: {text[:100]}...")
elif ext == '.docx':
doc = Document(file_path)
text = "\n".join([p.text for p in doc.paragraphs if p.text.strip()][:50])
elif ext in ['.png', '.jpg', '.jpeg']:
img = Image.open(file_path)
img = preprocess_image(img)
text = pytesseract.image_to_string(img, config='--psm 6')
elif ext in ['.csv', '.xls', '.xlsx']:
df = pd.read_csv(file_path) if ext == '.csv' else pd.read_excel(file_path)
text = " ".join(df.astype(str).values.flatten())[:1000]
text = clean_text(text)
if not text or len(text) < 50:
return None, f"No valid text extracted from {file_path}"
return text, None
except Exception as e:
logger.error(f"Text extraction failed: {str(e)}")
return None, f"Text extraction failed: {str(e)}"
# Parse dates
def parse_dates(text):
ist = timezone('Asia/Kolkata')
current_date = datetime.now(ist).strftime('%Y-%m-%d')
try:
date_patterns = [
r'\b\d{4}-\d{2}-\d{2}\b',
r'\b\d{1,2}/\d{1,2}/\d{4}\b',
r'\b\d{1,2}\s+(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{4}\b',
]
term_patterns = [
r'(?:term|duration|period)\s*(?:of|for)\s*(\d+)\s*(?:year|years)',
r'(?:end|ending|expires|expiration)\s*(?:on|at)?\s*(\d{4}-\d{2}-\d{2}|\d{1,2}/\d{1,2}/\d{4}|\d{1,2}\s+(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{4})'
]
dates = []
for pattern in date_patterns:
dates.extend(re.findall(pattern, text, re.IGNORECASE))
parsed_dates = []
for date in dates:
try:
if '/' in date:
parsed = datetime.strptime(date, '%m/%d/%Y').strftime('%Y-%m-%d')
elif '-' in date:
parsed = datetime.strptime(date, '%Y-%m-%d').strftime('%Y-%m-%d')
else:
parsed = datetime.strptime(date, '%d %B %Y').strftime('%Y-%m-%d')
parsed_dates.append(parsed)
except:
continue
term_years = None
explicit_end_date = None
for pattern in term_patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
for match in matches:
if match.isdigit():
term_years = int(match)
else:
try:
if '/' in match:
explicit_end_date = datetime.strptime(match, '%m/%d/%Y').strftime('%Y-%m-%d')
elif '-' in match:
explicit_end_date = datetime.strptime(match, '%Y-%m-%d').strftime('%Y-%m-%d')
else:
explicit_end_date = datetime.strptime(match, '%d %B %Y').strftime('%Y-%m-%d')
except:
continue
parsed_dates = sorted(set(parsed_dates))
start_date = parsed_dates[0] if parsed_dates else current_date
if explicit_end_date:
end_date = explicit_end_date
elif term_years:
start_dt = datetime.strptime(start_date, '%Y-%m-%d')
end_date = (start_dt + timedelta(days=term_years * 365)).strftime('%Y-%m-%d')
elif len(parsed_dates) > 1:
end_date = parsed_dates[-1]
else:
start_dt = datetime.strptime(start_date, '%Y-%m-%d')
end_date = (start_dt + timedelta(days=365)).strftime('%Y-%m-%d')
logger.info(f"Parsed dates - Start: {start_date}, End: {end_date}")
return start_date, end_date
except Exception as e:
logger.error(f"Date parsing failed: {str(e)}")
return current_date, current_date
# Summarize contract (async)
async def summarize_contract_async(text, summarizer, file_name):
aspects = ["parties", "payment terms", "obligations", "termination clauses"]
try:
if not text or len(text.strip()) < 50:
ist = timezone('Asia/Kolkata')
current_date = datetime.now(ist).strftime('%Y-%m-%d')
return {
"full_summary": "No summary due to insufficient text",
"aspect_summaries": {asp: "Not extracted" for asp in aspects},
"start_date": current_date,
"end_date": current_date
}, None
text = clean_text(text)[:512]
try:
prompt = f"summarize: Create a concise agreement summary including parties and obligations: {text}"
summary_result = summarizer(
prompt,
max_length=80,
min_length=30,
do_sample=False,
num_beams=4
)[0]['summary_text']
full_summary = clean_text(summary_result)
logger.info(f"Generated summary: {full_summary}")
except Exception as e:
logger.error(f"Summarizer failed: {str(e)}")
full_summary = text[:60] + "..." if len(text) > 60 else text
aspect_summaries = {}
for asp in aspects:
if asp == "parties":
match = re.search(r'(?:parties|between)\s+([A-Za-z\s&]+?)(?:\sand|\,|\.)', text, re.IGNORECASE)
aspect_summaries[asp] = match.group(1).strip()[:100] if match else "Not extracted"
elif asp == "payment terms":
match = re.search(r'(?:payment|terms)\s+([\d,.]+\s*(?:EUR|USD|INR)\s*(?:monthly|annually|quarterly))', text, re.IGNORECASE)
aspect_summaries[asp] = match.group(1)[:100] if match else "Not extracted"
elif asp == "obligations":
match = re.search(r'(?:obligations|services|duties)\s+(.+?)(?:\by|\,|\.)', text, re.IGNORECASE)
aspect_summaries[asp] = match.group(1).strip()[:100] if match else "Not extracted"
elif asp == "termination clauses":
match = re.search(r'(?:termination|notice)\s+(\d+\s*days\'?\s*notice)', text, re.IGNORECASE)
aspect_summaries[asp] = match.group(1)[:100] if match else "Not extracted"
parties = aspect_summaries.get("parties", "Not extracted")
obligations = aspect_summaries.get("obligations", "Not extracted")
if parties != "Not extracted" and obligations != "Not extracted":
full_summary = f"Logistics agreement between {parties} for {obligations}..."
else:
full_summary = full_summary if full_summary else text[:60] + "..."
logger.info(f"Final summary: {full_summary}")
start_date, end_date = parse_dates(text)
return {
"full_summary": full_summary,
"aspect_summaries": aspect_summaries,
"start_date": start_date,
"end_date": end_date
}, None
except Exception as e:
logger.error(f"Summarization failed: {str(e)}")
ist = timezone('Asia/Kolkata')
current_date = datetime.now(ist).strftime('%Y-%m-%d')
return {
"full_summary": text[:60] + "..." if len(text) > 60 else text,
"aspect_summaries": {asp: "Not extracted" for asp in aspects},
"start_date": current_date,
"end_date": current_date
}, None
# Create Contract Document (async)
async def create_contract_document(sf, file_name):
ist = timezone('Asia/Kolkata')
current_time = datetime.now(ist).strftime('%Y-%m-%dT%H:%M:%SZ')
try:
escaped_file_name = file_name.replace("'", "\\'")
query = f"SELECT Id FROM Contract_Document__c WHERE Name = '{escaped_file_name}' LIMIT 1"
result = await asyncio.get_event_loop().run_in_executor(None, sf.query, query)
if result['totalSize'] > 0:
return result['records'][0]['Id'], None
record = {
'Name': file_name,
'Document_URL__c': '',
'Upload_Date__c': current_time,
'Status__c': 'Uploaded'
}
result = await asyncio.get_event_loop().run_in_executor(None, sf.Contract_Document__c.create, record)
return result['id'], None
except Exception as e:
logger.error(f"Contract document creation failed: {str(e)}")
return None, f"Contract document creation failed: {str(e)}"
# Store summary in Salesforce (async)
async def store_in_salesforce(sf, summary_data, file_name, contract_doc_id):
try:
if not contract_doc_id:
return None, "Contract document ID is missing"
query = f"SELECT Id FROM Contract_Summary__c WHERE Contract_Document__c = '{contract_doc_id}' LIMIT 1"
result = await asyncio.get_event_loop().run_in_executor(None, sf.query, query)
if result['totalSize'] > 0:
return {'id': result['records'][0]['Id']}, None
encrypted_summary = _fernet.encrypt(summary_data['full_summary'].encode()).decode()
def truncate(text, length=100):
return text[:length] if text else 'Not extracted'
record = {
'Name': file_name,
'Contract_Document__c': contract_doc_id,
'Parties__c': truncate(summary_data['aspect_summaries'].get('parties', 'Not extracted')),
'Payment_Terms__c': truncate(summary_data['aspect_summaries'].get('payment terms', 'Not extracted')),
'Obligations__c': truncate(summary_data['aspect_summaries'].get('obligations', 'Not extracted')),
'Termination_Clause__c': truncate(summary_data['aspect_summaries'].get('termination clauses', 'Not extracted')),
'Custom_Field_1__c': encrypted_summary,
'Validation_Status__c': 'Pending',
'Start_Date__c': summary_data['start_date'][:10],
'End_Date__c': summary_data['end_date'][:10],
}
result = await asyncio.get_event_loop().run_in_executor(None, sf.Contract_Summary__c.create, record)
return result, None
except Exception as e:
logger.error(f"Store summary failed: {str(e)}")
return None, f"Store summary failed: {str(e)}"
# Generate CSV report (async)
async def generate_report(sf, output_file, contract_doc_id):
try:
if not contract_doc_id:
return pd.DataFrame(columns=['Field', 'Value']), "Contract document ID is missing"
query = (
f"SELECT Id, Name, Parties__c, Payment_Terms__c, Obligations__c, Termination_Clause__c, Custom_Field_1__c, "
f"Validation_Status__c, Start_Date__c, End_Date__c "
f"FROM Contract_Summary__c WHERE Contract_Document__c = '{contract_doc_id}' LIMIT 1"
)
results = (await asyncio.get_event_loop().run_in_executor(None, sf.query, query))['records']
rows = []
for r in results:
decrypted_summary = _fernet.decrypt(r.get('Custom_Field_1__c', '').encode()).decode() if r.get('Custom_Field_1__c') else 'Not extracted'
fields = [
('Contract Name', r.get('Name', 'Not extracted')),
('Parties', r.get('Parties__c', 'Not extracted')[:100]),
('Payment Terms', r.get('Payment_Terms__c', 'Not extracted')[:100]),
('Obligations', r.get('Obligations__c', 'Not extracted')[:100]),
('Termination Clause', r.get('Termination_Clause__c', 'Not extracted')[:100]),
('Full Summary', decrypted_summary[:100]),
('Validation Status', r.get('Validation_Status__c', 'Not extracted')),
('Start Date', r.get('Start_Date__c', 'Not extracted')),
('End Date', r.get('End_Date__c', 'Not extracted')),
]
rows.extend(fields)
df = pd.DataFrame(rows, columns=['Field', 'Value']) if rows else pd.DataFrame(columns=['Field', 'Value'])
df.to_csv(output_file, index=False, encoding='utf-8')
return df, output_file
except Exception as e:
logger.error(f"Report generation failed: {str(e)}")
return pd.DataFrame(columns=['Field', 'Value']), f"Report generation failed: {str(e)}"
# Gradio interface function (async)
async def gradio_process_async(file, progress=gr.Progress()):
try:
if not file:
return "No file uploaded.", pd.DataFrame(columns=['Field', 'Value']), None
file_path = file.name if hasattr(file, 'name') else file
file_name = os.path.basename(file_path)
progress(0.1, desc="Validating...")
is_valid, error = validate_file(file_path)
if not is_valid:
return f"Error: {error}", pd.DataFrame(columns=['Field', 'Value']), None
progress(0.2, desc="Extracting text...")
text, error = await extract_text_async(file_path)
if error:
return f"Error extracting text: {error}", pd.DataFrame(columns=['Field', 'Value']), None
progress(0.4, desc="Initializing...")
sf = await init_salesforce()
summarizer = await init_models()
progress(0.6, desc="Summarizing...")
summary_data, err = await summarize_contract_async(text, summarizer, file_name)
if err:
return f"Error summarizing: {err}", pd.DataFrame(columns=['Field', 'Value']), None
progress(0.8, desc="Storing in Salesforce...")
contract_doc_id, err = await create_contract_document(sf, file_name)
if err or not contract_doc_id:
return f"Error creating document: {err or 'No contract document ID returned'}", pd.DataFrame(columns=['Field', 'Value']), None
result, err = await store_in_salesforce(sf, summary_data, file_name, contract_doc_id)
if err:
return f"Error storing summary: {err}", pd.DataFrame(columns=['Field', 'Value']), None
progress(0.9, desc="Generating report...")
csv_path = os.path.join(tempfile.gettempdir(), f"contract_summary_{file_name}.csv")
report_df, csv_path = await generate_report(sf, csv_path, contract_doc_id)
if not csv_path:
return f"Error generating report: {err}", pd.DataFrame(columns=['Field', 'Value']), None
progress(1.0, desc="Complete!")
return (
f"Success! Summary stored for {file_name}.",
report_df,
csv_path
)
except Exception as e:
logger.error(f"Processing error for {file_name if 'file_name' in locals() else 'file'}: {str(e)}")
return f"Error processing {file_name if 'file_name' in locals() else 'file'}: {str(e)}", pd.DataFrame(columns=['Field', 'Value']), None
# Gradio UI setup
with gr.Blocks() as iface:
file_input = gr.File(label="Upload Contract (PDF, DOCX, PNG, JPG, CSV, XLS/XLSX)")
submit_btn = gr.Button("Submit")
result_output = gr.Textbox(label="Result", lines=3)
report_output = gr.DataFrame(label="Summary Report", headers=['Field', 'Value'], interactive=False)
csv_output = gr.File(label="Download CSV")
submit_btn.click(
fn=gradio_process_async,
inputs=[file_input],
outputs=[result_output, report_output, csv_output]
)
if __name__ == "__main__":
logger.info("Application startup")
missing_deps, _ = check_dependencies()
if missing_deps:
logger.warning(f"Missing dependencies: {', '.join(missing_deps)}")
iface.launch()