Spaces:
Sleeping
Sleeping
| # -*- coding: utf-8 -*- | |
| """Pibit.ipynb | |
| Automatically generated by Colab. | |
| Original file is located at | |
| https://colab.research.google.com/drive/1Lug_ARdBOTu2e87sThol1luVksA0986T | |
| """ | |
| import re | |
| import json | |
| import datetime | |
| from collections import defaultdict, Counter | |
| from typing import List, Dict, Tuple, Set, Optional | |
| import unicodedata | |
| import math | |
| import gradio as gr | |
| import pandas as pd | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| from plotly.subplots import make_subplots | |
| class PibitInsuranceTokenizer: | |
| """ | |
| Specialized tokenizer for insurance domain documents, designed for Pibit.ai's | |
| underwriting automation platform. Handles loss run documents, policy documents, | |
| claims data, and other insurance-specific text processing needs. | |
| """ | |
| def __init__(self, vocab_size=15000, model_type="insurance_bpe"): | |
| self.vocab_size = vocab_size | |
| self.model_type = model_type | |
| self.special_tokens = [ | |
| "<PAD>", "<UNK>", "<START>", "<END>", "<MASK>", | |
| "<CLAIM>", "<POLICY>", "<AMOUNT>", "<DATE>", "<RISK>", | |
| "<COVERAGE>", "<DEDUCTIBLE>", "<PREMIUM>", "<LOSS>", | |
| "<UNDERWRITER>", "<CARRIER>", "<INSURED>", "<PERCENTAGE>" | |
| ] | |
| self.vocab = {} | |
| self.id_to_token = {} | |
| self.token_frequencies = Counter() | |
| self.merges = [] | |
| self.bpe_ranks = {} | |
| # COMPUTATION-LIGHT: Compiling regex patterns once at initialization is highly efficient. | |
| # This avoids re-compiling the same pattern for every function call. | |
| self.insurance_patterns = self._load_insurance_patterns() | |
| self.financial_pattern = re.compile(r'\$[\d,]+(?:\.\d{2})?') | |
| self.policy_pattern = re.compile(r'[A-Z]{2,4}[\d\-]{6,12}') | |
| self.date_pattern = re.compile(r'\d{1,2}[\/\-]\d{1,2}[\/\-]\d{2,4}|\d{4}[\/\-]\d{1,2}[\/\-]\d{1,2}') | |
| self.percentage_pattern = re.compile(r'\d+(?:\.\d+)?%') | |
| self._initialize_special_tokens() | |
| def _load_insurance_patterns(self) -> Dict[str, List[str]]: | |
| """Load insurance domain-specific patterns and terminology.""" | |
| return { | |
| 'coverage_types': ['general liability', 'workers compensation', 'property coverage', 'commercial auto', 'cyber liability', 'professional liability', 'directors officers', 'employment practices', 'umbrella coverage', 'commercial crime', 'boiler machinery', 'builders risk'], | |
| 'claim_types': ['bodily injury', 'property damage', 'medical payments', 'personal injury', 'products liability', 'completed operations', 'fire damage', 'theft', 'vandalism', 'water damage', 'slip and fall', 'motor vehicle accident', 'workplace injury'], | |
| 'risk_factors': ['hazardous materials', 'high risk activity', 'prior claims', 'safety violations', 'regulatory issues', 'financial distress', 'industry classification', 'geographic risk', 'seasonal business', 'new venture', 'construction defects', 'product recall'], | |
| 'financial_terms': ['deductible', 'premium', 'limit', 'retention', 'aggregate', 'occurrence', 'claims made', 'prior acts', 'extended reporting', 'loss ratio', 'experience modification', 'rate', 'exposure'], | |
| 'underwriting_terms': ['risk assessment', 'loss run', 'acord forms', 'submission', 'renewal', 'policy period', 'effective date', 'expiration', 'carrier', 'excess', 'reinsurance', 'facultative', 'treaty', 'reserve', 'incurred', 'paid', 'outstanding', 'ibnr'] | |
| } | |
| def _initialize_special_tokens(self): | |
| """Initialize special tokens in vocabulary.""" | |
| for i, token in enumerate(self.special_tokens): | |
| self.vocab[token] = i | |
| self.id_to_token[i] = token | |
| def _preprocess_text(self, text: str) -> str: | |
| """ | |
| Insurance-specific text preprocessing. | |
| Normalizes financial amounts, dates, policy numbers, and other entities. | |
| """ | |
| # COMPUTATION-LIGHT: Unicode normalization and regex substitutions are very fast C-level operations. | |
| text = unicodedata.normalize('NFKC', text) | |
| text = self.financial_pattern.sub('<AMOUNT>', text) | |
| text = self.date_pattern.sub('<DATE>', text) | |
| text = self.policy_pattern.sub('<POLICY>', text) | |
| text = self.percentage_pattern.sub('<PERCENTAGE>', text) | |
| text = self._normalize_insurance_terms(text) | |
| return text.strip() | |
| def _normalize_insurance_terms(self, text: str) -> str: | |
| """Normalize insurance-specific terminology.""" | |
| abbreviations = { | |
| r'\bGL\b': 'general liability', r'\bWC\b': 'workers compensation', r'\bAuto\b': 'automobile', | |
| r'\bD&O\b': 'directors officers', r'\bE&O\b': 'errors omissions', r'\bEPLI\b': 'employment practices liability', | |
| r'\bBI\b': 'bodily injury', r'\bPD\b': 'property damage', r'\bMP\b': 'medical payments', | |
| r'\bTPA\b': 'third party administrator', r'\bMGA\b': 'managing general agent', r'\bACV\b': 'actual cash value', | |
| r'\bRCV\b': 'replacement cost value' | |
| } | |
| for abbrev, full_form in abbreviations.items(): | |
| text = re.sub(abbrev, full_form, text, flags=re.IGNORECASE) | |
| return text | |
| def _extract_insurance_entities(self, text: str) -> List[Tuple[str, str]]: | |
| """Extract insurance-specific entities from text.""" | |
| # COMPUTATION-LIGHT: Finding all matches with `finditer` is highly optimized. | |
| entities = [] | |
| for match in self.financial_pattern.finditer(text): entities.append(('AMOUNT', match.group())) | |
| for match in self.date_pattern.finditer(text): entities.append(('DATE', match.group())) | |
| for match in self.policy_pattern.finditer(text): entities.append(('POLICY', match.group())) | |
| for match in self.percentage_pattern.finditer(text): entities.append(('PERCENTAGE', match.group())) | |
| return entities | |
| def _tokenize_with_domain_awareness(self, text: str) -> List[str]: | |
| """ | |
| Domain-aware tokenization that preserves insurance terminology. | |
| """ | |
| # COMPUTATION-LIGHT: A single pass with regex to get initial tokens. | |
| word_pattern = r"[a-zA-Z]+(?:'[a-zA-Z]+)?|[0-9]+(?:\.[0-9]+)?|[^\w\s]" | |
| tokens = re.findall(word_pattern, text.lower()) | |
| # COMPUTATION-LIGHT: A single while loop to merge compound terms. Its complexity is linear O(n) with respect to the number of tokens. | |
| merged_tokens = [] | |
| i = 0 | |
| while i < len(tokens): | |
| found_compound = False | |
| for length in [3, 2]: | |
| if i + length <= len(tokens): | |
| candidate = ' '.join(tokens[i:i+length]) | |
| for category, terms in self.insurance_patterns.items(): | |
| if candidate in terms: | |
| merged_tokens.append(candidate.replace(' ', '_')) | |
| i += length | |
| found_compound = True | |
| break | |
| if found_compound: break | |
| if not found_compound: | |
| merged_tokens.append(tokens[i]) | |
| i += 1 | |
| return merged_tokens | |
| def _get_word_frequencies_insurance(self, texts: List[str]) -> Dict[str, int]: | |
| """Get word frequencies with insurance domain emphasis.""" | |
| # COMPUTATION-LIGHT: Dictionary lookups and updates are very fast, close to constant time O(1) on average. | |
| word_freqs = defaultdict(int) | |
| for text in texts: | |
| preprocessed_text = self._preprocess_text(text) | |
| tokens = self._tokenize_with_domain_awareness(preprocessed_text) | |
| for token in tokens: | |
| token_chars = ' '.join(list(token)) + ' </w>' | |
| word_freqs[token_chars] += 1 | |
| if self._is_insurance_term(token): | |
| word_freqs[token_chars] += 2 | |
| return word_freqs | |
| def _is_insurance_term(self, token: str) -> bool: | |
| """Check if token is an insurance-specific term.""" | |
| token_lower = token.lower().replace('_', ' ') | |
| for category, terms in self.insurance_patterns.items(): | |
| if token_lower in terms: return True | |
| insurance_keywords = {'claim', 'policy', 'premium', 'deductible', 'coverage', 'liability', 'underwrite', 'insured', 'carrier', 'risk', 'loss', 'damage', 'accident', 'incident', 'hazard', 'peril', 'exposure', 'limit'} | |
| return token_lower in insurance_keywords | |
| def _get_pairs(self, word: str) -> Set[Tuple[str, str]]: | |
| """Get all adjacent pairs in a word.""" | |
| pairs = set() | |
| prev_char = word[0] | |
| for char in word[1:]: | |
| pairs.add((prev_char, char)) | |
| prev_char = char | |
| return pairs | |
| def _merge_word(self, word: str, pair: Tuple[str, str]) -> str: | |
| """Merge a specific pair in a word.""" | |
| return ' '.join(word.split()).replace(f'{pair[0]} {pair[1]}', f'{pair[0]}{pair[1]}') | |
| def _train_insurance_bpe(self, texts: List[str]) -> None: | |
| """ | |
| # COMPUTATION-HEAVY: This is the most intensive part of the code. | |
| # BPE training involves multiple loops over the vocabulary and pairs, which can be slow, | |
| # especially as the vocabulary and number of merges grow. | |
| # This should only be run ONCE during setup, not during user interaction. | |
| """ | |
| word_freqs = self._get_word_frequencies_insurance(texts) | |
| vocab = set() | |
| for word in word_freqs.keys(): vocab.update(word.split()) | |
| for category, terms in self.insurance_patterns.items(): | |
| for term in terms: vocab.add(term.replace(' ', '_')) | |
| num_merges = self.vocab_size - len(self.vocab) - len(vocab) | |
| for merge_idx in range(num_merges): | |
| pairs = defaultdict(int) | |
| for word, freq in word_freqs.items(): | |
| word_pairs = self._get_pairs(word.split()) | |
| for pair in word_pairs: | |
| pairs[pair] += freq | |
| if not pairs: break | |
| best_pair = max(pairs, key=pairs.get) | |
| new_word_freqs = {} | |
| for word, freq in word_freqs.items(): | |
| new_word = self._merge_word(word, best_pair) | |
| new_word_freqs[new_word] = freq | |
| word_freqs = new_word_freqs | |
| self.merges.append(best_pair) | |
| self.bpe_ranks[best_pair] = merge_idx | |
| merged_token = best_pair[0] + best_pair[1] | |
| vocab.add(merged_token) | |
| def _apply_bpe(self, word: str) -> List[str]: | |
| """Apply BPE merges to a word.""" | |
| if len(word) == 1: return [word] | |
| word_tokens = list(word) | |
| word = ' '.join(word_tokens) + ' </w>' | |
| while True: | |
| pairs = self._get_pairs(word.split()) | |
| if not pairs: break | |
| bigram = min(pairs, key=lambda pair: self.bpe_ranks.get(pair, float('inf'))) | |
| if bigram not in self.bpe_ranks: break | |
| word = self._merge_word(word, bigram) | |
| return word.split() | |
| def train(self, texts: List[str]) -> None: | |
| """Train the insurance domain tokenizer.""" | |
| if self.model_type == "insurance_bpe": | |
| self._train_insurance_bpe(texts) | |
| all_tokens = set() | |
| for text in texts: | |
| preprocessed = self._preprocess_text(text) | |
| tokens = self._tokenize_with_domain_awareness(preprocessed) | |
| for token in tokens: | |
| bpe_tokens = self._apply_bpe(token) | |
| all_tokens.update(bpe_tokens) | |
| else: | |
| all_tokens = set() | |
| for text in texts: | |
| preprocessed = self._preprocess_text(text) | |
| tokens = self._tokenize_with_domain_awareness(preprocessed) | |
| all_tokens.update(tokens) | |
| for token in tokens: | |
| self.token_frequencies[token] += 3 if self._is_insurance_term(token) else 1 | |
| if len(all_tokens) > self.vocab_size - len(self.special_tokens): | |
| if self.model_type != "insurance_bpe": | |
| insurance_terms = [t for t in all_tokens if self._is_insurance_term(t)] | |
| other_terms = sorted([t for t in all_tokens if not self._is_insurance_term(t)], key=lambda x: self.token_frequencies[x], reverse=True) | |
| max_others = self.vocab_size - len(self.special_tokens) - len(insurance_terms) | |
| all_tokens = insurance_terms + other_terms[:max_others] | |
| start_idx = len(self.special_tokens) | |
| for i, token in enumerate(sorted(list(all_tokens))): | |
| token_id = start_idx + i | |
| self.vocab[token] = token_id | |
| self.id_to_token[token_id] = token | |
| def tokenize(self, text: str) -> List[str]: | |
| """Tokenize insurance document text.""" | |
| preprocessed = self._preprocess_text(text) | |
| tokens = self._tokenize_with_domain_awareness(preprocessed) | |
| if self.model_type == "insurance_bpe": | |
| result = [] | |
| for token in tokens: | |
| if token in self.vocab: result.append(token) | |
| else: result.extend(self._apply_bpe(token)) | |
| return result | |
| return tokens | |
| def encode(self, text: str) -> List[int]: | |
| """Encode text to token IDs.""" | |
| tokens = self.tokenize(text) | |
| return [self.vocab.get(token, self.vocab["<UNK>"]) for token in tokens] | |
| def decode(self, token_ids: List[int]) -> str: | |
| """Decode token IDs back to text.""" | |
| tokens = [self.id_to_token[tid] for tid in token_ids if tid in self.id_to_token] | |
| text = ' '.join(tokens).replace(' </w>', '').replace('_', ' ') | |
| # Clean up special tokens that shouldn't be in the final text | |
| for special in self.special_tokens: | |
| if special not in ["<AMOUNT>", "<DATE>", "<POLICY>", "<PERCENTAGE>"]: | |
| text = text.replace(special, '') | |
| return text.strip() | |
| def analyze_document(self, text: str) -> Dict: | |
| """Analyze insurance document and extract key information.""" | |
| # COMPUTATION-LIGHT: Analysis is fast as it reuses efficient tokenization and regex methods. | |
| entities = self._extract_insurance_entities(text) | |
| tokens = self.tokenize(text) | |
| if not tokens: return {'document_type': 'Unknown', 'total_tokens': 0, 'insurance_terms': 0, 'insurance_term_ratio': 0, 'entities': [], 'key_terms': [], 'risk_score': 0, 'confidence': 0} | |
| insurance_term_count = sum(1 for token in tokens if self._is_insurance_term(token)) | |
| doc_type = self._identify_document_type(text, tokens) | |
| risk_score = self._calculate_risk_score(text, tokens, entities) | |
| return { | |
| 'document_type': doc_type, 'total_tokens': len(tokens), | |
| 'insurance_terms': insurance_term_count, 'insurance_term_ratio': insurance_term_count / len(tokens), | |
| 'entities': entities, 'key_terms': list(set([t for t in tokens if self._is_insurance_term(t)]))[:20], | |
| 'risk_score': risk_score, 'confidence': min(0.95, insurance_term_count / len(tokens) * 2) | |
| } | |
| def _calculate_risk_score(self, text: str, tokens: List[str], entities: List[Tuple[str, str]]) -> float: | |
| """Calculate risk score based on document content.""" | |
| risk_score = 0.5 | |
| high_risk_terms = ['prior claims', 'safety violations', 'hazardous materials', 'high risk activity', 'regulatory issues', 'financial distress'] | |
| for term in high_risk_terms: | |
| if term.replace(' ', '_') in tokens or term in text.lower(): risk_score += 0.1 | |
| amounts = [float(re.sub(r'[\$,]', '', entity[1])) for entity in entities if entity[0] == 'AMOUNT' and re.sub(r'[\$,]', '', entity[1]).replace('.','',1).isdigit()] | |
| if amounts: | |
| max_amount = max(amounts) | |
| if max_amount > 1000000: risk_score += 0.2 | |
| elif max_amount > 100000: risk_score += 0.1 | |
| return min(1.0, max(0.0, risk_score)) | |
| def _identify_document_type(self, text: str, tokens: List[str]) -> str: | |
| """Identify the type of insurance document.""" | |
| doc_indicators = { | |
| 'loss_run': ['loss_run', 'claims_history', 'paid_losses', 'incurred', 'reserves', 'loss_ratio'], | |
| 'policy': ['policy_number', 'effective_date', 'expiration_date', 'coverage_limit', 'policy_period'], | |
| 'claim': ['claim_number', 'date_of_loss', 'claimant', 'adjuster', 'claim_details'], | |
| 'submission': ['submission', 'application', 'proposal', 'quote_request', 'underwriting'], | |
| 'certificate': ['certificate', 'evidence_of_coverage', 'additional_insured'] | |
| } | |
| scores = {doc_type: sum(2 if ind.replace(' ', '_') in tokens else 1 if ind.replace('_', ' ') in text.lower() else 0 for ind in indicators) for doc_type, indicators in doc_indicators.items()} | |
| if not scores or max(scores.values()) == 0: return 'general_insurance' | |
| return max(scores, key=scores.get) | |
| def get_vocab_size(self) -> int: | |
| return len(self.vocab) | |
| # --- SINGLE GLOBAL INSTANCE --- | |
| # The tokenizer is created and trained only ONCE when the script starts. | |
| # All functions will now use this single, pre-trained instance. | |
| print("Initializing and training the Pibit.ai Insurance Tokenizer... Please wait.") | |
| tokenizer = PibitInsuranceTokenizer(vocab_size=8000, model_type="insurance_bpe") | |
| # Default training documents | |
| default_training_docs = [ | |
| "Loss Run Report for Policy GL2024-001234. Effective Date: 01/01/2024 to 12/31/2024. Insured: ABC Manufacturing Company. Coverage: General Liability, $1,000,000 per occurrence, $2,000,000 aggregate. Claims History: Claim 1: Slip and fall incident, Date of Loss: 03/15/2024, Paid: $25,000, Incurred: $45,000, Status: Closed. Claim 2: Product liability claim, Date of Loss: 07/22/2024, Paid: $0, Incurred: $75,000, Status: Open, Reserves: $75,000. Total Paid Losses: $25,000. Total Incurred Losses: $120,000. Loss Ratio: 12%. Experience Modification Factor: 0.85", | |
| "Workers Compensation Submission. Applicant: XYZ Construction LLC. Policy Period: 01/01/2025 to 01/01/2026. Industry Code: 5645 - Residential Construction. Annual Payroll: $2,500,000. Prior Coverage: Carrier ABC, Premium: $125,000, Deductible: $5,000. Claims Experience: 3 claims in past 3 years. Workplace injury, back strain: $15,000 paid. Fall from height accident: $85,000 incurred. Repetitive motion injury: $12,000 paid. Risk Factors: High-risk construction activities, prior OSHA violations. Underwriting Notes: Requires safety program implementation.", | |
| "Commercial Property Loss Notice. Policy Number: CP2024-567890. Insured: Downtown Office Building LLC. Date of Loss: 11/08/2024. Cause of Loss: Water damage from burst pipe. Coverage Details: Building Coverage: $5,000,000. Business Personal Property: $500,000. Deductible: $10,000. Claim Details: Estimated Repair Cost: $125,000. Business Interruption Loss: $25,000. Adjuster: John Smith, License #12345. Initial Reserve: $150,000.", | |
| "Underwriting Risk Assessment Report. Account: Tech Startup Solutions Inc. Line of Business: Cyber Liability Insurance. Requested Limits: $5,000,000 per claim, $10,000,000 aggregate. Risk Factors Analysis: Industry: Technology/Software Development. Revenue: $15,000,000 annually. Security Measures: Multi-factor authentication: Yes. Encryption protocols: AES-256. Employee training: Quarterly. Incident response plan: In place. Prior Claims: None reported. Competitive Premium Quote: $45,000 annually. Recommended Deductible: $25,000." | |
| ] | |
| tokenizer.train(default_training_docs) | |
| print("Tokenizer is ready!") | |
| # --- Gradio App Functions --- | |
| def create_analysis_plots(analysis_data): | |
| """Create visualization plots for document analysis.""" | |
| fig_gauge = go.Figure(go.Indicator( | |
| mode = "gauge+number", value = analysis_data['risk_score'] * 100, | |
| domain = {'x': [0, 1], 'y': [0, 1]}, title = {'text': "Risk Score"}, | |
| gauge = {'axis': {'range': [None, 100]}, 'bar': {'color': "#2E86AB"}, | |
| 'steps': [{'range': [0, 40], 'color': "lightgreen"}, {'range': [40, 70], 'color': "yellow"}, {'range': [70, 100], 'color': "lightcoral"}]})) | |
| fig_gauge.update_layout(height=300, margin=dict(l=20, r=20, t=50, b=20)) | |
| insurance_tokens = analysis_data['insurance_terms'] | |
| other_tokens = analysis_data['total_tokens'] - insurance_tokens | |
| fig_pie = px.pie(values=[insurance_tokens, other_tokens], names=['Insurance Terms', 'Other Terms'], title='Token Distribution', color_discrete_sequence=['#FF6B6B', '#4ECDC4']) | |
| fig_pie.update_layout(height=300, margin=dict(l=20, r=20, t=50, b=20)) | |
| return fig_gauge, fig_pie | |
| def analyze_insurance_document(text): | |
| """ | |
| Main function to analyze insurance documents. | |
| This now uses the single, globally-trained tokenizer and is very fast. | |
| """ | |
| if not text.strip(): | |
| return "Please enter some text to analyze.", go.Figure(), go.Figure(), pd.DataFrame(), "" | |
| # The core change: No more retraining! Just analyze. | |
| analysis = tokenizer.analyze_document(text) | |
| summary = f""" | |
| ## π Pibit.ai Insurance Document Analysis Report | |
| ### π’ Document Classification | |
| - **Document Type**: {analysis['document_type'].title().replace('_', ' ')} | |
| - **Analysis Confidence**: {analysis['confidence']:.1%} | |
| ### π Token Analysis | |
| - **Total Tokens**: {analysis['total_tokens']:,} | |
| - **Insurance-Specific Terms**: {analysis['insurance_terms']:,} | |
| - **Domain Relevance**: {analysis['insurance_term_ratio']:.1%} | |
| ### β οΈ Risk Assessment | |
| - **Risk Score**: {analysis['risk_score']:.2f} / 1.00 | |
| - **Risk Level**: {"π΄ HIGH" if analysis['risk_score'] > 0.7 else "π‘ MEDIUM" if analysis['risk_score'] > 0.4 else "π’ LOW"} | |
| ### π·οΈ Entities Detected | |
| {len(analysis['entities'])} entities found: | |
| """ | |
| for entity_type, entity_value in analysis['entities'][:10]: | |
| summary += f"- **{entity_type}**: {entity_value}\n" | |
| if len(analysis['entities']) > 10: | |
| summary += f"- ... and {len(analysis['entities']) - 10} more\n" | |
| summary += f"\n### π Key Insurance Terms\n" | |
| summary += ", ".join([f"`{term.replace('_', ' ')}`" for term in analysis['key_terms']]) | |
| fig_gauge, fig_pie = create_analysis_plots(analysis) | |
| entities_df = pd.DataFrame(analysis['entities'], columns=['Entity Type', 'Value']) | |
| tokens = tokenizer.tokenize(text[:500]) | |
| tokenization_example = f"**Sample Tokenization** (first 500 characters):\n\n{' | '.join(tokens[:20])}" | |
| if len(tokens) > 20: | |
| tokenization_example += f" | ... ({len(tokens)} total tokens)" | |
| return summary, fig_gauge, fig_pie, entities_df, tokenization_example | |
| def tokenize_text(text): | |
| """Tokenize text and return tokens.""" | |
| if not text.strip(): return "Please enter some text to tokenize." | |
| tokens = tokenizer.tokenize(text) | |
| token_ids = tokenizer.encode(text) | |
| result = f"**Tokens ({len(tokens)}):**\n{' | '.join(tokens)}\n\n**Token IDs:**\n{' '.join(map(str, token_ids[:50]))}" | |
| if len(token_ids) > 50: result += f" ... ({len(token_ids)} total IDs)" | |
| return result | |
| def get_tokenizer_stats(): | |
| """Get tokenizer statistics.""" | |
| vocab_size = tokenizer.get_vocab_size() | |
| insurance_terms = sum(1 for token in tokenizer.vocab.keys() if tokenizer._is_insurance_term(token)) | |
| return f""" | |
| ## π§ Pibit.ai Insurance Tokenizer Statistics | |
| - **Total Vocabulary Size**: {vocab_size:,} | |
| - **Insurance-Specific Terms**: {insurance_terms:,} | |
| - **Special Tokens**: {len(tokenizer.special_tokens)} | |
| - **Model Type**: {tokenizer.model_type} | |
| """ | |
| # Create the Gradio interface | |
| with gr.Blocks(theme=gr.themes.Soft(), title="Pibit.ai Insurance Tokenizer") as demo: | |
| gr.HTML("""<div style="text-align: center; padding: 20px;"><h1 style="color: #2E86AB;">π’ Pibit.ai Insurance Tokenizer</h1><p style="font-size: 18px; color: #666;">Specialized NLP Tokenizer for Property & Casualty Insurance Documents</p></div>""") | |
| with gr.Tabs(): | |
| with gr.Tab("π Document Analysis"): | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| input_text = gr.Textbox(lines=15, placeholder="Paste your insurance document here...", label="π Insurance Document Text") | |
| analyze_btn = gr.Button("π Analyze Document", variant="primary", size="lg") | |
| with gr.Column(scale=3): | |
| analysis_output = gr.Markdown(label="π Analysis Report") | |
| with gr.Row(): | |
| risk_gauge = gr.Plot(label="β οΈ Risk Assessment") | |
| token_pie = gr.Plot(label="π₯§ Token Distribution") | |
| entities_table = gr.DataFrame(label="π·οΈ Detected Entities") | |
| tokenization_sample = gr.Markdown(label="π§ Tokenization Sample") | |
| # The custom_training input has been removed to fix the performance issue. | |
| analyze_btn.click(analyze_insurance_document, inputs=[input_text], outputs=[analysis_output, risk_gauge, token_pie, entities_table, tokenization_sample]) | |
| gr.Examples( | |
| examples=[ | |
| ["Loss Run Report for Policy GL2024-001234\nEffective Date: 01/01/2024 to 12/31/2024\nInsured: ABC Manufacturing Company\nCoverage: General Liability, $1,000,000 per occurrence, $2,000,000 aggregate\nClaims History:\nClaim 1: Slip and fall incident, Date of Loss: 03/15/2024, Paid: $25,000, Incurred: $45,000, Status: Closed"], | |
| ["Workers Compensation Submission\nApplicant: XYZ Construction LLC\nPolicy Period: 01/01/2025 to 01/01/2026\nAnnual Payroll: $2,500,000\nRisk Factors: High-risk construction activities, prior OSHA violations"], | |
| ["Underwriting Risk Assessment Report\nAccount: Tech Startup Solutions Inc.\nLine of Business: Cyber Liability Insurance\nRequested Limits: $5,000,000 per claim\nSecurity Measures:\n- Multi-factor authentication: Yes\n- Incident response plan: In place\nPrior Claims: None reported"], | |
| ], | |
| inputs=input_text | |
| ) | |
| with gr.Tab("π§ Tokenization Tool"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| tokenize_input = gr.Textbox(lines=8, placeholder="Enter text to tokenize...", label="π Text to Tokenize") | |
| tokenize_btn = gr.Button("π§ Tokenize", variant="primary") | |
| with gr.Column(): | |
| tokenize_output = gr.Markdown(label="π― Tokenization Results") | |
| tokenize_btn.click(tokenize_text, inputs=tokenize_input, outputs=tokenize_output) | |
| with gr.Tab("βΉοΈ Tokenizer Info"): | |
| tokenizer_info = gr.Markdown() | |
| demo.load(get_tokenizer_stats, inputs=None, outputs=tokenizer_info) | |
| if __name__ == "__main__": | |
| demo.launch(debug=True) | |