Spaces:
Sleeping
Sleeping
| from fastapi import APIRouter, HTTPException | |
| import pandas as pd | |
| from database import fetch_data | |
| from utils.data_processing import preprocess_data | |
| from logger import logger | |
| # Initialize router | |
| router = APIRouter() | |
| def get_filtered_data(columns): | |
| """Fetches and filters data based on required columns.""" | |
| try: | |
| raw_data = fetch_data() | |
| df = preprocess_data(pd.DataFrame(raw_data)) | |
| missing_cols = [col for col in columns if col not in df.columns] | |
| if missing_cols: | |
| raise HTTPException(status_code=400, detail=f"Missing columns: {missing_cols}") | |
| return df[columns].dropna() | |
| except Exception as e: | |
| logger.error(f"Error fetching data: {e}") | |
| raise HTTPException(status_code=500, detail=f"Internal server error: {e}") | |
| def satisfaction_analysis(): | |
| try: | |
| data = get_filtered_data(['Satisfaction Score', 'DepartmentType']) | |
| result = data.groupby("DepartmentType")["Satisfaction Score"].mean().reset_index() | |
| return result.to_dict(orient="records") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Internal server error: {e}") | |
| def department_performance(): | |
| try: | |
| data = get_filtered_data(["Performance Score", "Current Employee Rating", "DepartmentType"]) | |
| result = data.groupby("DepartmentType")[["Performance Score", "Current Employee Rating"]].mean().reset_index() | |
| return result.to_dict(orient="records") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Internal server error: {e}") | |
| def training_analytics(): | |
| try: | |
| data = get_filtered_data(['Training Outcome', 'Training Cost']) | |
| completion_rates = data.groupby("Training Cost")['Training Outcome'].value_counts(normalize=True).unstack(fill_value=0) | |
| return completion_rates.to_dict() | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Internal server error: {e}") | |
| def engagement_performance_correlation(): | |
| try: | |
| data = get_filtered_data(['Engagement Score', 'Performance Score']) | |
| correlation = data[['Engagement Score', 'Performance Score']].corr().iloc[0, 1] | |
| return {"correlation_coefficient": correlation} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Internal server error: {e}") | |
| def cost_benefit_analysis(): | |
| try: | |
| data = get_filtered_data(['Training Cost', 'Performance Score', 'DepartmentType']) | |
| result = data.groupby("DepartmentType").apply(lambda g: g["Performance Score"].mean() / g["Training Cost"].sum()).reset_index(name="ROI") | |
| return result.to_dict(orient="records") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Internal server error: {e}") | |
| def training_effectiveness(): | |
| try: | |
| data = get_filtered_data(['Training Outcome', 'Performance Score', 'Training Program Name']) | |
| result = data.groupby("Training Program Name")["Performance Score"].mean().reset_index() | |
| return result.to_dict(orient="records") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Internal server error: {e}") | |
| def diversity_dashboard(): | |
| try: | |
| data = get_filtered_data(['GenderCode', 'RaceDesc', 'DepartmentType']) | |
| diversity_metrics = data.groupby("DepartmentType")['GenderCode'].value_counts(normalize=True).unstack(fill_value=0) | |
| return diversity_metrics.to_dict() | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Internal server error: {e}") | |
| def worklife_balance_impact(): | |
| try: | |
| data = get_filtered_data(['Work-Life Balance Score', 'Performance Score']) | |
| correlation = data[['Work-Life Balance Score', 'Performance Score']].corr().iloc[0, 1] | |
| return {"correlation_coefficient": correlation} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Internal server error: {e}") | |
| def career_development(): | |
| try: | |
| data = get_filtered_data(['Employee ID', 'StartDate']) | |
| career_progress = data.groupby("Employee ID")["StartDate"].count().reset_index(name="Career Movements") | |
| return career_progress.to_dict(orient="records") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Internal server error: {e}") | |