# upload_ingest.py import pandas as pd import os import json from typing import Dict, List, Any import PyPDF2 import docx import csv def extract_text_from_files(file_paths: List[str]) -> Dict[str, Any]: """Extract text and data from uploaded files dynamically.""" result = { "chunks": [], "artifacts": [], "healthcare_data": {} } for file_path in file_paths: try: file_name = os.path.basename(file_path) file_ext = os.path.splitext(file_name)[1].lower() if file_ext == '.csv': df = pd.read_csv(file_path) result["chunks"].append(f"CSV file: {file_name}") # Dynamic healthcare data detection healthcare_info = detect_healthcare_data_type(df) if healthcare_info: result["healthcare_data"][file_name] = healthcare_info result["artifacts"].append({ "file": file_name, "type": "csv", "sample": df.head(3).to_dict('records') }) elif file_ext in ['.xlsx', '.xls']: df = pd.read_excel(file_path) result["chunks"].append(f"Excel file: {file_name}") healthcare_info = detect_healthcare_data_type(df) if healthcare_info: result["healthcare_data"][file_name] = healthcare_info result["artifacts"].append({ "file": file_name, "type": "excel", "sample": df.head(3).to_dict('records') }) elif file_ext == '.json': with open(file_path, 'r') as f: data = json.load(f) df = pd.json_normalize(data) result["chunks"].append(f"JSON file: {file_name}") healthcare_info = detect_healthcare_data_type(df) if healthcare_info: result["healthcare_data"][file_name] = healthcare_info result["artifacts"].append({ "file": file_name, "type": "json", "sample": df.head(3).to_dict('records') }) elif file_ext == '.parquet': df = pd.read_parquet(file_path) result["chunks"].append(f"Parquet file: {file_name}") healthcare_info = detect_healthcare_data_type(df) if healthcare_info: result["healthcare_data"][file_name] = healthcare_info result["artifacts"].append({ "file": file_name, "type": "parquet", "sample": df.head(3).to_dict('records') }) elif file_ext == '.pdf': text = extract_text_from_pdf(file_path) result["chunks"].append(f"PDF file: {file_name}") result["chunks"].append(f"Extracted text preview: {text[:500]}...") result["artifacts"].append({ "file": file_name, "type": "pdf", "text": text }) elif file_ext == '.docx': text = extract_text_from_docx(file_path) result["chunks"].append(f"DOCX file: {file_name}") result["chunks"].append(f"Extracted text preview: {text[:500]}...") result["artifacts"].append({ "file": file_name, "type": "docx", "text": text }) elif file_ext == '.txt': with open(file_path, 'r', encoding='utf-8') as f: text = f.read() result["chunks"].append(f"Text file: {file_name}") result["chunks"].append(f"Content preview: {text[:500]}...") result["artifacts"].append({ "file": file_name, "type": "txt", "text": text }) else: result["chunks"].append(f"Unsupported file type: {file_ext}") except Exception as e: result["chunks"].append(f"Error processing {file_path}: {str(e)}") return result def extract_text_from_pdf(file_path: str) -> str: """Extract text from PDF file.""" text = "" with open(file_path, 'rb') as file: reader = PyPDF2.PdfReader(file) for page in reader.pages: text += page.extract_text() return text def extract_text_from_docx(file_path: str) -> str: """Extract text from DOCX file.""" doc = docx.Document(file_path) text = "" for paragraph in doc.paragraphs: text += paragraph.text + "\n" return text def detect_healthcare_data_type(df: pd.DataFrame) -> Dict[str, Any]: """Detect healthcare data type dynamically.""" healthcare_info = {} # Convert column names to lowercase for easier matching columns_lower = [col.lower() for col in df.columns] # Check for facility data indicators facility_indicators = ['facility', 'hospital', 'clinic', 'center', 'site', 'name'] type_indicators = ['type', 'category', 'class'] has_facility_data = any( any(indicator in col for indicator in facility_indicators) for col in columns_lower ) has_type_data = any( any(indicator in col for indicator in type_indicators) for col in columns_lower ) if has_facility_data: healthcare_info['type'] = 'facility_data' if has_type_data: type_col = next((col for col in df.columns if any(indicator in col.lower() for indicator in type_indicators)), None) if type_col: healthcare_info['facility_types'] = df[type_col].value_counts().to_dict() # Check for bed data indicators bed_indicators = ['bed', 'capacity', 'occupancy'] time_indicators = ['current', 'prev', '2023', '2024', '2022'] has_bed_data = any( any(bed_indicator in col for bed_indicator in bed_indicators) for col in columns_lower ) if has_bed_data: healthcare_info['type'] = 'bed_data' # Try to calculate changes if we have current and previous data current_cols = [col for col in df.columns if any(indicator in col.lower() for indicator in ['current', '2023', '2024'])] prev_cols = [col for col in df.columns if any(indicator in col.lower() for indicator in ['prev', '2022', 'previous'])] if current_cols and prev_cols: current_col = current_cols[0] prev_col = prev_cols[0] df['bed_change'] = df[current_col] - df[prev_col] healthcare_info['total_change'] = df['bed_change'].sum() df['percent_change'] = df.apply( lambda row: (row['bed_change'] / row[prev_col] * 100) if row[prev_col] != 0 else 0, axis=1 ) healthcare_info['has_derived_metrics'] = True return healthcare_info