it_value_check / core /core.py
5minbetter's picture
Deploy to Hugging Face
cc7330c
import pandas as pd
import numpy as np
import math
import re
import pickle
import os
# --- 데이터 경로 설정 ---
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
DATA_DIR = os.path.join(BASE_DIR, "core", "data")
# --- 전역 데이터 로드 ---
try:
with open(os.path.join(DATA_DIR, 'sw_competency.pkl'), 'rb') as f:
sw_competency = pickle.load(f)
job_def = sw_competency['직무레벨']
factor_def = sw_competency['평가요소']
factors = factor_def.columns.to_list()[2:]
bars_df = sw_competency['평가지표']
with open(os.path.join(DATA_DIR, 'sw_wage.pkl'), 'rb') as f:
sw_wage = pickle.load(f)
raw_df = sw_wage['raw']
avg_df = sw_wage['avg']
conds_df = sw_wage['conds']
except Exception as e:
print(f"Warning: Could not load data files: {e}")
# --- 핵심 비즈니스 로직 ---
def get_options(df, selected, target_col):
# 1. 불필요한 copy() 제거
filtered = df
for col, val in selected.items():
if val is not None and val != "":
filtered = filtered[filtered[col] == val]
options = filtered[target_col].dropna().drop_duplicates().tolist()
# 2. 콘크리트 옵션들만 추출
concrete_options = [x for x in options if x != '전체']
if '전체' in options:
if len(concrete_options) > 1:
options = ['전체'] + concrete_options
else:
# 1개만 남았을 때도 구체적인 옵션을 보여주고 싶다면 return concrete_options
options = ['전체']
else:
options = concrete_options
# 4. 정렬 방식 개선
if target_col == '직원규모':
sort_map = {'전체': 0, '300~999인': 1, '100~299인': 2, '50~99인': 3, '49인 이하': 4}
options.sort(key=lambda x: sort_map.get(x, 999))
return options
def get_step_options(df, job=None, bm=None, sales=None):
result = {'job_options': df['ITSQF 직무(변환)'].dropna().drop_duplicates().tolist()}
if job:
result['bm_options'] = get_options(df, {'ITSQF 직무(변환)': job}, 'BM')
if job and bm:
result['sales_options'] = get_options(df, {'ITSQF 직무(변환)': job, 'BM': bm}, '매출규모')
if job and bm and sales:
result['emp_options'] = get_options(df, {'ITSQF 직무(변환)': job, 'BM': bm, '매출규모': sales}, '직원규모')
result['base_options'] = ['지급총액', '고정급']
return result
def get_wage_result(df, job, bm, sales='전체', emp='전체'):
mask = (
(df['ITSQF 직무(변환)'] == job) &
(df['BM'] == bm) &
(df['매출규모'] == sales) &
(df['직원규모'] == emp)
)
return df[mask].copy()
def apply_option_to_wage(avg_df, job, bm, sales, emp, w_base=0.5):
selected_df = get_wage_result(avg_df, job=job, bm=bm, sales=sales, emp=emp)
base_df = get_wage_result(avg_df, job=job, bm=bm)
final_df = pd.concat([base_df, selected_df], ignore_index=True)
w_opt = 1 - w_base
dfs = []
for base_type in ['고정급', '지급총액']:
df = final_df.pivot_table(
index='ITSQF 수준',
values=base_type,
aggfunc='mean'
).reset_index()
base_val = base_df.pivot_table(
index='ITSQF 수준',
values=base_type,
aggfunc='mean'
).reset_index().rename(columns={base_type: '전체값'})
opt_val = selected_df.pivot_table(
index='ITSQF 수준',
values=base_type,
aggfunc='mean'
).reset_index().rename(columns={base_type: '옵션값'})
df = df[['ITSQF 수준']].drop_duplicates()
df = df.merge(base_val, on='ITSQF 수준', how='left')
df = df.merge(opt_val, on='ITSQF 수준', how='left')
df['평균연봉'] = df['전체값'] * w_base + df['옵션값'] * w_opt
df['기준'] = base_type
dfs.append(df)
job_wage_df = pd.concat(dfs, ignore_index=True)
num_cols = ['전체값', '옵션값', '평균연봉']
existing_num_cols = [c for c in num_cols if c in job_wage_df.columns]
job_wage_df[existing_num_cols] = (
job_wage_df[existing_num_cols].astype(float).round(-3) / 10000
)
return job_wage_df
def get_levels_by_wage(avg_df, job, bm, sales, emp, base_type, target_wage, w_base = 0.5, top_n=2):
job_wage_df = apply_option_to_wage(avg_df, job, bm, sales, emp, w_base)
df = job_wage_df[job_wage_df["기준"] == base_type].copy()
if df.empty:
return []
df["편차"] = (df["평균연봉"] - target_wage).abs()
# 편차 기준 오름차순 정렬 후 상위 2개
cands = df.sort_values("편차").head(top_n)["ITSQF 수준"].tolist()
def level_num(level: str) -> int:
m = re.search(r'(\d+)', str(level))
return int(m.group(1)) if m else -1
# 정렬(낮은 레벨 -> 높은 레벨)
return sorted(cands, key=level_num)
def make_bars_table(bars_df, job, levels):
target_df = bars_df[(bars_df['직무']==job) & (bars_df['레벨'].isin(levels))]
target_table = target_df.pivot_table(index='평가요소', columns='레벨', values='지표정의', aggfunc='sum').reset_index()
# 평가요소 순서대로 정렬
target_table['평가요소'] = pd.Categorical(
target_table['평가요소'],
categories=factors,
ordered=True
)
target_table = target_table.sort_values('평가요소').reset_index(drop=True)
ppt_tables = []
for level in levels:
table = target_table[['평가요소', level]].copy()
cols = ["평가요소", f"{level} 수준"]
table.columns = cols
ppt_tables.append(table)
target_table = target_table.rename(columns={levels[0]:'2수준', levels[1]:'4수준'})
bars_indicator = target_table.copy()
bars_indicator = bars_indicator.reset_index(names='id')
bars_indicator['id'] = bars_indicator['id'] + 1
bars_indicator['평가요소'] = bars_indicator['id'].astype(str).str.zfill(2) + '. ' + bars_indicator['평가요소'].astype(str)
for col in ['2수준', '4수준']:
bars_indicator[col] = bars_indicator[col].apply(lambda x: x.split('\n'))
bars_indicator.columns = ['id', 'title', 'level2', 'level4']
return bars_indicator, ppt_tables
def get_levels_definition(job, levels, job_def):
job_pool = job_def[job_def['직무'] == job]
texts = []
for level in levels:
level_text = job_pool[job_pool['수준'] == level]['수준 정의'].values[0]
texts.append(level_text)
return texts
def format_text_wrap(text: str, max_len: int = 45, delimiter: str = " ") -> str:
if not text: return ""
lines = []
for paragraph in text.split("\n"):
paragraph = paragraph.strip()
while len(paragraph) > max_len:
# max_len 안에서 가장 마지막 공백 위치 찾기
split_pos = paragraph.rfind(delimiter, 0, max_len)
# delimiter가 없으면 그냥 max_len에서 자름 (예외 케이스)
if split_pos == -1:
split_pos = max_len
else:
split_pos += len(delimiter) # delimiter 포함해서 자르기
lines.append(paragraph[:split_pos].strip())
paragraph = paragraph[split_pos:].strip()
if paragraph:
lines.append(paragraph)
return "\n".join(lines)
def judge_level(user_scores, factors, levels, levels_def, low_cut=2, high_cut=4):
"""
단일 레벨 판정(Yes/No 성격):
- 총점이 36점이상이면 high_level
- 그 외, low_level로 보되 '보완 필요' 상태(낙인 표현 지양)
개선항목:
- low_set: low_cut 미만 항목
- middle_set: low_cut 이상 high_cut 미만 항목
- high_set:또는 high_cut이상 항목
"""
s = pd.Series(user_scores, dtype="float")
low_level, high_level = levels[0], levels[1]
level_def_map = {low_level: levels_def[0], high_level: levels_def[1]}
low_set = s[s < low_cut].sort_values().index.tolist()
middle_set = s[(s >= low_cut) & (s < high_cut)].sort_values().index.tolist()
high_set = s[s >= high_cut].sort_values(ascending=False).index.tolist()
def trim_items(index_set, max_item=3):
if len(index_set) == 0:
text = "-"
elif len(index_set) > max_item:
text = ", ".join([factors[i] for i in index_set[:max_item]]) + " 등"
else:
text = ", ".join([factors[i] for i in index_set])
return format_text_wrap(text, max_len=33, delimiter=",")
final_level = high_level if sum(user_scores) >= 36 else low_level
if final_level == high_level:
output = {
'하위 레벨' : [
f"하위 레벨: {low_level}", level_def_map.get(low_level, "-"),
"아래 역량은 현재 레벨 안착을 위해 보완해보면 좋겠습니다.",
trim_items(middle_set),
],
'현재 레벨' : [
f"현재 레벨: {high_level}", level_def_map.get(high_level, "-"),
"다음 역량은 현재 안정적으로 발휘되고 있는 강점입니다.",
trim_items(high_set)
]
}
else:
output = {
'현재 레벨' :[
f"현재 레벨: {low_level}", level_def_map.get(low_level, "-"),
"아래 역량은 현재 레벨 기준에 비추어 보완해보면 좋겠습니다.",
trim_items(low_set)
],
'상위 레벨' :[
f"상위 레벨: {high_level}", level_def_map.get(high_level, "-"),
"다음 역량을 강화하면 Level-Up 성장을 기대할 수 있습니다.",
trim_items(middle_set)
],
}
return final_level, pd.DataFrame(output, index=["title", "definition", "guide", "items"])
def describe_percentile(p):
p = max(0, min(100, float(p)))
p = round(p, 1)
top = round(100 - p, 1)
if top > 60:
pos = f"하위 {int(math.ceil(p / 10.0) * 10)}% 이내"
pos_text = f"하위 {p:.1f}% 수준"
else:
pos = f"상위 {int(math.ceil(top / 10.0) * 10)}% 이내" if top > 5 else f"상위 {int(top)}% 이내"
pos_text = f"상위 {top:.1f}% 수준"
if p >= 70: desc = "높은"
elif p >= 60: desc = "평균 이상"
elif p >= 40: desc = "평균"
elif p >= 20: desc = "다소 낮은"
else: desc = "낮은"
return pos, desc, pos_text
def judge_wage(
raw_df: pd.DataFrame,
job: str,
bm: str,
sales: str,
emp: str,
final_level: str,
base_type: str,
target_wage: int,
k_std: float = 20.0,
k_shrink: float = 20.0,
z_clip: float = 2.5,
n_switch: int = 20,
alpha_denominator: int = 30, # n>=n_switch일 때 raw percentile 비중 증가 속도 (추천 30)
):
# 입력 파싱: 만원 -> 원
x = float(target_wage) * 10000.0
# 1) job_pool (직무 + 레벨 필터)
job_pool_df = raw_df[(raw_df['ITSQF 직무(변환)'] == job)&(raw_df['ITSQF 수준'] == final_level)]
job_pool_vals = pd.to_numeric(job_pool_df[base_type], errors="coerce").dropna().to_numpy(dtype=float)
std_pool = float(np.std(job_pool_vals, ddof=1))
mean_job = float(np.mean(job_pool_vals))
# 2) cohort (직무 + 레벨 + 옵션 필터)
def get_cohort_df(df, bm, sales, emp):
mask = (df['BM'] == bm)
if sales != '전체':
mask = mask & (df['매출규모'] == sales)
if emp != '전체':
mask = mask & (df['직원규모'] == emp)
return df[mask].copy()
cohort_df = get_cohort_df(job_pool_df, bm, sales, emp)
cohort_vals = pd.to_numeric(cohort_df[base_type], errors="coerce").dropna().to_numpy(dtype=float)
n = int(cohort_vals.size)
mean_cohort = float(np.mean(cohort_vals)) if n >= 1 else mean_job
std_cohort = float(np.std(cohort_vals, ddof=1)) if n >= 2 else 0.0
# 3) std 풀링 (분산 기준 혼합)
w_std = (n / (n + k_std)) if n > 0 else 0.0
var_eff = w_std * (std_cohort ** 2) + (1.0 - w_std) * (std_pool ** 2)
std_eff = math.sqrt(max(var_eff, 1e-9)) # 0 방어
# 4) z-score 보정
z_raw = (x - mean_cohort) / std_eff
w_n = (n / (n + k_shrink)) if n > 0 else 0.0 # 표본 수 수축
z_adj = float(np.clip(w_n * z_raw, -z_clip, z_clip)) # 클리핑
def normal_cdf(z: float) -> float:
return 0.5 * (1.0 + math.erf(z / math.sqrt(2.0)))
def percentile_of_score(arr: np.ndarray, x: float) -> float:
"""퍼센타일: arr 중 x 이하 비율 * 100"""
if arr.size == 0:
return float("nan")
return float((arr <= x).mean() * 100.0)
# 5) 퍼센타일 (z 기반 + 필요시 raw와 블렌딩)
p_z = normal_cdf(z_adj) * 100.0
p_raw = percentile_of_score(cohort_vals, x) if n >= 3 else float("nan")
# n이 커질수록 raw에 더 무게, 완만한 전이를 위해 alpha_denominator 추천 30
if n >= n_switch and not math.isnan(p_raw):
alpha = min(1.0, n / float(alpha_denominator))
p_final = alpha * p_raw + (1.0 - alpha) * p_z
else:
p_final = p_z
p = round(float(p_final), 1)
pos, desc, pos_text = describe_percentile(p)
head_message = f"""진단 결과, 현재 귀하의 직무 역량 수준은 {final_level}에 가장 가까운 것으로 보입니다.
동일 직무 레벨 및 조건 대비 보상 경쟁력은 {pos}{desc} 수준입니다."""
# 평균 대비 차이 (cohort 평균 기준; cohort가 비면 job 평균)
diff = x - mean_cohort
sign = "+" if diff >= 0 else "-"
gap = f"{sign}{abs(diff)/10000:,.0f}"
if diff == 0:
comp_text = "시장 평균과 동일한 수준으로 나타났습니다."
else:
direction = "더 높게" if diff > 0 else "더 낮게"
comp_text = f"시장 평균 대비 {gap}만원 {direction} 나타났습니다."
block1 = f"""현재 보상 경쟁력은 시장 {pos_text}으로,
{comp_text}"""
block2 = f"""직무: {job}
레벨: {final_level}
보상수준: ({base_type}) {target_wage:,.0f}만원
준거집단:
- (BM) {bm}
- (매출규모) {sales}
- (직원규모) {emp}
"""
table3 = pd.DataFrame({
"user": f"{target_wage:,.0f}",
"marketAverage": f"{mean_cohort/10000:,.0f}",
"gap": gap
}, index=["값"])
def make_guage_chart(p):
value = round(p / 100 * 180, 1)
guage = [180, value, 180 - value]
return pd.DataFrame(
{"값": guage},
index=["항목1", "항목2", "항목3"]
)
chart3 = make_guage_chart(p)
return head_message, block1, block2, p, chart3, table3
def format_table(table, factors, user_scores, cut=2):
label = table.columns[1]
s = pd.DataFrame({
"평가요소": factors,
label: [3]*10,
"User": user_scores
})
s['User'] = s['User'].apply(lambda x: 2 if x < cut else (3 if x == cut else 4))
s['부족'] = s['User'].apply(lambda x : "●" if x == 2 else "")
s['충족'] = s['User'].apply(lambda x : "●" if x == 3 else "")
s['초과'] = s['User'].apply(lambda x : "●" if x == 4 else "")
table = table.merge(s[['평가요소', '부족', '충족', '초과']], on='평가요소', how='left')
chart = s[['평가요소', label, "User"]]
return [table, chart]