Spaces:
Sleeping
Sleeping
| from pathlib import Path | |
| import yaml | |
| from django.conf import settings | |
| from django.contrib import admin, messages | |
| from django.contrib.admin.models import ADDITION, CHANGE, LogEntry | |
| from django.db import transaction | |
| from apps.skills.models import Skill | |
| from .models import Role, RoleSkill, UserTargetRole, normalize_roleskill_fields | |
| CURATED_YAML = Path(settings.BASE_DIR) / "seed_data" / "onet_roles_curated.yaml" | |
| class RoleSkillInline(admin.TabularInline): | |
| model = RoleSkill | |
| extra = 0 | |
| fields = ('skill', 'required_level', 'weight', 'is_mandatory') | |
| readonly_fields = () | |
| autocomplete_fields = ['skill'] | |
| def has_add_permission(self, request, obj=None): | |
| # Disallow adding RoleSkills while the parent Role is still being | |
| # created — FK integrity forces the Role to be saved first. | |
| return obj is not None | |
| def _log_admin_action(request, objects, action_flag, message_per_object): | |
| """Emit admin.LogEntry rows for a bulk admin action. | |
| Django's standard change_view auto-creates LogEntry, but bulk actions do | |
| not — without this, Module 7 admin CRUD has a gap in its audit trail for | |
| the O*NET import and role duplication flows. | |
| """ | |
| for obj in objects: | |
| LogEntry.objects.log_actions( | |
| user_id=request.user.pk, | |
| queryset=[obj], | |
| action_flag=action_flag, | |
| change_message=message_per_object(obj), | |
| single_object=True, | |
| ) | |
| def _load_curated_yaml(): | |
| if not CURATED_YAML.exists(): | |
| return None | |
| with CURATED_YAML.open(encoding="utf-8") as f: | |
| return yaml.safe_load(f) or {} | |
| def _match_curated_role(role, curated): | |
| """Return the curated YAML entry that matches the given Role instance. | |
| Match precedence: (1) primary_soc == role.onet_soc_code, | |
| (2) role.onet_soc_code in all_socs, (3) case-insensitive name match. | |
| """ | |
| if not curated: | |
| return None | |
| entries = curated.get("roles") or [] | |
| soc = (role.onet_soc_code or "").strip() | |
| if soc: | |
| for entry in entries: | |
| if (entry.get("primary_soc") or "").strip() == soc: | |
| return entry | |
| for entry in entries: | |
| if soc in (entry.get("all_socs") or []): | |
| return entry | |
| name = (role.role_name or "").strip().lower() | |
| for entry in entries: | |
| if (entry.get("name") or "").strip().lower() == name: | |
| return entry | |
| return None | |
| class RoleAdmin(admin.ModelAdmin): | |
| list_display = ('role_name', 'industry', 'onet_soc_code', 'is_active') | |
| list_filter = ('industry', 'is_active') | |
| search_fields = ('role_name', 'description', 'onet_soc_code') | |
| inlines = [RoleSkillInline] | |
| actions = ['import_skills_from_onet', 'duplicate_role_with_skills'] | |
| def import_skills_from_onet(self, request, queryset): | |
| curated = _load_curated_yaml() | |
| if curated is None: | |
| self.message_user( | |
| request, | |
| f"Curated O*NET YAML not found at {CURATED_YAML}. " | |
| "Run scripts/parse_onet_dump.py + curate_roles.py first.", | |
| level=messages.ERROR, | |
| ) | |
| return | |
| roles_matched = 0 | |
| skills_created = 0 | |
| skills_skipped_existing = 0 | |
| skills_missing_catalog = [] | |
| skills_invalid = [] | |
| roles_unmatched = [] | |
| per_role_added: dict[int, int] = {} | |
| with transaction.atomic(): | |
| for role in queryset: | |
| entry = _match_curated_role(role, curated) | |
| if not entry: | |
| roles_unmatched.append(role.role_name) | |
| continue | |
| roles_matched += 1 | |
| added_this_role = 0 | |
| for skill_entry in entry.get("skills") or []: | |
| name = skill_entry.get("skill_name") | |
| if not name: | |
| continue | |
| try: | |
| skill = Skill.objects.get(skill_name=name) | |
| except Skill.DoesNotExist: | |
| skills_missing_catalog.append(f"{role.role_name}:{name}") | |
| continue | |
| is_mandatory = bool(skill_entry.get("is_mandatory", False)) | |
| level, weight, error = normalize_roleskill_fields( | |
| skill_entry.get("required_level"), | |
| skill_entry.get("weight", 1.0), | |
| is_mandatory, | |
| ) | |
| if error: | |
| # Skip+report rather than full_clean inside the atomic | |
| # block — one bad row must not abort the whole import. | |
| skills_invalid.append(f"{role.role_name}:{name} ({error})") | |
| continue | |
| _, created = RoleSkill.objects.get_or_create( | |
| role=role, | |
| skill=skill, | |
| defaults={ | |
| "required_level": level, | |
| "weight": weight, | |
| "is_mandatory": is_mandatory, | |
| }, | |
| ) | |
| if created: | |
| skills_created += 1 | |
| added_this_role += 1 | |
| else: | |
| skills_skipped_existing += 1 | |
| if added_this_role: | |
| per_role_added[role.pk] = added_this_role | |
| _log_admin_action( | |
| request, queryset, | |
| action_flag=CHANGE, | |
| message_per_object=lambda r: ( | |
| f"O*NET import: {per_role_added.get(r.pk, 0)} RoleSkill row(s) added." | |
| if r.pk in per_role_added | |
| else "O*NET import: no new RoleSkill rows (already present or unmatched)." | |
| ), | |
| ) | |
| self.message_user( | |
| request, | |
| f"O*NET import: {roles_matched} role(s) matched, " | |
| f"{skills_created} RoleSkill row(s) created, " | |
| f"{skills_skipped_existing} already present.", | |
| level=messages.SUCCESS if roles_matched else messages.WARNING, | |
| ) | |
| if roles_unmatched: | |
| self.message_user( | |
| request, | |
| "No curated entry for: " + ", ".join(roles_unmatched) + | |
| " (check onet_soc_code or role_name).", | |
| level=messages.WARNING, | |
| ) | |
| if skills_missing_catalog: | |
| self.message_user( | |
| request, | |
| "Skills missing from catalog (run seed_initial_skills): " + | |
| ", ".join(skills_missing_catalog[:20]) + | |
| ("..." if len(skills_missing_catalog) > 20 else ""), | |
| level=messages.WARNING, | |
| ) | |
| if skills_invalid: | |
| self.message_user( | |
| request, | |
| "Skipped invalid RoleSkill rows: " + | |
| ", ".join(skills_invalid[:20]) + | |
| ("..." if len(skills_invalid) > 20 else ""), | |
| level=messages.WARNING, | |
| ) | |
| def duplicate_role_with_skills(self, request, queryset): | |
| created_names = [] | |
| new_roles = [] | |
| with transaction.atomic(): | |
| for role in queryset: | |
| source_skills = list(role.role_skills.all()) | |
| base_name = f"{role.role_name} (copy)" | |
| new_name = base_name | |
| suffix = 2 | |
| while Role.objects.filter(role_name=new_name).exists(): | |
| new_name = f"{base_name} {suffix}" | |
| suffix += 1 | |
| new_role = Role.objects.create( | |
| role_name=new_name, | |
| description=role.description, | |
| industry=role.industry, | |
| is_active=role.is_active, | |
| onet_soc_code=role.onet_soc_code, | |
| ) | |
| RoleSkill.objects.bulk_create([ | |
| RoleSkill( | |
| role=new_role, | |
| skill=rs.skill, | |
| required_level=rs.required_level, | |
| weight=rs.weight, | |
| is_mandatory=rs.is_mandatory, | |
| ) | |
| for rs in source_skills | |
| ]) | |
| created_names.append(new_name) | |
| new_roles.append((new_role, role, len(source_skills))) | |
| _log_admin_action( | |
| request, [nr for nr, _, _ in new_roles], | |
| action_flag=ADDITION, | |
| message_per_object=lambda nr: next( | |
| f"Duplicated from '{src.role_name}' with {n} RoleSkill row(s)." | |
| for new, src, n in new_roles if new.pk == nr.pk | |
| ), | |
| ) | |
| if created_names: | |
| self.message_user( | |
| request, | |
| f"Duplicated {len(created_names)} role(s): " + ", ".join(created_names), | |
| level=messages.SUCCESS, | |
| ) | |
| else: | |
| self.message_user(request, "No roles selected.", level=messages.WARNING) | |
| class UserTargetRoleAdmin(admin.ModelAdmin): | |
| list_display = ('user', 'role', 'selected_at', 'is_active') | |
| list_filter = ('is_active', 'selected_at') | |
| search_fields = ('user__email', 'role__role_name') | |
| readonly_fields = ('selected_at',) | |