ai-kit / chat_bot /process_policies.py
Kim Adams
adding llm to inference
31643df
import re
import json
import pytesseract
import pdf2image
import openai
from spacy import blank
from utilities import api_keys, prompt_constants, clean_text, constants
from image_to_text import image_to_text
openai.api_key = api_keys.APIKeys().get_key('OPENAI_API_KEY')
key_words=set()
def ExtractInsights(chunk,prompt):
chunk = clean_text.CleanText(chunk)
completion = CompletionEngine(prompt, chunk, constants.SUMMARY_TOKENS, 1, constants.TEMP, constants.INSIGHTS_MODEL, constants.TOP_P)
summary = completion.choices[0]['message']['content']
return summary
def CompletionEngine(sys_message, user_message, num_tokens, num_results, temperature, topic_model, top_p):
return openai.ChatCompletion.create(
model=topic_model,
messages=[
{"role": "system", "content": sys_message},
{"role": "user", "content": user_message}, ],
max_tokens=num_tokens,
n=num_results,
temperature=temperature,
stop=None,
top_p=top_p
)
#---------- 1. Convert PDF to text using OCR
def convert_pdf_to_text_ocr(pdf_path, txt_path):
pages = pdf2image.convert_from_path(pdf_path, 300)
text_output = ''
for page in pages:
text_output += pytesseract.image_to_string(page)
with open(txt_path, 'w') as f:
f.write(text_output)
return text_output
#---------- 2. Create schema from policy text using parsers
def create_schema(text):
schema = {
'policy_info': parse_policy_info(text),
'contact_info': parse_contact_info(text),
'vehicles': parse_vehicle_info(text),
'coverages': parse_coverage_info(text),
'discounts': parse_discounts(text),
'additional_info': parse_additional_info(text),
'compliance_info': parse_compliance_info(text)
}
return schema
#---------- Parsers for each type of policy data, leverage regex to identify patterns in data
def parse_vehicle_info(text):
vehicles = []
# Extract the relevant section
description_start = text.find("Description of Vehicle(s)")
supplemental_start = text.find("Supplemental Coverage")
relevant_text = text[description_start:supplemental_start]
# Clean and prepare the text
relevant_lines = relevant_text.split('\n')
relevant_text = ' '.join(relevant_lines).replace(' ', ' ')
chunks = image_to_text.CreateChunks(relevant_text)
for chunk in chunks:
vehicle_data = ExtractInsights(chunk, prompt_constants.VEHICLE_INSIGHTS_PROMPT)
print("** vehicle_data:", vehicle_data)
if vehicle_data != "None" and vehicle_data:
vehicle_pattern = re.compile(r'"Make":\s*"(.*?)",\s*"Model":\s*"(.*?)",\s*"Year":\s*"(.*?)",\s*"VIN":\s*"(.*?)",\s*"BodyType":\s*"(.*?)"')
matches = vehicle_pattern.finditer(vehicle_data)
for match in matches:
make, model, year, vin, body_type = match.groups()
if make != 'Unknown' and model != 'Unknown' and vin != 'Unknown':
vehicle_info = {
'make': make,
'model': model,
'year': year,
'vin': vin,
'body_type': body_type
}
vehicles.append(vehicle_info)
return vehicles
'''
try:
vehicle_list = json.loads(vehicle_data)
# Iterate over each vehicle in the list
for vehicle_json in vehicle_list:
if vehicle_json['Make'] != 'Unknown' and vehicle_json['Model'] != 'Unknown' and vehicle_json['VIN'] != 'Unknown':
vehicles.append({
'make': vehicle_json['Make'],
'model': vehicle_json['Model'],
'year': vehicle_json['Year'],
'vin': vehicle_json['VIN'],
'body_type': vehicle_json['BodyType']
})
except json.JSONDecodeError as e:
print("Failed to decode JSON, skipping this chunk due to:", str(e))'''
print ("**** vehicles:", vehicles)
return vehicles
def parse_policy_info(text):
policy_info = {}
policy_number = re.search(r'POLICY NUMBER\s*:\s*(\S+)', text)
effective_date = re.search(r'EFFECTIVE\s+(\S+)\s+TO\s+(\S+)', text)
named_insured = re.search(r'Named Insured and Address\s*([\w\s]+)\s', text)
address = re.search(r'Address\s*([\d\w\s]+)\s*', text)
if policy_number:
policy_info['policy_number'] = policy_number.group(1)
if effective_date:
policy_info['effective_date'] = effective_date.group(1)
policy_info['expiration_date'] = effective_date.group(2)
if named_insured:
policy_info['named_insured'] = named_insured.group(1).strip()
if address:
policy_info['address'] = address.group(1).strip()
return policy_info
def parse_contact_info(text):
contact_info = {}
insurance_company = re.search(r'Insurance Company\s*:\s*([\w\s]+)', text)
customer_service = re.search(r'customer service\s*:\s*(\d+-\d+-\d+)', text)
claims = re.search(r'claims\s*:\s*(\d+-\d+-\d+)', text)
website = re.search(r'website\s*:\s*(\S+)', text)
mailing_address = re.search(r'Mailing Address\s*:\s*([\d\w\s,]+)', text)
if insurance_company:
contact_info['insurance_company'] = insurance_company.group(1).strip()
if customer_service:
contact_info['customer_service'] = customer_service.group(1).strip()
if claims:
contact_info['claims'] = claims.group(1).strip()
if website:
contact_info['website'] = website.group(1).strip()
if mailing_address:
contact_info['mailing_address'] = mailing_address.group(1).strip()
return contact_info
def parse_coverage_info(text):
coverages = []
coverage_info = re.findall(r'PART\s+(\w+)\s+[-\w\s]*\n([^\n]+)', text)
for info in coverage_info:
coverage = {
'type': info[0],
'limits_of_liability': info[1].strip(),
'deductible': None # Set to None, adjust if found in text
}
coverages.append(coverage)
return coverages
def parse_discounts(text):
discounts = []
discount_info = re.findall(r'(\w+\s+\w+)\s+DISCOUNT\s+-\$\s*(\d+\.\d+)', text)
for info in discount_info:
discount = {
'type': info[0].strip(),
'amount': f"-${info[1]}"
}
discounts.append(discount)
return discounts
def parse_additional_info(text):
additional_info = {}
total_premium = re.search(r'TOTAL PREMIUM\s*\$\s*(\d+\.\d+)', text)
premium_due = re.search(r'PREMIUM DUE AT INCEPTION', text)
policy_changes = re.findall(r'ENDORSEMENTS\s*:\s*([\w, ]+)', text)
if total_premium:
additional_info['total_premium'] = f"${total_premium.group(1)}"
if premium_due:
additional_info['premium_due_at_inception'] = "Yes"
if policy_changes:
additional_info['policy_changes'] = [change.strip() for change in policy_changes]
return additional_info
def parse_compliance_info(text):
compliance_info = {}
state_requirements = re.search(r'state requirements\s*:\s*([\w\s]+)', text)
coverage_rejections = re.search(r'coverage rejections\s*:\s*([\w\s]+)', text)
if state_requirements:
compliance_info['state_requirements'] = state_requirements.group(1).strip()
if coverage_rejections:
compliance_info['coverage_rejections'] = coverage_rejections.group(1).strip()
return compliance_info