Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import PyPDF2 | |
| import nltk | |
| import seaborn as sns | |
| import matplotlib.pyplot as plt | |
| from reportlab.lib.pagesizes import letter | |
| from reportlab.pdfgen import canvas | |
| import json | |
| import os | |
| from io import BytesIO | |
| import numpy as np | |
| import logging | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Download NLTK data | |
| nltk.download('punkt') | |
| # Clause types and risk scoring logic | |
| CLAUSE_TYPES = ["penalty", "obligation", "delay"] | |
| RISK_WEIGHTS = {"penalty": 0.8, "obligation": 0.5, "delay": 0.6} | |
| # Keyword-based heuristic for clause classification | |
| KEYWORD_MAP = { | |
| "penalty": ["penalty", "fee", "fine", "charge", "incur"], | |
| "obligation": ["shall", "must", "obligated", "required", "responsible"], | |
| "delay": ["delay", "late", "beyond", "postpone", "deferred"] | |
| } | |
| def extract_text_from_pdf(pdf_file): | |
| """Extract text from uploaded PDF file.""" | |
| try: | |
| reader = PyPDF2.PdfReader(pdf_file) | |
| text = "" | |
| for page in reader.pages: | |
| page_text = page.extract_text() or "" | |
| text += page_text + "\n" | |
| logger.info(f"Extracted text length: {len(text)} characters") | |
| logger.debug(f"Extracted text sample: {text[:500]}") | |
| if not text.strip(): | |
| return "Error: No text extracted from PDF." | |
| return text | |
| except Exception as e: | |
| logger.error(f"Text extraction error: {str(e)}") | |
| return f"Error extracting text: {str(e)}" | |
| def parse_contract(text): | |
| """Parse contract text into clauses and classify risks using keyword-based heuristic.""" | |
| # Clean text: replace multiple newlines with single, handle LaTeX artifacts | |
| text = text.replace("\n\n", "\n").replace("\t", " ") | |
| sentences = nltk.sent_tokenize(text) | |
| logger.info(f"Number of sentences tokenized: {len(sentences)}") | |
| logger.debug(f"Sample sentences: {sentences[:3]}") | |
| results = [] | |
| risk_scores = [] | |
| for idx, sentence in enumerate(sentences): | |
| sentence = sentence.strip() | |
| if len(sentence) < 10: # Skip short sentences | |
| logger.debug(f"Skipping short sentence (length {len(sentence)}): {sentence}") | |
| continue | |
| # Heuristic classification based on keywords | |
| sentence_lower = sentence.lower() | |
| clause_type = None | |
| for c_type, keywords in KEYWORD_MAP.items(): | |
| if any(keyword in sentence_lower for keyword in keywords): | |
| clause_type = c_type | |
| break | |
| if clause_type not in CLAUSE_TYPES: | |
| logger.debug(f"No relevant clause type for sentence {idx}: {sentence}") | |
| continue | |
| # Assign a dummy score based on keyword presence (simulating model confidence) | |
| score = RISK_WEIGHTS[clause_type] * 0.9 # 0.9 as a dummy confidence score | |
| results.append({ | |
| "clause_id": idx, | |
| "text": sentence, | |
| "clause_type": clause_type, | |
| "risk_score": round(score, 2) | |
| }) | |
| risk_scores.append(score) | |
| logger.info(f"Detected clause {idx}: {clause_type} with risk score {score}") | |
| return results, risk_scores | |
| def generate_heatmap(risk_scores): | |
| """Generate heatmap for risk scores.""" | |
| if not risk_scores: | |
| logger.warning("No risk scores to generate heatmap.") | |
| return None | |
| data = np.array(risk_scores).reshape(1, -1) | |
| plt.figure(figsize=(10, 2)) | |
| sns.heatmap(data, cmap="YlOrRd", annot=True, fmt=".2f", cbar_kws={'label': 'Risk Score'}) | |
| plt.title("Contract Risk Heatmap") | |
| plt.xlabel("Clause Index") | |
| plt.ylabel("Risk") | |
| buffer = BytesIO() | |
| plt.savefig(buffer, format="png", bbox_inches="tight") | |
| plt.close() | |
| buffer.seek(0) | |
| return buffer | |
| def generate_pdf_report(results, heatmap_buffer): | |
| """Generate PDF report with summary and heatmap.""" | |
| buffer = BytesIO() | |
| c = canvas.Canvas(buffer, pagesize=letter) | |
| c.setFont("Helvetica", 12) | |
| c.drawString(50, 750, "Contract Risk Analysis Report") | |
| # Summary | |
| c.drawString(50, 720, "Summary of Risk-Prone Clauses:") | |
| y = 700 | |
| for result in results[:5]: # Limit to top 5 for brevity | |
| text = f"Clause {result['clause_id']}: {result['clause_type'].capitalize()} (Risk: {result['risk_score']})" | |
| c.drawString(50, y, text[:80] + "..." if len(text) > 80 else text) | |
| y -= 20 | |
| # Embed heatmap | |
| if heatmap_buffer: | |
| c.drawImage(BytesIO(heatmap_buffer.read()), 50, y-200, width=500, height=100) | |
| c.showPage() | |
| c.save() | |
| buffer.seek(0) | |
| return buffer | |
| def process_contract(pdf_file): | |
| """Main function to process uploaded contract.""" | |
| # Extract text | |
| text = extract_text_from_pdf(pdf_file) | |
| if "Error" in text: | |
| return text, None, None, {"Error": text} | |
| # Parse and classify | |
| results, risk_scores = parse_contract(text) | |
| if not results: | |
| return "No relevant clauses detected.", None, None, {"Summary": "No risk-prone clauses found."} | |
| # Generate outputs | |
| json_output = json.dumps(results, indent=2) | |
| heatmap_buffer = generate_heatmap(risk_scores) | |
| pdf_report = generate_pdf_report(results, heatmap_buffer) | |
| return json_output, heatmap_buffer, pdf_report, {"Summary": f"Detected {len(results)} risk-prone clauses."} | |
| # Gradio interface | |
| iface = gr.Interface( | |
| fn=process_contract, | |
| inputs=gr.File(label="Upload Contract PDF"), | |
| outputs=[ | |
| gr.Textbox(label="JSON Output"), | |
| gr.Image(label="Risk Heatmap"), | |
| gr.File(label="Download PDF Report"), | |
| gr.JSON(label="Summary") | |
| ], | |
| title="Contract Risk Analyzer", | |
| description="Upload a contract PDF to analyze risk-prone clauses and visualize results." | |
| ) | |
| if __name__ == "__main__": | |
| iface.launch() |