import pandas as pd import numpy as np import math import re import pickle import os # --- 데이터 경로 설정 --- BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) DATA_DIR = os.path.join(BASE_DIR, "core", "data") # --- 전역 데이터 로드 --- try: with open(os.path.join(DATA_DIR, 'sw_competency.pkl'), 'rb') as f: sw_competency = pickle.load(f) job_def = sw_competency['직무레벨'] factor_def = sw_competency['평가요소'] factors = factor_def.columns.to_list()[2:] bars_df = sw_competency['평가지표'] with open(os.path.join(DATA_DIR, 'sw_wage.pkl'), 'rb') as f: sw_wage = pickle.load(f) raw_df = sw_wage['raw'] avg_df = sw_wage['avg'] conds_df = sw_wage['conds'] except Exception as e: print(f"Warning: Could not load data files: {e}") # --- 핵심 비즈니스 로직 --- def get_options(df, selected, target_col): # 1. 불필요한 copy() 제거 filtered = df for col, val in selected.items(): if val is not None and val != "": filtered = filtered[filtered[col] == val] options = filtered[target_col].dropna().drop_duplicates().tolist() # 2. 콘크리트 옵션들만 추출 concrete_options = [x for x in options if x != '전체'] if '전체' in options: if len(concrete_options) > 1: options = ['전체'] + concrete_options else: # 1개만 남았을 때도 구체적인 옵션을 보여주고 싶다면 return concrete_options options = ['전체'] else: options = concrete_options # 4. 정렬 방식 개선 if target_col == '직원규모': sort_map = {'전체': 0, '300~999인': 1, '100~299인': 2, '50~99인': 3, '49인 이하': 4} options.sort(key=lambda x: sort_map.get(x, 999)) return options def get_step_options(df, job=None, bm=None, sales=None): result = {'job_options': df['ITSQF 직무(변환)'].dropna().drop_duplicates().tolist()} if job: result['bm_options'] = get_options(df, {'ITSQF 직무(변환)': job}, 'BM') if job and bm: result['sales_options'] = get_options(df, {'ITSQF 직무(변환)': job, 'BM': bm}, '매출규모') if job and bm and sales: result['emp_options'] = get_options(df, {'ITSQF 직무(변환)': job, 'BM': bm, '매출규모': sales}, '직원규모') result['base_options'] = ['지급총액', '고정급'] return result def get_wage_result(df, job, bm, sales='전체', emp='전체'): mask = ( (df['ITSQF 직무(변환)'] == job) & (df['BM'] == bm) & (df['매출규모'] == sales) & (df['직원규모'] == emp) ) return df[mask].copy() def apply_option_to_wage(avg_df, job, bm, sales, emp, w_base=0.5): selected_df = get_wage_result(avg_df, job=job, bm=bm, sales=sales, emp=emp) base_df = get_wage_result(avg_df, job=job, bm=bm) final_df = pd.concat([base_df, selected_df], ignore_index=True) w_opt = 1 - w_base dfs = [] for base_type in ['고정급', '지급총액']: df = final_df.pivot_table( index='ITSQF 수준', values=base_type, aggfunc='mean' ).reset_index() base_val = base_df.pivot_table( index='ITSQF 수준', values=base_type, aggfunc='mean' ).reset_index().rename(columns={base_type: '전체값'}) opt_val = selected_df.pivot_table( index='ITSQF 수준', values=base_type, aggfunc='mean' ).reset_index().rename(columns={base_type: '옵션값'}) df = df[['ITSQF 수준']].drop_duplicates() df = df.merge(base_val, on='ITSQF 수준', how='left') df = df.merge(opt_val, on='ITSQF 수준', how='left') df['평균연봉'] = df['전체값'] * w_base + df['옵션값'] * w_opt df['기준'] = base_type dfs.append(df) job_wage_df = pd.concat(dfs, ignore_index=True) num_cols = ['전체값', '옵션값', '평균연봉'] existing_num_cols = [c for c in num_cols if c in job_wage_df.columns] job_wage_df[existing_num_cols] = ( job_wage_df[existing_num_cols].astype(float).round(-3) / 10000 ) return job_wage_df def get_levels_by_wage(avg_df, job, bm, sales, emp, base_type, target_wage, w_base = 0.5, top_n=2): job_wage_df = apply_option_to_wage(avg_df, job, bm, sales, emp, w_base) df = job_wage_df[job_wage_df["기준"] == base_type].copy() if df.empty: return [] df["편차"] = (df["평균연봉"] - target_wage).abs() # 편차 기준 오름차순 정렬 후 상위 2개 cands = df.sort_values("편차").head(top_n)["ITSQF 수준"].tolist() def level_num(level: str) -> int: m = re.search(r'(\d+)', str(level)) return int(m.group(1)) if m else -1 # 정렬(낮은 레벨 -> 높은 레벨) return sorted(cands, key=level_num) def make_bars_table(bars_df, job, levels): target_df = bars_df[(bars_df['직무']==job) & (bars_df['레벨'].isin(levels))] target_table = target_df.pivot_table(index='평가요소', columns='레벨', values='지표정의', aggfunc='sum').reset_index() # 평가요소 순서대로 정렬 target_table['평가요소'] = pd.Categorical( target_table['평가요소'], categories=factors, ordered=True ) target_table = target_table.sort_values('평가요소').reset_index(drop=True) ppt_tables = [] for level in levels: table = target_table[['평가요소', level]].copy() cols = ["평가요소", f"{level} 수준"] table.columns = cols ppt_tables.append(table) target_table = target_table.rename(columns={levels[0]:'2수준', levels[1]:'4수준'}) bars_indicator = target_table.copy() bars_indicator = bars_indicator.reset_index(names='id') bars_indicator['id'] = bars_indicator['id'] + 1 bars_indicator['평가요소'] = bars_indicator['id'].astype(str).str.zfill(2) + '. ' + bars_indicator['평가요소'].astype(str) for col in ['2수준', '4수준']: bars_indicator[col] = bars_indicator[col].apply(lambda x: x.split('\n')) bars_indicator.columns = ['id', 'title', 'level2', 'level4'] return bars_indicator, ppt_tables def get_levels_definition(job, levels, job_def): job_pool = job_def[job_def['직무'] == job] texts = [] for level in levels: level_text = job_pool[job_pool['수준'] == level]['수준 정의'].values[0] texts.append(level_text) return texts def format_text_wrap(text: str, max_len: int = 45, delimiter: str = " ") -> str: if not text: return "" lines = [] for paragraph in text.split("\n"): paragraph = paragraph.strip() while len(paragraph) > max_len: # max_len 안에서 가장 마지막 공백 위치 찾기 split_pos = paragraph.rfind(delimiter, 0, max_len) # delimiter가 없으면 그냥 max_len에서 자름 (예외 케이스) if split_pos == -1: split_pos = max_len else: split_pos += len(delimiter) # delimiter 포함해서 자르기 lines.append(paragraph[:split_pos].strip()) paragraph = paragraph[split_pos:].strip() if paragraph: lines.append(paragraph) return "\n".join(lines) def judge_level(user_scores, factors, levels, levels_def, low_cut=2, high_cut=4): """ 단일 레벨 판정(Yes/No 성격): - 총점이 36점이상이면 high_level - 그 외, low_level로 보되 '보완 필요' 상태(낙인 표현 지양) 개선항목: - low_set: low_cut 미만 항목 - middle_set: low_cut 이상 high_cut 미만 항목 - high_set:또는 high_cut이상 항목 """ s = pd.Series(user_scores, dtype="float") low_level, high_level = levels[0], levels[1] level_def_map = {low_level: levels_def[0], high_level: levels_def[1]} low_set = s[s < low_cut].sort_values().index.tolist() middle_set = s[(s >= low_cut) & (s < high_cut)].sort_values().index.tolist() high_set = s[s >= high_cut].sort_values(ascending=False).index.tolist() def trim_items(index_set, max_item=3): if len(index_set) == 0: text = "-" elif len(index_set) > max_item: text = ", ".join([factors[i] for i in index_set[:max_item]]) + " 등" else: text = ", ".join([factors[i] for i in index_set]) return format_text_wrap(text, max_len=33, delimiter=",") final_level = high_level if sum(user_scores) >= 36 else low_level if final_level == high_level: output = { '하위 레벨' : [ f"하위 레벨: {low_level}", level_def_map.get(low_level, "-"), "아래 역량은 현재 레벨 안착을 위해 보완해보면 좋겠습니다.", trim_items(middle_set), ], '현재 레벨' : [ f"현재 레벨: {high_level}", level_def_map.get(high_level, "-"), "다음 역량은 현재 안정적으로 발휘되고 있는 강점입니다.", trim_items(high_set) ] } else: output = { '현재 레벨' :[ f"현재 레벨: {low_level}", level_def_map.get(low_level, "-"), "아래 역량은 현재 레벨 기준에 비추어 보완해보면 좋겠습니다.", trim_items(low_set) ], '상위 레벨' :[ f"상위 레벨: {high_level}", level_def_map.get(high_level, "-"), "다음 역량을 강화하면 Level-Up 성장을 기대할 수 있습니다.", trim_items(middle_set) ], } return final_level, pd.DataFrame(output, index=["title", "definition", "guide", "items"]) def describe_percentile(p): p = max(0, min(100, float(p))) p = round(p, 1) top = round(100 - p, 1) if top > 60: pos = f"하위 {int(math.ceil(p / 10.0) * 10)}% 이내" pos_text = f"하위 {p:.1f}% 수준" else: pos = f"상위 {int(math.ceil(top / 10.0) * 10)}% 이내" if top > 5 else f"상위 {int(top)}% 이내" pos_text = f"상위 {top:.1f}% 수준" if p >= 70: desc = "높은" elif p >= 60: desc = "평균 이상" elif p >= 40: desc = "평균" elif p >= 20: desc = "다소 낮은" else: desc = "낮은" return pos, desc, pos_text def judge_wage( raw_df: pd.DataFrame, job: str, bm: str, sales: str, emp: str, final_level: str, base_type: str, target_wage: int, k_std: float = 20.0, k_shrink: float = 20.0, z_clip: float = 2.5, n_switch: int = 20, alpha_denominator: int = 30, # n>=n_switch일 때 raw percentile 비중 증가 속도 (추천 30) ): # 입력 파싱: 만원 -> 원 x = float(target_wage) * 10000.0 # 1) job_pool (직무 + 레벨 필터) job_pool_df = raw_df[(raw_df['ITSQF 직무(변환)'] == job)&(raw_df['ITSQF 수준'] == final_level)] job_pool_vals = pd.to_numeric(job_pool_df[base_type], errors="coerce").dropna().to_numpy(dtype=float) std_pool = float(np.std(job_pool_vals, ddof=1)) mean_job = float(np.mean(job_pool_vals)) # 2) cohort (직무 + 레벨 + 옵션 필터) def get_cohort_df(df, bm, sales, emp): mask = (df['BM'] == bm) if sales != '전체': mask = mask & (df['매출규모'] == sales) if emp != '전체': mask = mask & (df['직원규모'] == emp) return df[mask].copy() cohort_df = get_cohort_df(job_pool_df, bm, sales, emp) cohort_vals = pd.to_numeric(cohort_df[base_type], errors="coerce").dropna().to_numpy(dtype=float) n = int(cohort_vals.size) mean_cohort = float(np.mean(cohort_vals)) if n >= 1 else mean_job std_cohort = float(np.std(cohort_vals, ddof=1)) if n >= 2 else 0.0 # 3) std 풀링 (분산 기준 혼합) w_std = (n / (n + k_std)) if n > 0 else 0.0 var_eff = w_std * (std_cohort ** 2) + (1.0 - w_std) * (std_pool ** 2) std_eff = math.sqrt(max(var_eff, 1e-9)) # 0 방어 # 4) z-score 보정 z_raw = (x - mean_cohort) / std_eff w_n = (n / (n + k_shrink)) if n > 0 else 0.0 # 표본 수 수축 z_adj = float(np.clip(w_n * z_raw, -z_clip, z_clip)) # 클리핑 def normal_cdf(z: float) -> float: return 0.5 * (1.0 + math.erf(z / math.sqrt(2.0))) def percentile_of_score(arr: np.ndarray, x: float) -> float: """퍼센타일: arr 중 x 이하 비율 * 100""" if arr.size == 0: return float("nan") return float((arr <= x).mean() * 100.0) # 5) 퍼센타일 (z 기반 + 필요시 raw와 블렌딩) p_z = normal_cdf(z_adj) * 100.0 p_raw = percentile_of_score(cohort_vals, x) if n >= 3 else float("nan") # n이 커질수록 raw에 더 무게, 완만한 전이를 위해 alpha_denominator 추천 30 if n >= n_switch and not math.isnan(p_raw): alpha = min(1.0, n / float(alpha_denominator)) p_final = alpha * p_raw + (1.0 - alpha) * p_z else: p_final = p_z p = round(float(p_final), 1) pos, desc, pos_text = describe_percentile(p) head_message = f"""진단 결과, 현재 귀하의 직무 역량 수준은 {final_level}에 가장 가까운 것으로 보입니다. 동일 직무 레벨 및 조건 대비 보상 경쟁력은 {pos}로 {desc} 수준입니다.""" # 평균 대비 차이 (cohort 평균 기준; cohort가 비면 job 평균) diff = x - mean_cohort sign = "+" if diff >= 0 else "-" gap = f"{sign}{abs(diff)/10000:,.0f}" if diff == 0: comp_text = "시장 평균과 동일한 수준으로 나타났습니다." else: direction = "더 높게" if diff > 0 else "더 낮게" comp_text = f"시장 평균 대비 {gap}만원 {direction} 나타났습니다." block1 = f"""현재 보상 경쟁력은 시장 {pos_text}으로, {comp_text}""" block2 = f"""직무: {job} 레벨: {final_level} 보상수준: ({base_type}) {target_wage:,.0f}만원 준거집단: - (BM) {bm} - (매출규모) {sales} - (직원규모) {emp} """ table3 = pd.DataFrame({ "user": f"{target_wage:,.0f}", "marketAverage": f"{mean_cohort/10000:,.0f}", "gap": gap }, index=["값"]) def make_guage_chart(p): value = round(p / 100 * 180, 1) guage = [180, value, 180 - value] return pd.DataFrame( {"값": guage}, index=["항목1", "항목2", "항목3"] ) chart3 = make_guage_chart(p) return head_message, block1, block2, p, chart3, table3 def format_table(table, factors, user_scores, cut=2): label = table.columns[1] s = pd.DataFrame({ "평가요소": factors, label: [3]*10, "User": user_scores }) s['User'] = s['User'].apply(lambda x: 2 if x < cut else (3 if x == cut else 4)) s['부족'] = s['User'].apply(lambda x : "●" if x == 2 else "") s['충족'] = s['User'].apply(lambda x : "●" if x == 3 else "") s['초과'] = s['User'].apply(lambda x : "●" if x == 4 else "") table = table.merge(s[['평가요소', '부족', '충족', '초과']], on='평가요소', how='left') chart = s[['평가요소', label, "User"]] return [table, chart]