Spaces:
Sleeping
Sleeping
| """Parse the O*NET v30.2 bulk database into a normalized seed YAML. | |
| Reads: | |
| backend/seed_data/onet_dump/db_30_2_text/Technology Skills.txt | |
| backend/seed_data/onet_dump/db_30_2_text/Occupation Data.txt | |
| backend/seed_data/role_soc_mapping.yaml | |
| backend/seed_data/manual_skill_augmentation.yaml | |
| Writes: | |
| backend/seed_data/onet_roles_raw.yaml | |
| Filter rule: keep technology_skills where Hot Technology=Y AND In Demand=Y. | |
| That intersection produces 11-35 skills per role (verified) — the right granularity | |
| for a usable RoleSkill list. Manual augmentation adds modern frameworks O*NET misses. | |
| Run from repo root: | |
| cd backend && python scripts/parse_onet_dump.py | |
| """ | |
| from __future__ import annotations | |
| import csv | |
| import sys | |
| from pathlib import Path | |
| import yaml | |
| REPO_ROOT = Path(__file__).resolve().parent.parent | |
| SEED_DIR = REPO_ROOT / "seed_data" | |
| ONET_DIR = SEED_DIR / "onet_dump" / "db_30_2_text" | |
| ROLE_MAPPING_FILE = SEED_DIR / "role_soc_mapping.yaml" | |
| AUGMENTATION_FILE = SEED_DIR / "manual_skill_augmentation.yaml" | |
| OUTPUT_FILE = SEED_DIR / "onet_roles_raw.yaml" | |
| def load_occupation_data() -> dict[str, dict[str, str]]: | |
| """SOC code -> {title, description}.""" | |
| path = ONET_DIR / "Occupation Data.txt" | |
| out: dict[str, dict[str, str]] = {} | |
| with path.open(encoding="utf-8") as f: | |
| reader = csv.DictReader(f, delimiter="\t") | |
| for row in reader: | |
| out[row["O*NET-SOC Code"]] = { | |
| "title": row["Title"], | |
| "description": row["Description"], | |
| } | |
| return out | |
| def load_tech_skills_for_socs(socs: set[str]) -> dict[str, list[dict]]: | |
| """SOC code -> [{example, commodity_title, hot, in_demand}]. | |
| Only returns rows where Hot Technology=Y AND In Demand=Y. | |
| """ | |
| path = ONET_DIR / "Technology Skills.txt" | |
| out: dict[str, list[dict]] = {soc: [] for soc in socs} | |
| with path.open(encoding="utf-8") as f: | |
| reader = csv.DictReader(f, delimiter="\t") | |
| for row in reader: | |
| soc = row["O*NET-SOC Code"] | |
| if soc not in socs: | |
| continue | |
| if row["Hot Technology"] != "Y" or row["In Demand"] != "Y": | |
| continue | |
| out[soc].append({ | |
| "example": row["Example"], | |
| "commodity_title": row["Commodity Title"], | |
| "hot": True, | |
| "in_demand": True, | |
| }) | |
| return out | |
| def normalize_skill_name(raw: str) -> str: | |
| """Map O*NET's verbose skill names to cleaner display names. | |
| Examples: | |
| "Structured query language SQL" -> "SQL" | |
| "Amazon Web Services AWS software" -> "AWS" | |
| "Hypertext markup language HTML" -> "HTML" | |
| "Cascading style sheets CSS" -> "CSS" | |
| "Microsoft Azure software" -> "Microsoft Azure" | |
| "Oracle Java" -> "Java" | |
| "JavaScript Object Notation JSON" -> "JSON" | |
| "Extensible markup language XML" -> "XML" | |
| "Microsoft .NET Framework" -> ".NET" | |
| "Spring Framework" / "Spring Boot" -> kept as-is | |
| "The MathWorks MATLAB" -> "MATLAB" | |
| "Google Angular" -> "Angular" | |
| "IBM Terraform" -> "Terraform" | |
| Cleaner names = better matches against manual augmentation entries and | |
| against terms a student would actually type. | |
| """ | |
| s = raw.strip() | |
| # Strip trailing " software" | |
| if s.endswith(" software"): | |
| s = s[: -len(" software")] | |
| # Common acronym extractions: take the trailing all-caps token if the rest | |
| # is a verbose expansion of it. | |
| aliases = { | |
| "Structured query language SQL": "SQL", | |
| "Amazon Web Services AWS": "AWS", | |
| "Hypertext markup language HTML": "HTML", | |
| "Cascading style sheets CSS": "CSS", | |
| "JavaScript Object Notation JSON": "JSON", | |
| "Extensible markup language XML": "XML", | |
| "The MathWorks MATLAB": "MATLAB", | |
| "Google Angular": "Angular", | |
| "IBM Terraform": "Terraform", | |
| "Oracle Java": "Java", | |
| "Microsoft .NET Framework": ".NET", | |
| "Apache Hive": "Apache Hive", | |
| "Apache Hadoop": "Apache Hadoop", | |
| "Apache Spark": "Apache Spark", | |
| "Apache Kafka": "Apache Kafka", | |
| "Apache Airflow": "Apache Airflow", | |
| "Apache Cassandra": "Apache Cassandra", | |
| "Atlassian JIRA": "Jira", | |
| "Atlassian Confluence": "Confluence", | |
| "Microsoft Azure": "Microsoft Azure", | |
| "Microsoft Excel": "Microsoft Excel", | |
| "Microsoft Power BI": "Microsoft Power BI", | |
| "Microsoft PowerPoint": "Microsoft PowerPoint", | |
| "Microsoft Access": "Microsoft Access", | |
| "Jenkins CI": "Jenkins", | |
| "Node.js": "Node.js", | |
| } | |
| return aliases.get(s, s) | |
| def categorize_skill(name: str, commodity_title: str) -> str: | |
| """Best-effort skill category from O*NET commodity title or exact name match. | |
| Order matters: most specific buckets first (BI / Cloud / ML / Infra / Framework | |
| / Database / Language) before generic Tools fallback. Match on EXACT skill | |
| names where possible to avoid substring false positives (e.g., the single | |
| letter "r" matching "PowerPoint"). | |
| """ | |
| n = name.strip() | |
| n_lower = n.lower() | |
| c_lower = commodity_title.lower() if commodity_title else "" | |
| # Data/Analytics tools — check before BI because commodity_title for many | |
| # data tools (Spark, Hadoop) is "Business intelligence and data analysis software" | |
| data_names = {"Apache Spark", "Apache Hadoop", "Apache Hive", "Apache Kafka", | |
| "Apache Airflow", "Apache Cassandra", "dbt", "Pandas", "NumPy", | |
| "Matplotlib", "Seaborn", "Alteryx software", "Alteryx"} | |
| if n in data_names: | |
| return "Data" | |
| # Most specific first | |
| bi_names = {"Microsoft Power BI", "Tableau", "Looker", "Microsoft Excel"} | |
| if n in bi_names or "business intelligence" in c_lower: | |
| return "BI" | |
| cloud_names = {"AWS", "Microsoft Azure", "Google Cloud Platform", "Snowflake", | |
| "Google BigQuery", "Amazon Redshift", "Amazon Elastic Compute Cloud EC2", | |
| "Amazon Simple Storage Service S3"} | |
| if n in cloud_names or "cloud-based" in c_lower: | |
| return "Cloud" | |
| ml_names = {"PyTorch", "TensorFlow", "Scikit-learn", "HuggingFace Transformers", | |
| "Apache MXNet", "MLflow", "Sentence Transformers", "ONNX"} | |
| if n in ml_names: | |
| return "ML" | |
| infra_names = {"Docker", "Kubernetes", "Terraform", "Ansible", "Linux", | |
| "Linux Administration", "Bash Scripting", "Docker Compose"} | |
| if n in infra_names: | |
| return "Infra" | |
| framework_names = {"React", "Angular", "Vue.js", "Django", "Flask", "FastAPI", | |
| "Spring Boot", "Spring Framework", "Next.js", "Node.js", | |
| "Tailwind CSS", "TanStack Query", "Express", "LangChain", | |
| "LlamaIndex"} | |
| if n in framework_names: | |
| return "Framework" | |
| cicd_names = {"Jenkins", "GitHub Actions", "Atlassian JIRA", "Jira", | |
| "Atlassian Confluence", "Confluence"} | |
| if n in cicd_names: | |
| return "CI/CD" | |
| db_names = {"PostgreSQL", "MySQL", "MongoDB", "Redis", "Elasticsearch", | |
| "NoSQL", "pgvector", "Pinecone"} | |
| if n in db_names or ("data base" in c_lower and n not in cloud_names): | |
| return "Database" | |
| # Languages — match on EXACT name only to avoid "r" matching "PowerPoint" | |
| language_names = {"Python", "Java", "C", "C#", "C++", "JavaScript", "TypeScript", | |
| "Go", "Ruby", "Swift", "Kotlin", "Bash", "MATLAB", "R", "SAS", | |
| "PHP", "Rust", "Scala", "HTML", "CSS", "XML", "JSON", ".NET"} | |
| if n in language_names: | |
| return "Language" | |
| if n in ("Git", "GitHub", "Vite", "Jupyter Notebook"): | |
| return "Tools" | |
| if "version" in c_lower or "monitoring" in c_lower: | |
| return "Tools" | |
| return "Tools" | |
| def main() -> int: | |
| if not ONET_DIR.exists(): | |
| print(f"ERROR: O*NET dump not found at {ONET_DIR}", file=sys.stderr) | |
| print("Download db_30_2_text.zip from " | |
| "https://www.onetcenter.org/dl_files/database/db_30_2_text.zip " | |
| "and extract under backend/seed_data/onet_dump/", file=sys.stderr) | |
| return 1 | |
| role_mapping = yaml.safe_load(ROLE_MAPPING_FILE.read_text(encoding="utf-8")) | |
| augmentation = yaml.safe_load(AUGMENTATION_FILE.read_text(encoding="utf-8")) | |
| # Collect all SOCs we need to query (across composite roles) | |
| all_socs: set[str] = set() | |
| for role in role_mapping["roles"]: | |
| all_socs.update(role["socs"]) | |
| occupation_data = load_occupation_data() | |
| tech_skills = load_tech_skills_for_socs(all_socs) | |
| # Verify every SOC was found | |
| missing = [soc for soc in all_socs if soc not in occupation_data] | |
| if missing: | |
| print(f"ERROR: SOC codes not found in Occupation Data.txt: {missing}", | |
| file=sys.stderr) | |
| return 1 | |
| output = {"roles": []} | |
| for role in role_mapping["roles"]: | |
| name = role["name"] | |
| socs = role["socs"] | |
| # Pick the title/description from the first SOC, unless overridden | |
| primary_soc = socs[0] | |
| title_source = occupation_data[primary_soc] | |
| description = role.get("description_override") or title_source["description"] | |
| # Union tech skills across composite SOCs, dedupe by normalized name | |
| seen_names: set[str] = set() | |
| skills: list[dict] = [] | |
| for soc in socs: | |
| for entry in tech_skills.get(soc, []): | |
| normalized = normalize_skill_name(entry["example"]) | |
| if normalized in seen_names: | |
| continue | |
| seen_names.add(normalized) | |
| skills.append({ | |
| "skill_name": normalized, | |
| "category": categorize_skill(normalized, entry["commodity_title"]), | |
| "source": "onet", | |
| "source_soc": soc, | |
| "is_mandatory": True, # Hot+InDemand baseline | |
| "weight": 1.0, # Admin can override | |
| "required_level": "INTERMEDIATE", # Admin can override | |
| }) | |
| # Apply manual augmentation | |
| for aug in augmentation.get("augmentation", {}).get(name, []): | |
| normalized = aug["skill"].strip() | |
| if normalized in seen_names: | |
| # Augmentation upgrades the existing entry's flags/weight | |
| for s in skills: | |
| if s["skill_name"] == normalized: | |
| s["category"] = aug.get("category", s["category"]) | |
| s["is_mandatory"] = aug["is_mandatory"] | |
| s["weight"] = aug["weight"] | |
| s["required_level"] = aug["required_level"] | |
| s["source"] = "onet+manual" | |
| break | |
| else: | |
| seen_names.add(normalized) | |
| skills.append({ | |
| "skill_name": normalized, | |
| "category": aug.get("category", "Tools"), | |
| "source": "manual", | |
| "source_soc": None, | |
| "is_mandatory": aug["is_mandatory"], | |
| "weight": aug["weight"], | |
| "required_level": aug["required_level"], | |
| }) | |
| output["roles"].append({ | |
| "name": name, | |
| "industry": role["industry"], | |
| "description": description, | |
| "primary_soc": primary_soc, | |
| "all_socs": socs, | |
| "skills": skills, | |
| }) | |
| print(f" {name}: {len(skills)} skills " | |
| f"({sum(1 for s in skills if s['source'].startswith('onet'))} from O*NET, " | |
| f"{sum(1 for s in skills if s['source'] == 'manual')} manual)") | |
| OUTPUT_FILE.write_text( | |
| yaml.safe_dump(output, sort_keys=False, allow_unicode=True, width=200), | |
| encoding="utf-8", | |
| ) | |
| print(f"\nWrote {OUTPUT_FILE}") | |
| print(f" {len(output['roles'])} roles, " | |
| f"{sum(len(r['skills']) for r in output['roles'])} total RoleSkill entries") | |
| return 0 | |
| if __name__ == "__main__": | |
| sys.exit(main()) | |