nsbecf / src /collect_jobs.py
acarey5
new scrapping
851ce09
from __future__ import annotations
import json
import logging
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable, List, Optional
from src import cache
from src.collectors import generic, greenhouse, lever, smartrecruiters, workday
from src.collectors.common import dedupe_jobs
from src.company_loader import load_company_records
from src.detectors.ats_detector import detect_ats_type, extract_ats_identifier
from src.jobs.debug_utils import save_debug_html
from src.models import CompanyRecord, JobPosting
from src.resolver.jobs_page_resolver import ResolvedJobsPage, resolve_real_jobs_page
LOGGER = logging.getLogger("career_fair_matcher.collect_jobs")
CONNECTOR_REGISTRY = {
"greenhouse": greenhouse.collect,
"lever": lever.collect,
"workday": workday.collect,
"smartrecruiters": smartrecruiters.collect,
"generic": generic.collect,
}
@dataclass
class CompanyCollectionOutcome:
company: str
original_url: str
resolved_url: str
fetch_method: str
ats_detected: str
connector_used: str
jobs_collected: int
failure_reason: str = ""
def configure_logging(level: int = logging.INFO) -> None:
if LOGGER.handlers:
return
logging.basicConfig(level=level, format="%(message)s")
def _log_company_event(outcome: CompanyCollectionOutcome) -> None:
LOGGER.info(json.dumps({
"company": outcome.company,
"original_careers_url": outcome.original_url,
"resolved_url": outcome.resolved_url,
"fetch_method": outcome.fetch_method,
"ats_detected": outcome.ats_detected,
"connector_used": outcome.connector_used,
"jobs_collected": outcome.jobs_collected,
"failure_reason": outcome.failure_reason,
}))
def _save_resolution_snapshots(debug_dir: Path, company: CompanyRecord, resolved_page: ResolvedJobsPage) -> None:
for stage, html in resolved_page.html_snapshots.items():
save_debug_html(company.company, html, stage, debug_dir)
save_debug_html(company.company, resolved_page.html, "resolved", debug_dir)
def _collect_from_connector(company: CompanyRecord, resolved_page: ResolvedJobsPage, ats_type: str) -> tuple[List[JobPosting], str, str]:
connector_name = ats_type if ats_type in CONNECTOR_REGISTRY else "generic"
connector = CONNECTOR_REGISTRY[connector_name]
jobs = connector(company, resolved_page)
failure_reason = ""
if not jobs and connector_name != "generic":
jobs = generic.collect(company, resolved_page)
failure_reason = "ATS_PARSE_FAILED" if not jobs else ""
connector_name = "generic"
if not jobs:
resolution_failure = resolved_page.failure_reason or "NO_JOBS_FOUND"
return [], connector_name, resolution_failure
return dedupe_jobs(jobs), connector_name, failure_reason
def collect_jobs_for_company(company: CompanyRecord, debug_dir: Path) -> tuple[List[JobPosting], CompanyCollectionOutcome]:
"""Resolve, detect, collect, normalize, and log a single curated company."""
resolved_page = resolve_real_jobs_page(company.careers_url)
_save_resolution_snapshots(debug_dir, company, resolved_page)
ats_type = detect_ats_type(resolved_page.url, resolved_page.html, company.ats_type)
if not company.ats_identifier:
company.ats_identifier = extract_ats_identifier(ats_type, resolved_page.url, resolved_page.html)
jobs, connector_used, failure_reason = _collect_from_connector(company, resolved_page, ats_type)
normalized_jobs = [
job if isinstance(job, JobPosting) else job
for job in jobs
]
outcome = CompanyCollectionOutcome(
company=company.company,
original_url=company.careers_url,
resolved_url=resolved_page.url,
fetch_method=resolved_page.fetch_method,
ats_detected=ats_type,
connector_used=connector_used,
jobs_collected=len(normalized_jobs),
failure_reason=failure_reason,
)
_log_company_event(outcome)
return normalized_jobs, outcome
def refresh_selected_companies(
*,
company_csv: str | Path,
cache_path: str | Path,
debug_dir: str | Path,
selected_companies: Optional[Iterable[str]] = None,
limit: Optional[int] = None,
) -> tuple[List[JobPosting], List[CompanyCollectionOutcome], Path]:
"""Refresh cached jobs for the curated list, preserving partial results on failure."""
configure_logging()
debug_path = Path(debug_dir)
companies = load_company_records(company_csv, limit=limit, selected_companies=set(selected_companies or []))
all_jobs: List[JobPosting] = []
outcomes: List[CompanyCollectionOutcome] = []
for company in companies:
try:
company_jobs, outcome = collect_jobs_for_company(company, debug_path)
all_jobs.extend(company_jobs)
outcomes.append(outcome)
except Exception as exc:
outcome = CompanyCollectionOutcome(
company=company.company,
original_url=company.careers_url,
resolved_url=company.careers_url,
fetch_method="requests",
ats_detected=company.ats_type or "generic",
connector_used="generic",
jobs_collected=0,
failure_reason=str(exc) or "REQUEST_FAILED",
)
outcomes.append(outcome)
_log_company_event(outcome)
refreshed_companies = [company.company for company in companies]
cache_file = cache.merge_cached_jobs(cache_path, dedupe_jobs(all_jobs), refreshed_companies)
return dedupe_jobs(all_jobs), outcomes, cache_file
def main() -> None:
base_dir = Path(__file__).resolve().parent.parent
company_csv = base_dir / "data" / "nsbe_companies.csv"
cache_path = base_dir / "data" / "cached_jobs.json"
debug_dir = base_dir / "debug_html" / "collect_jobs"
refresh_selected_companies(company_csv=company_csv, cache_path=cache_path, debug_dir=debug_dir)
if __name__ == "__main__":
main()