Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, HTTPException,Query | |
| import pandas as pd | |
| from supabase import create_client, Client | |
| import os | |
| from dotenv import load_dotenv | |
| # Load environment variables | |
| load_dotenv() | |
| # Read Supabase credentials | |
| SUPABASE_URL = os.getenv("SUPABASE_URL") | |
| SUPABASE_KEY = os.getenv("SUPABASE_KEY") | |
| # Initialize FastAPI and Supabase | |
| app = FastAPI() | |
| supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) | |
| # Fetch data from Supabase | |
| try: | |
| response = supabase.table("HR analysis").select("*").execute() | |
| data = pd.DataFrame(response.data) if response.data else pd.DataFrame() | |
| except Exception as e: | |
| print(f"Error fetching data: {e}") | |
| data = pd.DataFrame() | |
| # Convert date columns | |
| for col in ['Survey Date', 'StartDate', 'DOB']: | |
| if col in data.columns: | |
| data[col] = pd.to_datetime(data[col], errors='coerce') | |
| # Calculate Age | |
| if 'DOB' in data.columns: | |
| data['Age'] = (pd.to_datetime("today") - data['DOB']).dt.days // 365 | |
| # Clean Performance Score | |
| score_map = {"Exceeds": 5, "Fully Meets": 4, "Needs Improvement": 3, "PIP": 2} | |
| if 'Performance Score' in data.columns: | |
| data['Performance Score'] = data['Performance Score'].map(lambda x: score_map.get(str(x).strip(), None)) | |
| data['Performance Score'] = pd.to_numeric(data['Performance Score'], errors='coerce') | |
| # Endpoints with try-except handling | |
| def satisfaction_analysis(department: str = Query(None, description="Filter by department")): | |
| try: | |
| if "DepartmentType" not in data.columns or "Satisfaction Score" not in data.columns: | |
| raise HTTPException(status_code=500, detail="Required columns missing in dataset") | |
| filtered_data = data.copy() | |
| if department: | |
| department = department.strip().title() # Normalize input | |
| filtered_data = filtered_data[ | |
| filtered_data["DepartmentType"].str.strip().str.title() == department | |
| ] | |
| if filtered_data.empty: | |
| return [] # Return empty JSON instead of error | |
| result = filtered_data.groupby("DepartmentType")["Satisfaction Score"].mean().reset_index() | |
| return result.to_dict(orient="records") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def department_performance(): | |
| try: | |
| result = data.groupby("DepartmentType")[["Performance Score", "Current Employee Rating"]].mean().reset_index() | |
| return result.to_dict(orient="records") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def training_analytics(program_name: str = Query(None, description="Filter by training program name")): | |
| try: | |
| filtered_data = data if program_name is None else data[data["Training Program Name"] == program_name] | |
| if filtered_data.empty: | |
| return [] | |
| result = filtered_data.groupby("Training Program Name")["Training Outcome"].value_counts(normalize=True).unstack(fill_value=0) | |
| return result.reset_index().to_dict(orient="records") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def engagement_performance(): | |
| try: | |
| correlation = data[['Engagement Score', 'Performance Score']].corr().iloc[0, 1] | |
| return {"correlation_coefficient": correlation} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def cost_benefit_analysis(): | |
| try: | |
| result = data.groupby("DepartmentType").apply(lambda x: x['Performance Score'].mean() / x['Training Cost'].sum()).reset_index(name="ROI") | |
| return result.to_dict(orient="records") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def training_effectiveness(): | |
| try: | |
| result = data.groupby("Training Program Name")["Performance Score"].mean().reset_index() | |
| return result.to_dict(orient="records") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def diversity_dashboard(): | |
| try: | |
| if "DepartmentType" not in data.columns or "GenderCode" not in data.columns: | |
| raise HTTPException(status_code=500, detail="Required columns missing in dataset") | |
| # Compute gender distribution by department | |
| diversity_metrics = data.groupby("DepartmentType")["GenderCode"].value_counts(normalize=True).unstack(fill_value=0).reset_index() | |
| return diversity_metrics.to_dict(orient="records") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def worklife_balance_impact(): | |
| try: | |
| correlation = data[['Work-Life Balance Score', 'Performance Score']].corr().iloc[0, 1] | |
| return {"correlation_coefficient": round(correlation, 3)} # Return as a JSON object | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def career_development(employee_id: str = Query(None, description="Filter by Employee ID")): | |
| try: | |
| if "Employee ID" not in data.columns or "StartDate" not in data.columns: | |
| raise HTTPException(status_code=500, detail="Required columns missing in dataset") | |
| # Print available Employee IDs for debugging | |
| print("Available Employee IDs:", data["Employee ID"].unique()) | |
| filtered_data = data.copy() | |
| if employee_id: | |
| employee_id = employee_id.strip() # Remove leading/trailing spaces | |
| filtered_data = filtered_data[filtered_data["Employee ID"].astype(str) == employee_id] | |
| if filtered_data.empty: | |
| return [] # Return an empty list if no matching records | |
| career_progress = filtered_data.groupby("Employee ID")["StartDate"].count().reset_index(name="Career Movements") | |
| return career_progress.to_dict(orient="records") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |