Natwar commited on
Commit
2abea01
Β·
verified Β·
1 Parent(s): 132935b

Upload app.py

Browse files
Files changed (1) hide show
  1. app.py +475 -0
app.py ADDED
@@ -0,0 +1,475 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # -*- coding: utf-8 -*-
2
+ """Pibit.ipynb
3
+
4
+ Automatically generated by Colab.
5
+
6
+ Original file is located at
7
+ https://colab.research.google.com/drive/1Lug_ARdBOTu2e87sThol1luVksA0986T
8
+ """
9
+
10
+ import re
11
+ import json
12
+ import datetime
13
+ from collections import defaultdict, Counter
14
+ from typing import List, Dict, Tuple, Set, Optional
15
+ import unicodedata
16
+ import math
17
+ import gradio as gr
18
+ import pandas as pd
19
+ import plotly.express as px
20
+ import plotly.graph_objects as go
21
+ from plotly.subplots import make_subplots
22
+
23
+ class PibitInsuranceTokenizer:
24
+ """
25
+ Specialized tokenizer for insurance domain documents, designed for Pibit.ai's
26
+ underwriting automation platform. Handles loss run documents, policy documents,
27
+ claims data, and other insurance-specific text processing needs.
28
+ """
29
+
30
+ def __init__(self, vocab_size=15000, model_type="insurance_bpe"):
31
+ self.vocab_size = vocab_size
32
+ self.model_type = model_type
33
+
34
+ self.special_tokens = [
35
+ "<PAD>", "<UNK>", "<START>", "<END>", "<MASK>",
36
+ "<CLAIM>", "<POLICY>", "<AMOUNT>", "<DATE>", "<RISK>",
37
+ "<COVERAGE>", "<DEDUCTIBLE>", "<PREMIUM>", "<LOSS>",
38
+ "<UNDERWRITER>", "<CARRIER>", "<INSURED>", "<PERCENTAGE>"
39
+ ]
40
+
41
+ self.vocab = {}
42
+ self.id_to_token = {}
43
+ self.token_frequencies = Counter()
44
+ self.merges = []
45
+ self.bpe_ranks = {}
46
+
47
+ # COMPUTATION-LIGHT: Compiling regex patterns once at initialization is highly efficient.
48
+ # This avoids re-compiling the same pattern for every function call.
49
+ self.insurance_patterns = self._load_insurance_patterns()
50
+ self.financial_pattern = re.compile(r'\$[\d,]+(?:\.\d{2})?')
51
+ self.policy_pattern = re.compile(r'[A-Z]{2,4}[\d\-]{6,12}')
52
+ self.date_pattern = re.compile(r'\d{1,2}[\/\-]\d{1,2}[\/\-]\d{2,4}|\d{4}[\/\-]\d{1,2}[\/\-]\d{1,2}')
53
+ self.percentage_pattern = re.compile(r'\d+(?:\.\d+)?%')
54
+
55
+ self._initialize_special_tokens()
56
+
57
+ def _load_insurance_patterns(self) -> Dict[str, List[str]]:
58
+ """Load insurance domain-specific patterns and terminology."""
59
+ return {
60
+ 'coverage_types': ['general liability', 'workers compensation', 'property coverage', 'commercial auto', 'cyber liability', 'professional liability', 'directors officers', 'employment practices', 'umbrella coverage', 'commercial crime', 'boiler machinery', 'builders risk'],
61
+ 'claim_types': ['bodily injury', 'property damage', 'medical payments', 'personal injury', 'products liability', 'completed operations', 'fire damage', 'theft', 'vandalism', 'water damage', 'slip and fall', 'motor vehicle accident', 'workplace injury'],
62
+ 'risk_factors': ['hazardous materials', 'high risk activity', 'prior claims', 'safety violations', 'regulatory issues', 'financial distress', 'industry classification', 'geographic risk', 'seasonal business', 'new venture', 'construction defects', 'product recall'],
63
+ 'financial_terms': ['deductible', 'premium', 'limit', 'retention', 'aggregate', 'occurrence', 'claims made', 'prior acts', 'extended reporting', 'loss ratio', 'experience modification', 'rate', 'exposure'],
64
+ 'underwriting_terms': ['risk assessment', 'loss run', 'acord forms', 'submission', 'renewal', 'policy period', 'effective date', 'expiration', 'carrier', 'excess', 'reinsurance', 'facultative', 'treaty', 'reserve', 'incurred', 'paid', 'outstanding', 'ibnr']
65
+ }
66
+
67
+ def _initialize_special_tokens(self):
68
+ """Initialize special tokens in vocabulary."""
69
+ for i, token in enumerate(self.special_tokens):
70
+ self.vocab[token] = i
71
+ self.id_to_token[i] = token
72
+
73
+ def _preprocess_text(self, text: str) -> str:
74
+ """
75
+ Insurance-specific text preprocessing.
76
+ Normalizes financial amounts, dates, policy numbers, and other entities.
77
+ """
78
+ # COMPUTATION-LIGHT: Unicode normalization and regex substitutions are very fast C-level operations.
79
+ text = unicodedata.normalize('NFKC', text)
80
+ text = self.financial_pattern.sub('<AMOUNT>', text)
81
+ text = self.date_pattern.sub('<DATE>', text)
82
+ text = self.policy_pattern.sub('<POLICY>', text)
83
+ text = self.percentage_pattern.sub('<PERCENTAGE>', text)
84
+ text = self._normalize_insurance_terms(text)
85
+ return text.strip()
86
+
87
+ def _normalize_insurance_terms(self, text: str) -> str:
88
+ """Normalize insurance-specific terminology."""
89
+ abbreviations = {
90
+ r'\bGL\b': 'general liability', r'\bWC\b': 'workers compensation', r'\bAuto\b': 'automobile',
91
+ r'\bD&O\b': 'directors officers', r'\bE&O\b': 'errors omissions', r'\bEPLI\b': 'employment practices liability',
92
+ r'\bBI\b': 'bodily injury', r'\bPD\b': 'property damage', r'\bMP\b': 'medical payments',
93
+ r'\bTPA\b': 'third party administrator', r'\bMGA\b': 'managing general agent', r'\bACV\b': 'actual cash value',
94
+ r'\bRCV\b': 'replacement cost value'
95
+ }
96
+ for abbrev, full_form in abbreviations.items():
97
+ text = re.sub(abbrev, full_form, text, flags=re.IGNORECASE)
98
+ return text
99
+
100
+ def _extract_insurance_entities(self, text: str) -> List[Tuple[str, str]]:
101
+ """Extract insurance-specific entities from text."""
102
+ # COMPUTATION-LIGHT: Finding all matches with `finditer` is highly optimized.
103
+ entities = []
104
+ for match in self.financial_pattern.finditer(text): entities.append(('AMOUNT', match.group()))
105
+ for match in self.date_pattern.finditer(text): entities.append(('DATE', match.group()))
106
+ for match in self.policy_pattern.finditer(text): entities.append(('POLICY', match.group()))
107
+ for match in self.percentage_pattern.finditer(text): entities.append(('PERCENTAGE', match.group()))
108
+ return entities
109
+
110
+ def _tokenize_with_domain_awareness(self, text: str) -> List[str]:
111
+ """
112
+ Domain-aware tokenization that preserves insurance terminology.
113
+ """
114
+ # COMPUTATION-LIGHT: A single pass with regex to get initial tokens.
115
+ word_pattern = r"[a-zA-Z]+(?:'[a-zA-Z]+)?|[0-9]+(?:\.[0-9]+)?|[^\w\s]"
116
+ tokens = re.findall(word_pattern, text.lower())
117
+
118
+ # COMPUTATION-LIGHT: A single while loop to merge compound terms. Its complexity is linear O(n) with respect to the number of tokens.
119
+ merged_tokens = []
120
+ i = 0
121
+ while i < len(tokens):
122
+ found_compound = False
123
+ for length in [3, 2]:
124
+ if i + length <= len(tokens):
125
+ candidate = ' '.join(tokens[i:i+length])
126
+ for category, terms in self.insurance_patterns.items():
127
+ if candidate in terms:
128
+ merged_tokens.append(candidate.replace(' ', '_'))
129
+ i += length
130
+ found_compound = True
131
+ break
132
+ if found_compound: break
133
+ if not found_compound:
134
+ merged_tokens.append(tokens[i])
135
+ i += 1
136
+ return merged_tokens
137
+
138
+ def _get_word_frequencies_insurance(self, texts: List[str]) -> Dict[str, int]:
139
+ """Get word frequencies with insurance domain emphasis."""
140
+ # COMPUTATION-LIGHT: Dictionary lookups and updates are very fast, close to constant time O(1) on average.
141
+ word_freqs = defaultdict(int)
142
+ for text in texts:
143
+ preprocessed_text = self._preprocess_text(text)
144
+ tokens = self._tokenize_with_domain_awareness(preprocessed_text)
145
+ for token in tokens:
146
+ token_chars = ' '.join(list(token)) + ' </w>'
147
+ word_freqs[token_chars] += 1
148
+ if self._is_insurance_term(token):
149
+ word_freqs[token_chars] += 2
150
+ return word_freqs
151
+
152
+ def _is_insurance_term(self, token: str) -> bool:
153
+ """Check if token is an insurance-specific term."""
154
+ token_lower = token.lower().replace('_', ' ')
155
+ for category, terms in self.insurance_patterns.items():
156
+ if token_lower in terms: return True
157
+ insurance_keywords = {'claim', 'policy', 'premium', 'deductible', 'coverage', 'liability', 'underwrite', 'insured', 'carrier', 'risk', 'loss', 'damage', 'accident', 'incident', 'hazard', 'peril', 'exposure', 'limit'}
158
+ return token_lower in insurance_keywords
159
+
160
+ def _get_pairs(self, word: str) -> Set[Tuple[str, str]]:
161
+ """Get all adjacent pairs in a word."""
162
+ pairs = set()
163
+ prev_char = word[0]
164
+ for char in word[1:]:
165
+ pairs.add((prev_char, char))
166
+ prev_char = char
167
+ return pairs
168
+
169
+ def _merge_word(self, word: str, pair: Tuple[str, str]) -> str:
170
+ """Merge a specific pair in a word."""
171
+ return ' '.join(word.split()).replace(f'{pair[0]} {pair[1]}', f'{pair[0]}{pair[1]}')
172
+
173
+ def _train_insurance_bpe(self, texts: List[str]) -> None:
174
+ """
175
+ # COMPUTATION-HEAVY: This is the most intensive part of the code.
176
+ # BPE training involves multiple loops over the vocabulary and pairs, which can be slow,
177
+ # especially as the vocabulary and number of merges grow.
178
+ # This should only be run ONCE during setup, not during user interaction.
179
+ """
180
+ word_freqs = self._get_word_frequencies_insurance(texts)
181
+ vocab = set()
182
+ for word in word_freqs.keys(): vocab.update(word.split())
183
+ for category, terms in self.insurance_patterns.items():
184
+ for term in terms: vocab.add(term.replace(' ', '_'))
185
+
186
+ num_merges = self.vocab_size - len(self.vocab) - len(vocab)
187
+
188
+ for merge_idx in range(num_merges):
189
+ pairs = defaultdict(int)
190
+ for word, freq in word_freqs.items():
191
+ word_pairs = self._get_pairs(word.split())
192
+ for pair in word_pairs:
193
+ pairs[pair] += freq
194
+ if not pairs: break
195
+ best_pair = max(pairs, key=pairs.get)
196
+
197
+ new_word_freqs = {}
198
+ for word, freq in word_freqs.items():
199
+ new_word = self._merge_word(word, best_pair)
200
+ new_word_freqs[new_word] = freq
201
+ word_freqs = new_word_freqs
202
+
203
+ self.merges.append(best_pair)
204
+ self.bpe_ranks[best_pair] = merge_idx
205
+ merged_token = best_pair[0] + best_pair[1]
206
+ vocab.add(merged_token)
207
+
208
+ def _apply_bpe(self, word: str) -> List[str]:
209
+ """Apply BPE merges to a word."""
210
+ if len(word) == 1: return [word]
211
+ word_tokens = list(word)
212
+ word = ' '.join(word_tokens) + ' </w>'
213
+
214
+ while True:
215
+ pairs = self._get_pairs(word.split())
216
+ if not pairs: break
217
+
218
+ bigram = min(pairs, key=lambda pair: self.bpe_ranks.get(pair, float('inf')))
219
+ if bigram not in self.bpe_ranks: break
220
+
221
+ word = self._merge_word(word, bigram)
222
+
223
+ return word.split()
224
+
225
+ def train(self, texts: List[str]) -> None:
226
+ """Train the insurance domain tokenizer."""
227
+ if self.model_type == "insurance_bpe":
228
+ self._train_insurance_bpe(texts)
229
+ all_tokens = set()
230
+ for text in texts:
231
+ preprocessed = self._preprocess_text(text)
232
+ tokens = self._tokenize_with_domain_awareness(preprocessed)
233
+ for token in tokens:
234
+ bpe_tokens = self._apply_bpe(token)
235
+ all_tokens.update(bpe_tokens)
236
+ else:
237
+ all_tokens = set()
238
+ for text in texts:
239
+ preprocessed = self._preprocess_text(text)
240
+ tokens = self._tokenize_with_domain_awareness(preprocessed)
241
+ all_tokens.update(tokens)
242
+ for token in tokens:
243
+ self.token_frequencies[token] += 3 if self._is_insurance_term(token) else 1
244
+
245
+ if len(all_tokens) > self.vocab_size - len(self.special_tokens):
246
+ if self.model_type != "insurance_bpe":
247
+ insurance_terms = [t for t in all_tokens if self._is_insurance_term(t)]
248
+ other_terms = sorted([t for t in all_tokens if not self._is_insurance_term(t)], key=lambda x: self.token_frequencies[x], reverse=True)
249
+ max_others = self.vocab_size - len(self.special_tokens) - len(insurance_terms)
250
+ all_tokens = insurance_terms + other_terms[:max_others]
251
+
252
+ start_idx = len(self.special_tokens)
253
+ for i, token in enumerate(sorted(list(all_tokens))):
254
+ token_id = start_idx + i
255
+ self.vocab[token] = token_id
256
+ self.id_to_token[token_id] = token
257
+
258
+ def tokenize(self, text: str) -> List[str]:
259
+ """Tokenize insurance document text."""
260
+ preprocessed = self._preprocess_text(text)
261
+ tokens = self._tokenize_with_domain_awareness(preprocessed)
262
+ if self.model_type == "insurance_bpe":
263
+ result = []
264
+ for token in tokens:
265
+ if token in self.vocab: result.append(token)
266
+ else: result.extend(self._apply_bpe(token))
267
+ return result
268
+ return tokens
269
+
270
+ def encode(self, text: str) -> List[int]:
271
+ """Encode text to token IDs."""
272
+ tokens = self.tokenize(text)
273
+ return [self.vocab.get(token, self.vocab["<UNK>"]) for token in tokens]
274
+
275
+ def decode(self, token_ids: List[int]) -> str:
276
+ """Decode token IDs back to text."""
277
+ tokens = [self.id_to_token[tid] for tid in token_ids if tid in self.id_to_token]
278
+ text = ' '.join(tokens).replace(' </w>', '').replace('_', ' ')
279
+ # Clean up special tokens that shouldn't be in the final text
280
+ for special in self.special_tokens:
281
+ if special not in ["<AMOUNT>", "<DATE>", "<POLICY>", "<PERCENTAGE>"]:
282
+ text = text.replace(special, '')
283
+ return text.strip()
284
+
285
+ def analyze_document(self, text: str) -> Dict:
286
+ """Analyze insurance document and extract key information."""
287
+ # COMPUTATION-LIGHT: Analysis is fast as it reuses efficient tokenization and regex methods.
288
+ entities = self._extract_insurance_entities(text)
289
+ tokens = self.tokenize(text)
290
+ if not tokens: return {'document_type': 'Unknown', 'total_tokens': 0, 'insurance_terms': 0, 'insurance_term_ratio': 0, 'entities': [], 'key_terms': [], 'risk_score': 0, 'confidence': 0}
291
+
292
+ insurance_term_count = sum(1 for token in tokens if self._is_insurance_term(token))
293
+ doc_type = self._identify_document_type(text, tokens)
294
+ risk_score = self._calculate_risk_score(text, tokens, entities)
295
+
296
+ return {
297
+ 'document_type': doc_type, 'total_tokens': len(tokens),
298
+ 'insurance_terms': insurance_term_count, 'insurance_term_ratio': insurance_term_count / len(tokens),
299
+ 'entities': entities, 'key_terms': list(set([t for t in tokens if self._is_insurance_term(t)]))[:20],
300
+ 'risk_score': risk_score, 'confidence': min(0.95, insurance_term_count / len(tokens) * 2)
301
+ }
302
+
303
+ def _calculate_risk_score(self, text: str, tokens: List[str], entities: List[Tuple[str, str]]) -> float:
304
+ """Calculate risk score based on document content."""
305
+ risk_score = 0.5
306
+ high_risk_terms = ['prior claims', 'safety violations', 'hazardous materials', 'high risk activity', 'regulatory issues', 'financial distress']
307
+ for term in high_risk_terms:
308
+ if term.replace(' ', '_') in tokens or term in text.lower(): risk_score += 0.1
309
+
310
+ amounts = [float(re.sub(r'[\$,]', '', entity[1])) for entity in entities if entity[0] == 'AMOUNT' and re.sub(r'[\$,]', '', entity[1]).replace('.','',1).isdigit()]
311
+ if amounts:
312
+ max_amount = max(amounts)
313
+ if max_amount > 1000000: risk_score += 0.2
314
+ elif max_amount > 100000: risk_score += 0.1
315
+ return min(1.0, max(0.0, risk_score))
316
+
317
+ def _identify_document_type(self, text: str, tokens: List[str]) -> str:
318
+ """Identify the type of insurance document."""
319
+ doc_indicators = {
320
+ 'loss_run': ['loss_run', 'claims_history', 'paid_losses', 'incurred', 'reserves', 'loss_ratio'],
321
+ 'policy': ['policy_number', 'effective_date', 'expiration_date', 'coverage_limit', 'policy_period'],
322
+ 'claim': ['claim_number', 'date_of_loss', 'claimant', 'adjuster', 'claim_details'],
323
+ 'submission': ['submission', 'application', 'proposal', 'quote_request', 'underwriting'],
324
+ 'certificate': ['certificate', 'evidence_of_coverage', 'additional_insured']
325
+ }
326
+ scores = {doc_type: sum(2 if ind.replace(' ', '_') in tokens else 1 if ind.replace('_', ' ') in text.lower() else 0 for ind in indicators) for doc_type, indicators in doc_indicators.items()}
327
+ if not scores or max(scores.values()) == 0: return 'general_insurance'
328
+ return max(scores, key=scores.get)
329
+
330
+ def get_vocab_size(self) -> int:
331
+ return len(self.vocab)
332
+
333
+ # --- SINGLE GLOBAL INSTANCE ---
334
+ # The tokenizer is created and trained only ONCE when the script starts.
335
+ # All functions will now use this single, pre-trained instance.
336
+ print("Initializing and training the Pibit.ai Insurance Tokenizer... Please wait.")
337
+ tokenizer = PibitInsuranceTokenizer(vocab_size=8000, model_type="insurance_bpe")
338
+
339
+ # Default training documents
340
+ default_training_docs = [
341
+ "Loss Run Report for Policy GL2024-001234. Effective Date: 01/01/2024 to 12/31/2024. Insured: ABC Manufacturing Company. Coverage: General Liability, $1,000,000 per occurrence, $2,000,000 aggregate. Claims History: Claim 1: Slip and fall incident, Date of Loss: 03/15/2024, Paid: $25,000, Incurred: $45,000, Status: Closed. Claim 2: Product liability claim, Date of Loss: 07/22/2024, Paid: $0, Incurred: $75,000, Status: Open, Reserves: $75,000. Total Paid Losses: $25,000. Total Incurred Losses: $120,000. Loss Ratio: 12%. Experience Modification Factor: 0.85",
342
+ "Workers Compensation Submission. Applicant: XYZ Construction LLC. Policy Period: 01/01/2025 to 01/01/2026. Industry Code: 5645 - Residential Construction. Annual Payroll: $2,500,000. Prior Coverage: Carrier ABC, Premium: $125,000, Deductible: $5,000. Claims Experience: 3 claims in past 3 years. Workplace injury, back strain: $15,000 paid. Fall from height accident: $85,000 incurred. Repetitive motion injury: $12,000 paid. Risk Factors: High-risk construction activities, prior OSHA violations. Underwriting Notes: Requires safety program implementation.",
343
+ "Commercial Property Loss Notice. Policy Number: CP2024-567890. Insured: Downtown Office Building LLC. Date of Loss: 11/08/2024. Cause of Loss: Water damage from burst pipe. Coverage Details: Building Coverage: $5,000,000. Business Personal Property: $500,000. Deductible: $10,000. Claim Details: Estimated Repair Cost: $125,000. Business Interruption Loss: $25,000. Adjuster: John Smith, License #12345. Initial Reserve: $150,000.",
344
+ "Underwriting Risk Assessment Report. Account: Tech Startup Solutions Inc. Line of Business: Cyber Liability Insurance. Requested Limits: $5,000,000 per claim, $10,000,000 aggregate. Risk Factors Analysis: Industry: Technology/Software Development. Revenue: $15,000,000 annually. Security Measures: Multi-factor authentication: Yes. Encryption protocols: AES-256. Employee training: Quarterly. Incident response plan: In place. Prior Claims: None reported. Competitive Premium Quote: $45,000 annually. Recommended Deductible: $25,000."
345
+ ]
346
+ tokenizer.train(default_training_docs)
347
+ print("Tokenizer is ready!")
348
+
349
+ # --- Gradio App Functions ---
350
+
351
+ def create_analysis_plots(analysis_data):
352
+ """Create visualization plots for document analysis."""
353
+ fig_gauge = go.Figure(go.Indicator(
354
+ mode = "gauge+number", value = analysis_data['risk_score'] * 100,
355
+ domain = {'x': [0, 1], 'y': [0, 1]}, title = {'text': "Risk Score"},
356
+ gauge = {'axis': {'range': [None, 100]}, 'bar': {'color': "#2E86AB"},
357
+ 'steps': [{'range': [0, 40], 'color': "lightgreen"}, {'range': [40, 70], 'color': "yellow"}, {'range': [70, 100], 'color': "lightcoral"}]}))
358
+ fig_gauge.update_layout(height=300, margin=dict(l=20, r=20, t=50, b=20))
359
+
360
+ insurance_tokens = analysis_data['insurance_terms']
361
+ other_tokens = analysis_data['total_tokens'] - insurance_tokens
362
+ fig_pie = px.pie(values=[insurance_tokens, other_tokens], names=['Insurance Terms', 'Other Terms'], title='Token Distribution', color_discrete_sequence=['#FF6B6B', '#4ECDC4'])
363
+ fig_pie.update_layout(height=300, margin=dict(l=20, r=20, t=50, b=20))
364
+ return fig_gauge, fig_pie
365
+
366
+ def analyze_insurance_document(text):
367
+ """
368
+ Main function to analyze insurance documents.
369
+ This now uses the single, globally-trained tokenizer and is very fast.
370
+ """
371
+ if not text.strip():
372
+ return "Please enter some text to analyze.", go.Figure(), go.Figure(), pd.DataFrame(), ""
373
+
374
+ # The core change: No more retraining! Just analyze.
375
+ analysis = tokenizer.analyze_document(text)
376
+
377
+ summary = f"""
378
+ ## πŸ“Š Pibit.ai Insurance Document Analysis Report
379
+ ### 🏒 Document Classification
380
+ - **Document Type**: {analysis['document_type'].title().replace('_', ' ')}
381
+ - **Analysis Confidence**: {analysis['confidence']:.1%}
382
+ ### πŸ“ˆ Token Analysis
383
+ - **Total Tokens**: {analysis['total_tokens']:,}
384
+ - **Insurance-Specific Terms**: {analysis['insurance_terms']:,}
385
+ - **Domain Relevance**: {analysis['insurance_term_ratio']:.1%}
386
+ ### ⚠️ Risk Assessment
387
+ - **Risk Score**: {analysis['risk_score']:.2f} / 1.00
388
+ - **Risk Level**: {"πŸ”΄ HIGH" if analysis['risk_score'] > 0.7 else "🟑 MEDIUM" if analysis['risk_score'] > 0.4 else "🟒 LOW"}
389
+ ### 🏷️ Entities Detected
390
+ {len(analysis['entities'])} entities found:
391
+ """
392
+ for entity_type, entity_value in analysis['entities'][:10]:
393
+ summary += f"- **{entity_type}**: {entity_value}\n"
394
+ if len(analysis['entities']) > 10:
395
+ summary += f"- ... and {len(analysis['entities']) - 10} more\n"
396
+
397
+ summary += f"\n### πŸ”‘ Key Insurance Terms\n"
398
+ summary += ", ".join([f"`{term.replace('_', ' ')}`" for term in analysis['key_terms']])
399
+
400
+ fig_gauge, fig_pie = create_analysis_plots(analysis)
401
+ entities_df = pd.DataFrame(analysis['entities'], columns=['Entity Type', 'Value'])
402
+ tokens = tokenizer.tokenize(text[:500])
403
+ tokenization_example = f"**Sample Tokenization** (first 500 characters):\n\n{' | '.join(tokens[:20])}"
404
+ if len(tokens) > 20:
405
+ tokenization_example += f" | ... ({len(tokens)} total tokens)"
406
+
407
+ return summary, fig_gauge, fig_pie, entities_df, tokenization_example
408
+
409
+ def tokenize_text(text):
410
+ """Tokenize text and return tokens."""
411
+ if not text.strip(): return "Please enter some text to tokenize."
412
+ tokens = tokenizer.tokenize(text)
413
+ token_ids = tokenizer.encode(text)
414
+ result = f"**Tokens ({len(tokens)}):**\n{' | '.join(tokens)}\n\n**Token IDs:**\n{' '.join(map(str, token_ids[:50]))}"
415
+ if len(token_ids) > 50: result += f" ... ({len(token_ids)} total IDs)"
416
+ return result
417
+
418
+ def get_tokenizer_stats():
419
+ """Get tokenizer statistics."""
420
+ vocab_size = tokenizer.get_vocab_size()
421
+ insurance_terms = sum(1 for token in tokenizer.vocab.keys() if tokenizer._is_insurance_term(token))
422
+ return f"""
423
+ ## πŸ”§ Pibit.ai Insurance Tokenizer Statistics
424
+ - **Total Vocabulary Size**: {vocab_size:,}
425
+ - **Insurance-Specific Terms**: {insurance_terms:,}
426
+ - **Special Tokens**: {len(tokenizer.special_tokens)}
427
+ - **Model Type**: {tokenizer.model_type}
428
+ """
429
+
430
+ # Create the Gradio interface
431
+ with gr.Blocks(theme=gr.themes.Soft(), title="Pibit.ai Insurance Tokenizer") as demo:
432
+ gr.HTML("""<div style="text-align: center; padding: 20px;"><h1 style="color: #2E86AB;">🏒 Pibit.ai Insurance Tokenizer</h1><p style="font-size: 18px; color: #666;">Specialized NLP Tokenizer for Property & Casualty Insurance Documents</p></div>""")
433
+
434
+ with gr.Tabs():
435
+ with gr.Tab("πŸ“Š Document Analysis"):
436
+ with gr.Row():
437
+ with gr.Column(scale=2):
438
+ input_text = gr.Textbox(lines=15, placeholder="Paste your insurance document here...", label="πŸ“„ Insurance Document Text")
439
+ analyze_btn = gr.Button("πŸ” Analyze Document", variant="primary", size="lg")
440
+ with gr.Column(scale=3):
441
+ analysis_output = gr.Markdown(label="πŸ“‹ Analysis Report")
442
+ with gr.Row():
443
+ risk_gauge = gr.Plot(label="⚠️ Risk Assessment")
444
+ token_pie = gr.Plot(label="πŸ₯§ Token Distribution")
445
+ entities_table = gr.DataFrame(label="🏷️ Detected Entities")
446
+ tokenization_sample = gr.Markdown(label="πŸ”§ Tokenization Sample")
447
+
448
+ # The custom_training input has been removed to fix the performance issue.
449
+ analyze_btn.click(analyze_insurance_document, inputs=[input_text], outputs=[analysis_output, risk_gauge, token_pie, entities_table, tokenization_sample])
450
+
451
+ gr.Examples(
452
+ examples=[
453
+ ["Loss Run Report for Policy GL2024-001234\nEffective Date: 01/01/2024 to 12/31/2024\nInsured: ABC Manufacturing Company\nCoverage: General Liability, $1,000,000 per occurrence, $2,000,000 aggregate\nClaims History:\nClaim 1: Slip and fall incident, Date of Loss: 03/15/2024, Paid: $25,000, Incurred: $45,000, Status: Closed"],
454
+ ["Workers Compensation Submission\nApplicant: XYZ Construction LLC\nPolicy Period: 01/01/2025 to 01/01/2026\nAnnual Payroll: $2,500,000\nRisk Factors: High-risk construction activities, prior OSHA violations"],
455
+ ["Underwriting Risk Assessment Report\nAccount: Tech Startup Solutions Inc.\nLine of Business: Cyber Liability Insurance\nRequested Limits: $5,000,000 per claim\nSecurity Measures:\n- Multi-factor authentication: Yes\n- Incident response plan: In place\nPrior Claims: None reported"],
456
+ ],
457
+ inputs=input_text
458
+ )
459
+
460
+ with gr.Tab("πŸ”§ Tokenization Tool"):
461
+ with gr.Row():
462
+ with gr.Column():
463
+ tokenize_input = gr.Textbox(lines=8, placeholder="Enter text to tokenize...", label="πŸ“ Text to Tokenize")
464
+ tokenize_btn = gr.Button("πŸ”§ Tokenize", variant="primary")
465
+ with gr.Column():
466
+ tokenize_output = gr.Markdown(label="🎯 Tokenization Results")
467
+ tokenize_btn.click(tokenize_text, inputs=tokenize_input, outputs=tokenize_output)
468
+
469
+ with gr.Tab("ℹ️ Tokenizer Info"):
470
+ tokenizer_info = gr.Markdown()
471
+ demo.load(get_tokenizer_stats, inputs=None, outputs=tokenizer_info)
472
+
473
+ if __name__ == "__main__":
474
+ demo.launch(debug=True)
475
+