Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import pdfplumber | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| from word2number import w2n | |
| import re | |
| from typing import Tuple, List, Dict | |
| from simple_salesforce import Salesforce | |
| import os | |
| import base64 | |
| from io import BytesIO | |
| import uuid | |
| import logging | |
| import textwrap | |
| import tempfile | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Custom CSS for styling with dark mode compatibility | |
| css = """ | |
| :root { | |
| --primary-color: #1e90ff; | |
| --secondary-color: #4169e1; | |
| --text-color: #2c3e50; | |
| --bg-color: #ffffff; | |
| --box-bg: #ffffff; | |
| --border-color: #e0e0e0; | |
| --success-color: #28a745; | |
| --warning-color: #ff9800; | |
| --danger-color: #f44336; | |
| --info-color: #2196F3; | |
| } | |
| .dark { | |
| --primary-color: #4a89dc; | |
| --secondary-color: #3b7dd8; | |
| --text-color: #f0f0f0; | |
| --bg-color: #1e1e1e; | |
| --box-bg: #2d2d2d; | |
| --border-color: #444; | |
| --success-color: #4CAF50; | |
| --warning-color: #FFC107; | |
| --danger-color: #F44336; | |
| --info-color: #2196F3; | |
| } | |
| body { | |
| background-image: url('https://images.unsplash.com/photo-1604147706283-d7119b5b822c?ixlib=rb-1.2.1&auto=format&fit=crop&w=1920&q=80'); | |
| background-size: cover; | |
| background-position: center; | |
| background-attachment: fixed; | |
| background-repeat: no-repeat; | |
| min-height: 100vh; | |
| margin: 0; | |
| padding: 0; | |
| font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; | |
| color: var(--text-color); | |
| } | |
| .gradio-container { | |
| background-color: rgba(var(--bg-color), 0.97) !important; | |
| border-radius: 15px; | |
| padding: 25px; | |
| margin: 20px auto; | |
| max-width: 1200px; | |
| box-shadow: 0 8px 24px rgba(0,0,0,0.12); | |
| min-height: 90vh; | |
| border: 1px solid var(--primary-color) !important; | |
| } | |
| .risk-low { color: var(--success-color); font-weight: bold; } | |
| .risk-medium { color: var(--warning-color); font-weight: bold; } | |
| .risk-high { color: var(--danger-color); font-weight: bold; } | |
| .result-box { | |
| padding: 20px; | |
| border-radius: 10px; | |
| margin-bottom: 25px; | |
| background-color: var(--box-bg); | |
| box-shadow: 0 2px 8px rgba(0,0,0,0.08); | |
| border-left: 5px solid var(--primary-color) !important; | |
| color: var(--text-color); | |
| } | |
| .penalty-box { | |
| padding: 20px; | |
| border-radius: 10px; | |
| margin-bottom: 20px; | |
| border-left: 5px solid var(--danger-color); | |
| background-color: var(--box-bg); | |
| box-shadow: 0 2px 8px rgba(0,0,0,0.08); | |
| } | |
| .obligation-box { | |
| padding: 20px; | |
| border-radius: 10px; | |
| margin-bottom: 20px; | |
| border-left: 5px solid var(--warning-color); | |
| background-color: var(--box-bg); | |
| box-shadow: 0 2px 8px rgba(0,0,0,0.08); | |
| } | |
| .delay-box { | |
| padding: 20px; | |
| border-radius: 10px; | |
| margin-bottom: 20px; | |
| border-left: 5px solid var(--info-color); | |
| background-color: var(--box-bg); | |
| box-shadow: 0 2px 8px rgba(0,0,0,0.08); | |
| } | |
| .combined-risk-container { | |
| display: flex; | |
| flex-direction: column; | |
| gap: 15px; | |
| margin-bottom: 25px; | |
| background-color: var(--box-bg); | |
| padding: 20px; | |
| border-radius: 10px; | |
| box-shadow: 0 2px 8px rgba(0,0,0,0.08); | |
| } | |
| .risk-row { | |
| display: flex; | |
| align-items: center; | |
| gap: 20px; | |
| padding: 15px; | |
| border-radius: 8px; | |
| background-color: rgba(0,0,0,0.05); | |
| border: 1px solid var(--border-color) !important; | |
| transition: all 0.3s ease; | |
| } | |
| .risk-row:hover { | |
| transform: translateY(-2px); | |
| box-shadow: 0 4px 12px rgba(0,0,0,0.1); | |
| } | |
| .risk-label { | |
| width: 150px; | |
| font-weight: 600; | |
| font-size: 16px; | |
| color: var(--text-color) !important; | |
| } | |
| .risk-score { | |
| width: 120px; | |
| font-size: 20px; | |
| text-align: center; | |
| padding: 8px 12px; | |
| border-radius: 6px; | |
| } | |
| .warning-box { | |
| padding: 18px; | |
| border-radius: 8px; | |
| margin: 15px 0; | |
| background-color: rgba(255, 243, 205, 0.3); | |
| border-left: 5px solid var(--warning-color); | |
| font-weight: 600; | |
| box-shadow: 0 2px 8px rgba(0,0,0,0.05); | |
| } | |
| .danger-box { | |
| padding: 18px; | |
| border-radius: 8px; | |
| margin: 15px 0; | |
| background-color: rgba(248, 215, 218, 0.3); | |
| border-left: 5px solid var(--danger-color); | |
| font-weight: 600; | |
| box-shadow: 0 2px 8px rgba(0,0,0,0.05); | |
| } | |
| .success-box { | |
| padding: 18px; | |
| border-radius: 8px; | |
| margin: 15px 0; | |
| background-color: rgba(212, 237, 218, 0.3); | |
| border-left: 5px solid var(--success-color); | |
| font-weight: 600; | |
| box-shadow: 0 2px 8px rgba(0,0,0,0.05); | |
| } | |
| .info-box { | |
| padding: 18px; | |
| border-radius: 8px; | |
| margin: 15px 0; | |
| background-color: rgba(227, 242, 253, 0.3); | |
| border-left: 5px solid var(--info-color); | |
| font-weight: 600; | |
| box-shadow: 0 2px 8px rgba(0,0,0,0.05); | |
| } | |
| .section-title { | |
| font-size: 22px; | |
| font-weight: 700; | |
| margin-bottom: 18px; | |
| color: var(--primary-color) !important; | |
| display: flex; | |
| align-items: center; | |
| gap: 10px; | |
| } | |
| .count-item { | |
| display: flex; | |
| justify-content: space-between; | |
| padding: 12px 0; | |
| border-bottom: 1px solid var(--border-color) !important; | |
| transition: all 0.2s ease; | |
| } | |
| .count-item:hover { | |
| background-color: rgba(0,0,0,0.05); | |
| transform: translateX(5px); | |
| } | |
| .count-label { | |
| font-weight: 600; | |
| color: var(--text-color) !important; | |
| display: flex; | |
| align-items: center; | |
| gap: 8px; | |
| } | |
| .count-value { | |
| color: var(--secondary-color) !important; | |
| font-weight: 600; | |
| font-size: 16px; | |
| } | |
| button { | |
| background: linear-gradient(135deg, var(--primary-color), var(--secondary-color)) !important; | |
| border: none !important; | |
| color: white !important; | |
| font-weight: 600 !important; | |
| padding: 12px 24px !important; | |
| border-radius: 8px !important; | |
| transition: all 0.3s ease !important; | |
| box-shadow: 0 4px 6px rgba(0,0,0,0.1) !important; | |
| } | |
| button:hover { | |
| background: linear-gradient(135deg, var(--secondary-color), var(--primary-color)) !important; | |
| transform: translateY(-2px) !important; | |
| box-shadow: 0 6px 12px rgba(0,0,0,0.15) !important; | |
| } | |
| .upload-area { | |
| border: 2px dashed var(--primary-color) !important; | |
| background-color: rgba(240, 248, 255, 0.3) !important; | |
| border-radius: 10px !important; | |
| padding: 30px !important; | |
| transition: all 0.3s ease !important; | |
| } | |
| .upload-area:hover { | |
| background-color: rgba(224, 255, 255, 0.3) !important; | |
| border-color: var(--secondary-color) !important; | |
| } | |
| .risk-meter { | |
| width: 100%; | |
| height: 20px; | |
| background: linear-gradient(90deg, var(--success-color), var(--warning-color), var(--danger-color)); | |
| border-radius: 10px; | |
| margin: 15px 0; | |
| position: relative; | |
| } | |
| .risk-meter-indicator { | |
| position: absolute; | |
| top: -5px; | |
| width: 3px; | |
| height: 30px; | |
| background-color: var(--text-color); | |
| transform: translateX(-50%); | |
| } | |
| .risk-meter-labels { | |
| display: flex; | |
| justify-content: space-between; | |
| margin-top: 5px; | |
| font-size: 12px; | |
| color: var(--text-color); | |
| } | |
| .clause-example { | |
| background-color: rgba(0,0,0,0.05); | |
| padding: 15px; | |
| border-radius: 8px; | |
| margin-bottom: 10px; | |
| border-left: 3px solid var(--primary-color); | |
| font-family: 'Courier New', monospace; | |
| line-height: 1.5; | |
| color: var(--text-color); | |
| } | |
| .clause-number { | |
| font-weight: bold; | |
| color: var(--primary-color); | |
| margin-right: 8px; | |
| } | |
| .sentiment-meter { | |
| width: 100%; | |
| height: 20px; | |
| background: linear-gradient(90deg, var(--danger-color), var(--warning-color), var(--success-color)); | |
| border-radius: 10px; | |
| margin: 15px 0; | |
| } | |
| .sentiment-score { | |
| height: 100%; | |
| border-radius: 10px; | |
| background-color: rgba(255,255,255,0.3); | |
| } | |
| /* Hide elements */ | |
| footer, .gradio-footer, .hide, [data-testid="Use via API"], [data-testid="mmsettings"], | |
| #sentiment-analysis, #risk-visualization { | |
| display: none !important; | |
| visibility: hidden !important; | |
| height: 0 !important; | |
| width: 0 !important; | |
| padding: 0 !important; | |
| margin: 0 !important; | |
| } | |
| .file-info { | |
| margin-top: -15px; | |
| margin-bottom: 15px; | |
| color: var(--text-color); | |
| font-size: 13px; | |
| } | |
| /* Dark mode specific adjustments */ | |
| .dark .clause-example { | |
| background-color: rgba(255,255,255,0.05); | |
| } | |
| .dark .risk-row { | |
| background-color: rgba(255,255,255,0.05); | |
| } | |
| .dark .count-item:hover { | |
| background-color: rgba(255,255,255,0.05); | |
| } | |
| """ | |
| # Salesforce credentials | |
| SF_USERNAME = "Kushalpavansekharm503@agentforce.com" | |
| SF_PASSWORD = "Kushal@123" | |
| SF_TOKEN = "WwUIFWBVUjeKn9VPKyWJmawY0" | |
| def authenticate_salesforce() -> Salesforce: | |
| """Authenticate with Salesforce and return a Salesforce client""" | |
| try: | |
| sf = Salesforce( | |
| username=SF_USERNAME, | |
| password=SF_PASSWORD, | |
| security_token=SF_TOKEN | |
| ) | |
| logger.info("Successfully authenticated with Salesforce") | |
| return sf | |
| except Exception as e: | |
| logger.error(f"Failed to authenticate with Salesforce: {str(e)}") | |
| raise Exception(f"Salesforce authentication failed: {str(e)}") | |
| def upload_pdf_to_salesforce(pdf_report: BytesIO, project_title: str, record_id: str) -> Tuple[str, str]: | |
| """Upload PDF to Salesforce as a ContentVersion""" | |
| try: | |
| sf = authenticate_salesforce() | |
| # Prepare the file data | |
| pdf_content = pdf_report.getvalue() | |
| encoded_content = base64.b64encode(pdf_content).decode('utf-8') | |
| # Create ContentVersion record | |
| content_version = { | |
| 'Title': f"{project_title} Risk Analysis Report", | |
| 'PathOnClient': f"{project_title}_report.pdf", | |
| 'VersionData': encoded_content, | |
| 'FirstPublishLocationId': record_id, | |
| 'ReasonForChange': 'Initial upload' | |
| } | |
| result = sf.ContentVersion.create(content_version) | |
| if not result['success']: | |
| logger.error(f"Failed to upload PDF to Salesforce: {result}") | |
| return None, None | |
| # Get the ContentDocumentId and URL | |
| content_version_id = result['id'] | |
| content_doc = sf.ContentVersion.get(content_version_id) | |
| content_document_id = content_doc['ContentDocumentId'] | |
| # Construct the URL to the document | |
| org_domain = sf.sf_instance | |
| pdf_url = f"https://{org_domain}/lightning/r/ContentDocument/{content_document_id}/view" | |
| logger.info(f"Successfully uploaded PDF to Salesforce. ContentDocumentId: {content_document_id}") | |
| return content_document_id, pdf_url | |
| except Exception as e: | |
| logger.error(f"Error uploading PDF to Salesforce: {str(e)}") | |
| return None, None | |
| def get_hugging_face_sentiment(text: str) -> float: | |
| """Get sentiment score using Hugging Face model""" | |
| try: | |
| from transformers import pipeline | |
| classifier = pipeline("sentiment-analysis", model="distilbert-base-uncased-finetuned-sst-2-english") | |
| result = classifier(text[:512])[0] | |
| score = result['score'] if result['label'] == 'POSITIVE' else 1 - result['score'] | |
| return round(score, 2) | |
| except Exception as e: | |
| logger.error(f"Hugging Face sentiment analysis failed: {str(e)}. Using fallback score.") | |
| return 0.5 | |
| def extract_text_from_pdf(pdf_path: str) -> str: | |
| """Extract text from PDF using pdfplumber""" | |
| try: | |
| text = "" | |
| with pdfplumber.open(pdf_path) as pdf: | |
| for page in pdf.pages: | |
| page_text = page.extract_text() | |
| if page_text: | |
| text += page_text | |
| return text | |
| except Exception as e: | |
| logger.error(f"PDF text extraction failed: {str(e)}") | |
| raise Exception(f"PDF text extraction failed: {str(e)}") | |
| def count_keywords(text: str, keywords: List[str]) -> Dict[str, int]: | |
| """Count occurrences of keywords in text""" | |
| counts = {} | |
| for keyword in keywords: | |
| counts[keyword] = len(re.findall(r'\b' + re.escape(keyword) + r'\b', text, flags=re.IGNORECASE)) | |
| return counts | |
| def find_penalty_values(text: str) -> List[float]: | |
| """Find penalty amounts in the text""" | |
| patterns = [ | |
| r'\$\s*[\d,]+(?:\.\d+)?', | |
| r'(?:USD|usd)\s*[\d,]+(?:\.\d+)?', | |
| r'\d+\s*(?:percent|%)', | |
| r'(?:\b[a-z]+\s*)+dollars', | |
| ] | |
| penalties = [] | |
| for pattern in patterns: | |
| matches = re.finditer(pattern, text, flags=re.IGNORECASE) | |
| for match in matches: | |
| penalty_text = match.group() | |
| try: | |
| if any(word in penalty_text.lower() for word in ['one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine', 'ten', 'hundred', 'thousand', 'million']): | |
| penalty_value = w2n.word_to_num(penalty_text.split('dollars')[0].strip()) | |
| else: | |
| penalty_value = float(re.sub(r'[^\d.]', '', penalty_text)) | |
| penalties.append(penalty_value) | |
| except: | |
| continue | |
| return penalties | |
| def calculate_risk_score(penalty_count: int, penalty_values: List[float], obligation_count: int, delay_count: int) -> Tuple[float, str]: | |
| """Calculate risk score based on various factors""" | |
| score = 0 | |
| score += min(penalty_count * 5, 30) | |
| if penalty_values: | |
| avg_penalty = sum(penalty_values) / len(penalty_values) | |
| if avg_penalty > 1000000: | |
| score += 40 | |
| elif avg_penalty > 100000: | |
| score += 25 | |
| elif avg_penalty > 10000: | |
| score += 15 | |
| else: | |
| score += 5 | |
| score += min(obligation_count * 2, 20) | |
| score += min(delay_count * 10, 30) | |
| score = min(score, 100) | |
| if score < 30: | |
| return score, "Low" | |
| elif score < 70: | |
| return score, "Medium" | |
| else: | |
| return score, "High" | |
| def generate_risk_meter(risk_score: float) -> str: | |
| """Generate a visual risk meter with indicator""" | |
| position = risk_score | |
| return f""" | |
| <div class="risk-meter"> | |
| <div class="risk-meter-indicator" style="left: {position}%"></div> | |
| </div> | |
| <div class="risk-meter-labels"> | |
| <span>Low (0-30)</span> | |
| <span>Medium (31-69)</span> | |
| <span>High (70-100)</span> | |
| </div> | |
| """ | |
| def generate_sentiment_meter(sentiment_score: float) -> str: | |
| """Generate a visual sentiment meter""" | |
| width = sentiment_score * 100 | |
| return f""" | |
| <div class="sentiment-meter"> | |
| <div class="sentiment-score" style="width: {width}%"></div> | |
| </div> | |
| <div style="display: flex; justify-content: space-between; margin-top: 5px;"> | |
| <span>Negative</span> | |
| <span>Neutral</span> | |
| <span>Positive</span> | |
| </div> | |
| """ | |
| def generate_heatmap(risk_level: str): | |
| """Generate a simple heatmap based on risk level""" | |
| try: | |
| fig, ax = plt.subplots(figsize=(8, 2)) | |
| if risk_level == "Low": | |
| cmap = plt.cm.Blues | |
| color = '#4CAF50' | |
| elif risk_level == "Medium": | |
| cmap = plt.cm.Oranges | |
| color = '#FF9800' | |
| else: | |
| cmap = plt.cm.Reds | |
| color = '#F44336' | |
| gradient = np.linspace(0, 1, 256).reshape(1, -1) | |
| gradient = np.vstack((gradient, gradient)) | |
| ax.imshow(gradient, aspect='auto', cmap=cmap) | |
| ax.text(128, 0.5, f"{risk_level} Risk", color='white' if risk_level in ["High", "Medium"] else 'black', | |
| ha='center', va='center', fontsize=24, fontweight='bold') | |
| ax.set_axis_off() | |
| plt.tight_layout() | |
| return fig | |
| except Exception as e: | |
| logger.error(f"Heatmap generation failed: {str(e)}") | |
| raise Exception(f"Heatmap generation failed: {str(e)}") | |
| def save_to_salesforce(sf: Salesforce, data: Dict): | |
| """Save analysis results to Salesforce Custom_Risk_Analysis__c object""" | |
| try: | |
| record = { | |
| 'Sentiment_Score__c': data['sentiment_score'], | |
| 'Risk_Score__c': data['risk_score'], | |
| 'Risk_Level__c': data['risk_level'], | |
| 'Record_Id__c': data['record_id'], | |
| 'Penalty_Examples__c': data['penalty_examples'][:131072], | |
| 'Penalty_Details__c': data['penalty_details'][:131072], | |
| 'Penalty_Amounts__c': data['penalty_amounts'][:131072], | |
| 'Obligation_Details__c': data['obligation_details'][:131072], | |
| 'Delay_Details__c': data['delay_details'][:131072], | |
| 'PDF_URL__c': data['pdf_url'] | |
| } | |
| result = sf.Custom_Risk_Analysis__c.create(record) | |
| logger.info(f"Successfully created Salesforce record: {result['id']}") | |
| return result['id'] | |
| except Exception as e: | |
| logger.error(f"Failed to save to Salesforce: {str(e)}") | |
| raise Exception(f"Salesforce record creation failed: {str(e)}") | |
| def format_warning_message(count: int, item_type: str, emoji: str) -> str: | |
| """Format warning message based on count with appropriate color coding""" | |
| if count == 0: | |
| return f"""<div class="success-box">β {emoji} No {item_type} clauses detected - This is excellent!</div>""" | |
| elif count < 3: | |
| return f"""<div class="info-box">βΉοΈ {emoji} {count} {item_type} clauses detected - Standard contract language</div>""" | |
| elif count < 5: | |
| return f"""<div class="warning-box">β οΈ {emoji} {count} {item_type} clauses detected - Review recommended</div>""" | |
| else: | |
| return f"""<div class="danger-box">π¨ {emoji} {count} {item_type} clauses detected - High Risk! Needs immediate attention</div>""" | |
| def format_clause_example(example: str, index: int) -> str: | |
| """Format a clause example with proper wrapping and styling""" | |
| wrapped_text = textwrap.fill(example, width=80) | |
| return f""" | |
| <div class="clause-example"> | |
| <span class="clause-number">{index}.</span> {wrapped_text} | |
| </div> | |
| """ | |
| def analyze_pdf(file_obj) -> List: | |
| """Main analysis function for Gradio interface""" | |
| # Initialize variables that might be used in error handling | |
| pdf_url = "Not uploaded" | |
| project_title = "Unknown" | |
| record_id = str(uuid.uuid4()) | |
| try: | |
| if not file_obj: | |
| raise Exception("No PDF file uploaded. Please upload a valid PDF file.") | |
| try: | |
| sf = authenticate_salesforce() | |
| except Exception as e: | |
| raise Exception(f"Salesforce authentication failed: {str(e)}") | |
| try: | |
| text = extract_text_from_pdf(file_obj.name) | |
| if not text.strip(): | |
| raise Exception("No text extracted from PDF. It might be a scanned document.") | |
| except Exception as e: | |
| raise Exception(f"PDF text extraction failed: {str(e)}") | |
| try: | |
| sentiment_score = get_hugging_face_sentiment(text) | |
| except Exception as e: | |
| logger.warning(f"Sentiment analysis failed: {str(e)}. Using fallback score of 0.5.") | |
| sentiment_score = 0.5 | |
| penalty_keywords = ["penalty", "fine", "forfeit", "liquidated damages", "breach"] | |
| obligation_keywords = ["shall", "must", "required to", "obligated to", "duty"] | |
| delay_keywords = ["delay", "late", "overdue", "extension", "time is of the essence"] | |
| penalty_counts = count_keywords(text, penalty_keywords) | |
| obligation_counts = count_keywords(text, obligation_keywords) | |
| delay_counts = count_keywords(text, delay_keywords) | |
| penalty_values = find_penalty_values(text) | |
| total_penalties = sum(penalty_counts.values()) | |
| total_obligations = sum(obligation_counts.values()) | |
| total_delays = sum(delay_counts.values()) | |
| # Generate warning messages with emojis | |
| penalty_warning = format_warning_message(total_penalties, "penalty", "π°") | |
| obligation_warning = format_warning_message(total_obligations, "obligation", "π") | |
| delay_warning = format_warning_message(total_delays, "delay", "β±οΈ") | |
| try: | |
| risk_score, risk_level = calculate_risk_score( | |
| total_penalties, penalty_values, total_obligations, total_delays | |
| ) | |
| except Exception as e: | |
| raise Exception(f"Risk score calculation failed: {str(e)}") | |
| try: | |
| heatmap = generate_heatmap(risk_level) | |
| risk_meter = generate_risk_meter(risk_score) | |
| sentiment_meter = generate_sentiment_meter(sentiment_score) | |
| except Exception as e: | |
| raise Exception(f"Visual generation failed: {str(e)}") | |
| # Format details with warning messages and emojis | |
| penalty_details = f""" | |
| {penalty_warning} | |
| <div class='penalty-box'> | |
| <div class='section-title'>π° Penalty Clause Details</div> | |
| {"".join([f"<div class='count-item'><span class='count-label'><span style='color: var(--danger-color)'>β’</span> {kw}</span><span class='count-value'>{count}</span></div>" for kw, count in penalty_counts.items()])} | |
| </div> | |
| """ | |
| obligation_details = f""" | |
| {obligation_warning} | |
| <div class='obligation-box'> | |
| <div class='section-title'>π Obligation Clause Details</div> | |
| {"".join([f"<div class='count-item'><span class='count-label'><span style='color: var(--warning-color)'>β’</span> {kw}</span><span class='count-value'>{count}</span></div>" for kw, count in obligation_counts.items()])} | |
| </div> | |
| """ | |
| delay_details = f""" | |
| {delay_warning} | |
| <div class='delay-box'> | |
| <div class='section-title'>β±οΈ Delay Clause Details</div> | |
| {"".join([f"<div class='count-item'><span class='count-label'><span style='color: var(--info-color)'>β’</span> {kw}</span><span class='count-value'>{count}</span></div>" for kw, count in delay_counts.items()])} | |
| </div> | |
| """ | |
| penalty_amounts = "\n".join([f"<div class='count-item'><span class='count-label'>π° Amount</span><span class='count-value'>${amt:,.2f}</span></div>" for amt in penalty_values[:5]]) if penalty_values else "<div class='success-box'>β No specific penalty amounts found - This is good news!</div>" | |
| penalty_sentences = [] | |
| for sentence in re.split(r'(?<=[.!?])\s+', text): | |
| if any(kw.lower() in sentence.lower() for kw in penalty_keywords): | |
| penalty_sentences.append(sentence.strip()) | |
| extracted_data = "\n".join([format_clause_example(sent, i+1) for i, sent in enumerate(penalty_sentences[:3])]) if penalty_sentences else "<div class='success-box'>β No penalty clauses found - Excellent contract terms!</div>" | |
| project_title = os.path.splitext(os.path.basename(file_obj.name))[0] | |
| # Generate PDF report | |
| pdf_report = None | |
| try: | |
| from fpdf import FPDF | |
| pdf = FPDF() | |
| pdf.add_page() | |
| pdf.set_font("Arial", size=12) | |
| # Add report content to PDF | |
| pdf.cell(200, 10, txt=f"Contract Risk Analysis Report: {project_title}", ln=1, align='C') | |
| pdf.ln(10) | |
| pdf.cell(200, 10, txt=f"Risk Score: {risk_score:.1f}/100 ({risk_level} Risk)", ln=1) | |
| pdf.ln(5) | |
| pdf.cell(200, 10, txt=f"Sentiment Score: {sentiment_score:.2f}", ln=1) | |
| pdf.ln(5) | |
| # Add summary data | |
| pdf.cell(200, 10, txt="Summary Findings:", ln=1) | |
| pdf.cell(200, 10, txt=f"- Total Penalty Clauses: {total_penalties}", ln=1) | |
| pdf.cell(200, 10, txt=f"- Total Obligation Clauses: {total_obligations}", ln=1) | |
| pdf.cell(200, 10, txt=f"- Total Delay Clauses: {total_delays}", ln=1) | |
| # Save to BytesIO | |
| pdf_output = BytesIO() | |
| pdf_output.write(pdf.output(dest='S').encode('latin1')) | |
| pdf_report = pdf_output | |
| except Exception as e: | |
| logger.error(f"PDF report generation failed: {str(e)}") | |
| pdf_report = None | |
| # Upload PDF to Salesforce if generated | |
| if pdf_report: | |
| pdf_content_id, pdf_url = upload_pdf_to_salesforce(pdf_report, project_title, record_id) | |
| if not pdf_content_id: | |
| logger.warning("Failed to upload PDF to Salesforce") | |
| pdf_url = "Not uploaded" | |
| sf_data = { | |
| 'sentiment_score': sentiment_score, | |
| 'risk_score': risk_score, | |
| 'risk_level': risk_level, | |
| 'record_id': record_id, | |
| 'penalty_examples': extracted_data, | |
| 'penalty_details': "\n".join([f"{kw}: {count}" for kw, count in penalty_counts.items()]), | |
| 'penalty_amounts': "\n".join([f"${amt:,.2f}" for amt in penalty_values[:5]]) if penalty_values else "No specific penalty amounts found", | |
| 'obligation_details': "\n".join([f"{kw}: {count}" for kw, count in obligation_counts.items()]), | |
| 'delay_details': "\n".join([f"{kw}: {count}" for kw, count in delay_counts.items()]), | |
| 'pdf_url': pdf_url | |
| } | |
| try: | |
| salesforce_record_id = save_to_salesforce(sf, sf_data) | |
| logger.info(f"Saved to Salesforce with ID: {salesforce_record_id}") | |
| except Exception as e: | |
| logger.error(f"Salesforce record creation failed: {str(e)}") | |
| box_class = "success-box" if risk_level == "Low" else "warning-box" if risk_level == "Medium" else "danger-box" | |
| risk_icon = "β " if risk_level == "Low" else "β οΈ" if risk_level == "Medium" else "π¨" | |
| risk_advice = { | |
| "Low": "This contract appears to be low risk. Standard review recommended.", | |
| "Medium": "This contract has moderate risk. Careful review advised.", | |
| "High": "This contract is high risk! Immediate legal review required." | |
| } | |
| return [ | |
| f""" | |
| <div class='result-box'> | |
| <div class='section-title'>{risk_icon} Contract Risk Summary</div> | |
| <div class='risk-row'> | |
| <span class='risk-label'>Overall Risk Score</span> | |
| <span class='risk-score risk-{risk_level.lower()}'>{risk_score:.1f}/100</span> | |
| </div> | |
| {risk_meter} | |
| <div style='margin-top: 15px; font-size: 16px;'> | |
| <strong>Assessment:</strong> {risk_advice[risk_level]} | |
| </div> | |
| </div> | |
| """, | |
| "", # Empty string for hidden risk visualization | |
| penalty_details, | |
| f"<div class='penalty-box'><div class='section-title'>π° Penalty Amounts Found</div>{penalty_amounts}</div>", | |
| obligation_details, | |
| delay_details, | |
| f"<div class='result-box'><div class='section-title'>π Extracted Data</div>{extracted_data}</div>", | |
| "" # Empty string for hidden sentiment analysis | |
| ] | |
| except Exception as e: | |
| logger.error(f"Analysis failed: {str(e)}") | |
| error_message = f""" | |
| <div class='danger-box'> | |
| <div style='display: flex; align-items: center; gap: 10px;'> | |
| <span style='font-size: 24px;'>β</span> | |
| <span style='font-size: 18px; font-weight: bold;'>Analysis Error</span> | |
| </div> | |
| <div style='margin-top: 10px;'>{str(e)}</div> | |
| <div style='margin-top: 15px; font-size: 14px;'> | |
| Please ensure you've uploaded a valid PDF document with selectable text. | |
| </div> | |
| </div> | |
| """ | |
| return [error_message] * 8 | |
| # Create Gradio interface with dark mode compatibility | |
| with gr.Blocks(css=css, title="PDF Contract Risk Analyzer", theme=gr.themes.Default(primary_hue="blue")) as demo: | |
| gr.Markdown(""" | |
| <div style='text-align: center; margin-bottom: 30px;'> | |
| <h1 style='color: var(--primary-color); margin-bottom: 10px;'>PDF Contract Analysis</h1> | |
| <p style='color: var(--secondary-color); font-size: 16px;'> | |
| Upload a contract PDF to analyze risks, obligations, and sentiment. Get instant insights into potential issues. | |
| </p> | |
| </div> | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| file_input = gr.File( | |
| label="Upload Contract PDF", | |
| file_types=[".pdf"], | |
| elem_classes="upload-area" | |
| ) | |
| gr.Markdown(""" | |
| <div class='file-info'> | |
| Drag and drop your contract PDF file here. | |
| </div> | |
| """) | |
| submit_btn = gr.Button("Analyze Contract", variant="primary") | |
| with gr.Column(scale=3): | |
| risk_summary = gr.HTML(label="Contract Risk Summary") | |
| risk_visualization = gr.HTML(label="Risk Visualization", visible=False, elem_id="risk-visualization") | |
| with gr.Row(): | |
| with gr.Column(): | |
| penalty_count = gr.HTML(label="Penalty Clauses Analysis") | |
| penalty_amounts = gr.HTML(label="Penalty Amounts Found") | |
| with gr.Column(): | |
| obligation_count = gr.HTML(label="Obligation Clauses Analysis") | |
| with gr.Column(): | |
| delay_count = gr.HTML(label="Delay Clauses Analysis") | |
| with gr.Row(): | |
| extracted_data = gr.HTML(label="Extracted Data") | |
| with gr.Row(): | |
| sentiment_analysis = gr.HTML(label="Sentiment Analysis", visible=False, elem_id="sentiment-analysis") | |
| submit_btn.click( | |
| fn=analyze_pdf, | |
| inputs=[file_input], | |
| outputs=[ | |
| risk_summary, risk_visualization, | |
| penalty_count, penalty_amounts, | |
| obligation_count, delay_count, | |
| extracted_data, sentiment_analysis | |
| ] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |