Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import numpy as np | |
| import math | |
| import re | |
| import pickle | |
| import os | |
| # --- 데이터 경로 설정 --- | |
| BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| DATA_DIR = os.path.join(BASE_DIR, "core", "data") | |
| # --- 전역 데이터 로드 --- | |
| try: | |
| with open(os.path.join(DATA_DIR, 'sw_competency.pkl'), 'rb') as f: | |
| sw_competency = pickle.load(f) | |
| job_def = sw_competency['직무레벨'] | |
| factor_def = sw_competency['평가요소'] | |
| factors = factor_def.columns.to_list()[2:] | |
| bars_df = sw_competency['평가지표'] | |
| with open(os.path.join(DATA_DIR, 'sw_wage.pkl'), 'rb') as f: | |
| sw_wage = pickle.load(f) | |
| raw_df = sw_wage['raw'] | |
| avg_df = sw_wage['avg'] | |
| conds_df = sw_wage['conds'] | |
| except Exception as e: | |
| print(f"Warning: Could not load data files: {e}") | |
| # --- 핵심 비즈니스 로직 --- | |
| def get_options(df, selected, target_col): | |
| # 1. 불필요한 copy() 제거 | |
| filtered = df | |
| for col, val in selected.items(): | |
| if val is not None and val != "": | |
| filtered = filtered[filtered[col] == val] | |
| options = filtered[target_col].dropna().drop_duplicates().tolist() | |
| # 2. 콘크리트 옵션들만 추출 | |
| concrete_options = [x for x in options if x != '전체'] | |
| if '전체' in options: | |
| if len(concrete_options) > 1: | |
| options = ['전체'] + concrete_options | |
| else: | |
| # 1개만 남았을 때도 구체적인 옵션을 보여주고 싶다면 return concrete_options | |
| options = ['전체'] | |
| else: | |
| options = concrete_options | |
| # 4. 정렬 방식 개선 | |
| if target_col == '직원규모': | |
| sort_map = {'전체': 0, '300~999인': 1, '100~299인': 2, '50~99인': 3, '49인 이하': 4} | |
| options.sort(key=lambda x: sort_map.get(x, 999)) | |
| return options | |
| def get_step_options(df, job=None, bm=None, sales=None): | |
| result = {'job_options': df['ITSQF 직무(변환)'].dropna().drop_duplicates().tolist()} | |
| if job: | |
| result['bm_options'] = get_options(df, {'ITSQF 직무(변환)': job}, 'BM') | |
| if job and bm: | |
| result['sales_options'] = get_options(df, {'ITSQF 직무(변환)': job, 'BM': bm}, '매출규모') | |
| if job and bm and sales: | |
| result['emp_options'] = get_options(df, {'ITSQF 직무(변환)': job, 'BM': bm, '매출규모': sales}, '직원규모') | |
| result['base_options'] = ['지급총액', '고정급'] | |
| return result | |
| def get_wage_result(df, job, bm, sales='전체', emp='전체'): | |
| mask = ( | |
| (df['ITSQF 직무(변환)'] == job) & | |
| (df['BM'] == bm) & | |
| (df['매출규모'] == sales) & | |
| (df['직원규모'] == emp) | |
| ) | |
| return df[mask].copy() | |
| def apply_option_to_wage(avg_df, job, bm, sales, emp, w_base=0.5): | |
| selected_df = get_wage_result(avg_df, job=job, bm=bm, sales=sales, emp=emp) | |
| base_df = get_wage_result(avg_df, job=job, bm=bm) | |
| final_df = pd.concat([base_df, selected_df], ignore_index=True) | |
| w_opt = 1 - w_base | |
| dfs = [] | |
| for base_type in ['고정급', '지급총액']: | |
| df = final_df.pivot_table( | |
| index='ITSQF 수준', | |
| values=base_type, | |
| aggfunc='mean' | |
| ).reset_index() | |
| base_val = base_df.pivot_table( | |
| index='ITSQF 수준', | |
| values=base_type, | |
| aggfunc='mean' | |
| ).reset_index().rename(columns={base_type: '전체값'}) | |
| opt_val = selected_df.pivot_table( | |
| index='ITSQF 수준', | |
| values=base_type, | |
| aggfunc='mean' | |
| ).reset_index().rename(columns={base_type: '옵션값'}) | |
| df = df[['ITSQF 수준']].drop_duplicates() | |
| df = df.merge(base_val, on='ITSQF 수준', how='left') | |
| df = df.merge(opt_val, on='ITSQF 수준', how='left') | |
| df['평균연봉'] = df['전체값'] * w_base + df['옵션값'] * w_opt | |
| df['기준'] = base_type | |
| dfs.append(df) | |
| job_wage_df = pd.concat(dfs, ignore_index=True) | |
| num_cols = ['전체값', '옵션값', '평균연봉'] | |
| existing_num_cols = [c for c in num_cols if c in job_wage_df.columns] | |
| job_wage_df[existing_num_cols] = ( | |
| job_wage_df[existing_num_cols].astype(float).round(-3) / 10000 | |
| ) | |
| return job_wage_df | |
| def get_levels_by_wage(avg_df, job, bm, sales, emp, base_type, target_wage, w_base = 0.5, top_n=2): | |
| job_wage_df = apply_option_to_wage(avg_df, job, bm, sales, emp, w_base) | |
| df = job_wage_df[job_wage_df["기준"] == base_type].copy() | |
| if df.empty: | |
| return [] | |
| df["편차"] = (df["평균연봉"] - target_wage).abs() | |
| # 편차 기준 오름차순 정렬 후 상위 2개 | |
| cands = df.sort_values("편차").head(top_n)["ITSQF 수준"].tolist() | |
| def level_num(level: str) -> int: | |
| m = re.search(r'(\d+)', str(level)) | |
| return int(m.group(1)) if m else -1 | |
| # 정렬(낮은 레벨 -> 높은 레벨) | |
| return sorted(cands, key=level_num) | |
| def make_bars_table(bars_df, job, levels): | |
| target_df = bars_df[(bars_df['직무']==job) & (bars_df['레벨'].isin(levels))] | |
| target_table = target_df.pivot_table(index='평가요소', columns='레벨', values='지표정의', aggfunc='sum').reset_index() | |
| # 평가요소 순서대로 정렬 | |
| target_table['평가요소'] = pd.Categorical( | |
| target_table['평가요소'], | |
| categories=factors, | |
| ordered=True | |
| ) | |
| target_table = target_table.sort_values('평가요소').reset_index(drop=True) | |
| ppt_tables = [] | |
| for level in levels: | |
| table = target_table[['평가요소', level]].copy() | |
| cols = ["평가요소", f"{level} 수준"] | |
| table.columns = cols | |
| ppt_tables.append(table) | |
| target_table = target_table.rename(columns={levels[0]:'2수준', levels[1]:'4수준'}) | |
| bars_indicator = target_table.copy() | |
| bars_indicator = bars_indicator.reset_index(names='id') | |
| bars_indicator['id'] = bars_indicator['id'] + 1 | |
| bars_indicator['평가요소'] = bars_indicator['id'].astype(str).str.zfill(2) + '. ' + bars_indicator['평가요소'].astype(str) | |
| for col in ['2수준', '4수준']: | |
| bars_indicator[col] = bars_indicator[col].apply(lambda x: x.split('\n')) | |
| bars_indicator.columns = ['id', 'title', 'level2', 'level4'] | |
| return bars_indicator, ppt_tables | |
| def get_levels_definition(job, levels, job_def): | |
| job_pool = job_def[job_def['직무'] == job] | |
| texts = [] | |
| for level in levels: | |
| level_text = job_pool[job_pool['수준'] == level]['수준 정의'].values[0] | |
| texts.append(level_text) | |
| return texts | |
| def format_text_wrap(text: str, max_len: int = 45, delimiter: str = " ") -> str: | |
| if not text: return "" | |
| lines = [] | |
| for paragraph in text.split("\n"): | |
| paragraph = paragraph.strip() | |
| while len(paragraph) > max_len: | |
| # max_len 안에서 가장 마지막 공백 위치 찾기 | |
| split_pos = paragraph.rfind(delimiter, 0, max_len) | |
| # delimiter가 없으면 그냥 max_len에서 자름 (예외 케이스) | |
| if split_pos == -1: | |
| split_pos = max_len | |
| else: | |
| split_pos += len(delimiter) # delimiter 포함해서 자르기 | |
| lines.append(paragraph[:split_pos].strip()) | |
| paragraph = paragraph[split_pos:].strip() | |
| if paragraph: | |
| lines.append(paragraph) | |
| return "\n".join(lines) | |
| def judge_level(user_scores, factors, levels, levels_def, low_cut=2, high_cut=4): | |
| """ | |
| 단일 레벨 판정(Yes/No 성격): | |
| - 총점이 36점이상이면 high_level | |
| - 그 외, low_level로 보되 '보완 필요' 상태(낙인 표현 지양) | |
| 개선항목: | |
| - low_set: low_cut 미만 항목 | |
| - middle_set: low_cut 이상 high_cut 미만 항목 | |
| - high_set:또는 high_cut이상 항목 | |
| """ | |
| s = pd.Series(user_scores, dtype="float") | |
| low_level, high_level = levels[0], levels[1] | |
| level_def_map = {low_level: levels_def[0], high_level: levels_def[1]} | |
| low_set = s[s < low_cut].sort_values().index.tolist() | |
| middle_set = s[(s >= low_cut) & (s < high_cut)].sort_values().index.tolist() | |
| high_set = s[s >= high_cut].sort_values(ascending=False).index.tolist() | |
| def trim_items(index_set, max_item=3): | |
| if len(index_set) == 0: | |
| text = "-" | |
| elif len(index_set) > max_item: | |
| text = ", ".join([factors[i] for i in index_set[:max_item]]) + " 등" | |
| else: | |
| text = ", ".join([factors[i] for i in index_set]) | |
| return format_text_wrap(text, max_len=33, delimiter=",") | |
| final_level = high_level if sum(user_scores) >= 36 else low_level | |
| if final_level == high_level: | |
| output = { | |
| '하위 레벨' : [ | |
| f"하위 레벨: {low_level}", level_def_map.get(low_level, "-"), | |
| "아래 역량은 현재 레벨 안착을 위해 보완해보면 좋겠습니다.", | |
| trim_items(middle_set), | |
| ], | |
| '현재 레벨' : [ | |
| f"현재 레벨: {high_level}", level_def_map.get(high_level, "-"), | |
| "다음 역량은 현재 안정적으로 발휘되고 있는 강점입니다.", | |
| trim_items(high_set) | |
| ] | |
| } | |
| else: | |
| output = { | |
| '현재 레벨' :[ | |
| f"현재 레벨: {low_level}", level_def_map.get(low_level, "-"), | |
| "아래 역량은 현재 레벨 기준에 비추어 보완해보면 좋겠습니다.", | |
| trim_items(low_set) | |
| ], | |
| '상위 레벨' :[ | |
| f"상위 레벨: {high_level}", level_def_map.get(high_level, "-"), | |
| "다음 역량을 강화하면 Level-Up 성장을 기대할 수 있습니다.", | |
| trim_items(middle_set) | |
| ], | |
| } | |
| return final_level, pd.DataFrame(output, index=["title", "definition", "guide", "items"]) | |
| def describe_percentile(p): | |
| p = max(0, min(100, float(p))) | |
| p = round(p, 1) | |
| top = round(100 - p, 1) | |
| if top > 60: | |
| pos = f"하위 {int(math.ceil(p / 10.0) * 10)}% 이내" | |
| pos_text = f"하위 {p:.1f}% 수준" | |
| else: | |
| pos = f"상위 {int(math.ceil(top / 10.0) * 10)}% 이내" if top > 5 else f"상위 {int(top)}% 이내" | |
| pos_text = f"상위 {top:.1f}% 수준" | |
| if p >= 70: desc = "높은" | |
| elif p >= 60: desc = "평균 이상" | |
| elif p >= 40: desc = "평균" | |
| elif p >= 20: desc = "다소 낮은" | |
| else: desc = "낮은" | |
| return pos, desc, pos_text | |
| def judge_wage( | |
| raw_df: pd.DataFrame, | |
| job: str, | |
| bm: str, | |
| sales: str, | |
| emp: str, | |
| final_level: str, | |
| base_type: str, | |
| target_wage: int, | |
| k_std: float = 20.0, | |
| k_shrink: float = 20.0, | |
| z_clip: float = 2.5, | |
| n_switch: int = 20, | |
| alpha_denominator: int = 30, # n>=n_switch일 때 raw percentile 비중 증가 속도 (추천 30) | |
| ): | |
| # 입력 파싱: 만원 -> 원 | |
| x = float(target_wage) * 10000.0 | |
| # 1) job_pool (직무 + 레벨 필터) | |
| job_pool_df = raw_df[(raw_df['ITSQF 직무(변환)'] == job)&(raw_df['ITSQF 수준'] == final_level)] | |
| job_pool_vals = pd.to_numeric(job_pool_df[base_type], errors="coerce").dropna().to_numpy(dtype=float) | |
| std_pool = float(np.std(job_pool_vals, ddof=1)) | |
| mean_job = float(np.mean(job_pool_vals)) | |
| # 2) cohort (직무 + 레벨 + 옵션 필터) | |
| def get_cohort_df(df, bm, sales, emp): | |
| mask = (df['BM'] == bm) | |
| if sales != '전체': | |
| mask = mask & (df['매출규모'] == sales) | |
| if emp != '전체': | |
| mask = mask & (df['직원규모'] == emp) | |
| return df[mask].copy() | |
| cohort_df = get_cohort_df(job_pool_df, bm, sales, emp) | |
| cohort_vals = pd.to_numeric(cohort_df[base_type], errors="coerce").dropna().to_numpy(dtype=float) | |
| n = int(cohort_vals.size) | |
| mean_cohort = float(np.mean(cohort_vals)) if n >= 1 else mean_job | |
| std_cohort = float(np.std(cohort_vals, ddof=1)) if n >= 2 else 0.0 | |
| # 3) std 풀링 (분산 기준 혼합) | |
| w_std = (n / (n + k_std)) if n > 0 else 0.0 | |
| var_eff = w_std * (std_cohort ** 2) + (1.0 - w_std) * (std_pool ** 2) | |
| std_eff = math.sqrt(max(var_eff, 1e-9)) # 0 방어 | |
| # 4) z-score 보정 | |
| z_raw = (x - mean_cohort) / std_eff | |
| w_n = (n / (n + k_shrink)) if n > 0 else 0.0 # 표본 수 수축 | |
| z_adj = float(np.clip(w_n * z_raw, -z_clip, z_clip)) # 클리핑 | |
| def normal_cdf(z: float) -> float: | |
| return 0.5 * (1.0 + math.erf(z / math.sqrt(2.0))) | |
| def percentile_of_score(arr: np.ndarray, x: float) -> float: | |
| """퍼센타일: arr 중 x 이하 비율 * 100""" | |
| if arr.size == 0: | |
| return float("nan") | |
| return float((arr <= x).mean() * 100.0) | |
| # 5) 퍼센타일 (z 기반 + 필요시 raw와 블렌딩) | |
| p_z = normal_cdf(z_adj) * 100.0 | |
| p_raw = percentile_of_score(cohort_vals, x) if n >= 3 else float("nan") | |
| # n이 커질수록 raw에 더 무게, 완만한 전이를 위해 alpha_denominator 추천 30 | |
| if n >= n_switch and not math.isnan(p_raw): | |
| alpha = min(1.0, n / float(alpha_denominator)) | |
| p_final = alpha * p_raw + (1.0 - alpha) * p_z | |
| else: | |
| p_final = p_z | |
| p = round(float(p_final), 1) | |
| pos, desc, pos_text = describe_percentile(p) | |
| head_message = f"""진단 결과, 현재 귀하의 직무 역량 수준은 {final_level}에 가장 가까운 것으로 보입니다. | |
| 동일 직무 레벨 및 조건 대비 보상 경쟁력은 {pos}로 {desc} 수준입니다.""" | |
| # 평균 대비 차이 (cohort 평균 기준; cohort가 비면 job 평균) | |
| diff = x - mean_cohort | |
| sign = "+" if diff >= 0 else "-" | |
| gap = f"{sign}{abs(diff)/10000:,.0f}" | |
| if diff == 0: | |
| comp_text = "시장 평균과 동일한 수준으로 나타났습니다." | |
| else: | |
| direction = "더 높게" if diff > 0 else "더 낮게" | |
| comp_text = f"시장 평균 대비 {gap}만원 {direction} 나타났습니다." | |
| block1 = f"""현재 보상 경쟁력은 시장 {pos_text}으로, | |
| {comp_text}""" | |
| block2 = f"""직무: {job} | |
| 레벨: {final_level} | |
| 보상수준: ({base_type}) {target_wage:,.0f}만원 | |
| 준거집단: | |
| - (BM) {bm} | |
| - (매출규모) {sales} | |
| - (직원규모) {emp} | |
| """ | |
| table3 = pd.DataFrame({ | |
| "user": f"{target_wage:,.0f}", | |
| "marketAverage": f"{mean_cohort/10000:,.0f}", | |
| "gap": gap | |
| }, index=["값"]) | |
| def make_guage_chart(p): | |
| value = round(p / 100 * 180, 1) | |
| guage = [180, value, 180 - value] | |
| return pd.DataFrame( | |
| {"값": guage}, | |
| index=["항목1", "항목2", "항목3"] | |
| ) | |
| chart3 = make_guage_chart(p) | |
| return head_message, block1, block2, p, chart3, table3 | |
| def format_table(table, factors, user_scores, cut=2): | |
| label = table.columns[1] | |
| s = pd.DataFrame({ | |
| "평가요소": factors, | |
| label: [3]*10, | |
| "User": user_scores | |
| }) | |
| s['User'] = s['User'].apply(lambda x: 2 if x < cut else (3 if x == cut else 4)) | |
| s['부족'] = s['User'].apply(lambda x : "●" if x == 2 else "") | |
| s['충족'] = s['User'].apply(lambda x : "●" if x == 3 else "") | |
| s['초과'] = s['User'].apply(lambda x : "●" if x == 4 else "") | |
| table = table.merge(s[['평가요소', '부족', '충족', '초과']], on='평가요소', how='left') | |
| chart = s[['평가요소', label, "User"]] | |
| return [table, chart] | |