gapguide-api / apps /roles /admin.py
arifRB's picture
Deploy GapGuide backend (Docker)
ffd36e0 verified
Raw
History Blame Contribute Delete
9.75 kB
from pathlib import Path
import yaml
from django.conf import settings
from django.contrib import admin, messages
from django.contrib.admin.models import ADDITION, CHANGE, LogEntry
from django.db import transaction
from apps.skills.models import Skill
from .models import Role, RoleSkill, UserTargetRole, normalize_roleskill_fields
CURATED_YAML = Path(settings.BASE_DIR) / "seed_data" / "onet_roles_curated.yaml"
class RoleSkillInline(admin.TabularInline):
model = RoleSkill
extra = 0
fields = ('skill', 'required_level', 'weight', 'is_mandatory')
readonly_fields = ()
autocomplete_fields = ['skill']
def has_add_permission(self, request, obj=None):
# Disallow adding RoleSkills while the parent Role is still being
# created — FK integrity forces the Role to be saved first.
return obj is not None
def _log_admin_action(request, objects, action_flag, message_per_object):
"""Emit admin.LogEntry rows for a bulk admin action.
Django's standard change_view auto-creates LogEntry, but bulk actions do
not — without this, Module 7 admin CRUD has a gap in its audit trail for
the O*NET import and role duplication flows.
"""
for obj in objects:
LogEntry.objects.log_actions(
user_id=request.user.pk,
queryset=[obj],
action_flag=action_flag,
change_message=message_per_object(obj),
single_object=True,
)
def _load_curated_yaml():
if not CURATED_YAML.exists():
return None
with CURATED_YAML.open(encoding="utf-8") as f:
return yaml.safe_load(f) or {}
def _match_curated_role(role, curated):
"""Return the curated YAML entry that matches the given Role instance.
Match precedence: (1) primary_soc == role.onet_soc_code,
(2) role.onet_soc_code in all_socs, (3) case-insensitive name match.
"""
if not curated:
return None
entries = curated.get("roles") or []
soc = (role.onet_soc_code or "").strip()
if soc:
for entry in entries:
if (entry.get("primary_soc") or "").strip() == soc:
return entry
for entry in entries:
if soc in (entry.get("all_socs") or []):
return entry
name = (role.role_name or "").strip().lower()
for entry in entries:
if (entry.get("name") or "").strip().lower() == name:
return entry
return None
@admin.register(Role)
class RoleAdmin(admin.ModelAdmin):
list_display = ('role_name', 'industry', 'onet_soc_code', 'is_active')
list_filter = ('industry', 'is_active')
search_fields = ('role_name', 'description', 'onet_soc_code')
inlines = [RoleSkillInline]
actions = ['import_skills_from_onet', 'duplicate_role_with_skills']
@admin.action(description="Import skills from O*NET (pre-fill RoleSkills)")
def import_skills_from_onet(self, request, queryset):
curated = _load_curated_yaml()
if curated is None:
self.message_user(
request,
f"Curated O*NET YAML not found at {CURATED_YAML}. "
"Run scripts/parse_onet_dump.py + curate_roles.py first.",
level=messages.ERROR,
)
return
roles_matched = 0
skills_created = 0
skills_skipped_existing = 0
skills_missing_catalog = []
skills_invalid = []
roles_unmatched = []
per_role_added: dict[int, int] = {}
with transaction.atomic():
for role in queryset:
entry = _match_curated_role(role, curated)
if not entry:
roles_unmatched.append(role.role_name)
continue
roles_matched += 1
added_this_role = 0
for skill_entry in entry.get("skills") or []:
name = skill_entry.get("skill_name")
if not name:
continue
try:
skill = Skill.objects.get(skill_name=name)
except Skill.DoesNotExist:
skills_missing_catalog.append(f"{role.role_name}:{name}")
continue
is_mandatory = bool(skill_entry.get("is_mandatory", False))
level, weight, error = normalize_roleskill_fields(
skill_entry.get("required_level"),
skill_entry.get("weight", 1.0),
is_mandatory,
)
if error:
# Skip+report rather than full_clean inside the atomic
# block — one bad row must not abort the whole import.
skills_invalid.append(f"{role.role_name}:{name} ({error})")
continue
_, created = RoleSkill.objects.get_or_create(
role=role,
skill=skill,
defaults={
"required_level": level,
"weight": weight,
"is_mandatory": is_mandatory,
},
)
if created:
skills_created += 1
added_this_role += 1
else:
skills_skipped_existing += 1
if added_this_role:
per_role_added[role.pk] = added_this_role
_log_admin_action(
request, queryset,
action_flag=CHANGE,
message_per_object=lambda r: (
f"O*NET import: {per_role_added.get(r.pk, 0)} RoleSkill row(s) added."
if r.pk in per_role_added
else "O*NET import: no new RoleSkill rows (already present or unmatched)."
),
)
self.message_user(
request,
f"O*NET import: {roles_matched} role(s) matched, "
f"{skills_created} RoleSkill row(s) created, "
f"{skills_skipped_existing} already present.",
level=messages.SUCCESS if roles_matched else messages.WARNING,
)
if roles_unmatched:
self.message_user(
request,
"No curated entry for: " + ", ".join(roles_unmatched) +
" (check onet_soc_code or role_name).",
level=messages.WARNING,
)
if skills_missing_catalog:
self.message_user(
request,
"Skills missing from catalog (run seed_initial_skills): " +
", ".join(skills_missing_catalog[:20]) +
("..." if len(skills_missing_catalog) > 20 else ""),
level=messages.WARNING,
)
if skills_invalid:
self.message_user(
request,
"Skipped invalid RoleSkill rows: " +
", ".join(skills_invalid[:20]) +
("..." if len(skills_invalid) > 20 else ""),
level=messages.WARNING,
)
@admin.action(description="Duplicate role + skills")
def duplicate_role_with_skills(self, request, queryset):
created_names = []
new_roles = []
with transaction.atomic():
for role in queryset:
source_skills = list(role.role_skills.all())
base_name = f"{role.role_name} (copy)"
new_name = base_name
suffix = 2
while Role.objects.filter(role_name=new_name).exists():
new_name = f"{base_name} {suffix}"
suffix += 1
new_role = Role.objects.create(
role_name=new_name,
description=role.description,
industry=role.industry,
is_active=role.is_active,
onet_soc_code=role.onet_soc_code,
)
RoleSkill.objects.bulk_create([
RoleSkill(
role=new_role,
skill=rs.skill,
required_level=rs.required_level,
weight=rs.weight,
is_mandatory=rs.is_mandatory,
)
for rs in source_skills
])
created_names.append(new_name)
new_roles.append((new_role, role, len(source_skills)))
_log_admin_action(
request, [nr for nr, _, _ in new_roles],
action_flag=ADDITION,
message_per_object=lambda nr: next(
f"Duplicated from '{src.role_name}' with {n} RoleSkill row(s)."
for new, src, n in new_roles if new.pk == nr.pk
),
)
if created_names:
self.message_user(
request,
f"Duplicated {len(created_names)} role(s): " + ", ".join(created_names),
level=messages.SUCCESS,
)
else:
self.message_user(request, "No roles selected.", level=messages.WARNING)
@admin.register(UserTargetRole)
class UserTargetRoleAdmin(admin.ModelAdmin):
list_display = ('user', 'role', 'selected_at', 'is_active')
list_filter = ('is_active', 'selected_at')
search_fields = ('user__email', 'role__role_name')
readonly_fields = ('selected_at',)