Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| orchestrate_cr.py β Fully automated CR application pipeline. | |
| Reads an Excel contribution list, downloads all Accepted CRs and their target | |
| TSs, parses tracked changes from each CR, applies them to the TS, and | |
| finalises the document metadata β all without any per-CR manual scripting. | |
| Usage: | |
| python3 orchestrate_cr.py <excel_path> [person_name] [--output-dir DIR] [--author NAME] | |
| Arguments: | |
| excel_path Path to .xls or .xlsx contribution list (Windows paths OK) | |
| person_name Name to match in SubmittedBy column (default: "Ly Thanh PHAN") | |
| Options: | |
| --output-dir Base output folder (default: ~/CR_Processing) | |
| --author Tracked-change author name (default: "CR Application") | |
| --retry-mode Skip steps 1-4; apply CRs listed in failed_ts.json | |
| --ts-mode Apply all CRs for a given spec number across all versions | |
| --ts-id Spec number to process in ts-mode (e.g. "102 267") | |
| --excel-hash Excel hash used to filter the HF index in ts-mode | |
| --hf-repo HuggingFace dataset repo containing the CR index | |
| """ | |
| import argparse | |
| import contextlib | |
| import datetime | |
| import io | |
| import json | |
| import os | |
| import re | |
| import shutil | |
| import sys | |
| import time | |
| from pathlib import Path | |
| import docx as docx_lib | |
| # ββ sys.path setup ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| SCRIPT_DIR = Path(__file__).parent | |
| FETCH_SCRIPTS = SCRIPT_DIR.parent.parent / 'fetch-crs' / 'scripts' | |
| sys.path.insert(0, str(SCRIPT_DIR)) | |
| sys.path.insert(0, str(FETCH_SCRIPTS)) | |
| from fetch_crs import parse_excel, download_cr, parse_cr_cover, download_ts, wsl_path | |
| from cr_parser import parse_cr | |
| from ts_applicator import apply_manifest | |
| from finalize_ts import ( | |
| extract_cr_metadata, | |
| compute_pub_date, | |
| derive_new_version, | |
| update_change_history_table, | |
| update_history_table, | |
| update_title_para, | |
| NoChangeHistoryTable, | |
| ) | |
| from docx_helpers import RevCounter, AUTHOR as DEFAULT_AUTHOR, DATE as DEFAULT_DATE | |
| # ββ Display / logging helpers βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _section(title): | |
| bar = '=' * 60 | |
| print(f'\n{bar}') | |
| print(f' {title}') | |
| print(bar) | |
| class _TeeWriter: | |
| """Writes to both real stdout and a StringIO buffer simultaneously.""" | |
| def __init__(self, real, buf): | |
| self._real = real | |
| self._buf = buf | |
| def write(self, s): | |
| self._real.write(s) | |
| self._buf.write(s) | |
| def flush(self): | |
| self._real.flush() | |
| # ββ Small report / cache helpers βββββββββββββββββββββββββββββββββββββββββββββ | |
| def _print_report(report, *, detailed=True): | |
| """Print per-TS result lines from a report list.""" | |
| for ts_key, n_ok, n_skip, n_crs, out_path, log_path, errors in report: | |
| status = 'OK' if out_path and not errors else ('WARN' if out_path else 'FAIL') | |
| print(f' [{status}] {ts_key}') | |
| if detailed: | |
| print(f' CRs: {n_crs} | Body changes applied: {n_ok} | Skipped: {n_skip}') | |
| if out_path: | |
| print(f' Output: {out_path.parent.name}/{out_path.name}') | |
| if log_path and log_path.exists(): | |
| print(f' Log: {log_path.parent.name}/{log_path.name}') | |
| for err in errors: | |
| print(f' ! {err}') | |
| def _copy_cr_cache_if_needed(cr_paths, cr_dir, output_dir): | |
| """Copy downloaded CRs into output_dir/CRs when a shared cache is used.""" | |
| run_cr_dir = output_dir / 'CRs' | |
| if cr_dir.resolve() != run_cr_dir.resolve(): | |
| run_cr_dir.mkdir(parents=True, exist_ok=True) | |
| for p in cr_paths.values(): | |
| if p.exists(): | |
| shutil.copy2(p, run_cr_dir / p.name) | |
| # ββ Per-TS-group apply helper βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _apply_ts_group(spec_number, version, uids, ts_paths, cr_paths, spec_dir, | |
| author, tc_date, log_label='Pipeline Log'): | |
| """Parse, apply, and finalise one TS group. Returns one report tuple.""" | |
| ts_key = f'TS {spec_number} v{version}' | |
| spec_compact = spec_number.replace(' ', '') | |
| new_v = derive_new_version(version) | |
| stem = f'ts_{spec_compact}_v{new_v}_was_v{version}' | |
| ts_applied = spec_dir / f'ts_{spec_compact}_v{version}_applied.docx' | |
| ts_final = spec_dir / f'{stem}.docx' | |
| log_path = spec_dir / f'{stem}.log' | |
| errors = [] | |
| print(f'\n-- {ts_key} ({len(uids)} CR(s): {", ".join(uids)}) --') | |
| if (spec_number, version) not in ts_paths: | |
| msg = 'TS download failed β skipping' | |
| print(f' SKIP: {msg}') | |
| return (ts_key, 0, 0, len(uids), None, log_path, [msg]) | |
| ts_in = ts_paths[(spec_number, version)] | |
| log_buf = io.StringIO() | |
| tee = _TeeWriter(sys.stdout, log_buf) | |
| with contextlib.redirect_stdout(tee): | |
| log_header = ( | |
| f'{log_label}\n' | |
| f'TS: {spec_number} v{version} -> v{new_v}\n' | |
| f'CRs: {", ".join(uids)}\n' | |
| f'Date: {datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}\n' | |
| f'{"=" * 60}\n' | |
| ) | |
| print(log_header, end='') | |
| combined_manifest = [] | |
| participating_uids = [] | |
| for uid in uids: | |
| if uid not in cr_paths: | |
| errors.append(f'[{uid}] CR download had failed β skipped') | |
| continue | |
| print(f' Parsing {uid}... ', end='', flush=True) | |
| try: | |
| changes = parse_cr(cr_paths[uid]) | |
| combined_manifest.extend(changes) | |
| participating_uids.append(uid) | |
| print(f'{len(changes)} change(s)') | |
| except Exception as e: | |
| errors.append(f'[{uid}] parse ERROR: {e}') | |
| print(f'ERROR: {e}') | |
| if not combined_manifest: | |
| print(' No changes parsed β skipping apply step.') | |
| log_path.write_text(log_buf.getvalue(), encoding='utf-8') | |
| return (ts_key, 0, 0, len(uids), None, log_path, | |
| errors + ['No changes parsed']) | |
| print(f' Applying {len(combined_manifest)} change(s) to {ts_in.name}...') | |
| try: | |
| n_ok, n_skip, log_lines, n_parsed, n_merged = apply_manifest( | |
| ts_in, combined_manifest, ts_applied, author=author, date=tc_date | |
| ) | |
| except Exception as e: | |
| errors.append(f'apply_manifest ERROR: {e}') | |
| print(f' ERROR: {e}') | |
| log_path.write_text(log_buf.getvalue(), encoding='utf-8') | |
| return (ts_key, 0, 0, len(uids), None, log_path, errors) | |
| for line in log_lines: | |
| print(f' {line}') | |
| for line in log_lines: | |
| if line.strip().startswith('ERROR'): | |
| errors.append(line.strip()) | |
| print(f' Parsed: {n_parsed} body changes (merged to {n_merged} groups)' | |
| f' β Applied: {n_ok} Skipped: {n_skip}') | |
| print(' Finalising metadata...') | |
| ts_final_or_applied = ts_applied # fallback if finalise raises | |
| try: | |
| ts_doc = docx_lib.Document(str(ts_applied)) | |
| rev = RevCounter(ts_doc) | |
| pub_ym, pub_month_year = compute_pub_date() | |
| old_v = version | |
| title_text = ts_doc.paragraphs[0].text | |
| date_match = re.search(r'\((\d{4}-\d{2})\)', title_text) | |
| old_date_str = date_match.group(1) if date_match else '' | |
| print(f' Version: {old_v} -> {new_v}') | |
| print(f' Publication: {pub_month_year} ({pub_ym})') | |
| for uid in participating_uids: | |
| try: | |
| meta = extract_cr_metadata(str(cr_paths[uid])) | |
| ch_cells = update_change_history_table( | |
| ts_doc, meta, pub_ym, old_v, new_v, rev, author, tc_date | |
| ) | |
| print(f' [Change History] {uid}: {ch_cells}') | |
| except NoChangeHistoryTable: | |
| print(f' [Change History] {uid}: NOT PRESENT β this document has no Change History table (History table only)') | |
| except Exception as e: | |
| errors.append(f'[{uid}] Change History ERROR: {e}') | |
| print(f' [Change History] {uid}: ERROR β {e}') | |
| try: | |
| h_cells = update_history_table( | |
| ts_doc, new_v, pub_month_year, rev, author, tc_date | |
| ) | |
| print(f' [History] {h_cells}') | |
| except Exception as e: | |
| errors.append(f'History table ERROR: {e}') | |
| print(f' [History] ERROR β {e}') | |
| if old_date_str: | |
| try: | |
| update_title_para( | |
| ts_doc, old_v, new_v, old_date_str, pub_ym, rev, author, tc_date | |
| ) | |
| print(f' [Title] V{old_v} -> V{new_v}, ({old_date_str}) -> ({pub_ym})') | |
| except Exception as e: | |
| errors.append(f'Title update ERROR: {e}') | |
| print(f' [Title] ERROR β {e}') | |
| else: | |
| print(f' [Title] SKIP β no (YYYY-MM) pattern in: {title_text!r}') | |
| ts_doc.save(str(ts_final)) | |
| print(f' Saved: {spec_compact}/{ts_final.name}') | |
| print(f' Log: {spec_compact}/{log_path.name}') | |
| ts_final_or_applied = ts_final | |
| except Exception as e: | |
| errors.append(f'Finalisation ERROR: {e}') | |
| print(f' Finalisation ERROR: {e}') | |
| log_path.write_text(log_buf.getvalue(), encoding='utf-8') | |
| return (ts_key, n_ok, n_skip, len(uids), ts_final_or_applied, log_path, errors) | |
| # ββ Shared Steps 2, 4, 5, 6 ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _run_steps_2_to_6(cr_list, ts_groups, output_dir, cr_dir, ts_dir, | |
| eol_user, eol_password, author, tc_date): | |
| """ | |
| Execute steps 2 (download CRs), 4 (download TSs), 5 & 6 (apply + finalise). | |
| cr_list : list of (uid, title) | |
| ts_groups : dict {(spec_number, version): [uid, ...]} β may be pre-built | |
| (ts-mode) or None to trigger Step 3 (cover page parse). | |
| """ | |
| # ββ Step 2: Download CR DOCXs βββββββββββββββββββββββββββββββββββββββββββββ | |
| _section('Step 2 β Downloading CR DOCXs') | |
| cr_paths = {} # uid -> Path | |
| for uid, _ in cr_list: | |
| docx_path, note = download_cr(uid, cr_dir, eol_user, eol_password) | |
| if docx_path: | |
| cr_paths[uid] = docx_path | |
| print(f' [{uid}] OK ({note}) β {docx_path.name}') | |
| else: | |
| print(f' [{uid}] FAILED β {note}') | |
| n_cr_failed = len(cr_list) - len(cr_paths) | |
| if n_cr_failed: | |
| print(f' {len(cr_paths)}/{len(cr_list)} downloaded ({n_cr_failed} failed)') | |
| else: | |
| print(f' All {len(cr_list)} CR(s) downloaded successfully') | |
| # ββ Step 3: Parse cover pages (only when ts_groups not pre-built) βββββββββ | |
| if ts_groups is None: | |
| _section('Step 3 β Parsing CR cover pages') | |
| ts_groups = {} | |
| uid_cover_failed = [] | |
| for uid in cr_paths: | |
| spec_number, version = parse_cr_cover(cr_paths[uid]) | |
| if spec_number and version: | |
| key = (spec_number, version) | |
| ts_groups.setdefault(key, []).append(uid) | |
| print(f' [{uid}] -> TS {spec_number} v{version}') | |
| else: | |
| uid_cover_failed.append(uid) | |
| print(f' [{uid}] WARNING: could not parse cover page β skipping') | |
| if not ts_groups: | |
| print('\nNo TSs identified. Nothing to apply.') | |
| return [], {}, {}, {} | |
| # ββ Step 4: Download TSs ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _section('Step 4 β Downloading TSs') | |
| ts_paths = {} # (spec_number, version) -> Path | |
| spec_dirs = {} # (spec_number, version) -> Path (per-spec subfolder) | |
| for (spec_number, version) in ts_groups: | |
| spec_compact = spec_number.replace(' ', '') | |
| spec_dir = ts_dir / spec_compact | |
| spec_dir.mkdir(parents=True, exist_ok=True) | |
| spec_dirs[(spec_number, version)] = spec_dir | |
| print(f' [TS {spec_number} v{version}] ', end='', flush=True) | |
| filename, note = None, "not attempted" | |
| for attempt in range(1, 4): | |
| filename, note = download_ts(spec_number, version, spec_dir, eol_user, eol_password) | |
| if filename: | |
| break | |
| if attempt < 3: | |
| print(f'\n [attempt {attempt}/3 failed β retrying in 5s: {note}]', flush=True) | |
| print(f' [TS {spec_number} v{version}] ', end='', flush=True) | |
| time.sleep(5) | |
| else: | |
| print(f'\n [all 3 attempts failed]', flush=True) | |
| if filename: | |
| ts_paths[(spec_number, version)] = spec_dir / filename | |
| print(f'OK ({note}) β {spec_compact}/{filename}') | |
| else: | |
| print(f'FAILED β {note}') | |
| # Write failed_ts.json | |
| failed_ts_entries = [ | |
| { | |
| "spec_number": spec_number, | |
| "version": version, | |
| "spec_compact": spec_number.replace(' ', ''), | |
| "spec_dir": str(spec_dirs[(spec_number, version)]), | |
| "expected_filename": f"ts_{spec_number.replace(' ', '')}_v{version}.docx", | |
| "cr_uids": ts_groups[(spec_number, version)], | |
| "cr_dir": str(cr_dir), | |
| } | |
| for (spec_number, version) in ts_groups | |
| if (spec_number, version) not in ts_paths | |
| ] | |
| (output_dir / "failed_ts.json").write_text( | |
| json.dumps(failed_ts_entries, indent=2) | |
| ) | |
| # Write failed_cr.json | |
| failed_cr_entries = [] | |
| for uid, _ in cr_list: | |
| if uid not in cr_paths: | |
| sn, v = next( | |
| ((sn, v) for (sn, v), uids in ts_groups.items() if uid in uids), | |
| (None, None), | |
| ) | |
| sc = sn.replace(' ', '') if sn else None | |
| sd = spec_dirs.get((sn, v)) if sn else None | |
| failed_cr_entries.append({ | |
| "uid": uid, | |
| "cr_dir": str(cr_dir), | |
| "expected_filename": f"{uid}.docx", | |
| "ts_spec_number": sn, | |
| "ts_version": v, | |
| "ts_spec_dir": str(sd) if sd else None, | |
| "ts_expected_filename": f"ts_{sc}_v{v}.docx" if sc and v else None, | |
| }) | |
| (output_dir / "failed_cr.json").write_text(json.dumps(failed_cr_entries, indent=2)) | |
| if failed_cr_entries: | |
| print(f' {len(failed_cr_entries)} failed CR download(s) written to failed_cr.json') | |
| # ββ Steps 5 & 6: Apply CRs + Finalise each TS ββββββββββββββββββββββββββββ | |
| _section('Steps 5 & 6 β Applying CRs and Finalising Metadata') | |
| report = [] # (ts_key, n_ok, n_skip, n_crs, out_path, log_path, errors) | |
| for (spec_number, version), uids in ts_groups.items(): | |
| spec_compact = spec_number.replace(' ', '') | |
| spec_dir = spec_dirs.get((spec_number, version), ts_dir / spec_compact) | |
| spec_dir.mkdir(parents=True, exist_ok=True) | |
| report.append(_apply_ts_group( | |
| spec_number, version, uids, ts_paths, cr_paths, spec_dir, author, tc_date | |
| )) | |
| return report, cr_paths, ts_paths, spec_dirs | |
| # ββ Main ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def main(): | |
| ap = argparse.ArgumentParser( | |
| description='Fully automated CR application pipeline.', | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| ) | |
| ap.add_argument( | |
| 'excel_path', | |
| nargs='?', | |
| default=None, | |
| help='Path to .xls or .xlsx contribution list (not required in --retry-mode or --ts-mode)', | |
| ) | |
| ap.add_argument( | |
| 'person_name', | |
| nargs='?', | |
| default='Ly Thanh PHAN', | |
| help='Name to match in SubmittedBy column (default: "Ly Thanh PHAN")', | |
| ) | |
| ap.add_argument( | |
| '--output-dir', | |
| default=str(Path.home() / 'CR_Processing'), | |
| help='Base output directory (default: ~/CR_Processing)', | |
| ) | |
| ap.add_argument( | |
| '--author', | |
| default=DEFAULT_AUTHOR, | |
| help=f'Tracked change author name (default: "{DEFAULT_AUTHOR}")', | |
| ) | |
| ap.add_argument( | |
| '--retry-mode', | |
| action='store_true', | |
| help='Skip steps 1-4; apply CRs to TSs listed in failed_ts.json that now have their DOCX on disk', | |
| ) | |
| ap.add_argument( | |
| '--ts-mode', | |
| action='store_true', | |
| help='Apply all CRs for a given spec number across all versions (uses HF index)', | |
| ) | |
| ap.add_argument( | |
| '--ts-id', | |
| default='', | |
| help='Spec number to process in ts-mode, e.g. "102 267"', | |
| ) | |
| ap.add_argument( | |
| '--excel-hash', | |
| default='', | |
| help='Excel hash used to filter the HF index in ts-mode', | |
| ) | |
| ap.add_argument( | |
| '--hf-repo', | |
| default='OrganizedProgrammers/CR_Index', | |
| help='HuggingFace dataset repo containing the CR index', | |
| ) | |
| ap.add_argument( | |
| '--cr-cache-dir', | |
| default='', | |
| help='Shared directory for caching downloaded CR DOCXs across runs ' | |
| '(default: <output-dir>/CRs)', | |
| ) | |
| args = ap.parse_args() | |
| if args.ts_mode and not args.ts_id: | |
| ap.error('--ts-id is required when using --ts-mode') | |
| if not args.ts_mode and not args.retry_mode and not args.excel_path: | |
| ap.error('excel_path is required when not in --retry-mode or --ts-mode') | |
| eol_user = os.environ.get("EOL_USER", "") | |
| eol_password = os.environ.get("EOL_PASSWORD", "") | |
| if not eol_user or not eol_password: | |
| sys.exit("ERROR: EOL_USER and EOL_PASSWORD must be set") | |
| output_dir = Path(wsl_path(args.output_dir)).expanduser() | |
| cr_cache = args.cr_cache_dir.strip() | |
| cr_dir = Path(wsl_path(cr_cache)).expanduser() if cr_cache else output_dir / 'CRs' | |
| ts_dir = output_dir / 'TS' | |
| cr_dir.mkdir(parents=True, exist_ok=True) | |
| ts_dir.mkdir(parents=True, exist_ok=True) | |
| author = args.author | |
| tc_date = DEFAULT_DATE | |
| # ββ Retry mode β skip steps 1-4, reconstruct state from failed_ts.json βββ | |
| if args.retry_mode: | |
| failed_ts_path = output_dir / 'failed_ts.json' | |
| failed_cr_path = output_dir / 'failed_cr.json' | |
| failed_ts_entries = json.loads(failed_ts_path.read_text()) if failed_ts_path.exists() else [] | |
| failed_cr_entries = json.loads(failed_cr_path.read_text()) if failed_cr_path.exists() else [] | |
| if not failed_ts_entries and not failed_cr_entries: | |
| print('No failed TSs or CRs β nothing to retry.') | |
| return | |
| _section('Retry mode β Steps 5 & 6 only') | |
| print(f'Retrying {len(failed_ts_entries)} TS(s) from failed_ts.json') | |
| ts_groups = {} | |
| spec_dirs = {} | |
| ts_paths = {} | |
| cr_paths = {} | |
| for entry in failed_ts_entries: | |
| spec_number = entry['spec_number'] | |
| version = entry['version'] | |
| key = (spec_number, version) | |
| ts_groups[key] = entry['cr_uids'] | |
| spec_dir = Path(entry['spec_dir']) | |
| spec_dirs[key] = spec_dir | |
| expected = spec_dir / entry['expected_filename'] | |
| if expected.exists(): | |
| ts_paths[key] = expected | |
| print(f' [TS {spec_number} v{version}] DOCX found β will apply') | |
| else: | |
| print(f' [TS {spec_number} v{version}] DOCX missing β skipping') | |
| cr_entry_dir = Path(entry['cr_dir']) | |
| for uid in entry['cr_uids']: | |
| extracted = cr_entry_dir / f'{uid}_extracted.docx' | |
| plain = cr_entry_dir / f'{uid}.docx' | |
| if extracted.exists(): | |
| cr_paths[uid] = extracted | |
| elif plain.exists(): | |
| cr_paths[uid] = plain | |
| # ββ Recover CRs from failed_cr.json ββββββββββββββββββββββββββββββββββ | |
| still_failed_cr = [] | |
| for entry in failed_cr_entries: | |
| uid = entry['uid'] | |
| cr_entry_dir = Path(entry['cr_dir']) | |
| extracted = cr_entry_dir / f'{uid}_extracted.docx' | |
| plain = cr_entry_dir / entry['expected_filename'] | |
| if extracted.exists(): | |
| cr_paths[uid] = extracted | |
| elif plain.exists(): | |
| cr_paths[uid] = plain | |
| else: | |
| still_failed_cr.append(entry) | |
| continue | |
| sn, v = entry['ts_spec_number'], entry['ts_version'] | |
| if sn and v: | |
| key = (sn, v) | |
| ts_groups.setdefault(key, []) | |
| if uid not in ts_groups[key]: | |
| ts_groups[key].append(uid) | |
| if key not in spec_dirs and entry['ts_spec_dir']: | |
| spec_dirs[key] = Path(entry['ts_spec_dir']) | |
| if key not in ts_paths and entry['ts_spec_dir'] and entry['ts_expected_filename']: | |
| ts_file = Path(entry['ts_spec_dir']) / entry['ts_expected_filename'] | |
| if ts_file.exists(): | |
| ts_paths[key] = ts_file | |
| print(f' [{uid}] CR recovered β will apply to TS {sn} v{v}') | |
| else: | |
| print(f' [{uid}] CR recovered but TS group unknown β skipping apply') | |
| failed_cr_path.write_text(json.dumps(still_failed_cr, indent=2)) | |
| # ββ Steps 5 & 6 (retry mode) βββββββββββββββββββββββββββββββββββββββββ | |
| report = [] | |
| for (spec_number, version), uids in ts_groups.items(): | |
| spec_compact = spec_number.replace(' ', '') | |
| spec_dir = spec_dirs.get((spec_number, version), ts_dir / spec_compact) | |
| spec_dir.mkdir(parents=True, exist_ok=True) | |
| report.append(_apply_ts_group( | |
| spec_number, version, uids, ts_paths, cr_paths, spec_dir, author, tc_date, | |
| log_label='Pipeline Log (retry)' | |
| )) | |
| # Update failed_ts.json β remove entries that are now resolved | |
| still_failed = [ | |
| e for e in failed_ts_entries | |
| if not (Path(e['spec_dir']) / e['expected_filename']).exists() | |
| ] | |
| failed_ts_path.write_text(json.dumps(still_failed, indent=2)) | |
| _section('Retry Summary') | |
| n_success = sum(1 for r in report if r[4] is not None and not r[6]) | |
| n_partial = sum(1 for r in report if r[4] is not None and r[6]) | |
| n_failed = sum(1 for r in report if r[4] is None) | |
| n_cr_still = len(still_failed_cr) | |
| print(f'TSs processed: {n_success} fully OK, {n_partial} with warnings, {n_failed} skipped/failed') | |
| if n_cr_still: | |
| print(f'CRs still missing: {n_cr_still} (see failed_cr.json)') | |
| _print_report(report, detailed=False) | |
| return | |
| # ββ TS mode β load HF index, skip Steps 1 & 3 ββββββββββββββββββββββββββββ | |
| if args.ts_mode: | |
| hf_token = os.environ.get("HF_TOKEN", "") | |
| if not hf_token: | |
| sys.exit("ERROR: HF_TOKEN must be set in ts-mode") | |
| from hf_cr_index import load_hf_index | |
| _section(f'TS mode β spec {args.ts_id!r}') | |
| print(f'Loading HF index from {args.hf_repo}...') | |
| try: | |
| all_records = load_hf_index(hf_token, args.hf_repo) | |
| except Exception as e: | |
| sys.exit(f'ERROR loading HF index: {e}') | |
| records = [ | |
| r for r in all_records | |
| if r.get("excel_hash") == args.excel_hash | |
| and r.get("spec_number") == args.ts_id | |
| ] | |
| if not records: | |
| sys.exit( | |
| f'ERROR: no indexed CRs found for spec {args.ts_id!r} ' | |
| f'with excel_hash={args.excel_hash!r}' | |
| ) | |
| # Build ts_groups from index (bypasses Step 3) | |
| ts_groups = {} | |
| for r in records: | |
| key = (r["spec_number"], r["version"]) | |
| ts_groups.setdefault(key, []).append(r["uid"]) | |
| # Build cr_list for Step 2 download | |
| cr_list = [(r["uid"], r["title"]) for r in records] | |
| print(f'Found {len(records)} CR(s) across {len(ts_groups)} version(s):') | |
| for (spec, ver), uids in ts_groups.items(): | |
| print(f' TS {spec} v{ver}: {", ".join(uids)}') | |
| report, cr_paths, ts_paths, spec_dirs = _run_steps_2_to_6( | |
| cr_list, ts_groups, output_dir, cr_dir, ts_dir, | |
| eol_user, eol_password, author, tc_date, | |
| ) | |
| # Copy the CRs actually applied into the run output dir so the ZIP | |
| # contains exactly the CRs used for this TS (only needed when using | |
| # a shared CR cache that lives outside output_dir). | |
| _copy_cr_cache_if_needed(cr_paths, cr_dir, output_dir) | |
| _section('Final Report (TS mode)') | |
| n_success = sum(1 for r in report if r[4] is not None and not r[6]) | |
| n_partial = sum(1 for r in report if r[4] is not None and r[6]) | |
| n_failed = sum(1 for r in report if r[4] is None) | |
| print(f'Spec: {args.ts_id}') | |
| print(f'CRs found: {len(cr_list)}') | |
| print(f'TSs updated: {n_success} fully OK, {n_partial} with warnings, {n_failed} failed') | |
| print() | |
| _print_report(report) | |
| print() | |
| print(f'Output directory: {output_dir}/') | |
| return | |
| # ββ Normal mode βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| excel_path = wsl_path(args.excel_path) | |
| # ββ Step 1: Parse Excel βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _section('Step 1 β Parsing Excel') | |
| print(f'Excel: {excel_path}') | |
| print(f'Person: {args.person_name!r}') | |
| try: | |
| cr_list = parse_excel(excel_path, args.person_name) | |
| except Exception as e: | |
| sys.exit(f'ERROR parsing Excel: {e}') | |
| print(f'Found {len(cr_list)} Accepted CR(s)') | |
| if not cr_list: | |
| print('Nothing to process.') | |
| return | |
| # Steps 2, 3 (cover page parse), 4, 5, 6 | |
| report, cr_paths, ts_paths, spec_dirs = _run_steps_2_to_6( | |
| cr_list, None, output_dir, cr_dir, ts_dir, | |
| eol_user, eol_password, author, tc_date, | |
| ) | |
| # Copy the CRs actually applied into the run output dir so the ZIP | |
| # contains exactly the CRs used for this run (only needed when using | |
| # a shared CR cache that lives outside output_dir). | |
| _copy_cr_cache_if_needed(cr_paths, cr_dir, output_dir) | |
| # ββ Final Report ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _section('Final Report') | |
| n_success = sum(1 for r in report if r[4] is not None and not r[6]) | |
| n_partial = sum(1 for r in report if r[4] is not None and r[6]) | |
| n_failed = sum(1 for r in report if r[4] is None) | |
| print(f'Person: {args.person_name}') | |
| print(f'Excel: {excel_path}') | |
| print(f'CRs found: {len(cr_list)}') | |
| print(f'TSs updated: {n_success} fully OK, {n_partial} with warnings, {n_failed} failed') | |
| print() | |
| _print_report(report) | |
| print() | |
| print(f'Output directory: {output_dir}/') | |
| if __name__ == '__main__': | |
| main() | |