Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import pdfplumber | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| from word2number import w2n | |
| import re | |
| from typing import Tuple, List, Dict | |
| from simple_salesforce import Salesforce | |
| import base64 | |
| from io import BytesIO | |
| import uuid | |
| import logging | |
| import textwrap | |
| from reportlab.lib.pagesizes import letter | |
| from reportlab.pdfgen import canvas | |
| from reportlab.lib.units import inch | |
| import time | |
| import tempfile | |
| import os | |
| from datetime import datetime | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Custom CSS for styling with dark mode compatibility | |
| css = """ | |
| :root { | |
| --primary-color: #1e90ff; | |
| --secondary-color: #4169e1; | |
| --text-color: #2c3e50; | |
| --bg-color: #ffffff; | |
| --box-bg: #ffffff; | |
| --border-color: #e0e0e0; | |
| --success-color: #28a745; | |
| --warning-color: #ff9800; | |
| --danger-color: #f44336; | |
| --info-color: #2196F3; | |
| } | |
| .dark { | |
| --primary-color: #4a89dc; | |
| --secondary-color: #3b7dd8; | |
| --text-color: #f0f0f0; | |
| --bg-color: #1e1e1e; | |
| --box-bg: #2d2d2d; | |
| --border-color: #444; | |
| --success-color: #4CAF50; | |
| --warning-color: #FFC107; | |
| --danger-color: #F44336; | |
| --info-color: #2196F3; | |
| } | |
| body { | |
| background-image: url('https://images.unsplash.com/photo-1604147706283-d7119b5b822c?ixlib=rb-1.2.1&auto=format&fit=crop&w=1920&q=80'); | |
| background-size: cover; | |
| background-position: center; | |
| background-attachment: fixed; | |
| background-repeat: no-repeat; | |
| min-height: 100vh; | |
| margin: 0; | |
| padding: 0; | |
| font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; | |
| color: var(--text-color); | |
| } | |
| .gradio-container { | |
| background-color: rgba(var(--bg-color), 0.97) !important; | |
| border-radius: 15px; | |
| padding: 25px; | |
| margin: 20px auto; | |
| max-width: 1200px; | |
| box-shadow: 0 8px 24px rgba(0,0,0,0.12); | |
| min-height: 90vh; | |
| border: 1px solid var(--primary-color) !important; | |
| } | |
| .risk-low { color: var(--success-color); font-weight: bold; } | |
| .risk-medium { color: var(--warning-color); font-weight: bold; } | |
| .risk-high { color: var(--danger-color); font-weight: bold; } | |
| .result-box { | |
| padding: 20px; | |
| border-radius: 10px; | |
| margin-bottom: 25px; | |
| background-color: var(--box-bg); | |
| box-shadow: 0 2px 8px rgba(0,0,0,0.08); | |
| border-left: 5px solid var(--primary-color) !important; | |
| color: var(--text-color); | |
| } | |
| .penalty-box { | |
| padding: 20px; | |
| border-radius: 10px; | |
| margin-bottom: 20px; | |
| border-left: 5px solid var(--danger-color); | |
| background-color: var(--box-bg); | |
| box-shadow: 0 2px 8px rgba(0,0,0,0.08); | |
| } | |
| .obligation-box { | |
| padding: 20px; | |
| border-radius: 10px; | |
| margin-bottom: 20px; | |
| border-left: 5px solid var(--warning-color); | |
| background-color: var(--box-bg); | |
| box-shadow: 0 2px 8px rgba(0,0,0,0.08); | |
| } | |
| .delay-box { | |
| padding: 20px; | |
| border-radius: 10px; | |
| margin-bottom: 20px; | |
| border-left: 5px solid var(--info-color); | |
| background-color: var(--box-bg); | |
| box-shadow: 0 2px 8px rgba(0,0,0,0.08); | |
| } | |
| .combined-risk-container { | |
| display: flex; | |
| flex-direction: column; | |
| gap: 15px; | |
| margin-bottom: 25px; | |
| background-color: var(--box-bg); | |
| padding: 20px; | |
| border-radius: 10px; | |
| box-shadow: 0 2px 8px rgba(0,0,0,0.08); | |
| } | |
| .risk-row { | |
| display: flex; | |
| align-items: center; | |
| gap: 20px; | |
| padding: 15px; | |
| border-radius: 8px; | |
| background-color: rgba(0,0,0,0.05); | |
| border: 1px solid var(--border-color) !important; | |
| transition: all 0.3s ease; | |
| } | |
| .risk-row:hover { | |
| transform: translateY(-2px); | |
| box-shadow: 0 4px 12px rgba(0,0,0,0.1); | |
| } | |
| .risk-label { | |
| width: 150px; | |
| font-weight: 600; | |
| font-size: 16px; | |
| color: var(--text-color) !important; | |
| } | |
| .risk-score { | |
| width: 120px; | |
| font-size: 20px; | |
| text-align: center; | |
| padding: 8px 12px; | |
| border-radius: 6px; | |
| } | |
| .warning-box { | |
| padding: 18px; | |
| border-radius: 8px; | |
| margin: 15px 0; | |
| background-color: rgba(255, 243, 205, 0.3); | |
| border-left: 5px solid var(--warning-color); | |
| font-weight: 600; | |
| box-shadow: 0 2px 8px rgba(0,0,0,0.05); | |
| } | |
| .danger-box { | |
| padding: 18px; | |
| border-radius: 8px; | |
| margin: 15px 0; | |
| background-color: rgba(248, 215, 218, 0.3); | |
| border-left: 5px solid var(--danger-color); | |
| font-weight: 600; | |
| box-shadow: 0 2px 8px rgba(0,0,0,0.05); | |
| } | |
| .success-box { | |
| padding: 18px; | |
| border-radius: 8px; | |
| margin: 15px 0; | |
| background-color: rgba(212, 237, 218, 0.3); | |
| border-left: 5px solid var(--success-color); | |
| font-weight: 600; | |
| box-shadow: 0 2px 8px rgba(0,0,0,0.05); | |
| } | |
| .info-box { | |
| padding: 18px; | |
| border-radius: 8px; | |
| margin: 15px 0; | |
| background-color: rgba(227, 242, 253, 0.3); | |
| border-left: 5px solid var(--info-color); | |
| font-weight: 600; | |
| box-shadow: 0 2px 8px rgba(0,0,0,0.05); | |
| } | |
| .section-title { | |
| font-size: 22px; | |
| font-weight: 700; | |
| margin-bottom: 18px; | |
| color: var(--primary-color) !important; | |
| display: flex; | |
| align-items: center; | |
| gap: 10px; | |
| } | |
| .count-item { | |
| display: flex; | |
| justify-content: space-between; | |
| padding: 12px 0; | |
| border-bottom: 1px solid var(--border-color) !important; | |
| transition: all 0.2s ease; | |
| } | |
| .count-item:hover { | |
| background-color: rgba(0,0,0,0.05); | |
| transform: translateX(5px); | |
| } | |
| .count-label { | |
| font-weight: 600; | |
| color: var(--text-color) !important; | |
| display: flex; | |
| align-items: center; | |
| gap: 8px; | |
| } | |
| .count-value { | |
| color: var(--secondary-color) !important; | |
| font-weight: 600; | |
| font-size: 16px; | |
| } | |
| button { | |
| background: linear-gradient(135deg, var(--primary-color), var(--secondary-color)) !important; | |
| border: none !important; | |
| color: white !important; | |
| font-weight: 600 !important; | |
| padding: 12px 24px !important; | |
| border-radius: 8px !important; | |
| transition: all 0.3s ease !important; | |
| box-shadow: 0 4px 6px rgba(0,0,0,0.1) !important; | |
| } | |
| button:hover { | |
| background: linear-gradient(135deg, var(--secondary-color), var(--primary-color)) !important; | |
| transform: translateY(-2px) !important; | |
| box-shadow: 0 6px 12px rgba(0,0,0,0.15) !important; | |
| } | |
| .upload-area { | |
| border: 2px dashed var(--primary-color) !important; | |
| background-color: rgba(240, 248, 255, 0.3) !important; | |
| border-radius: 10px !important; | |
| padding: 30px !important; | |
| transition: all 0.3s ease !important; | |
| } | |
| .upload-area:hover { | |
| background-color: rgba(224, 255, 255, 0.3) !important; | |
| border-color: var(--secondary-color) !important; | |
| } | |
| .risk-meter { | |
| width: 100%; | |
| height: 20px; | |
| background: linear-gradient(90deg, var(--success-color), var(--warning-color), var(--danger-color)); | |
| border-radius: 10px; | |
| margin: 15px 0; | |
| position: relative; | |
| } | |
| .risk-meter-indicator { | |
| position: absolute; | |
| top: -5px; | |
| width: 3px; | |
| height: 30px; | |
| background-color: var(--text-color); | |
| transform: translateX(-50%); | |
| } | |
| .risk-meter-labels { | |
| display: flex; | |
| justify-content: space-between; | |
| margin-top: 5px; | |
| font-size: 12px; | |
| color: var(--text-color); | |
| } | |
| .clause-example { | |
| background-color: rgba(0,0,0,0.05); | |
| padding: 15px; | |
| border-radius: 8px; | |
| margin-bottom: 10px; | |
| border-left: 3px solid var(--primary-color); | |
| font-family: 'Courier New', monospace; | |
| line-height: 1.5; | |
| color: var(--text-color); | |
| } | |
| .clause-number { | |
| font-weight: bold; | |
| color: var(--primary-color); | |
| margin-right: 8px; | |
| } | |
| .sentiment-meter { | |
| width: 100%; | |
| height: 20px; | |
| background: linear-gradient(90deg, var(--danger-color), var(--warning-color), var(--success-color)); | |
| border-radius: 10px; | |
| margin: 15px 0; | |
| } | |
| .sentiment-score { | |
| height: 100%; | |
| border-radius: 10px; | |
| background-color: rgba(255,255,255,0.3); | |
| } | |
| .keyword-match { | |
| background-color: rgba(255, 255, 0, 0.3); | |
| padding: 2px 4px; | |
| border-radius: 3px; | |
| font-weight: bold; | |
| } | |
| .match-detail { | |
| margin-top: 5px; | |
| padding: 8px; | |
| background-color: rgba(0,0,0,0.05); | |
| border-radius: 5px; | |
| font-size: 14px; | |
| } | |
| .match-line { | |
| font-family: monospace; | |
| white-space: pre-wrap; | |
| margin-bottom: 5px; | |
| } | |
| .match-context { | |
| font-style: italic; | |
| color: var(--secondary-color); | |
| } | |
| /* Hide elements */ | |
| footer, .gradio-footer, .hide, [data-testid="Use via API"], [data-testid="mmsettings"], | |
| #sentiment-analysis, #risk-visualization { | |
| display: none !important; | |
| visibility: hidden !important; | |
| height: 0 !important; | |
| width: 0 !important; | |
| padding: 0 !important; | |
| margin: 0 !important; | |
| } | |
| .file-info { | |
| margin-top: -15px; | |
| margin-bottom: 15px; | |
| color: var(--text-color); | |
| font-size: 13px; | |
| } | |
| /* Dark mode specific adjustments */ | |
| .dark .clause-example { | |
| background-color: rgba(255,255,255,0.05); | |
| } | |
| .dark .risk-row { | |
| background-color: rgba(255,255,255,0.05); | |
| } | |
| .dark .count-item:hover { | |
| background-color: rgba(255,255,255,0.05); | |
| } | |
| .dark .keyword-match { | |
| background-color: rgba(255, 255, 0, 0.5); | |
| color: black; | |
| } | |
| .dark .match-detail { | |
| background-color: rgba(255,255,255,0.05); | |
| } | |
| """ | |
| # Salesforce credentials | |
| SF_USERNAME = "Kushalpavansekharm503@agentforce.com" | |
| SF_PASSWORD = "Kushal@123" | |
| SF_TOKEN = "WwUIFWBVUjeKn9VPKyWJmawY0" | |
| def authenticate_salesforce() -> Salesforce: | |
| """Authenticate with Salesforce and return a Salesforce client""" | |
| try: | |
| sf = Salesforce( | |
| username=SF_USERNAME, | |
| password=SF_PASSWORD, | |
| security_token=SF_TOKEN | |
| ) | |
| logger.info("Successfully authenticated with Salesforce") | |
| return sf | |
| except Exception as e: | |
| logger.error(f"Failed to authenticate with Salesforce: {str(e)}") | |
| raise Exception(f"Salesforce authentication failed: {str(e)}") | |
| def get_hugging_face_sentiment(text: str) -> float: | |
| """Get sentiment score using Hugging Face model""" | |
| try: | |
| from transformers import pipeline | |
| classifier = pipeline("sentiment-analysis", model="distilbert-base-uncased-finetuned-sst-2-english") | |
| result = classifier(text[:512])[0] | |
| score = result['score'] if result['label'] == 'POSITIVE' else 1 - result['score'] | |
| return round(score, 2) | |
| except Exception as e: | |
| logger.error(f"Hugging Face sentiment analysis failed: {str(e)}. Using fallback score.") | |
| return 0.5 | |
| def generate_analysis_pdf(analysis_data: Dict) -> BytesIO: | |
| """Generate a comprehensive PDF report with analysis results""" | |
| try: | |
| pdf_file = BytesIO() | |
| c = canvas.Canvas(pdf_file, pagesize=letter) | |
| # Header | |
| c.setFont("Helvetica-Bold", 18) | |
| c.drawString(1 * inch, 10.5 * inch, "Contract Risk Analysis Report") | |
| c.setFont("Helvetica", 10) | |
| c.drawString(1 * inch, 10.2 * inch, f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
| c.drawString(1 * inch, 10 * inch, f"Document: {analysis_data.get('document_name', 'Unknown')}") | |
| # Add a line separator | |
| c.line(1 * inch, 9.8 * inch, 7.5 * inch, 9.8 * inch) | |
| # Risk Summary Section | |
| y_position = 9.5 * inch | |
| c.setFont("Helvetica-Bold", 14) | |
| c.drawString(1 * inch, y_position, "1. Risk Summary") | |
| y_position -= 0.3 * inch | |
| c.setFont("Helvetica", 10) | |
| risk_level = analysis_data['risk_level'] | |
| risk_color = { | |
| "Low": "#4CAF50", | |
| "Medium": "#FF9800", | |
| "High": "#F44336" | |
| }.get(risk_level, "#000000") | |
| c.setFillColor(risk_color) | |
| c.setFont("Helvetica-Bold", 12) | |
| c.drawString(1 * inch, y_position, f"Overall Risk Level: {risk_level}") | |
| c.setFillColor("black") | |
| y_position -= 0.25 * inch | |
| c.setFont("Helvetica", 10) | |
| c.drawString(1 * inch, y_position, f"Risk Score: {analysis_data['risk_score']:.1f}/100") | |
| y_position -= 0.25 * inch | |
| # Risk explanation | |
| risk_explanations = { | |
| "Low": "The contract appears to be low risk with favorable terms. Standard review recommended.", | |
| "Medium": "The contract has moderate risk factors. Careful review of flagged clauses advised.", | |
| "High": "The contract contains high-risk elements! Immediate legal review required." | |
| } | |
| c.drawString(1 * inch, y_position, "Assessment:") | |
| y_position -= 0.2 * inch | |
| c.setFont("Helvetica", 10) | |
| for line in textwrap.wrap(risk_explanations.get(risk_level, ""), width=80): | |
| c.drawString(1.2 * inch, y_position, line) | |
| y_position -= 0.2 * inch | |
| # Detailed Metrics Section | |
| y_position -= 0.3 * inch | |
| c.setFont("Helvetica-Bold", 14) | |
| c.drawString(1 * inch, y_position, "2. Detailed Metrics") | |
| y_position -= 0.3 * inch | |
| # Sentiment Analysis | |
| c.setFont("Helvetica-Bold", 12) | |
| c.drawString(1 * inch, y_position, "Sentiment Analysis:") | |
| y_position -= 0.2 * inch | |
| c.setFont("Helvetica", 10) | |
| sentiment_score = analysis_data['sentiment_score'] | |
| sentiment_text = ( | |
| "Positive (favorable language)" if sentiment_score > 0.6 else | |
| "Negative (adversarial language)" if sentiment_score < 0.4 else | |
| "Neutral (balanced language)" | |
| ) | |
| c.drawString(1.2 * inch, y_position, f"Score: {sentiment_score:.2f} - {sentiment_text}") | |
| y_position -= 0.2 * inch | |
| c.drawString(1.2 * inch, y_position, "Interpretation: Measures the overall tone of the contract language.") | |
| y_position -= 0.25 * inch | |
| # Penalty Analysis | |
| c.setFont("Helvetica-Bold", 12) | |
| c.drawString(1 * inch, y_position, "Penalty Analysis:") | |
| y_position -= 0.2 * inch | |
| c.setFont("Helvetica", 10) | |
| c.drawString(1.2 * inch, y_position, f"Total penalty clauses found: {analysis_data['penalty_count']}") | |
| y_position -= 0.2 * inch | |
| if analysis_data['penalty_values']: | |
| c.drawString(1.2 * inch, y_position, f"Highest penalty amount: ${max(analysis_data['penalty_values']):,.2f}") | |
| y_position -= 0.2 * inch | |
| c.drawString(1.2 * inch, y_position, f"Average penalty amount: ${sum(analysis_data['penalty_values'])/len(analysis_data['penalty_values']):,.2f}") | |
| y_position -= 0.2 * inch | |
| c.drawString(1.2 * inch, y_position, "Interpretation: Penalties are financial consequences for non-compliance.") | |
| y_position -= 0.25 * inch | |
| # Obligation Analysis | |
| c.setFont("Helvetica-Bold", 12) | |
| c.drawString(1 * inch, y_position, "Obligation Analysis:") | |
| y_position -= 0.2 * inch | |
| c.setFont("Helvetica", 10) | |
| c.drawString(1.2 * inch, y_position, f"Total obligation clauses found: {analysis_data['obligation_count']}") | |
| y_position -= 0.2 * inch | |
| c.drawString(1.2 * inch, y_position, "Interpretation: Obligations are requirements that must be fulfilled.") | |
| y_position -= 0.25 * inch | |
| # Delay Analysis | |
| c.setFont("Helvetica-Bold", 12) | |
| c.drawString(1 * inch, y_position, "Delay Analysis:") | |
| y_position -= 0.2 * inch | |
| c.setFont("Helvetica", 10) | |
| c.drawString(1.2 * inch, y_position, f"Total delay clauses found: {analysis_data['delay_count']}") | |
| y_position -= 0.2 * inch | |
| c.drawString(1.2 * inch, y_position, "Interpretation: Delay clauses specify timelines and consequences for delays.") | |
| y_position -= 0.3 * inch | |
| # Key Findings Section | |
| if y_position < 2 * inch: | |
| c.showPage() | |
| y_position = 10.5 * inch | |
| c.setFont("Helvetica-Bold", 14) | |
| c.drawString(1 * inch, y_position, "3. Key Findings") | |
| y_position -= 0.3 * inch | |
| c.setFont("Helvetica", 10) | |
| # Add key findings | |
| findings = [] | |
| if analysis_data['risk_level'] == "High": | |
| findings.append("โ ๏ธ High-risk contract requiring immediate legal review") | |
| if analysis_data['penalty_count'] > 5: | |
| findings.append(f"โ ๏ธ High number of penalty clauses ({analysis_data['penalty_count']})") | |
| if analysis_data['obligation_count'] > 10: | |
| findings.append(f"๐ Numerous obligations ({analysis_data['obligation_count']}) that may require tracking") | |
| if analysis_data['sentiment_score'] < 0.4: | |
| findings.append("๐ Contract language appears adversarial (low sentiment score)") | |
| if not findings: | |
| findings.append("โ No major red flags detected in initial analysis") | |
| for finding in findings: | |
| c.drawString(1 * inch, y_position, finding) | |
| y_position -= 0.25 * inch | |
| # Recommendations Section | |
| y_position -= 0.3 * inch | |
| c.setFont("Helvetica-Bold", 14) | |
| c.drawString(1 * inch, y_position, "4. Recommendations") | |
| y_position -= 0.3 * inch | |
| c.setFont("Helvetica", 10) | |
| recommendations = [] | |
| if analysis_data['risk_level'] == "High": | |
| recommendations.append("โข Engage legal counsel for comprehensive review") | |
| recommendations.append("โข Negotiate penalty clauses and liability terms") | |
| if analysis_data['penalty_count'] > 0: | |
| recommendations.append("โข Review all penalty clauses for fairness and applicability") | |
| if analysis_data['obligation_count'] > 10: | |
| recommendations.append("โข Create an obligation tracking system") | |
| if analysis_data['sentiment_score'] < 0.4: | |
| recommendations.append("โข Consider negotiating more balanced language") | |
| if not recommendations: | |
| recommendations.append("โข Standard contract review process sufficient") | |
| for rec in recommendations: | |
| c.drawString(1 * inch, y_position, rec) | |
| y_position -= 0.25 * inch | |
| # Footer | |
| c.setFont("Helvetica-Oblique", 8) | |
| c.drawString(1 * inch, 0.5 * inch, "Generated by Contract Risk Analyzer - Confidential") | |
| c.save() | |
| pdf_file.seek(0) | |
| logger.info("PDF report generated successfully") | |
| return pdf_file | |
| except Exception as e: | |
| logger.error(f"Error generating PDF report: {str(e)}") | |
| raise Exception(f"PDF generation failed: {str(e)}") | |
| def save_to_salesforce(sf: Salesforce, data: Dict) -> str: | |
| """Save analysis results to Salesforce, return record ID""" | |
| try: | |
| record = { | |
| 'Sentiment_Score__c': data['sentiment_score'], | |
| 'Risk_Score__c': data['risk_score'], | |
| 'Risk_Level__c': data['risk_level'], | |
| 'Record_Id__c': data['record_id'], | |
| 'Penalty_Examples__c': data['penalty_examples'][:131072], | |
| 'Penalty_Details__c': data['penalty_details'][:131072], | |
| 'Penalty_Amounts__c': data['penalty_amounts'][:131072], | |
| 'Obligation_Details__c': data['obligation_details'][:131072], | |
| 'Delay_Details__c': data['delay_details'][:131072] | |
| } | |
| result = sf.Custom_Risk_Analysis__c.create(record) | |
| logger.info(f"Successfully created Salesforce record: {result['id']}") | |
| return result['id'] | |
| except Exception as e: | |
| logger.error(f"Failed to save to Salesforce: {str(e)}") | |
| raise Exception(f"Salesforce record creation failed: {str(e)}") | |
| def extract_text_from_pdf(pdf_path: str) -> str: | |
| """Extract text from PDF using pdfplumber""" | |
| try: | |
| text = "" | |
| with pdfplumber.open(pdf_path) as pdf: | |
| for page in pdf.pages: | |
| page_text = page.extract_text() | |
| if page_text: | |
| text += page_text + "\n" # Add newline between pages | |
| return text | |
| except Exception as e: | |
| logger.error(f"PDF text extraction failed: {str(e)}") | |
| raise Exception(f"PDF text extraction failed: {str(e)}") | |
| def find_keyword_matches(text: str, keywords: List[str]) -> Dict[str, List[Dict[str, str]]]: | |
| """Find all matches for keywords in text with line numbers and context""" | |
| matches = {} | |
| lines = text.split('\n') | |
| for keyword in keywords: | |
| keyword_matches = [] | |
| pattern = re.compile(r'\b' + re.escape(keyword) + r'\b', flags=re.IGNORECASE) | |
| for line_num, line in enumerate(lines, 1): | |
| line_matches = pattern.finditer(line) | |
| for match in line_matches: | |
| start = max(0, match.start() - 20) | |
| end = min(len(line), match.end() + 20) | |
| context = line[start:end] | |
| # Highlight the matched keyword in the context | |
| highlighted_context = ( | |
| context[:match.start()-start] + | |
| f"<span class='keyword-match'>{context[match.start()-start:match.end()-start]}</span>" + | |
| context[match.end()-start:] | |
| ) | |
| keyword_matches.append({ | |
| 'line_number': line_num, | |
| 'full_line': line.strip(), | |
| 'context': highlighted_context, | |
| 'match': match.group() | |
| }) | |
| matches[keyword] = keyword_matches | |
| return matches | |
| def count_keywords_with_details(text: str, keywords: List[str]) -> Dict[str, Dict]: | |
| """Count keyword occurrences with detailed match information""" | |
| keyword_details = {} | |
| matches = find_keyword_matches(text, keywords) | |
| for keyword in keywords: | |
| keyword_matches = matches.get(keyword, []) | |
| keyword_details[keyword] = { | |
| 'count': len(keyword_matches), | |
| 'matches': keyword_matches | |
| } | |
| return keyword_details | |
| def find_penalty_values(text: str) -> List[float]: | |
| """Find penalty amounts in the text""" | |
| patterns = [ | |
| r'\$\s*[\d,]+(?:\.\d+)?', | |
| r'(?:USD|usd)\s*[\d,]+(?:\.\d+)?', | |
| r'\d+\s*(?:percent|%)', | |
| r'(?:\b[a-z]+\s*)+dollars', | |
| ] | |
| penalties = [] | |
| for pattern in patterns: | |
| matches = re.finditer(pattern, text, flags=re.IGNORECASE) | |
| for match in matches: | |
| penalty_text = match.group() | |
| try: | |
| if any(word in penalty_text.lower() for word in ['one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine', 'ten', 'hundred', 'thousand', 'million']): | |
| penalty_value = w2n.word_to_num(penalty_text.split('dollars')[0].strip()) | |
| else: | |
| penalty_value = float(re.sub(r'[^\d.]', '', penalty_text)) | |
| penalties.append(penalty_value) | |
| except: | |
| continue | |
| return penalties | |
| def calculate_risk_score(penalty_count: int, penalty_values: List[float], obligation_count: int, delay_count: int) -> Tuple[float, str]: | |
| """Calculate risk score based on various factors""" | |
| score = 0 | |
| score += min(penalty_count * 5, 30) | |
| if penalty_values: | |
| avg_penalty = sum(penalty_values) / len(penalty_values) | |
| if avg_penalty > 1000000: | |
| score += 40 | |
| elif avg_penalty > 100000: | |
| score += 25 | |
| elif avg_penalty > 10000: | |
| score += 15 | |
| else: | |
| score += 5 | |
| score += min(obligation_count * 2, 20) | |
| score += min(delay_count * 10, 30) | |
| score = min(score, 100) | |
| if score < 30: | |
| return score, "Low" | |
| elif score < 70: | |
| return score, "Medium" | |
| else: | |
| return score, "High" | |
| def generate_risk_meter(risk_score: float) -> str: | |
| """Generate a visual risk meter with indicator""" | |
| position = risk_score | |
| return f""" | |
| <div class="risk-meter"> | |
| <div class="risk-meter-indicator" style="left: {position}%"></div> | |
| </div> | |
| <div class="risk-meter-labels"> | |
| <span>Low (0-30)</span> | |
| <span>Medium (31-69)</span> | |
| <span>High (70-100)</span> | |
| </div> | |
| """ | |
| def generate_sentiment_meter(sentiment_score: float) -> str: | |
| """Generate a visual sentiment meter""" | |
| width = sentiment_score * 100 | |
| return f""" | |
| <div class="sentiment-meter"> | |
| <div class="sentiment-score" style="width: {width}%"></div> | |
| </div> | |
| <div style="display: flex; justify-content: space-between; margin-top: 5px;"> | |
| <span>Negative</span> | |
| <span>Neutral</span> | |
| <span>Positive</span> | |
| </div> | |
| """ | |
| def generate_heatmap(risk_level: str): | |
| """Generate a simple heatmap based on risk level""" | |
| try: | |
| fig, ax = plt.subplots(figsize=(8, 2)) | |
| if risk_level == "Low": | |
| cmap = plt.cm.Blues | |
| color = '#4CAF50' | |
| elif risk_level == "Medium": | |
| cmap = plt.cm.Oranges | |
| color = '#FF9800' | |
| else: | |
| cmap = plt.cm.Reds | |
| color = '#F44336' | |
| gradient = np.linspace(0, 1, 256).reshape(1, -1) | |
| gradient = np.vstack((gradient, gradient)) | |
| ax.imshow(gradient, aspect='auto', cmap=cmap) | |
| ax.text(128, 0.5, f"{risk_level} Risk", color='white' if risk_level in ["High", "Medium"] else 'black', | |
| ha='center', va='center', fontsize=24, fontweight="bold") | |
| ax.set_axis_off() | |
| plt.tight_layout() | |
| return fig | |
| except Exception as e: | |
| logger.error(f"Heatmap generation failed: {str(e)}") | |
| raise Exception(f"Heatmap generation failed: {str(e)}") | |
| def format_warning_message(count: int, item_type: str, emoji: str) -> str: | |
| """Format warning message based on count with appropriate color coding""" | |
| if count == 0: | |
| return f"""<div class="success-box">โ {emoji} No {item_type} clauses detected!</div>""" | |
| elif count < 3: | |
| return f"""<div class="info-box">๐ {emoji} {count} {item_type} clauses detected</div>""" | |
| elif count < 5: | |
| return f"""<div class="warning-box">โ {emoji} {count} {item_type} clauses detected!</div>""" | |
| else: | |
| return f"""<div class="danger-box">๐จ {emoji} {count} {item_type} clauses detected!</div>""" | |
| def format_clause_example(example: str, index: int) -> str: | |
| """Format a clause example with proper wrapping and styling""" | |
| wrapped_text = textwrap.fill(example, width=80) | |
| return f""" | |
| <div class="clause-example"> | |
| <span class="clause-number">{index}.</span> {wrapped_text} | |
| </div> | |
| """ | |
| def format_keyword_matches(matches: List[Dict[str, str]]) -> str: | |
| """Format keyword matches with line numbers and context""" | |
| if not matches: | |
| return "<div class='success-box'>โ No matches found for this keyword</div>" | |
| result = [] | |
| for i, match in enumerate(matches[:5], 1): # Limit to top 5 matches per keyword | |
| result.append(f""" | |
| <div class="match-detail"> | |
| <div><strong>Match {i}:</strong> Line {match['line_number']}</div> | |
| <div class="match-context">Context: {match['context']}</div> | |
| <div class="match-line">Full line: {match['full_line']}</div> | |
| </div> | |
| """) | |
| return "".join(result) | |
| def analyze_pdf(file_obj) -> List: | |
| """Main analysis function for Gradio interface""" | |
| try: | |
| if not file_obj: | |
| raise Exception("No PDF file uploaded. Please upload a valid PDF file.") | |
| try: | |
| sf = authenticate_salesforce() | |
| except Exception as e: | |
| raise Exception(f"Salesforce authentication failed: {str(e)}") | |
| try: | |
| text = extract_text_from_pdf(file_obj.name) | |
| if not text.strip(): | |
| raise Exception("No text extracted from PDF. It might be a scanned document.") | |
| except Exception as e: | |
| raise Exception(f"PDF text extraction failed: {str(e)}") | |
| try: | |
| sentiment_score = get_hugging_face_sentiment(text) | |
| except Exception as e: | |
| logger.warning(f"Sentiment analysis failed: {str(e)}. Using fallback score of 0.5.") | |
| sentiment_score = 0.5 | |
| penalty_keywords = ["penalty", "fine", "forfeit", "liquidated damages", "breach"] | |
| obligation_keywords = ["shall", "must", "required to", "obligated to", "duty"] | |
| delay_keywords = ["delay", "late", "overdue", "extension", "time is of the essence"] | |
| # Get detailed keyword matches with line numbers and context | |
| penalty_details = count_keywords_with_details(text, penalty_keywords) | |
| obligation_details = count_keywords_with_details(text, obligation_keywords) | |
| delay_details = count_keywords_with_details(text, delay_keywords) | |
| total_penalties = sum(details['count'] for details in penalty_details.values()) | |
| total_obligations = sum(details['count'] for details in obligation_details.values()) | |
| total_delays = sum(details['count'] for details in delay_details.values()) | |
| penalty_values = find_penalty_values(text) | |
| # Generate warning messages with emojis | |
| penalty_warning = format_warning_message(total_penalties, "penalty", "๐ฐ") | |
| obligation_warning = format_warning_message(total_obligations, "obligation", "๐") | |
| delay_warning = format_warning_message(total_delays, "delay", "โฑ") | |
| try: | |
| risk_score, risk_level = calculate_risk_score( | |
| total_penalties, penalty_values, total_obligations, total_delays | |
| ) | |
| except Exception as e: | |
| raise Exception(f"Risk score calculation failed: {str(e)}") | |
| try: | |
| heatmap = generate_heatmap(risk_level) | |
| risk_meter = generate_risk_meter(risk_score) | |
| sentiment_meter = generate_sentiment_meter(sentiment_score) | |
| except Exception as e: | |
| raise Exception(f"Visual generation failed: {str(e)}") | |
| # Format penalty details with match information | |
| penalty_html = [] | |
| for keyword, details in penalty_details.items(): | |
| penalty_html.append(f""" | |
| <div class='count-item'> | |
| <span class='count-label'><span style='color: var(--danger-color)'>โข</span> {keyword}</span> | |
| <span class='count-value'>{details['count']}</span> | |
| </div> | |
| {format_keyword_matches(details['matches'])} | |
| """) | |
| penalty_details_html = f""" | |
| {penalty_warning} | |
| <div class='penalty-box'> | |
| <div class='section-title'>๐ฐ Penalty Clause Details</div> | |
| {"".join(penalty_html)} | |
| </div> | |
| """ | |
| # Format obligation details with match information | |
| obligation_html = [] | |
| for keyword, details in obligation_details.items(): | |
| obligation_html.append(f""" | |
| <div class='count-item'> | |
| <span class='count-label'><span style='color: var(--warning-color)'>โข</span> {keyword}</span> | |
| <span class='count-value'>{details['count']}</span> | |
| </div> | |
| {format_keyword_matches(details['matches'])} | |
| """) | |
| obligation_details_html = f""" | |
| {obligation_warning} | |
| <div class='obligation-box'> | |
| <div class='section-title'>๐ Obligation Clause Details</div> | |
| {"".join(obligation_html)} | |
| </div> | |
| """ | |
| # Format delay details with match information | |
| delay_html = [] | |
| for keyword, details in delay_details.items(): | |
| delay_html.append(f""" | |
| <div class='count-item'> | |
| <span class='count-label'><span style='color: var(--info-color)'>โข</span> {keyword}</span> | |
| <span class='count-value'>{details['count']}</span> | |
| </div> | |
| {format_keyword_matches(details['matches'])} | |
| """) | |
| delay_details_html = f""" | |
| {delay_warning} | |
| <div class='delay-box'> | |
| <div class='section-title'>โฑ Delay Clause Details</div> | |
| {"".join(delay_html)} | |
| </div> | |
| """ | |
| penalty_amounts = "\n".join([f"<div class='count-item'><span class='count-label'>๐ฐ Amount</span><span class='count-value'>${amt:,.2f}</span></div>" for amt in penalty_values[:5]]) if penalty_values else "<div class='success-box'>โ No penalties found!</div>" | |
| penalty_sentences = [] | |
| for sentence in re.split(r'(?<=[.!?])\s+', text): | |
| if any(kw.lower() in sentence.lower() for kw in penalty_keywords): | |
| penalty_sentences.append(sentence.strip()) | |
| extracted_data = "\n".join([format_clause_example(sent, i+1) for i, sent in enumerate(penalty_sentences[:3])]) if penalty_sentences else "<div class='success-box'>โ No penalty clauses found!</div>" | |
| record_id = str(uuid.uuid4()) | |
| sf_data = { | |
| 'sentiment_score': sentiment_score, | |
| 'risk_score': risk_score, | |
| 'risk_level': risk_level, | |
| 'record_id': record_id, | |
| 'penalty_examples': extracted_data, | |
| 'penalty_details': "\n".join([f"{kw}: {details['count']} matches" for kw, details in penalty_details.items()]), | |
| 'penalty_amounts': "\n".join([f"${amt:,.2f}" for amt in penalty_values[:5]]) if penalty_values else "", | |
| 'obligation_details': "\n".join([f"{kw}: {details['count']} matches" for kw, details in obligation_details.items()]), | |
| 'delay_details': "\n".join([f"{kw}: {details['count']} matches" for kw, details in delay_details.items()]) | |
| } | |
| try: | |
| salesforce_id = save_to_salesforce(sf, sf_data) | |
| logger.info(f"Saved to Salesforce with Record ID: {salesforce_id}") | |
| except Exception as e: | |
| logger.error(f"Salesforce record creation failed: {str(e)}") | |
| salesforce_id = "N/A" | |
| # Prepare data for PDF report | |
| analysis_data = { | |
| 'document_name': os.path.basename(file_obj.name), | |
| 'sentiment_score': sentiment_score, | |
| 'risk_score': risk_score, | |
| 'risk_level': risk_level, | |
| 'penalty_count': total_penalties, | |
| 'penalty_values': penalty_values, | |
| 'obligation_count': total_obligations, | |
| 'delay_count': total_delays, | |
| 'record_id': record_id | |
| } | |
| try: | |
| pdf_buffer = generate_analysis_pdf(analysis_data) | |
| if pdf_buffer is None: | |
| raise Exception("Failed to generate PDF") | |
| # Save to a temporary file for Gradio to serve | |
| with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_file: | |
| temp_file.write(pdf_buffer.getvalue()) | |
| temp_file_path = temp_file.name | |
| except Exception as e: | |
| logger.error(f"PDF generation failed: {str(e)}") | |
| temp_file_path = None | |
| box_class = "success-box" if risk_level == "Low" else "warning-box" if risk_level == "Medium" else "danger-box" | |
| risk_icon = "โ " if risk_level == "Low" else "โ " if risk_level == "Medium" else "๐จ" | |
| risk_advice = { | |
| "Low": "This contract appears to be low risk. Standard review recommended.", | |
| "Medium": "This contract has moderate risk. Careful review advised.", | |
| "High": "This contract is high risk! Immediate legal review required." | |
| } | |
| # Sentiment analysis output with PDF download prompt | |
| sentiment_analysis_output = f""" | |
| <div class='result-box'> | |
| <div class='section-title'>๐ Sentiment Analysis</div> | |
| <div class='risk-row'> | |
| <span class='risk-label'>Sentiment Score</span> | |
| <span class='risk-score'>{sentiment_score:.2f}</span> | |
| </div> | |
| {sentiment_meter} | |
| <div style='margin-top: 15px;'> | |
| <strong>Interpretation:</strong> { | |
| "Positive (favorable language)" if sentiment_score > 0.6 else | |
| "Negative (adversarial language)" if sentiment_score < 0.4 else | |
| "Neutral (balanced language)" | |
| } | |
| </div> | |
| <div style='margin-top: 10px;'> | |
| <strong>Full Report:</strong> Available for download below | |
| </div> | |
| </div> | |
| """ | |
| return [ | |
| f""" | |
| <div class='result-box'> | |
| <div class='section-title'>{risk_icon} Contract Risk Summary</div> | |
| <div class='risk-row'> | |
| <span class='risk-label'>Overall Risk Score</span> | |
| <span class='risk-score risk-{risk_level.lower()}'>{risk_score:.1f}/100</span> | |
| </div> | |
| {risk_meter} | |
| <div style='margin-top: 15px; font-size: 16px;'> | |
| <strong>Assessment:</strong> {risk_advice[risk_level]} | |
| </div> | |
| </div> | |
| """, | |
| "", # Empty string for hidden risk visualization | |
| penalty_details_html, | |
| f"<div class='penalty-box'><div class='section-title'>๐ฐ Penalty Amounts Found</div>{penalty_amounts}</div>", | |
| obligation_details_html, | |
| delay_details_html, | |
| f"<div class='result-box'><div class='section-title'>๐ Extracted Data</div>{extracted_data}</div>", | |
| sentiment_analysis_output, | |
| temp_file_path # Return temporary file path for PDF download | |
| ] | |
| except Exception as e: | |
| logger.error(f"Analysis failed: {str(e)}") | |
| error_message = f""" | |
| <div class='danger-box'> | |
| <div style='display: flex; align-items: center; gap: 10px;'> | |
| <span style='font-size: 24px;'>โ</span> | |
| <span style='font-size: 18px; font-weight: bold;'>Analysis Error</span> | |
| </div> | |
| <div style='margin-top: 10px;'>{str(e)}</div> | |
| <div style='margin-top: 15px; font-size: 14px;'> | |
| Please ensure you've uploaded a valid PDF document with selectable text. | |
| </div> | |
| </div> | |
| """ | |
| return [error_message] * 9 | |
| # Create Gradio interface with dark mode compatibility | |
| with gr.Blocks(css=css, title="PDF Contract Risk Analyzer", theme=gr.themes.Default(primary_hue="blue")) as demo: | |
| gr.Markdown(""" | |
| <div style='text-align: center; margin-bottom: 30px;'> | |
| <h1 style='color: var(--primary-color); margin-bottom: 10px;'>PDF Contract Analysis</h1> | |
| <p style='color: var(--secondary-color); font-size: 16px;'> | |
| Upload a contract PDF to analyze risks, obligations, and sentiment. | |
| </p> | |
| </div> | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| file_input = gr.File( | |
| label="Upload Contract PDF", | |
| file_types=[".pdf"], | |
| elem_classes="upload-area" | |
| ) | |
| gr.Markdown(""" | |
| <div class='file-info'> | |
| Drag and drop your contract PDF file here. | |
| </div> | |
| """) | |
| submit_btn = gr.Button("Analyze", variant="primary") | |
| with gr.Column(scale=3): | |
| risk_summary = gr.HTML(label="Contract Risk Summary") | |
| risk_visualization = gr.HTML(label="Risk Visualization", visible=False, elem_id="risk-visualization") | |
| with gr.Row(): | |
| with gr.Column(): | |
| penalty_count = gr.HTML(label="Penalty Clauses Analysis") | |
| penalty_amounts = gr.HTML(label="Penalty Amounts Found") | |
| with gr.Column(): | |
| obligation_count = gr.HTML(label="Obligation Clauses Analysis") | |
| with gr.Column(): | |
| delay_count = gr.HTML(label="Delay Clauses Analysis") | |
| with gr.Row(): | |
| extracted_data = gr.HTML(label="Extracted Data") | |
| with gr.Row(): | |
| sentiment_analysis = gr.HTML(label="Sentiment Analysis") | |
| pdf_output = gr.File(label="Download Full Analysis Report (PDF)", file_types=[".pdf"]) | |
| submit_btn.click( | |
| fn=analyze_pdf, | |
| inputs=[file_input], | |
| outputs=[ | |
| risk_summary, risk_visualization, | |
| penalty_count, penalty_amounts, | |
| obligation_count, delay_count, | |
| extracted_data, sentiment_analysis, pdf_output | |
| ] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |