Spaces:
Running
Running
| import pandas as pd | |
| from sentence_transformers import SentenceTransformer | |
| import os | |
| import sys | |
| import json | |
| import openpyxl | |
| from openpyxl.styles import PatternFill, Font | |
| from openpyxl.utils import get_column_letter | |
| from openpyxl.worksheet.datavalidation import DataValidation | |
| from openpyxl.workbook.defined_name import DefinedName | |
| from src.config import parse_cli_args, GROQ_API_KEY, AVAILABLE_MODELS, DEFAULT_SIMILARITY_THRESHOLD | |
| from src.llm_router import GroqRouter | |
| from src.data_pipeline import process_column, cluster_degrees_by_institution | |
| from src.utils import prune_manual_refs_against_official | |
| # Each cleaned column has its own conservative split pattern. Avoid splitting | |
| # on words like "and" because they can be part of official country names. | |
| COLUMNS_CONFIG = { | |
| "Country": r',|;|\n|/', | |
| "Institution": r'[,/;|\n]', | |
| "Continent": r',|;|\n|/', | |
| "City": r',|;|\n|/', | |
| "Level": r'\n|;', | |
| "Language": r',|;|\n|/', | |
| "Tags": r',|;|\n|/', | |
| "Degree": r'\n|;' | |
| } | |
| master_cache = {} | |
| def load_json_safe(filepath): | |
| """Load reference JSON files, accepting UTF-8 files with or without a BOM.""" | |
| with open(filepath, 'r', encoding='utf-8-sig') as f: | |
| return json.load(f) | |
| def validate_official_refs(official_refs): | |
| """Fail early if required reference buckets are missing or empty.""" | |
| missing = [] | |
| for column_name in COLUMNS_CONFIG: | |
| if column_name == "Degree": | |
| continue | |
| ref_data = official_refs.get(column_name) | |
| if not isinstance(ref_data, (list, dict)) or len(ref_data) == 0: | |
| missing.append(column_name) | |
| if missing: | |
| raise ValueError( | |
| "Official references are missing or empty for: " | |
| + ", ".join(missing) | |
| + ". Refusing to run because this would send too many values to Groq." | |
| ) | |
| def inject_searchable_dropdowns(blueprint_path, master_unique_lists): | |
| """Add hidden reference lists and dropdowns to the generated Blueprint.""" | |
| print("Injecting static searchable dropdowns into Blueprint...") | |
| wb = openpyxl.load_workbook(blueprint_path) | |
| main_sheet = wb.active | |
| # Store all dropdown values on a hidden sheet so Excel can reference them. | |
| ref_sheet = wb.create_sheet(title="Reference_Lists") | |
| col_idx = 1 | |
| for column_name, unique_items in master_unique_lists.items(): | |
| safe_name = column_name.replace(" ", "_") | |
| ref_sheet.cell(row=1, column=col_idx, value=safe_name) | |
| # Clean and alphabetize the list for a better review experience. | |
| valid_items = sorted([item for item in unique_items if item and isinstance(item, str)]) | |
| # Write the items | |
| for row_idx, item in enumerate(valid_items, start=2): | |
| ref_sheet.cell(row=row_idx, column=col_idx, value=item) | |
| # Named ranges let data validation reference long lists safely. | |
| if valid_items: | |
| letter = get_column_letter(col_idx) | |
| range_str = f"Reference_Lists!${letter}$2:${letter}${len(valid_items) + 1}" | |
| named_range = DefinedName(name=safe_name, attr_text=range_str) | |
| wb.defined_names.add(named_range) | |
| col_idx += 1 | |
| # The override dropdown changes based on the row's target column. | |
| target_col_idx = None | |
| override_col_letter = None | |
| for cell in main_sheet[1]: | |
| if cell.value == "Column": | |
| target_col_idx = get_column_letter(cell.column) | |
| elif cell.value == "Human_Override": | |
| override_col_letter = get_column_letter(cell.column) | |
| if target_col_idx and override_col_letter: | |
| dv = DataValidation( | |
| type="list", | |
| formula1=f'=INDIRECT(SUBSTITUTE(${target_col_idx}2, " ", "_"))', | |
| allowBlank=True, | |
| showErrorMessage=False # CRITICAL: This allows the user to manually type an override! | |
| ) | |
| dv.add(f"{override_col_letter}2:{override_col_letter}{main_sheet.max_row}") | |
| main_sheet.add_data_validation(dv) | |
| ref_sheet.sheet_state = 'hidden' | |
| wb.save(blueprint_path) | |
| print("Dropdowns successfully injected!") | |
| if __name__ == "__main__": | |
| # Parse CLI/UI arguments before loading any expensive model assets. | |
| args = parse_cli_args() | |
| source_sheet_name = args.sheet | |
| output_sheet_name = args.output_sheet | |
| available_models = [m.strip() for m in args.models.split(",") if m.strip()] if args.models else AVAILABLE_MODELS | |
| print("Loading AI Model (this may take a few seconds)...") | |
| model = SentenceTransformer('all-MiniLM-L6-v2') | |
| # The router owns Groq fallback order and rate-limit switching. | |
| router = GroqRouter(api_key=GROQ_API_KEY, available_models=available_models) | |
| if not os.path.exists(args.refs): | |
| raise FileNotFoundError(f"Official references file not found: {args.refs}") | |
| if not os.path.exists(args.manual_refs): | |
| os.makedirs(os.path.dirname(args.manual_refs), exist_ok=True) | |
| with open(args.manual_refs, 'w', encoding='utf-8') as f: | |
| json.dump({}, f) | |
| official_refs = load_json_safe(args.refs) | |
| manual_refs = load_json_safe(args.manual_refs) | |
| validate_official_refs(official_refs) | |
| # Manual memory should only contain values not already covered by official refs. | |
| memory_pruned = prune_manual_refs_against_official(manual_refs, official_refs) | |
| if memory_pruned: | |
| print(f"[INFO] Removed {memory_pruned} manual reference duplicate(s) already covered by official refs.") | |
| print(f"Loading Excel dataset from {args.input}, sheet '{source_sheet_name}'...") | |
| data = pd.read_excel(args.input, sheet_name=source_sheet_name, skiprows=[1]) | |
| # Every uncertain or changed value is logged here for human review. | |
| blueprint_records = [] | |
| # Run each configured column through the normalization pipeline. Degree | |
| # values are clustered within each institution instead of matched to refs. | |
| for col, pattern in COLUMNS_CONFIG.items(): | |
| if col == "Degree": | |
| inst_col = 'Cleaned_Institution' if 'Cleaned_Institution' in data.columns else 'Institution' | |
| data = cluster_degrees_by_institution( | |
| df=data, degree_col=col, inst_col=inst_col, model=model, | |
| master_cache=master_cache, blueprint_data=blueprint_records, | |
| threshold=DEFAULT_SIMILARITY_THRESHOLD | |
| ) | |
| else: | |
| data = process_column( | |
| df=data, column_name=col, model=model, groq_router=router, | |
| official_refs=official_refs, manual_refs=manual_refs, master_cache=master_cache, | |
| split_pattern=pattern, blueprint_data=blueprint_records | |
| ) | |
| print("\nSaving all memory files...") | |
| with open(args.manual_refs, 'w', encoding='utf-8') as f: json.dump(manual_refs, f, indent=4, ensure_ascii=False) | |
| # Export the review workbook only when there is something to inspect. | |
| if blueprint_records: | |
| bp_df = pd.DataFrame(blueprint_records) | |
| bp_df.to_excel(args.blueprint, index=False) | |
| # Basic formatting helps reviewers scan confidence levels quickly. | |
| bp_wb = openpyxl.load_workbook(args.blueprint) | |
| bp_sheet = bp_wb.active | |
| header_fill = PatternFill(start_color="1F4E78", end_color="1F4E78", fill_type="solid") | |
| header_font = Font(color="FFFFFF", bold=True) | |
| high_fill = PatternFill(start_color="C6EFCE", end_color="C6EFCE", fill_type="solid") | |
| med_fill = PatternFill(start_color="FFEB9C", end_color="FFEB9C", fill_type="solid") | |
| low_fill = PatternFill(start_color="FFC7CE", end_color="FFC7CE", fill_type="solid") | |
| conf_col_idx = None | |
| for col_idx in range(1, bp_sheet.max_column + 1): | |
| cell = bp_sheet.cell(row=1, column=col_idx) | |
| cell.fill = header_fill | |
| cell.font = header_font | |
| if cell.value == "Confidence": conf_col_idx = col_idx | |
| bp_sheet.column_dimensions[get_column_letter(col_idx)].width = 30 | |
| if conf_col_idx: | |
| for row_idx in range(2, bp_sheet.max_row + 1): | |
| cell = bp_sheet.cell(row=row_idx, column=conf_col_idx) | |
| val = str(cell.value).upper() | |
| if "HIGH" in val: cell.fill = high_fill | |
| elif "MEDIUM" in val: cell.fill = med_fill | |
| elif "LOW" in val: cell.fill = low_fill | |
| bp_wb.save(args.blueprint) | |
| print(f"[!] Saved and formatted {len(bp_df)} rows requiring review to {args.blueprint}") | |
| def extract_uniques(ref_data): | |
| """Extract display values from list-style or dict-style references.""" | |
| if isinstance(ref_data, dict): return list(ref_data.values()) | |
| elif isinstance(ref_data, list): return ref_data | |
| return [] | |
| master_lists = {} | |
| for category in COLUMNS_CONFIG.keys(): | |
| off_items = extract_uniques(official_refs.get(category, [])) | |
| man_items = extract_uniques(manual_refs.get(category, [])) | |
| # Merge official and manual values for the Blueprint dropdowns. | |
| master_lists[category] = list(set([x for x in (off_items + man_items) if x])) | |
| inject_searchable_dropdowns(args.blueprint, master_lists) | |
| else: | |
| print("[!] No blueprint generated. All matches were HIGH confidence!") | |
| # Copy the source sheet to preserve formatting, then overwrite cleaned columns. | |
| print("\nOpening original Excel file to preserve formatting...") | |
| wb = openpyxl.load_workbook(args.input) | |
| new_sheet_name = output_sheet_name | |
| if source_sheet_name == new_sheet_name: | |
| raise ValueError("Output sheet name cannot match the source sheet name.") | |
| source_sheet = wb[source_sheet_name] | |
| if new_sheet_name in wb.sheetnames: del wb[new_sheet_name] | |
| new_sheet = wb.copy_worksheet(source_sheet) | |
| new_sheet.title = new_sheet_name | |
| col_name_to_idx = {new_sheet.cell(row=1, column=c).value: c for c in range(1, new_sheet.max_column + 1) if new_sheet.cell(row=1, column=c).value} | |
| for row_idx, (_, row_data) in enumerate(data.iterrows()): | |
| excel_row = row_idx + 3 | |
| for col_name in COLUMNS_CONFIG.keys(): | |
| cleaned_col_name = f"Cleaned_{col_name}" | |
| if cleaned_col_name in data.columns and col_name in col_name_to_idx: | |
| new_value = row_data[cleaned_col_name] | |
| new_sheet.cell(row=excel_row, column=col_name_to_idx[col_name]).value = None if pd.isna(new_value) else new_value | |
| wb.save(args.input) | |
| print(f"\nSuccess! Initial pass saved. Please review {args.blueprint}.") | |