#!/usr/bin/env python3 #/* DARNA.HI # * Copyright (c) 2023 Seapoe1809 # * Copyright (c) 2023 pnmeka # * # * # * This program is free software: you can redistribute it and/or modify # * it under the terms of the GNU General Public License as published by # * the Free Software Foundation, either version 3 of the License, or # * (at your option) any later version. # * # * This program is distributed in the hope that it will be useful, # * but WITHOUT ANY WARRANTY; without even the implied warranty of # * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # * GNU General Public License for more details. # * # * You should have received a copy of the GNU General Public License # * along with this program. If not, see . import pytesseract from pdf2image import convert_from_path import os, subprocess from variables import variables from variables import variables2 import re from PIL import Image, ImageFile from datetime import datetime import json import fitz # PyMuPDF import chromadb from tqdm import tqdm #from install_module.Analyze.pdf_sectionreader import * #from install_module.Analyze.nlp_process import * ImageFile.LOAD_TRUNCATED_IMAGES = True HS_path = os.getcwd() print(HS_path) folderpath = os.environ.get('FOLDERPATH') print("folderpath is", folderpath) if folderpath: ocr_files = f"{folderpath}/ocr_files" else: print("Session FOLDERPATH environment variable not set.") APP_dir = f"{HS_path}/install_module" ocr_files = f"{folderpath}/ocr_files" upload_dir = f"{folderpath}/upload" ip_address = variables.ip_address age = variables2.age sex = variables2.sex try: formatted_ignore_words = variables2.ignore_words if hasattr(variables2, 'ignore_words') else None except NameError: formatted_ignore_words = None # Path to the Tesseract OCR executable (change this if necessary) pytesseract.pytesseract.tesseract_cmd = '/usr/bin/tesseract' ocr_files_dir = f'{ocr_files}/' output_dir = os.path.join(ocr_files_dir, 'Darna_tesseract') os.makedirs(output_dir, exist_ok=True) # Define the patterns to identify and deidentify # remove anything after keyword KEYWORDS_REGEX = r'(?i)(?:Name|DOB|Date of birth|Birth|Address|Phone|PATIENT|Patient|MRN|Medical Record Number|APT|House|Street|ST|zip|pin):.*?(\n|$)' # remove specific words IGNORE_REGEX = rf'(?i)(?' subdirectory if os.path.basename(root) == 'Darna_tesseract': continue for file in files: # Skip all .txt files if file.lower().endswith('.txt'): continue file_path = os.path.join(root, file) try: if file.lower().endswith(('.jpg', '.jpeg', '.png', '.gif')): img = Image.open(file_path) if img.size[0] > 0 and img.size[1] > 0: # Check if the image is not empty images.append(img) img.close() elif file.lower().endswith(('.pdf', '.PDF')): pdf_images = convert_pdf_to_images(file_path) if pdf_images is not None: for pdf_img in pdf_images: if pdf_img.size[0] > 0 and pdf_img.size[1] > 0: # Check if the image is not empty images.append(pdf_img) # No need to close PIL Images created from bytes except Exception as e: print(f"Error processing image: {file_path}") print(f"Error message: {str(e)}") continue def get_recommendations(age=None, sex=None, ancestry=None, pack_years=None, smoking=None, quit_within_past_15_years=None, overweight_or_obesity=None, cardiovascular_risk=None, cardiovascular_risk_7_5_to_10=None, rh_d_negative=None, pregnant=None, new_mother=None, substance_abuse_risk=None, skin_type=None): recommendations = [] # Set default values when not specified if ancestry is None: ancestry = "not None" if pack_years is None: pack_years = 5 if smoking is None: smoking = "not None" if quit_within_past_15_years is None: quit_within_past_15_years = "not None" if overweight_or_obesity is None: overweight_or_obesity = "not None" if cardiovascular_risk is None: cardiovascular_risk = "not None" if rh_d_negative is None: rh_d_negative = "not None" if cardiovascular_risk_7_5_to_10 is None: cardiovascular_risk_7_5_to_10 = "not None" if substance_abuse_risk is None: substance_abuse_risk = "not None" if skin_type is None: skin_type = "not None" # B - Recommended (39) if (sex == 'female') and (age is not None) and (age >= 21 and age <= 65): recommendations.append("Pap Smear: Cervical Cancer: Screening -- Women aged 21 to 65 years") if age is not None and (age >= 50 and age <= 75): recommendations.append("Colonoscopy: Colorectal Cancer: Screening -- Adults aged 50 to 75 years") if age is not None and (age >= 18): recommendations.append("BP: Blood pressure screening in office screening -- Adults aged 18 years and above") if sex == 'female' and age >= 45: recommendations.append("Coronary Risk: Screening women aged 45 and older for lipid disorders if they are at increased risk for coronary heart disease.") if sex == 'male' and age >= 35: recommendations.append("Fasting Lipid: Screening Men aged 35 and older for lipid disorders with fasting lipid profile.") if sex == 'female' and (ancestry is not None): recommendations.append("BRCA: BRCA-Related Cancer: Risk Assessment, Genetic Counseling, and Genetic Testing -- Women with a personal or family history of breast, ovarian, tubal, or peritoneal cancer or an ancestry associated with BRCA1/2 gene mutation") if sex == 'female' and age >= 35: recommendations.append("Breast Cancer: Medication Use to Reduce Risk -- Women at increased risk for breast cancer aged 35 years or older") if (sex == 'female') and age is not None and (age >= 50 and age <= 74): recommendations.append("Mammogram: Breast Cancer: Screening -- Women aged 50 to 74 years") if (sex == 'female' or (new_mother is not None and new_mother)): recommendations.append("Breastfeeding: Primary Care Interventions -- Pregnant women, new mothers, and their children") if sex == 'female': recommendations.append("Sti screen: Chlamydia and Gonorrhea: Screening -- Sexually active women, including pregnant persons") if age is not None and (age >= 45 and age <= 49): recommendations.append("Colonoscopy: Colorectal Cancer: Screening -- Adults aged 45 to 49 years") if age is not None and (age >= 8 and age <= 18): recommendations.append("Anxiety Questionnaire: Anxiety in Children and Adolescents: Screening -- Children and adolescents aged 8 to 18 years") if (sex == 'pregnant' or (pregnant is not None and pregnant)): recommendations.append("Aspirin for High Risk: Aspirin Use to Prevent Preeclampsia and Related Morbidity and Mortality: Preventive Medication -- Pregnant persons at high risk for preeclampsia") if sex == 'pregnant': recommendations.append("Urinalysis: Asymptomatic Bacteriuria in Adults: Screening -- Pregnant persons") if sex == 'male' and (ancestry is not None): recommendations.append("Brca Gene Test: BRCA-Related Cancer: If screen positive, risk Assessment, Genetic Counseling, and Genetic Testing -- Men with a personal or family history of breast, ovarian, tubal, or peritoneal cancer or an ancestry associated with BRCA1/2 gene mutation") if sex == 'male' and age >= 65 and (pack_years is not None and pack_years > 0): recommendations.append("Ultrasound Doppler Abdomen: Abdominal Aortic Aneurysm: Screening -- Men aged 65 to 75 years who have ever smoked") if age is not None and (age >= 12 and age <= 18): recommendations.append("Depression Screen Questionnaire: Depression and Suicide Risk in Children and Adolescents: Screening -- Adolescents aged 12 to 18 years") if age is not None and (age >= 65): recommendations.append("Falls Screen Questionnaire: Falls Prevention in Community-Dwelling Older Adults: Interventions -- Adults 65 years or older") if (sex == 'pregnant' or (pregnant is not None and pregnant)) and (age is not None and (age >= 24)): recommendations.append("Fasting Blood Glucose: Gestational Diabetes: Screening -- Asymptomatic pregnant persons at 24 weeks of gestation or after") if overweight_or_obesity is not None: recommendations.append("Bmi screen: If elevated BMI consider Healthy Diet and Physical Activity for Cardiovascular Disease Prevention in Adults With Cardiovascular Risk Factors: Behavioral Counseling Interventions -- Adults with cardiovascular disease risk factors") if (sex == 'pregnant' or (pregnant is not None and pregnant)): recommendations.append("Weight Trend: Healthy Weight and Weight Gain In Pregnancy: Behavioral Counseling Interventions -- Pregnant persons") if sex == 'female' and (age is not None and (age >= 18)): recommendations.append("Hepatitis B Blood Test: Hepatitis B Virus Infection in Adolescents and Adults: Screening -- Adolescents and adults at increased risk for infection") if sex == 'male' and (age is not None and (age >= 18 and age <= 79)): recommendations.append("Hepatitis C Blood Test: Hepatitis C Virus Infection in Adolescents and Adults: Screening -- Adults aged 18 to 79 years") if sex == 'female' and (age is not None and (age >= 14)): recommendations.append("Violence Questionnaire screen: Intimate Partner Violence, Elder Abuse, and Abuse of Vulnerable Adults: Screening -- Women of reproductive age") if age is not None and (age >= 6 and age <= 60): recommendations.append("Tb Screen Test/ Questionnaire: Latent Tuberculosis Infection in Adults: Screening -- Asymptomatic adults at increased risk of latent tuberculosis infection (LTBI)") if (sex == 'male' or (sex == 'female' and (pregnant is not None and pregnant))) and (age is not None and (age >= 50 and age <= 80) and (pack_years is not None) and (smoking is not None)): recommendations.append("Ct Chest: Lung Cancer screening if you smoked more that 20 pack years: Screening -- Adults aged 50 to 80 years who have a 20 pack-year smoking history and currently smoke or have quit within the past 15 years") if age is not None and (age >= 6 and age <= 18): recommendations.append("Bmi Screen: Obesity in Children and Adolescents: Screening -- Children and adolescents 6 years and older") if sex == 'female' and (age is not None and (age < 65)): recommendations.append("Dexa Bone Test: Osteoporosis to Prevent Fractures: Screening -- Postmenopausal women younger than 65 years at increased risk of osteoporosis") if sex == 'female' and (age is not None and (age >= 65)): recommendations.append("Dexa Bone Test: Osteoporosis to Prevent Fractures: Screening -- Women 65 years and older") if (sex == 'pregnant' or (pregnant is not None and pregnant) or (new_mother is not None)): recommendations.append("Depression Questionnaire: Perinatal Depression: Preventive Interventions -- Pregnant and postpartum persons") if age is not None and (age >= 35 and age <= 70): recommendations.append("Fasting Blood Glucose: Prediabetes and Type 2 Diabetes: Screening -- Asymptomatic adults aged 35 to 70 years who have overweight or obesity") if (sex == 'pregnant' or (pregnant is not None and pregnant)): recommendations.append("Bp, Questionnaire and Urine test: Preeclampsia: Screening -- Pregnant woman") if age is not None and (age < 5): recommendations.append("Oral Exam: Prevention of Dental Caries in Children Younger Than 5 Years: Screening and Interventions -- Children younger than 5 years") if (sex == 'female' or (pregnant is not None and pregnant)) or (new_mother is not None): recommendations.append("Oral Exam: Prevention of Dental Caries in Children Younger Than 5 Years: Screening and Interventions -- Children younger than 5 years") if (sex == 'pregnant' or (pregnant is not None and pregnant)) and (rh_d_negative is not None): recommendations.append("Rh Blood Test: Rh(D) Incompatibility especially with Rh negative: Screening -- Unsensitized Rh(D)-negative pregnant women") if sex == 'male' or (sex == 'female' and (pregnant is not None and pregnant) or (new_mother is not None and new_mother)): recommendations.append("Depression Questionnaire: Screening for Depression in Adults -- General adult population") if sex == 'male' or (sex == 'female' and (pregnant is not None and pregnant)) or (new_mother is not None): recommendations.append("Sti Screen: Sexually Transmitted Infections: Behavioral Counseling -- Sexually active adolescents and adults at increased risk") if (age is not None and (age >= 25)) or (new_mother is not None) or (sex == 'male' and (substance_abuse_risk is not None)): recommendations.append("Skin Exam: Skin Cancer Prevention: Behavioral Counseling -- Adults, Young adults, adolescents, children, and parents of young children") if (age is not None and (age >= 40 and age <= 75)) and (cardiovascular_risk is not None) and (cardiovascular_risk_7_5_to_10 is not None): recommendations.append("Heart Disease Questionnaire: Screen for CV risk and consider Statin Use for the Primary Prevention of Cardiovascular Disease in Adults: Preventive Medication -- Adults aged 40 to 75 years who have 1 or more cardiovascular risk factors and an estimated 10-year cardiovascular disease (CVD) risk of 10% or greater") if sex == 'female' and (pregnant is not None and pregnant) and (ancestry is not None and ancestry == 'BRCA1/2 gene mutation'): recommendations.append("Family History and Brca Test: BRCA-Related Cancer: Risk Assessment, Genetic Counseling, and Genetic Testing -- Women with a personal or family history of breast, ovarian, tubal, or peritoneal cancer or an ancestry associated with BRCA1/2 gene mutation") if (age is not None and (age >= 6 and age <= 18)) or (sex == 'pregnant' or (pregnant is not None and pregnant)): recommendations.append("Tobacco Questionnaire: Tobacco Use in Children and Adolescents: Primary Care Interventions -- School-aged children and adolescents who have not started to use tobacco") if age is not None and (age >= 18) and (substance_abuse_risk is not None): recommendations.append("Alcohol Questionnaire: Unhealthy Alcohol Use in Adolescents and Adults: Screening and Behavioral Counseling Interventions -- Adults 18 years or older, including pregnant women") if age is not None and (age >= 13): recommendations.append("Drug Abuse Questionnaire: Unhealthy Drug Use: Screening -- Adults age 13 years or older") if age is not None and (age > 2 and age < 24) and skin_type is not None: recommendations.append("Skin Exam: Skin Cancer: Counseling -- Fair-skinned individuals aged 6 months to 24 years with a family history of skin cancer or personal history of skin cancer, or who are at increased risk of skin cancer") return recommendations def generate_recommendations(age=None, sex=None): age = f"{age}" try: age = int(age) except ValueError: print("Invalid age value. Age must be a valid integer.") sex = f"{sex}" recommendations = get_recommendations(age, sex) # Adding subheading subheading = f"The USPTF recommendations for {age}/{sex} are:" subheading = f"RECOMMENDATIONS:" recommendations_with_subheading = [subheading] + recommendations with open(f'{ocr_files}/Darna_tesseract/USPTF_Intent.txt', 'w') as file: file.write('\n\n\n'.join(recommendations_with_subheading)) doc = fitz.open() # Create a new PDF page = doc.new_page() text = "\n\n\n".join(recommendations_with_subheading) page.insert_text((72, 72), text) doc.save(f'{ocr_files}/USPTF.pdf') # Save the PDF doc.close() #extract data from the updated fhir file def extract_lforms_data(json_data): if isinstance(json_data, str): data = json.loads(json_data) else: data = json_data extracted_info = { "date_of_birth": None, "sex": None, "allergies": [], "past_medical_history": [], "medications": [] } for item in data.get("items", []): if item.get("question") == "ABOUT ME": for subitem in item.get("items", []): if subitem.get("question") == "DATE OF BIRTH": extracted_info["date_of_birth"] = subitem.get("value") elif subitem.get("question") == "BIOLOGICAL SEX": extracted_info["sex"] = subitem.get("value", {}).get("text") elif item.get("question") == "ALLERGIES": for allergy_item in item.get("items", []): if allergy_item.get("question") == "Allergies and Other Dangerous Reactions": for subitem in allergy_item.get("items", []): if subitem.get("question") == "Name" and "value" in subitem: extracted_info["allergies"].append(subitem["value"]["text"]) elif item.get("question") == "PAST MEDICAL HISTORY:": for condition_item in item.get("items", []): if condition_item.get("question") == "PAST MEDICAL HISTORY" and "value" in condition_item: condition = extract_condition(condition_item) if condition: extracted_info["past_medical_history"].append(condition) elif item.get("question") == "MEDICATIONS:": medication = {} for med_item in item.get("items", []): if med_item.get("question") == "MEDICATIONS": medication["name"] = extract_med_value(med_item) elif med_item.get("question") == "Strength": medication["strength"] = extract_med_value(med_item) elif med_item.get("question") == "Instructions": medication["instructions"] = extract_med_value(med_item) if medication: extracted_info["medications"].append(medication) return extracted_info def extract_condition(condition_item): if isinstance(condition_item.get("value"), dict): return condition_item["value"].get("text", "") elif isinstance(condition_item.get("value"), str): return condition_item["value"] return "" def extract_med_value(med_item): if "value" not in med_item: return "" value = med_item["value"] if isinstance(value, str): return value elif isinstance(value, dict): return value.get("text", "") return "" ####### ###nlp_process.py functions import json import nltk import re, os from wordcloud import WordCloud import matplotlib.pyplot as plt from nltk.corpus import stopwords from nltk.tokenize import word_tokenize # Ensure NLTK components are downloaded #nltk.download('punkt') #nltk.download('stopwords') #convert text to lowercase and remove fillers def normalize_text(text): # Convert text to lowercase and remove ':' and '-' return re.sub('[: -]', '', text.lower()) def condense_summary_to_tokens(text, token_limit=300): tokens = word_tokenize(text) # Select the first 'token_limit' tokens limited_tokens = tokens[:token_limit] # Reconstruct the text from these tokens condensed_text = ' '.join(limited_tokens) return condensed_text #write all to a json summary file def wordcloud_summary(keys, texts, directory): output_file = f'{directory}/wordcloud_summary.json' wordcloud_dir = f'{directory}/wordclouds' try: with open(output_file, 'r', encoding='utf-8') as file: existing_data = json.load(file) except FileNotFoundError: existing_data = {} # Ensure the directories exist os.makedirs(os.path.dirname(output_file), exist_ok=True) os.makedirs(wordcloud_dir, exist_ok=True) for i, key in enumerate(keys): if i < len(texts): text = texts[i] # Check if the text contains any words if text.strip(): existing_data[key] = text # Attempt to generate word cloud try: # Split the text into words words = text.split() # Check if there are enough words if len(words) > 1: wordcloud = WordCloud(width=800, height=400, background_color='white').generate(text) # Save the word cloud plt.figure(figsize=(10, 5)) plt.imshow(wordcloud, interpolation='bilinear') plt.axis('off') plt.title(f'Word Cloud for {key}') plt.savefig(f'{wordcloud_dir}/{key}_wordcloud.png') plt.close() print(f"Generated word cloud for key: {key}") else: print(f"Not enough words to generate word cloud for key: {key}") except Exception as e: print(f"Error generating word cloud for key {key}: {str(e)}") else: print(f"Skipping empty text for key: {key}") else: print(f"No text available for key: {key}") with open(output_file, 'w', encoding='utf-8') as file: json.dump(existing_data, file, indent=4, ensure_ascii=False) #generate list of meds from the files def load_text_from_json_meds(json_file_path, keys): normalized_keys = [normalize_text(key) for key in keys] with open(json_file_path, 'r') as file: data = json.load(file) text = [] for json_key, value in data.items(): normalized_json_key = normalize_text(json_key) if any(normalized_key in normalized_json_key for normalized_key in normalized_keys): if isinstance(value, str): text.append(value) elif isinstance(value, list): text.extend(str(item) for item in value if item) elif isinstance(value, dict): text.extend(str(item) for item in value.values() if item) else: text.append(str(value)) combined_text = ' '.join(text) combined_text = condense_summary_to_tokens(combined_text, 300) return combined_text #generate a list of past medical history from the files def load_text_from_json_pmh(json_file_path, keys): normalized_keys = [normalize_text(key) for key in keys] with open(json_file_path, 'r') as file: data = json.load(file) text = [] for json_key, value in data.items(): normalized_json_key = normalize_text(json_key) if any(normalized_key in normalized_json_key for normalized_key in normalized_keys): if isinstance(value, str): text.append(value) elif isinstance(value, list): text.extend(str(item) for item in value if item) elif isinstance(value, dict): text.extend(str(item) for item in value.values() if item) else: text.append(str(value)) combined_text = ' '.join(text) combined_text = condense_summary_to_tokens(combined_text, 300) return combined_text #generate a list of screening items from the USPTF file def load_text_from_json_screening(json_file_path, keys): normalized_keys = [normalize_text(key) for key in keys] with open(json_file_path, 'r') as file: data = json.load(file) text = [] for json_key, value in data.items(): normalized_json_key = normalize_text(json_key) if any(normalized_key in normalized_json_key for normalized_key in normalized_keys): text.append(value) combined_text_screening=' '.join(text) #print (combined_text_screening) return combined_text_screening def load_text_from_json_summary(json_file_path, keys): normalized_keys = [normalize_text(key) for key in keys] with open(json_file_path, 'r') as file: data = json.load(file) text = [] for json_key, value in data.items(): normalized_json_key = normalize_text(json_key) if any(normalized_key in normalized_json_key for normalized_key in normalized_keys): if isinstance(value, str): text.append(value) elif isinstance(value, list): text.extend(str(item) for item in value if item) elif isinstance(value, dict): text.extend(str(item) for item in value.values() if item) else: text.append(str(value)) combined_text = ' '.join(text) combined_text = condense_summary_to_tokens(combined_text, 300) return combined_text #iterate json files in directory and call function above def process_directory_summary(directory, keys): combined_texts = [] for filename in os.listdir(directory): if filename.endswith('.json'): file_path = os.path.join(directory, filename) print(file_path) combined_text = load_text_from_json_summary(file_path, keys) if combined_text: # Only add non-empty strings combined_texts.append(combined_text) # Combine all texts into one final_combined_text = ' '.join(combined_texts) return final_combined_text #iterate json files in directory and summarize meds def process_directory_meds(directory, keys): combined_texts = [] for filename in os.listdir(directory): if filename.endswith('.json'): file_path = os.path.join(directory, filename) print(file_path) combined_text = load_text_from_json_meds(file_path, keys) combined_texts.append(combined_text) # Combine all texts into one final_combined_text = ' '.join(combined_texts) return final_combined_text #iterate json files in directory and summarize past medical def process_directory_pmh(directory, keys): combined_texts = [] for filename in os.listdir(directory): if filename.endswith('.json'): file_path = os.path.join(directory, filename) print(file_path) combined_text = load_text_from_json_pmh(file_path, keys) combined_texts.append(combined_text) # Combine all texts into one final_combined_text = ' '.join(combined_texts) return final_combined_text def preprocess_and_create_wordcloud(text, directory): # Tokenize and remove stopwords stop_words = set(stopwords.words('english')) words = word_tokenize(text) filtered_words = [word for word in words if word.isalnum() and word.lower() not in stop_words] # Check if there are any words left after filtering if not filtered_words: print("No words left after preprocessing. Skipping word cloud creation.") return processed_text = ' '.join(filtered_words) # Create and display the word cloud wordcloud = WordCloud(width=800, height=800, background_color='white').generate(processed_text) plt.figure(figsize=(8, 8), facecolor=None) plt.imshow(wordcloud) plt.axis("off") plt.tight_layout(pad=0) plt.tight_layout(pad = 0) # Display the word cloud #plt.show() # Save the word cloud image plt.savefig(f'{directory}darnahi_ocr.png') ############# pattern = r"\d+\..+?(\d{4};\d+\(\d+\):\d+–\d+\. DOI: .+?\.|.+?ed\., .+?: .+?; \d{4}\. \d+–\d+\.)" class Document: def __init__(self, page_content, metadata): self.page_content = page_content self.metadata = metadata def process_pdf(file_path, chunk_size=350): try: doc = fitz.open(file_path) full_text = "" for page in doc: text_blocks = page.get_text("dict")["blocks"] for block in text_blocks: if 'text' in block: text = block['text'].strip() if text: full_text += text + "\n" chunks = [full_text[i:i+chunk_size] for i in range(0, len(full_text), chunk_size)] return chunks except Exception as e: print(f"An error occurred: {str(e)}") return [] def process_json(input_file): try: with open(input_file, 'r', encoding='utf-8') as file: existing_data = json.load(file) except FileNotFoundError: print("File not found.") return [] semantic_snippets = [] for heading, content in existing_data.items(): metadata = {'heading': heading, 'file': input_file} doc = Document(page_content=content, metadata=metadata) semantic_snippets.append(doc) return semantic_snippets def process_files(directory): all_semantic_snippets = [] for filename in os.listdir(directory): file_path = os.path.join(directory, filename) if filename.endswith('.pdf'): snippets = process_pdf(file_path) all_semantic_snippets.extend(snippets) elif filename.endswith('.json'): semantic_snippets = process_json(file_path) all_semantic_snippets.extend(semantic_snippets) return all_semantic_snippets def chromadb_embed(directory, collection_name="documents_collection"): persist_directory = os.path.join(directory, 'Darna_tesseract', 'chroma_storage') os.makedirs(persist_directory, exist_ok=True) all_semantic_snippets = str(process_files(directory)) client = chromadb.PersistentClient(path=persist_directory) collection = client.get_or_create_collection(name=collection_name) count = collection.count() print(f"Collection already contains {count} documents") ids = [str(i) for i in range(count, count + len(all_semantic_snippets))] for i in tqdm(range(0, len(all_semantic_snippets), 100), desc="Adding documents"): batch_snippets = all_semantic_snippets[i:i+100] batch_metadatas = [] for snippet in batch_snippets: metadata = {"filename": "summary", "heading": "summary_heading"} if not isinstance(snippet, Document) else snippet.metadata batch_metadatas.append(metadata) collection.add(ids=ids[i:i+100], documents=[s if isinstance(s, str) else s.page_content for s in batch_snippets], metadatas=batch_metadatas) new_count = collection.count() print(f"Added {new_count - count} documents") ####################################### #########pdf_sectionreader.py import os import fitz import pandas as pd import json from unidecode import unidecode global_heading_content_dict = {} # Global dictionary to accumulate data def process_pdf_files(directory): for filename in os.listdir(directory): if filename.endswith('.pdf'): file_path = os.path.join(directory, filename) with fitz.open(file_path) as doc: print(f"Processing {filename}...") extract_and_tag_text(doc) # Generate and save output after processing all files generate_output(global_heading_content_dict, directory) def extract_and_tag_text(doc): block_dict, page_num = {}, 1 for page in doc: file_dict = page.get_text('dict') block = file_dict['blocks'] block_dict[page_num] = block page_num += 1 rows = [] for page_num, blocks in block_dict.items(): for block in blocks: if block['type'] == 0: for line in block['lines']: for span in line['spans']: xmin, ymin, xmax, ymax = list(span['bbox']) font_size = span['size'] text = unidecode(span['text']) span_font = span['font'] is_upper = text.isupper() is_bold = "bold" in span_font.lower() if text.strip() != "": rows.append((xmin, ymin, xmax, ymax, text, is_upper, is_bold, span_font, font_size)) span_df = pd.DataFrame(rows, columns=['xmin', 'ymin', 'xmax', 'ymax', 'text', 'is_upper', 'is_bold', 'span_font', 'font_size']) common_font_size = span_df['font_size'].mode().iloc[0] span_df['tag'] = span_df.apply(assign_tag, axis=1, common_font_size=common_font_size) update_global_dict(span_df) def assign_tag(row, common_font_size): if any(char.isdigit() for char in row['text']): return 'p' elif row['font_size'] > common_font_size and row['is_bold'] and row['is_upper']: return 'h1' elif row['is_bold'] or row['is_upper'] or row['font_size'] > common_font_size: return 'h2' else: return 'p' def update_global_dict(span_df): tmp = [] current_heading = None for index, span_row in span_df.iterrows(): text, tag = span_row.text.strip(), span_row.tag if 'h' in tag: if current_heading is not None: existing_text = global_heading_content_dict.get(current_heading, "") global_heading_content_dict[current_heading] = existing_text + '\n'.join(tmp).strip() current_heading = text tmp = [] else: tmp.append(text) if current_heading is not None: existing_text = global_heading_content_dict.get(current_heading, "") global_heading_content_dict[current_heading] = existing_text + '\n'.join(tmp).strip() def generate_output(heading_content_dict, directory): text_df = pd.DataFrame(list(heading_content_dict.items()), columns=['heading', 'content']) #text_df.to_excel(f'{directory}/combined_output.xlsx', index=False, engine='openpyxl') json_data = json.dumps(heading_content_dict, indent=4, ensure_ascii=False) with open(f'{directory}/Darna_tesseract/combined_output.json', 'w', encoding='utf-8') as f: f.write(json_data) with open(f'{directory}/combined_output.json', 'w', encoding='utf-8') as f: f.write(json_data) ########################################### #write files to pdf def write_text_to_pdf(directory, text): doc = fitz.open() # Create a new PDF page = doc.new_page() # Add a new page page.insert_text((72, 72), text) # Position (x, y) and text doc.save(f'{directory}/fhir_data.pdf') # Save the PDF doc.close() def run_analyzer(age, sex, ocr_files, formatted_ignore_words): try: # Process OCR files with provided input print("Processing OCR files") process_ocr_files(ocr_files, age) # Create collated file collate_images(ocr_files, f"{ocr_files}/Darna_tesseract") # Deidentify records print("Deidentifying records") deidentify_records(ocr_files, formatted_ignore_words) # Generate recommendations with provided age and sex print("Generating recommendations") recommendations = generate_recommendations(age=age, sex=sex) # Extract data from FHIR file and create PDF directory = ocr_files #folderpath is global directory with open(f'{folderpath}/summary/chart.json', 'r') as file: json_data = json.load(file) extracted_info = extract_lforms_data(json.dumps(json_data)) print(extracted_info) json_output = json.dumps(extracted_info, indent=4) write_text_to_pdf(directory, str(extracted_info)) final_directory = f'{directory}/Darna_tesseract/' # Process PDF files process_pdf_files(directory) # Write the JSON output to a file with open(f'{directory}/fhir_output.json', 'w', encoding='utf-8') as f: f.write(json_output) # NLP Processing for summary, past medical history, medications, and screening json_file_path = f'{directory}/combined_output.json' keys_pmh = ['PMH', 'medical', 'past medical history', 'surgical', 'past'] keys_meds = ['medications', 'MEDICATIONS:', 'medicine', 'meds'] keys_summary = ['HPI', 'history', 'summary'] keys_screening = ['RECS', 'RECOMMENDATIONS'] # Process text data and create word clouds text_summary = process_directory_summary(directory, keys_summary) preprocess_and_create_wordcloud(text_summary, final_directory) text_meds = process_directory_meds(directory, keys_meds) text_screening = load_text_from_json_screening(json_file_path, keys_screening) text_pmh = process_directory_pmh(directory, keys_pmh) # Write processed texts to JSON keys = ("darnahi_summary", "darnahi_past_medical_history", "darnahi_medications", "darnahi_screening") texts = (text_summary, text_pmh, text_meds, text_screening) wordcloud_summary(keys, texts, final_directory) # CHROMA embedding chromadb_embed(directory) # Cleanup OCR files, but leave Darna_tesseract files subprocess.run(f'find {directory} -maxdepth 1 -type f -exec rm {{}} +', shell=True) except Exception as e: print(f"Error during processing: {e}") ##CALL ANALYZER run_analyzer(age, sex, ocr_files, formatted_ignore_words) """ # Process OCR files with provided input print("process ocr files") process_ocr_files(ocr_files, age) #doesnt work #create collated file collate_images(ocr_files, f"{ocr_files}/Darna_tesseract") # Deidentify records print("debug deidentify records") deidentify_records() # Generate recommendations with provided age and sex print("debug generate records") recommendations = generate_recommendations(age=age, sex=sex) #extract data from fhir file and make pdf directory = ocr_files with open(f'{folderpath}/summary/chart.json', 'r') as file: json_data = json.load(file) # Extract information using function above from fhir document and write to pdf and json file extracted_info = extract_lforms_data(json.dumps(json_data)) print(extracted_info) #extracted_info = extract_info(json_data) json_output = json.dumps(extracted_info, indent=4) #extracted_info = extract_info(json_data) write_text_to_pdf(directory, str(extracted_info)) final_directory= f'{directory}/Darna_tesseract/' #calls the CALL_FILE pdf_sectionreader process_pdf_files(directory) # Write the JSON output to a file and pdf file (2 lines above) with open(f'{directory}/fhir_output.json', 'w', encoding='utf-8') as f: f.write(json_output) #CALL FILE NLP_PROCESS # Usage nlp_process json_file_path = f'{directory}/combined_output.json' #json_file_path = 'processed_data2.json' #keys_summary = ['HPI', 'History of presenting illness', 'History of', 'summary'] keys_pmh = ['PMH', 'medical', 'past medical history', 'surgical', 'past'] #extracts past medical history keys_meds = ['medications', 'MEDICATIONS:', 'medicine', 'meds'] #extracts medications keys_summary = ['HPI', 'history', 'summary'] keys_screening= ['RECS', 'RECOMMENDATIONS'] #call functions and write to wordcloud and creat wordcloud.png file text_summary = process_directory_summary(directory, keys_summary) #creates wordcloud of uploaded files preprocess_and_create_wordcloud(text_summary, final_directory) text_meds = process_directory_meds(directory, keys_meds)#saves to medications in json text_screening = load_text_from_json_screening(json_file_path, keys_screening)#saves to screening in json text_pmh = process_directory_pmh(directory, keys_pmh)#saves to past history in json #write to json using "keys":"texts" keys= ("darnahi_summary", "darnahi_past_medical_history", "darnahi_medications", "darnahi_screening") texts= (text_summary, text_pmh, text_meds, text_screening) wordcloud_summary(keys, texts, final_directory) #CHROMA MINER # Adjust this path to your directory chromadb_embed(directory) #remove files from ocr_files- cleanup but leave Darna_tesseract files subprocess.run(f'find {directory} -maxdepth 1 -type f -exec rm {{}} +', shell=True) """