Spaces:
Sleeping
Sleeping
| # upload_ingest.py | |
| import pandas as pd | |
| import os | |
| import json | |
| from typing import Dict, List, Any | |
| import PyPDF2 | |
| import docx | |
| import csv | |
| def extract_text_from_files(file_paths: List[str]) -> Dict[str, Any]: | |
| """Extract text and data from uploaded files dynamically.""" | |
| result = { | |
| "chunks": [], | |
| "artifacts": [], | |
| "healthcare_data": {} | |
| } | |
| for file_path in file_paths: | |
| try: | |
| file_name = os.path.basename(file_path) | |
| file_ext = os.path.splitext(file_name)[1].lower() | |
| if file_ext == '.csv': | |
| df = pd.read_csv(file_path) | |
| result["chunks"].append(f"CSV file: {file_name}") | |
| # Dynamic healthcare data detection | |
| healthcare_info = detect_healthcare_data_type(df) | |
| if healthcare_info: | |
| result["healthcare_data"][file_name] = healthcare_info | |
| result["artifacts"].append({ | |
| "file": file_name, | |
| "type": "csv", | |
| "sample": df.head(3).to_dict('records') | |
| }) | |
| elif file_ext in ['.xlsx', '.xls']: | |
| df = pd.read_excel(file_path) | |
| result["chunks"].append(f"Excel file: {file_name}") | |
| healthcare_info = detect_healthcare_data_type(df) | |
| if healthcare_info: | |
| result["healthcare_data"][file_name] = healthcare_info | |
| result["artifacts"].append({ | |
| "file": file_name, | |
| "type": "excel", | |
| "sample": df.head(3).to_dict('records') | |
| }) | |
| elif file_ext == '.json': | |
| with open(file_path, 'r') as f: | |
| data = json.load(f) | |
| df = pd.json_normalize(data) | |
| result["chunks"].append(f"JSON file: {file_name}") | |
| healthcare_info = detect_healthcare_data_type(df) | |
| if healthcare_info: | |
| result["healthcare_data"][file_name] = healthcare_info | |
| result["artifacts"].append({ | |
| "file": file_name, | |
| "type": "json", | |
| "sample": df.head(3).to_dict('records') | |
| }) | |
| elif file_ext == '.parquet': | |
| df = pd.read_parquet(file_path) | |
| result["chunks"].append(f"Parquet file: {file_name}") | |
| healthcare_info = detect_healthcare_data_type(df) | |
| if healthcare_info: | |
| result["healthcare_data"][file_name] = healthcare_info | |
| result["artifacts"].append({ | |
| "file": file_name, | |
| "type": "parquet", | |
| "sample": df.head(3).to_dict('records') | |
| }) | |
| elif file_ext == '.pdf': | |
| text = extract_text_from_pdf(file_path) | |
| result["chunks"].append(f"PDF file: {file_name}") | |
| result["chunks"].append(f"Extracted text preview: {text[:500]}...") | |
| result["artifacts"].append({ | |
| "file": file_name, | |
| "type": "pdf", | |
| "text": text | |
| }) | |
| elif file_ext == '.docx': | |
| text = extract_text_from_docx(file_path) | |
| result["chunks"].append(f"DOCX file: {file_name}") | |
| result["chunks"].append(f"Extracted text preview: {text[:500]}...") | |
| result["artifacts"].append({ | |
| "file": file_name, | |
| "type": "docx", | |
| "text": text | |
| }) | |
| elif file_ext == '.txt': | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| text = f.read() | |
| result["chunks"].append(f"Text file: {file_name}") | |
| result["chunks"].append(f"Content preview: {text[:500]}...") | |
| result["artifacts"].append({ | |
| "file": file_name, | |
| "type": "txt", | |
| "text": text | |
| }) | |
| else: | |
| result["chunks"].append(f"Unsupported file type: {file_ext}") | |
| except Exception as e: | |
| result["chunks"].append(f"Error processing {file_path}: {str(e)}") | |
| return result | |
| def extract_text_from_pdf(file_path: str) -> str: | |
| """Extract text from PDF file.""" | |
| text = "" | |
| with open(file_path, 'rb') as file: | |
| reader = PyPDF2.PdfReader(file) | |
| for page in reader.pages: | |
| text += page.extract_text() | |
| return text | |
| def extract_text_from_docx(file_path: str) -> str: | |
| """Extract text from DOCX file.""" | |
| doc = docx.Document(file_path) | |
| text = "" | |
| for paragraph in doc.paragraphs: | |
| text += paragraph.text + "\n" | |
| return text | |
| def detect_healthcare_data_type(df: pd.DataFrame) -> Dict[str, Any]: | |
| """Detect healthcare data type dynamically.""" | |
| healthcare_info = {} | |
| # Convert column names to lowercase for easier matching | |
| columns_lower = [col.lower() for col in df.columns] | |
| # Check for facility data indicators | |
| facility_indicators = ['facility', 'hospital', 'clinic', 'center', 'site', 'name'] | |
| type_indicators = ['type', 'category', 'class'] | |
| has_facility_data = any( | |
| any(indicator in col for indicator in facility_indicators) | |
| for col in columns_lower | |
| ) | |
| has_type_data = any( | |
| any(indicator in col for indicator in type_indicators) | |
| for col in columns_lower | |
| ) | |
| if has_facility_data: | |
| healthcare_info['type'] = 'facility_data' | |
| if has_type_data: | |
| type_col = next((col for col in df.columns if any(indicator in col.lower() for indicator in type_indicators)), None) | |
| if type_col: | |
| healthcare_info['facility_types'] = df[type_col].value_counts().to_dict() | |
| # Check for bed data indicators | |
| bed_indicators = ['bed', 'capacity', 'occupancy'] | |
| time_indicators = ['current', 'prev', '2023', '2024', '2022'] | |
| has_bed_data = any( | |
| any(bed_indicator in col for bed_indicator in bed_indicators) | |
| for col in columns_lower | |
| ) | |
| if has_bed_data: | |
| healthcare_info['type'] = 'bed_data' | |
| # Try to calculate changes if we have current and previous data | |
| current_cols = [col for col in df.columns if any(indicator in col.lower() for indicator in ['current', '2023', '2024'])] | |
| prev_cols = [col for col in df.columns if any(indicator in col.lower() for indicator in ['prev', '2022', 'previous'])] | |
| if current_cols and prev_cols: | |
| current_col = current_cols[0] | |
| prev_col = prev_cols[0] | |
| df['bed_change'] = df[current_col] - df[prev_col] | |
| healthcare_info['total_change'] = df['bed_change'].sum() | |
| df['percent_change'] = df.apply( | |
| lambda row: (row['bed_change'] / row[prev_col] * 100) if row[prev_col] != 0 else 0, | |
| axis=1 | |
| ) | |
| healthcare_info['has_derived_metrics'] = True | |
| return healthcare_info | |