| import re |
| import json |
| import pytesseract |
| import pdf2image |
| import openai |
| from spacy import blank |
| from utilities import api_keys, prompt_constants, clean_text, constants |
| from image_to_text import image_to_text |
|
|
| openai.api_key = api_keys.APIKeys().get_key('OPENAI_API_KEY') |
| key_words=set() |
|
|
| def ExtractInsights(chunk,prompt): |
| chunk = clean_text.CleanText(chunk) |
| completion = CompletionEngine(prompt, chunk, constants.SUMMARY_TOKENS, 1, constants.TEMP, constants.INSIGHTS_MODEL, constants.TOP_P) |
| summary = completion.choices[0]['message']['content'] |
| return summary |
|
|
| def CompletionEngine(sys_message, user_message, num_tokens, num_results, temperature, topic_model, top_p): |
| return openai.ChatCompletion.create( |
| model=topic_model, |
| messages=[ |
| {"role": "system", "content": sys_message}, |
| {"role": "user", "content": user_message}, ], |
| max_tokens=num_tokens, |
| n=num_results, |
| temperature=temperature, |
| stop=None, |
| top_p=top_p |
| ) |
|
|
| |
| def convert_pdf_to_text_ocr(pdf_path, txt_path): |
| pages = pdf2image.convert_from_path(pdf_path, 300) |
| text_output = '' |
| for page in pages: |
| text_output += pytesseract.image_to_string(page) |
| with open(txt_path, 'w') as f: |
| f.write(text_output) |
| return text_output |
|
|
| |
| def create_schema(text): |
| schema = { |
| 'policy_info': parse_policy_info(text), |
| 'contact_info': parse_contact_info(text), |
| 'vehicles': parse_vehicle_info(text), |
| 'coverages': parse_coverage_info(text), |
| 'discounts': parse_discounts(text), |
| 'additional_info': parse_additional_info(text), |
| 'compliance_info': parse_compliance_info(text) |
| } |
| return schema |
|
|
|
|
|
|
| |
| def parse_vehicle_info(text): |
| vehicles = [] |
| |
| description_start = text.find("Description of Vehicle(s)") |
| supplemental_start = text.find("Supplemental Coverage") |
| relevant_text = text[description_start:supplemental_start] |
|
|
| |
| relevant_lines = relevant_text.split('\n') |
| relevant_text = ' '.join(relevant_lines).replace(' ', ' ') |
|
|
| chunks = image_to_text.CreateChunks(relevant_text) |
| for chunk in chunks: |
| vehicle_data = ExtractInsights(chunk, prompt_constants.VEHICLE_INSIGHTS_PROMPT) |
| print("** vehicle_data:", vehicle_data) |
|
|
| if vehicle_data != "None" and vehicle_data: |
| vehicle_pattern = re.compile(r'"Make":\s*"(.*?)",\s*"Model":\s*"(.*?)",\s*"Year":\s*"(.*?)",\s*"VIN":\s*"(.*?)",\s*"BodyType":\s*"(.*?)"') |
| matches = vehicle_pattern.finditer(vehicle_data) |
| |
| for match in matches: |
| make, model, year, vin, body_type = match.groups() |
| if make != 'Unknown' and model != 'Unknown' and vin != 'Unknown': |
| vehicle_info = { |
| 'make': make, |
| 'model': model, |
| 'year': year, |
| 'vin': vin, |
| 'body_type': body_type |
| } |
| vehicles.append(vehicle_info) |
|
|
| return vehicles |
| ''' |
| try: |
| vehicle_list = json.loads(vehicle_data) |
| # Iterate over each vehicle in the list |
| for vehicle_json in vehicle_list: |
| if vehicle_json['Make'] != 'Unknown' and vehicle_json['Model'] != 'Unknown' and vehicle_json['VIN'] != 'Unknown': |
| vehicles.append({ |
| 'make': vehicle_json['Make'], |
| 'model': vehicle_json['Model'], |
| 'year': vehicle_json['Year'], |
| 'vin': vehicle_json['VIN'], |
| 'body_type': vehicle_json['BodyType'] |
| }) |
| except json.JSONDecodeError as e: |
| print("Failed to decode JSON, skipping this chunk due to:", str(e))''' |
|
|
| print ("**** vehicles:", vehicles) |
| return vehicles |
|
|
| def parse_policy_info(text): |
| policy_info = {} |
|
|
| policy_number = re.search(r'POLICY NUMBER\s*:\s*(\S+)', text) |
| effective_date = re.search(r'EFFECTIVE\s+(\S+)\s+TO\s+(\S+)', text) |
| named_insured = re.search(r'Named Insured and Address\s*([\w\s]+)\s', text) |
| address = re.search(r'Address\s*([\d\w\s]+)\s*', text) |
|
|
| if policy_number: |
| policy_info['policy_number'] = policy_number.group(1) |
| if effective_date: |
| policy_info['effective_date'] = effective_date.group(1) |
| policy_info['expiration_date'] = effective_date.group(2) |
| if named_insured: |
| policy_info['named_insured'] = named_insured.group(1).strip() |
| if address: |
| policy_info['address'] = address.group(1).strip() |
|
|
| return policy_info |
|
|
| def parse_contact_info(text): |
| contact_info = {} |
|
|
| insurance_company = re.search(r'Insurance Company\s*:\s*([\w\s]+)', text) |
| customer_service = re.search(r'customer service\s*:\s*(\d+-\d+-\d+)', text) |
| claims = re.search(r'claims\s*:\s*(\d+-\d+-\d+)', text) |
| website = re.search(r'website\s*:\s*(\S+)', text) |
| mailing_address = re.search(r'Mailing Address\s*:\s*([\d\w\s,]+)', text) |
|
|
| if insurance_company: |
| contact_info['insurance_company'] = insurance_company.group(1).strip() |
| if customer_service: |
| contact_info['customer_service'] = customer_service.group(1).strip() |
| if claims: |
| contact_info['claims'] = claims.group(1).strip() |
| if website: |
| contact_info['website'] = website.group(1).strip() |
| if mailing_address: |
| contact_info['mailing_address'] = mailing_address.group(1).strip() |
| return contact_info |
|
|
| def parse_coverage_info(text): |
| coverages = [] |
|
|
| coverage_info = re.findall(r'PART\s+(\w+)\s+[-\w\s]*\n([^\n]+)', text) |
| for info in coverage_info: |
| coverage = { |
| 'type': info[0], |
| 'limits_of_liability': info[1].strip(), |
| 'deductible': None |
| } |
| coverages.append(coverage) |
| |
| return coverages |
|
|
| def parse_discounts(text): |
| discounts = [] |
|
|
| discount_info = re.findall(r'(\w+\s+\w+)\s+DISCOUNT\s+-\$\s*(\d+\.\d+)', text) |
| for info in discount_info: |
| discount = { |
| 'type': info[0].strip(), |
| 'amount': f"-${info[1]}" |
| } |
| discounts.append(discount) |
| |
| return discounts |
|
|
| def parse_additional_info(text): |
| additional_info = {} |
|
|
| total_premium = re.search(r'TOTAL PREMIUM\s*\$\s*(\d+\.\d+)', text) |
| premium_due = re.search(r'PREMIUM DUE AT INCEPTION', text) |
| policy_changes = re.findall(r'ENDORSEMENTS\s*:\s*([\w, ]+)', text) |
|
|
| if total_premium: |
| additional_info['total_premium'] = f"${total_premium.group(1)}" |
| if premium_due: |
| additional_info['premium_due_at_inception'] = "Yes" |
| if policy_changes: |
| additional_info['policy_changes'] = [change.strip() for change in policy_changes] |
|
|
| return additional_info |
|
|
| def parse_compliance_info(text): |
| compliance_info = {} |
|
|
| state_requirements = re.search(r'state requirements\s*:\s*([\w\s]+)', text) |
| coverage_rejections = re.search(r'coverage rejections\s*:\s*([\w\s]+)', text) |
|
|
| if state_requirements: |
| compliance_info['state_requirements'] = state_requirements.group(1).strip() |
| if coverage_rejections: |
| compliance_info['coverage_rejections'] = coverage_rejections.group(1).strip() |
|
|
| return compliance_info |
|
|
|
|
|
|
|
|
|
|
|
|