Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import tempfile | |
| import os | |
| import re | |
| import io | |
| import json | |
| from typing import List, Dict, Tuple, Any, Optional | |
| import torch | |
| from transformers import AutoTokenizer, AutoModelForCausalLM | |
| from pypdf import PdfReader | |
| import docx | |
| import spacy | |
| import math | |
| import time | |
| # ------------------------- | |
| # PAGE CONFIG | |
| # ------------------------- | |
| st.set_page_config(page_title="ClauseWise β Granite 3.2 (2B) Legal Assistant", page_icon="βοΈ", layout="wide") | |
| # ------------------------- | |
| # MODEL SETUP WITH OPTIMIZATIONS | |
| # ------------------------- | |
| MODEL_ID = "ibm-granite/granite-3.2-2b-instruct" | |
| DEVICE = "cuda" if torch.cuda.is_available() else "cpu" | |
| DTYPE = torch.bfloat16 if torch.cuda.is_available() else torch.float32 | |
| def load_llm_model(): | |
| try: | |
| tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, use_fast=True) | |
| model = AutoModelForCausalLM.from_pretrained( | |
| MODEL_ID, | |
| torch_dtype=DTYPE, | |
| device_map="auto" if DEVICE == "cuda" else None, | |
| trust_remote_code=True | |
| ) | |
| if DEVICE != "cuda": | |
| model.to(DEVICE) | |
| return tokenizer, model | |
| except Exception as e: | |
| st.error(f"Error loading model: {e}") | |
| return None, None | |
| tokenizer, model = load_llm_model() | |
| try: | |
| nlp = spacy.load("en_core_web_sm") | |
| except: | |
| st.warning("spaCy model 'en_core_web_sm' not found. Please install with: python -m spacy download en_core_web_sm") | |
| nlp = None | |
| # ------------------------- | |
| # OPTIMIZED HELPER FUNCTIONS | |
| # ------------------------- | |
| def build_chat_prompt(system_prompt: str, user_prompt: str) -> str: | |
| messages = [] | |
| if system_prompt: | |
| messages.append({"role": "system", "content": system_prompt}) | |
| messages.append({"role": "user", "content": user_prompt}) | |
| try: | |
| return tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) | |
| except Exception: | |
| sys = f"<|system|>\n{system_prompt}\n" if system_prompt else "" | |
| usr = f"<|user|>\n{user_prompt}\n<|assistant|>\n" | |
| return sys + usr | |
| def llm_generate_optimized(system_prompt: str, user_prompt: str, max_new_tokens=256, temperature=0.3, top_p=0.9) -> str: | |
| if model is None or tokenizer is None: | |
| return "Model not available. Please check model loading." | |
| try: | |
| prompt = build_chat_prompt(system_prompt, user_prompt) | |
| inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=2048).to(DEVICE) | |
| with torch.inference_mode(): | |
| output_ids = model.generate( | |
| **inputs, | |
| max_new_tokens=max_new_tokens, | |
| temperature=temperature, | |
| top_p=top_p, | |
| do_sample=True, | |
| pad_token_id=tokenizer.eos_token_id, | |
| repetition_penalty=1.1, | |
| early_stopping=True | |
| ) | |
| full_text = tokenizer.decode(output_ids[0], skip_special_tokens=True) | |
| # Extract assistant response more efficiently | |
| if "<|assistant|>" in full_text: | |
| response = full_text.split("<|assistant|>")[-1].strip() | |
| elif full_text.startswith(prompt): | |
| response = full_text[len(prompt):].strip() | |
| else: | |
| response = full_text.strip() | |
| return response | |
| except Exception as e: | |
| return f"Error generating response: {str(e)}" | |
| # ------------------------- | |
| # DOCUMENT LOADING | |
| # ------------------------- | |
| def load_text_from_pdf(file_obj) -> str: | |
| try: | |
| reader = PdfReader(file_obj) | |
| pages = [] | |
| for page in reader.pages: | |
| try: | |
| text = page.extract_text() or "" | |
| pages.append(text) | |
| except Exception: | |
| pages.append("") | |
| return "\n".join(pages).strip() | |
| except Exception as e: | |
| return f"Error reading PDF: {str(e)}" | |
| def load_text_from_docx(file_obj) -> str: | |
| try: | |
| data = file_obj.read() | |
| file_obj.seek(0) | |
| f = io.BytesIO(data) | |
| doc = docx.Document(f) | |
| paras = [p.text for p in doc.paragraphs if p.text.strip()] | |
| return "\n".join(paras).strip() | |
| except Exception as e: | |
| return f"Error reading DOCX: {str(e)}" | |
| def load_text_from_txt(file_obj) -> str: | |
| try: | |
| data = file_obj.read() | |
| if isinstance(data, bytes): | |
| try: | |
| data = data.decode("utf-8", errors="ignore") | |
| except: | |
| data = data.decode("latin-1", errors="ignore") | |
| return str(data).strip() | |
| except Exception as e: | |
| return f"Error reading TXT: {str(e)}" | |
| def load_document(file) -> str: | |
| if not file: | |
| return "" | |
| name = (file.name or "").lower() | |
| if name.endswith(".pdf"): | |
| return load_text_from_pdf(file) | |
| elif name.endswith(".docx"): | |
| return load_text_from_docx(file) | |
| elif name.endswith(".txt"): | |
| return load_text_from_txt(file) | |
| else: | |
| return "Unsupported file format" | |
| def get_text_from_inputs(file, text): | |
| file_text = load_document(file) if file else "" | |
| user_text = (text or "").strip() | |
| if file_text and not user_text: | |
| return file_text | |
| elif user_text and not file_text: | |
| return user_text | |
| elif file_text and user_text: | |
| return file_text if len(file_text) > len(user_text) else user_text | |
| else: | |
| return "" | |
| # ------------------------- | |
| # CLAUSE PROCESSING | |
| # ------------------------- | |
| CLAUSE_SPLIT_REGEX = re.compile(r"(?:(?:^\s*\d+(?:\.\d+)*[.)]\s+)|(?:^\s*[β’\-*]\s+)|(?:\n\s*\n))", re.MULTILINE) | |
| def split_into_clauses(text: str, min_len: int = 20) -> List[str]: | |
| if not text or not text.strip(): | |
| return [] | |
| # First try splitting by common clause patterns | |
| parts = re.split(CLAUSE_SPLIT_REGEX, text) | |
| # If that doesn't work well, try sentence splitting | |
| if len(parts) < 2: | |
| parts = re.split(r"(?<=[.;!?])\s+(?=[A-Z])", text) | |
| clauses = [p.strip() for p in parts if p and len(p.strip()) >= min_len] | |
| # Remove duplicates while preserving order | |
| seen = set() | |
| unique = [] | |
| for c in clauses: | |
| # Simple normalization for comparison | |
| key = re.sub(r'\s+', ' ', c.lower()).strip() | |
| if key and key not in seen and len(c) >= min_len: | |
| seen.add(key) | |
| unique.append(c) | |
| return unique | |
| # ------------------------- | |
| # FAST CLAUSE SIMPLIFICATION | |
| # ------------------------- | |
| def simplify_clause_fast(clause: str) -> str: | |
| if not clause.strip(): | |
| return "Please provide a clause to simplify." | |
| # Quick validation for very short clauses | |
| if len(clause.strip()) < 10: | |
| return "Clause is too short for meaningful simplification." | |
| # Limit clause length for faster processing | |
| processed_clause = clause[:1500] # Process only first 1500 chars | |
| system_prompt = """You are a legal assistant that rewrites complex legal clauses into plain, understandable English. | |
| Be concise and focus on the main points. Keep responses under 200 words.""" | |
| user_prompt = f"""Rewrite this legal clause in simple English. Focus on the key obligations and rights: | |
| {processed_clause} | |
| Provide a clear, simple explanation:""" | |
| start_time = time.time() | |
| result = llm_generate_optimized( | |
| system_prompt, | |
| user_prompt, | |
| max_new_tokens=200, # Reduced from 400 | |
| temperature=0.4 | |
| ) | |
| end_time = time.time() | |
| st.sidebar.info(f"Simplification took {end_time - start_time:.1f} seconds") | |
| return result | |
| def simplify_clause_with_progress(clause: str) -> str: | |
| """Simplification with progress indicators""" | |
| if not clause.strip(): | |
| return "Please provide a clause to simplify." | |
| # Show progress | |
| progress_bar = st.progress(0) | |
| status_text = st.empty() | |
| status_text.text("Initializing simplification...") | |
| progress_bar.progress(10) | |
| time.sleep(0.5) | |
| # Check if model is available | |
| if model is None: | |
| progress_bar.progress(100) | |
| status_text.text("Using basic simplification (model not available)") | |
| return "Model not available. Please check if the model loaded correctly." | |
| status_text.text("Analyzing legal language...") | |
| progress_bar.progress(30) | |
| time.sleep(0.5) | |
| status_text.text("Generating plain English version...") | |
| progress_bar.progress(60) | |
| # Use the optimized LLM call | |
| result = simplify_clause_fast(clause) | |
| progress_bar.progress(90) | |
| status_text.text("Finalizing output...") | |
| time.sleep(0.5) | |
| progress_bar.progress(100) | |
| status_text.text("Simplification complete!") | |
| time.sleep(1) | |
| # Clear progress indicators | |
| progress_bar.empty() | |
| status_text.empty() | |
| return result | |
| def simplify_clause(clause: str) -> str: | |
| """Main simplification function""" | |
| return simplify_clause_with_progress(clause) | |
| def ner_entities(text: str) -> Dict[str, List[str]]: | |
| if not text or not text.strip(): | |
| return {} | |
| if nlp is None: | |
| return {"ERROR": ["spaCy model not available. Please install en_core_web_sm"]} | |
| try: | |
| # Process in chunks if text is too long | |
| if len(text) > 1000000: # ~1MB limit | |
| text = text[:1000000] | |
| doc = nlp(text) | |
| out: Dict[str, List[str]] = {} | |
| for ent in doc.ents: | |
| out.setdefault(ent.label_, []).append(ent.text) | |
| # Remove duplicates and sort | |
| out = {k: sorted(set(v)) for k, v in out.items()} | |
| return out | |
| except Exception as e: | |
| return {"ERROR": [f"NER processing error: {str(e)}"]} | |
| def extract_clauses(text: str) -> List[str]: | |
| return split_into_clauses(text) | |
| # ------------------------- | |
| # DOCUMENT CLASSIFICATION | |
| # ------------------------- | |
| DOC_TYPES = [ | |
| "Non-Disclosure Agreement (NDA)", | |
| "Lease Agreement", | |
| "Employment Contract", | |
| "Service Agreement", | |
| "Sales Agreement", | |
| "Consulting Agreement", | |
| "End User License Agreement (EULA)", | |
| "Terms of Service", | |
| "Partnership Agreement", | |
| "Loan Agreement" | |
| ] | |
| def classify_document(text: str) -> str: | |
| if not text or not text.strip(): | |
| return "No text provided for classification" | |
| system_prompt = """You are a legal document classification expert. Analyze the provided text and determine the most appropriate document type from the given list.""" | |
| labels = "\n".join(f"- {t}" for t in DOC_TYPES) | |
| user_prompt = f"""Classify the following legal document into one of these types: | |
| Available types: | |
| {labels} | |
| Document text (first 3000 characters): | |
| {text[:3000]} | |
| Provide only the most appropriate document type from the list above.""" | |
| resp = llm_generate_optimized(system_prompt, user_prompt, max_new_tokens=100) | |
| # Simple matching as fallback | |
| resp_lower = resp.lower() | |
| text_lower = text.lower() | |
| for doc_type in DOC_TYPES: | |
| if any(keyword in resp_lower for keyword in doc_type.lower().split()): | |
| return doc_type | |
| # If no match from LLM, try keyword matching | |
| if "confidential" in text_lower or "non-disclosure" in text_lower or "nda" in text_lower: | |
| return "Non-Disclosure Agreement (NDA)" | |
| elif "lease" in text_lower or "tenant" in text_lower or "landlord" in text_lower: | |
| return "Lease Agreement" | |
| elif "employment" in text_lower or "employee" in text_lower or "employer" in text_lower: | |
| return "Employment Contract" | |
| elif "service" in text_lower and "agreement" in text_lower: | |
| return "Service Agreement" | |
| elif "sale" in text_lower or "purchase" in text_lower: | |
| return "Sales Agreement" | |
| elif "consulting" in text_lower: | |
| return "Consulting Agreement" | |
| elif "eula" in text_lower or "end user" in text_lower: | |
| return "End User License Agreement (EULA)" | |
| elif "terms of service" in text_lower or "terms and conditions" in text_lower: | |
| return "Terms of Service" | |
| return "Unknown Document Type" | |
| # ------------------------- | |
| # FAST FAIRNESS BALANCE METER (Optimized - No LLM for scoring) | |
| # ------------------------- | |
| def calculate_fairness_score_fast(text: str) -> int: | |
| """Lightning-fast fairness scoring using keyword analysis only""" | |
| if not text.strip(): | |
| return 50 | |
| text_lower = text.lower() | |
| # Keywords that indicate one-sided terms (favoring company/Party A) | |
| one_sided_keywords = { | |
| "sole discretion": 15, "unilateral": 12, "without cause": 10, | |
| "without notice": 8, "indemnify": 8, "hold harmless": 8, | |
| "liable for": 6, "waive": 6, "proprietary": 4, | |
| "confidential information": 4, "non-compete": 7, "non-solicit": 6, | |
| "assignment": 5, "termination for convenience": 9, "exclusive": 5, | |
| "irrevocable": 6, "perpetual": 7, "warranty": 4 | |
| } | |
| # Keywords that indicate balanced/mutual terms | |
| balanced_keywords = { | |
| "mutual": 12, "both parties": 10, "either party": 8, | |
| "agree": 6, "reasonable": 8, "good faith": 9, "joint": 7, | |
| "shared": 6, "pro rata": 5, "mediation": 7, "arbitration": 6, | |
| "negotiate": 5, "consent": 4, "review": 3, "discuss": 3 | |
| } | |
| # Calculate scores efficiently | |
| one_sided_score = sum(weight for keyword, weight in one_sided_keywords.items() | |
| if keyword in text_lower) | |
| balanced_score = sum(weight for keyword, weight in balanced_keywords.items() | |
| if keyword in text_lower) | |
| # Calculate fairness percentage (0-100) | |
| if one_sided_score + balanced_score == 0: | |
| return 50 # Neutral if no keywords found | |
| # More aggressive scoring for better differentiation | |
| raw_score = 50 + (balanced_score - one_sided_score) / 2 | |
| fairness_percent = max(10, min(90, raw_score)) | |
| return int(fairness_percent) | |
| def get_fairness_analysis_fast(score: int, text: str) -> Dict[str, Any]: | |
| """Fast analysis without LLM calls""" | |
| if score >= 75: | |
| level = "Highly Balanced" | |
| analysis = "This clause appears fair and balanced between both parties with mutual obligations." | |
| recommendations = ["Maintain current terms", "Ensure mutual benefits are clear"] | |
| elif score >= 60: | |
| level = "Moderately Balanced" | |
| analysis = "Generally fair with some areas that could be more balanced." | |
| recommendations = ["Consider mutual termination rights", "Review liability caps"] | |
| elif score >= 40: | |
| level = "Neutral" | |
| analysis = "Neither strongly balanced nor one-sided. Standard contractual terms." | |
| recommendations = ["Monitor for fairness during negotiation", "Clarify ambiguous terms"] | |
| elif score >= 25: | |
| level = "Slightly One-Sided" | |
| analysis = "Some terms favor one party more than the other." | |
| recommendations = ["Request mutual obligations", "Limit unilateral rights", "Add review clauses"] | |
| else: | |
| level = "Highly One-Sided" | |
| analysis = "Significant imbalance favoring one party. Important terms need renegotiation." | |
| recommendations = ["Seek legal advice", "Request balanced terms", "Add mutual protections"] | |
| # Category breakdown based on score | |
| categories = { | |
| "Termination Rights": max(20, 100 - score), | |
| "Liability Balance": score, | |
| "IP Ownership": max(15, 100 - score * 0.7), | |
| "Restrictions": max(25, 100 - score * 0.8), | |
| "Obligations": max(30, score * 0.9) | |
| } | |
| return { | |
| "score": score, | |
| "level": level, | |
| "analysis": analysis, | |
| "recommendations": recommendations, | |
| "categories": categories, | |
| "keyword_count": len([w for w in text.lower().split() if len(w) > 4]) | |
| } | |
| def fairness_balance_meter_fast(clause: str) -> Tuple[int, str, Dict[str, Any]]: | |
| """Optimized fairness analysis - no LLM calls""" | |
| if not clause.strip(): | |
| return 50, "No clause provided", {} | |
| # Fast scoring only | |
| score = calculate_fairness_score_fast(clause) | |
| analysis_data = get_fairness_analysis_fast(score, clause) | |
| rationale = f"Fairness Score: {score}% - {analysis_data['level']}" | |
| return score, rationale, analysis_data | |
| # ------------------------- | |
| # FAST FUTURE RISK PREDICTOR (Optimized - Minimal LLM) | |
| # ------------------------- | |
| def analyze_risk_keywords_fast(text: str) -> Dict[str, float]: | |
| """Fast risk analysis using keyword patterns""" | |
| text_lower = text.lower() | |
| risk_factors = { | |
| "high_risk": 0.0, | |
| "medium_risk": 0.0, | |
| "low_risk": 0.0 | |
| } | |
| # High risk indicators | |
| high_risk_terms = { | |
| "indemnify": 0.3, "liable": 0.2, "damages": 0.2, "penalty": 0.3, | |
| "termination": 0.1, "breach": 0.2, "default": 0.2, "warranty": 0.1, | |
| "guarantee": 0.2, "irrevocable": 0.2, "perpetual": 0.3 | |
| } | |
| # Medium risk indicators | |
| medium_risk_terms = { | |
| "obligation": 0.1, "responsibility": 0.1, "compliance": 0.1, | |
| "audit": 0.1, "inspection": 0.1, "approval": 0.05, | |
| "consent": 0.05, "restriction": 0.1, "limitation": 0.1 | |
| } | |
| # Calculate risk scores | |
| for term, weight in high_risk_terms.items(): | |
| if term in text_lower: | |
| risk_factors["high_risk"] += weight | |
| for term, weight in medium_risk_terms.items(): | |
| if term in text_lower: | |
| risk_factors["medium_risk"] += weight | |
| # Low risk is inverse of high risk | |
| risk_factors["low_risk"] = max(0, 1.0 - risk_factors["high_risk"]) | |
| return risk_factors | |
| def generate_risk_timeline_fast(risk_factors: Dict[str, float], clause_length: int) -> List[Dict[str, Any]]: | |
| """Generate risk timeline without LLM""" | |
| base_risk = min(80, 20 + risk_factors["high_risk"] * 60) | |
| timeline = [] | |
| risk_descriptions = [ | |
| "Contract interpretation disputes", | |
| "Performance and compliance issues", | |
| "Financial and liability exposures", | |
| "Relationship and operational conflicts", | |
| "Regulatory and legal changes impact" | |
| ] | |
| mitigations = [ | |
| "Clarify ambiguous terms in writing", | |
| "Establish clear performance metrics", | |
| "Implement regular compliance reviews", | |
| "Maintain open communication channels", | |
| "Monitor regulatory changes proactively" | |
| ] | |
| for year in range(1, 6): | |
| # Risk increases moderately over time | |
| year_risk = min(90, base_risk + (year - 1) * 8) | |
| timeline.append({ | |
| "year": year, | |
| "risk_score_0_100": int(year_risk), | |
| "risk_level": "High" if year_risk >= 70 else "Medium" if year_risk >= 40 else "Low", | |
| "key_risks": [risk_descriptions[(year + i) % len(risk_descriptions)] for i in range(2)], | |
| "mitigation": [mitigations[(year + i) % len(mitigations)] for i in range(2)], | |
| "financial_impact": f"${year * 2500}-${year * 15000}", | |
| "probability": f"{int(year_risk)}%" | |
| }) | |
| return timeline | |
| def future_risk_predictor_fast(clause: str) -> Tuple[List[Dict[str, Any]], str]: | |
| """Optimized risk prediction with minimal processing""" | |
| if not clause.strip(): | |
| return [], "No clause provided" | |
| # Fast keyword analysis | |
| risk_factors = analyze_risk_keywords_fast(clause) | |
| timeline = generate_risk_timeline_fast(risk_factors, len(clause)) | |
| # Simple summary based on risk level | |
| avg_risk = sum(t["risk_score_0_100"] for t in timeline) / len(timeline) | |
| if avg_risk >= 70: | |
| summary = "High overall risk detected. Recommend thorough legal review and risk mitigation planning." | |
| elif avg_risk >= 50: | |
| summary = "Moderate risk level. Standard precautions and monitoring recommended." | |
| else: | |
| summary = "Lower risk profile. Maintain standard contractual safeguards." | |
| return timeline, summary | |
| # ------------------------- | |
| # OPTIMIZED UI | |
| # ------------------------- | |
| st.title("ClauseWise β Granite 3.2 (2B) Legal Assistant") | |
| st.markdown("Upload a PDF/DOCX/TXT or paste text below. Tabs provide different legal analysis tools.") | |
| with st.sidebar: | |
| st.header("Document Input") | |
| uploaded_file = st.file_uploader("Upload PDF/DOCX/TXT", type=["pdf","docx","txt"]) | |
| pasted_text = st.text_area("Or paste text here", height=200, placeholder="Paste your legal text here...") | |
| # Performance info | |
| st.header("Performance Tips") | |
| st.info(""" | |
| - Keep clauses under 1500 characters for faster processing | |
| - Use specific clauses rather than entire documents | |
| - Model loads faster on GPU (CUDA) | |
| """) | |
| if uploaded_file: | |
| st.info(f"Uploaded: {uploaded_file.name}") | |
| if pasted_text: | |
| st.info("Text input received") | |
| # Get text data | |
| text_data = get_text_from_inputs(uploaded_file, pasted_text) | |
| # Show text preview with length info | |
| if text_data and text_data not in ["", "Unsupported file format"]: | |
| with st.expander(f"Preview Extracted Text ({len(text_data)} characters)", expanded=False): | |
| st.text_area("Text Preview", text_data[:1500] + ("..." if len(text_data) > 1500 else ""), height=200, key="preview") | |
| if len(text_data) > 1500: | |
| st.warning(f"Document is large ({len(text_data)} characters). For faster processing, consider analyzing specific clauses.") | |
| else: | |
| st.warning("Please upload a document or paste text to get started") | |
| # Create only the core working tabs | |
| tabs = st.tabs([ | |
| "π Clause Simplification", | |
| "π Named Entity Recognition", | |
| "π Clause Extraction", | |
| "π Document Classification", | |
| "β‘ Fairness Balance", | |
| "π Risk Predictor" | |
| ]) | |
| # Tab 1: OPTIMIZED Clause Simplification | |
| with tabs[0]: | |
| st.header("Clause Simplification") | |
| st.markdown("Convert complex legal language into plain English") | |
| # Smart input selection | |
| col1, col2 = st.columns([2, 1]) | |
| with col1: | |
| clause_input = st.text_area( | |
| "Enter specific clause to simplify:", | |
| height=120, | |
| placeholder="Paste a complex legal clause here (recommended: under 1500 characters)...", | |
| key="simplify_input" | |
| ) | |
| with col2: | |
| st.markdown("### Options") | |
| use_document_text = st.checkbox( | |
| "Use uploaded document", | |
| value=not bool(clause_input.strip()), | |
| help="Use the entire uploaded document for simplification" | |
| ) | |
| # Character count and warnings | |
| if clause_input.strip(): | |
| char_count = len(clause_input) | |
| if char_count > 1500: | |
| st.warning(f"Clause is long ({char_count} characters). This may take longer to process.") | |
| else: | |
| st.info(f"Clause length: {char_count} characters") | |
| if st.button("Simplify Clause", key="simplify", type="primary", use_container_width=True): | |
| if use_document_text and text_data and text_data not in ["", "Unsupported file format"]: | |
| if len(text_data) > 2000: | |
| st.warning("Document is large. Simplifying first 1500 characters for speed.") | |
| target = text_data[:1500] | |
| else: | |
| target = text_data | |
| source = "uploaded document" | |
| elif clause_input.strip(): | |
| target = clause_input | |
| source = "text input" | |
| else: | |
| st.error("Please provide a clause to simplify either through text input or document upload") | |
| target = None | |
| if target: | |
| result = simplify_clause_with_progress(target) | |
| st.subheader("Simplified Output") | |
| # Display result in a nice container | |
| with st.container(): | |
| st.success("β Simplification Complete") | |
| st.text_area( | |
| "Plain English Version", | |
| result, | |
| height=300, | |
| key="result_output" | |
| ) | |
| # Add some metrics | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| st.metric("Original Length", f"{len(target)} chars") | |
| with col2: | |
| st.metric("Simplified Length", f"{len(result)} chars") | |
| with col3: | |
| reduction = max(0, len(target) - len(result)) | |
| st.metric("Reduction", f"{reduction} chars") | |
| # Tab 2: Named Entity Recognition | |
| with tabs[1]: | |
| st.header("Named Entity Recognition") | |
| st.markdown("Identify people, organizations, dates, and other entities in your legal documents") | |
| if st.button("Extract Entities", key="ner", type="primary"): | |
| if text_data and text_data not in ["", "Unsupported file format"]: | |
| with st.spinner("Analyzing entities..."): | |
| entities = ner_entities(text_data) | |
| st.subheader("Extracted Entities") | |
| st.json(entities) | |
| else: | |
| st.error("Please upload a document or paste text first") | |
| # Tab 3: Clause Extraction | |
| with tabs[2]: | |
| st.header("Clause Extraction") | |
| st.markdown("Automatically identify and extract individual clauses from legal documents") | |
| if st.button("Extract Clauses", key="extract", type="primary"): | |
| if text_data and text_data not in ["", "Unsupported file format"]: | |
| with st.spinner("Extracting clauses..."): | |
| clauses = extract_clauses(text_data) | |
| st.subheader(f"Found {len(clauses)} Clauses") | |
| if clauses: | |
| for i, clause in enumerate(clauses, 1): | |
| with st.expander(f"Clause {i} (Length: {len(clause)} chars)"): | |
| st.text(clause) | |
| else: | |
| st.info("No clauses could be automatically extracted. Try using the full text in other analysis tools.") | |
| else: | |
| st.error("Please upload a document or paste text first") | |
| # Tab 4: Document Classification | |
| with tabs[3]: | |
| st.header("Document Classification") | |
| st.markdown("Automatically identify the type of legal document") | |
| if st.button("Classify Document", key="classify", type="primary"): | |
| if text_data and text_data not in ["", "Unsupported file format"]: | |
| with st.spinner("Analyzing document type..."): | |
| doc_type = classify_document(text_data) | |
| st.subheader("Document Classification") | |
| st.info(f"**Predicted Document Type:** {doc_type}") | |
| else: | |
| st.error("Please upload a document or paste text first") | |
| # Tab 5: OPTIMIZED Fairness Balance Meter | |
| with tabs[4]: | |
| st.header("β‘ Fairness Balance Meter") | |
| st.markdown("**Fast analysis using keyword patterns**") | |
| fairness_clause = st.text_area("Clause to evaluate:", height=120, key="fairness_input") | |
| use_doc_for_fairness = st.checkbox("Use uploaded document", value=not bool(fairness_clause.strip()), key="use_doc_fairness") | |
| if st.button("Analyze Fairness", key="fairness_btn", type="primary"): | |
| start_time = time.time() | |
| if use_doc_for_fairness and text_data and text_data not in ["", "Unsupported file format"]: | |
| target = text_data[:2000] | |
| elif fairness_clause.strip(): | |
| target = fairness_clause | |
| else: | |
| st.error("Please provide a clause for analysis") | |
| target = None | |
| if target: | |
| with st.spinner("Analyzing fairness..."): | |
| score, rationale, analysis_data = fairness_balance_meter_fast(target) | |
| end_time = time.time() | |
| st.success(f"Analysis completed in {end_time - start_time:.1f} seconds") | |
| # Display score | |
| col1, col2 = st.columns([1, 2]) | |
| with col1: | |
| st.metric("Fairness Score", f"{score}%") | |
| st.progress(score/100) | |
| with col2: | |
| st.info(f"**{analysis_data['level']}**") | |
| st.write(analysis_data['analysis']) | |
| # Category breakdown | |
| st.subheader("Balance Analysis") | |
| import pandas as pd | |
| import plotly.express as px | |
| categories_df = pd.DataFrame([ | |
| {"Category": cat, "Balance Score": val} | |
| for cat, val in analysis_data['categories'].items() | |
| ]) | |
| fig = px.bar(categories_df, x="Balance Score", y="Category", orientation='h', | |
| title="Fairness by Category") | |
| fig.update_layout(height=300) | |
| st.plotly_chart(fig, use_container_width=True, key="fairness_chart") | |
| # Recommendations | |
| st.subheader("Recommendations") | |
| for rec in analysis_data['recommendations']: | |
| st.write(f"β’ {rec}") | |
| # Tab 6: OPTIMIZED Future Risk Predictor | |
| # Tab 6: OPTIMIZED Future Risk Predictor | |
| # Tab 6: OPTIMIZED Future Risk Predictor | |
| with tabs[5]: | |
| st.header("π Risk Predictor") | |
| st.markdown("**Fast risk assessment using pattern analysis**") | |
| risk_clause = st.text_area("Clause for risk prediction:", height=120, key="risk_input") | |
| use_doc_for_risk = st.checkbox("Use uploaded document for risk", value=not bool(risk_clause.strip()), key="use_doc_risk") | |
| if st.button("Predict Risks", key="risk_btn", type="primary"): | |
| start_time = time.time() | |
| if use_doc_for_risk and text_data and text_data not in ["", "Unsupported file format"]: | |
| target = text_data[:2000] | |
| elif risk_clause.strip(): | |
| target = risk_clause | |
| else: | |
| st.error("Please provide a clause for analysis") | |
| target = None | |
| if target: | |
| with st.spinner("Analyzing risks..."): | |
| timeline, summary = future_risk_predictor_fast(target) | |
| end_time = time.time() | |
| st.success(f"Risk analysis completed in {end_time - start_time:.1f} seconds") | |
| st.info(summary) | |
| # Yearly risk metrics | |
| st.subheader("Risk Timeline") | |
| cols = st.columns(5) | |
| for i, year_data in enumerate(timeline): | |
| with cols[i]: | |
| # Determine badge color based on risk level | |
| if year_data["risk_level"] == "High": | |
| badge_color = "π΄" | |
| elif year_data["risk_level"] == "Medium": | |
| badge_color = "π‘" | |
| else: | |
| badge_color = "π’" | |
| st.metric( | |
| f"Year {year_data['year']} {badge_color}", | |
| f"{year_data['risk_score_0_100']}%", | |
| year_data["risk_level"] | |
| ) | |
| # Detailed view - FIXED: Remove 'key' parameter from expander | |
| for year_data in timeline: | |
| with st.expander(f"Year {year_data['year']} Details"): | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.write("**Risks:**") | |
| for risk in year_data['key_risks']: | |
| st.write(f"β’ {risk}") | |
| with col2: | |
| st.write("**Mitigation:**") | |
| for mitigation in year_data['mitigation']: | |
| st.write(f"β’ {mitigation}") | |
| st.write(f"**Financial Impact:** {year_data['financial_impact']}") | |
| st.markdown("---") | |
| st.caption("ClauseWise Legal Assistant - Powered by Granite 3.2 2B Model | Core Features Only") |