Spaces:
Sleeping
Sleeping
File size: 6,160 Bytes
281ce5c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
from fastapi import FastAPI, HTTPException,Query
import pandas as pd
from supabase import create_client, Client
import os
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Read Supabase credentials
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_KEY = os.getenv("SUPABASE_KEY")
# Initialize FastAPI and Supabase
app = FastAPI()
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
# Fetch data from Supabase
try:
response = supabase.table("HR analysis").select("*").execute()
data = pd.DataFrame(response.data) if response.data else pd.DataFrame()
except Exception as e:
print(f"Error fetching data: {e}")
data = pd.DataFrame()
# Convert date columns
for col in ['Survey Date', 'StartDate', 'DOB']:
if col in data.columns:
data[col] = pd.to_datetime(data[col], errors='coerce')
# Calculate Age
if 'DOB' in data.columns:
data['Age'] = (pd.to_datetime("today") - data['DOB']).dt.days // 365
# Clean Performance Score
score_map = {"Exceeds": 5, "Fully Meets": 4, "Needs Improvement": 3, "PIP": 2}
if 'Performance Score' in data.columns:
data['Performance Score'] = data['Performance Score'].map(lambda x: score_map.get(str(x).strip(), None))
data['Performance Score'] = pd.to_numeric(data['Performance Score'], errors='coerce')
# Endpoints with try-except handling
@app.get("/satisfaction-analysis")
def satisfaction_analysis(department: str = Query(None, description="Filter by department")):
try:
if "DepartmentType" not in data.columns or "Satisfaction Score" not in data.columns:
raise HTTPException(status_code=500, detail="Required columns missing in dataset")
filtered_data = data.copy()
if department:
department = department.strip().title() # Normalize input
filtered_data = filtered_data[
filtered_data["DepartmentType"].str.strip().str.title() == department
]
if filtered_data.empty:
return [] # Return empty JSON instead of error
result = filtered_data.groupby("DepartmentType")["Satisfaction Score"].mean().reset_index()
return result.to_dict(orient="records")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/department-performance")
def department_performance():
try:
result = data.groupby("DepartmentType")[["Performance Score", "Current Employee Rating"]].mean().reset_index()
return result.to_dict(orient="records")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/training-analytics")
def training_analytics(program_name: str = Query(None, description="Filter by training program name")):
try:
filtered_data = data if program_name is None else data[data["Training Program Name"] == program_name]
if filtered_data.empty:
return []
result = filtered_data.groupby("Training Program Name")["Training Outcome"].value_counts(normalize=True).unstack(fill_value=0)
return result.reset_index().to_dict(orient="records")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/engagement-performance")
def engagement_performance():
try:
correlation = data[['Engagement Score', 'Performance Score']].corr().iloc[0, 1]
return {"correlation_coefficient": correlation}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/cost-benefit-analysis")
def cost_benefit_analysis():
try:
result = data.groupby("DepartmentType").apply(lambda x: x['Performance Score'].mean() / x['Training Cost'].sum()).reset_index(name="ROI")
return result.to_dict(orient="records")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/training-effectiveness")
def training_effectiveness():
try:
result = data.groupby("Training Program Name")["Performance Score"].mean().reset_index()
return result.to_dict(orient="records")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/diversity-inclusion")
def diversity_dashboard():
try:
if "DepartmentType" not in data.columns or "GenderCode" not in data.columns:
raise HTTPException(status_code=500, detail="Required columns missing in dataset")
# Compute gender distribution by department
diversity_metrics = data.groupby("DepartmentType")["GenderCode"].value_counts(normalize=True).unstack(fill_value=0).reset_index()
return diversity_metrics.to_dict(orient="records")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/work-life-balance")
def worklife_balance_impact():
try:
correlation = data[['Work-Life Balance Score', 'Performance Score']].corr().iloc[0, 1]
return {"correlation_coefficient": round(correlation, 3)} # Return as a JSON object
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/career-development")
def career_development(employee_id: str = Query(None, description="Filter by Employee ID")):
try:
if "Employee ID" not in data.columns or "StartDate" not in data.columns:
raise HTTPException(status_code=500, detail="Required columns missing in dataset")
# Print available Employee IDs for debugging
print("Available Employee IDs:", data["Employee ID"].unique())
filtered_data = data.copy()
if employee_id:
employee_id = employee_id.strip() # Remove leading/trailing spaces
filtered_data = filtered_data[filtered_data["Employee ID"].astype(str) == employee_id]
if filtered_data.empty:
return [] # Return an empty list if no matching records
career_progress = filtered_data.groupby("Employee ID")["StartDate"].count().reset_index(name="Career Movements")
return career_progress.to_dict(orient="records")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
|