Medica_DecisionSupportAI / upload_ingest.py
Rajan Sharma
Update upload_ingest.py
5cd1b74 verified
raw
history blame
7.45 kB
# upload_ingest.py
import pandas as pd
import os
import json
from typing import Dict, List, Any
import PyPDF2
import docx
import csv
def extract_text_from_files(file_paths: List[str]) -> Dict[str, Any]:
"""Extract text and data from uploaded files dynamically."""
result = {
"chunks": [],
"artifacts": [],
"healthcare_data": {}
}
for file_path in file_paths:
try:
file_name = os.path.basename(file_path)
file_ext = os.path.splitext(file_name)[1].lower()
if file_ext == '.csv':
df = pd.read_csv(file_path)
result["chunks"].append(f"CSV file: {file_name}")
# Dynamic healthcare data detection
healthcare_info = detect_healthcare_data_type(df)
if healthcare_info:
result["healthcare_data"][file_name] = healthcare_info
result["artifacts"].append({
"file": file_name,
"type": "csv",
"sample": df.head(3).to_dict('records')
})
elif file_ext in ['.xlsx', '.xls']:
df = pd.read_excel(file_path)
result["chunks"].append(f"Excel file: {file_name}")
healthcare_info = detect_healthcare_data_type(df)
if healthcare_info:
result["healthcare_data"][file_name] = healthcare_info
result["artifacts"].append({
"file": file_name,
"type": "excel",
"sample": df.head(3).to_dict('records')
})
elif file_ext == '.json':
with open(file_path, 'r') as f:
data = json.load(f)
df = pd.json_normalize(data)
result["chunks"].append(f"JSON file: {file_name}")
healthcare_info = detect_healthcare_data_type(df)
if healthcare_info:
result["healthcare_data"][file_name] = healthcare_info
result["artifacts"].append({
"file": file_name,
"type": "json",
"sample": df.head(3).to_dict('records')
})
elif file_ext == '.parquet':
df = pd.read_parquet(file_path)
result["chunks"].append(f"Parquet file: {file_name}")
healthcare_info = detect_healthcare_data_type(df)
if healthcare_info:
result["healthcare_data"][file_name] = healthcare_info
result["artifacts"].append({
"file": file_name,
"type": "parquet",
"sample": df.head(3).to_dict('records')
})
elif file_ext == '.pdf':
text = extract_text_from_pdf(file_path)
result["chunks"].append(f"PDF file: {file_name}")
result["chunks"].append(f"Extracted text preview: {text[:500]}...")
result["artifacts"].append({
"file": file_name,
"type": "pdf",
"text": text
})
elif file_ext == '.docx':
text = extract_text_from_docx(file_path)
result["chunks"].append(f"DOCX file: {file_name}")
result["chunks"].append(f"Extracted text preview: {text[:500]}...")
result["artifacts"].append({
"file": file_name,
"type": "docx",
"text": text
})
elif file_ext == '.txt':
with open(file_path, 'r', encoding='utf-8') as f:
text = f.read()
result["chunks"].append(f"Text file: {file_name}")
result["chunks"].append(f"Content preview: {text[:500]}...")
result["artifacts"].append({
"file": file_name,
"type": "txt",
"text": text
})
else:
result["chunks"].append(f"Unsupported file type: {file_ext}")
except Exception as e:
result["chunks"].append(f"Error processing {file_path}: {str(e)}")
return result
def extract_text_from_pdf(file_path: str) -> str:
"""Extract text from PDF file."""
text = ""
with open(file_path, 'rb') as file:
reader = PyPDF2.PdfReader(file)
for page in reader.pages:
text += page.extract_text()
return text
def extract_text_from_docx(file_path: str) -> str:
"""Extract text from DOCX file."""
doc = docx.Document(file_path)
text = ""
for paragraph in doc.paragraphs:
text += paragraph.text + "\n"
return text
def detect_healthcare_data_type(df: pd.DataFrame) -> Dict[str, Any]:
"""Detect healthcare data type dynamically."""
healthcare_info = {}
# Convert column names to lowercase for easier matching
columns_lower = [col.lower() for col in df.columns]
# Check for facility data indicators
facility_indicators = ['facility', 'hospital', 'clinic', 'center', 'site', 'name']
type_indicators = ['type', 'category', 'class']
has_facility_data = any(
any(indicator in col for indicator in facility_indicators)
for col in columns_lower
)
has_type_data = any(
any(indicator in col for indicator in type_indicators)
for col in columns_lower
)
if has_facility_data:
healthcare_info['type'] = 'facility_data'
if has_type_data:
type_col = next((col for col in df.columns if any(indicator in col.lower() for indicator in type_indicators)), None)
if type_col:
healthcare_info['facility_types'] = df[type_col].value_counts().to_dict()
# Check for bed data indicators
bed_indicators = ['bed', 'capacity', 'occupancy']
time_indicators = ['current', 'prev', '2023', '2024', '2022']
has_bed_data = any(
any(bed_indicator in col for bed_indicator in bed_indicators)
for col in columns_lower
)
if has_bed_data:
healthcare_info['type'] = 'bed_data'
# Try to calculate changes if we have current and previous data
current_cols = [col for col in df.columns if any(indicator in col.lower() for indicator in ['current', '2023', '2024'])]
prev_cols = [col for col in df.columns if any(indicator in col.lower() for indicator in ['prev', '2022', 'previous'])]
if current_cols and prev_cols:
current_col = current_cols[0]
prev_col = prev_cols[0]
df['bed_change'] = df[current_col] - df[prev_col]
healthcare_info['total_change'] = df['bed_change'].sum()
df['percent_change'] = df.apply(
lambda row: (row['bed_change'] / row[prev_col] * 100) if row[prev_col] != 0 else 0,
axis=1
)
healthcare_info['has_derived_metrics'] = True
return healthcare_info