import pandas as pd from sentence_transformers import SentenceTransformer import os import sys import json import openpyxl from openpyxl.styles import PatternFill, Font from openpyxl.utils import get_column_letter from openpyxl.worksheet.datavalidation import DataValidation from openpyxl.workbook.defined_name import DefinedName from src.config import parse_cli_args, GROQ_API_KEY, AVAILABLE_MODELS, DEFAULT_SIMILARITY_THRESHOLD from src.llm_router import GroqRouter from src.data_pipeline import process_column, cluster_degrees_by_institution from src.utils import prune_manual_refs_against_official # Each cleaned column has its own conservative split pattern. Avoid splitting # on words like "and" because they can be part of official country names. COLUMNS_CONFIG = { "Country": r',|;|\n|/', "Institution": r'[,/;|\n]', "Continent": r',|;|\n|/', "City": r',|;|\n|/', "Level": r'\n|;', "Language": r',|;|\n|/', "Tags": r',|;|\n|/', "Degree": r'\n|;' } master_cache = {} def load_json_safe(filepath): """Load reference JSON files, accepting UTF-8 files with or without a BOM.""" with open(filepath, 'r', encoding='utf-8-sig') as f: return json.load(f) def validate_official_refs(official_refs): """Fail early if required reference buckets are missing or empty.""" missing = [] for column_name in COLUMNS_CONFIG: if column_name == "Degree": continue ref_data = official_refs.get(column_name) if not isinstance(ref_data, (list, dict)) or len(ref_data) == 0: missing.append(column_name) if missing: raise ValueError( "Official references are missing or empty for: " + ", ".join(missing) + ". Refusing to run because this would send too many values to Groq." ) def inject_searchable_dropdowns(blueprint_path, master_unique_lists): """Add hidden reference lists and dropdowns to the generated Blueprint.""" print("Injecting static searchable dropdowns into Blueprint...") wb = openpyxl.load_workbook(blueprint_path) main_sheet = wb.active # Store all dropdown values on a hidden sheet so Excel can reference them. ref_sheet = wb.create_sheet(title="Reference_Lists") col_idx = 1 for column_name, unique_items in master_unique_lists.items(): safe_name = column_name.replace(" ", "_") ref_sheet.cell(row=1, column=col_idx, value=safe_name) # Clean and alphabetize the list for a better review experience. valid_items = sorted([item for item in unique_items if item and isinstance(item, str)]) # Write the items for row_idx, item in enumerate(valid_items, start=2): ref_sheet.cell(row=row_idx, column=col_idx, value=item) # Named ranges let data validation reference long lists safely. if valid_items: letter = get_column_letter(col_idx) range_str = f"Reference_Lists!${letter}$2:${letter}${len(valid_items) + 1}" named_range = DefinedName(name=safe_name, attr_text=range_str) wb.defined_names.add(named_range) col_idx += 1 # The override dropdown changes based on the row's target column. target_col_idx = None override_col_letter = None for cell in main_sheet[1]: if cell.value == "Column": target_col_idx = get_column_letter(cell.column) elif cell.value == "Human_Override": override_col_letter = get_column_letter(cell.column) if target_col_idx and override_col_letter: dv = DataValidation( type="list", formula1=f'=INDIRECT(SUBSTITUTE(${target_col_idx}2, " ", "_"))', allowBlank=True, showErrorMessage=False # CRITICAL: This allows the user to manually type an override! ) dv.add(f"{override_col_letter}2:{override_col_letter}{main_sheet.max_row}") main_sheet.add_data_validation(dv) ref_sheet.sheet_state = 'hidden' wb.save(blueprint_path) print("Dropdowns successfully injected!") if __name__ == "__main__": # Parse CLI/UI arguments before loading any expensive model assets. args = parse_cli_args() source_sheet_name = args.sheet output_sheet_name = args.output_sheet available_models = [m.strip() for m in args.models.split(",") if m.strip()] if args.models else AVAILABLE_MODELS print("Loading AI Model (this may take a few seconds)...") model = SentenceTransformer('all-MiniLM-L6-v2') # The router owns Groq fallback order and rate-limit switching. router = GroqRouter(api_key=GROQ_API_KEY, available_models=available_models) if not os.path.exists(args.refs): raise FileNotFoundError(f"Official references file not found: {args.refs}") if not os.path.exists(args.manual_refs): os.makedirs(os.path.dirname(args.manual_refs), exist_ok=True) with open(args.manual_refs, 'w', encoding='utf-8') as f: json.dump({}, f) official_refs = load_json_safe(args.refs) manual_refs = load_json_safe(args.manual_refs) validate_official_refs(official_refs) # Manual memory should only contain values not already covered by official refs. memory_pruned = prune_manual_refs_against_official(manual_refs, official_refs) if memory_pruned: print(f"[INFO] Removed {memory_pruned} manual reference duplicate(s) already covered by official refs.") print(f"Loading Excel dataset from {args.input}, sheet '{source_sheet_name}'...") data = pd.read_excel(args.input, sheet_name=source_sheet_name, skiprows=[1]) # Every uncertain or changed value is logged here for human review. blueprint_records = [] # Run each configured column through the normalization pipeline. Degree # values are clustered within each institution instead of matched to refs. for col, pattern in COLUMNS_CONFIG.items(): if col == "Degree": inst_col = 'Cleaned_Institution' if 'Cleaned_Institution' in data.columns else 'Institution' data = cluster_degrees_by_institution( df=data, degree_col=col, inst_col=inst_col, model=model, master_cache=master_cache, blueprint_data=blueprint_records, threshold=DEFAULT_SIMILARITY_THRESHOLD ) else: data = process_column( df=data, column_name=col, model=model, groq_router=router, official_refs=official_refs, manual_refs=manual_refs, master_cache=master_cache, split_pattern=pattern, blueprint_data=blueprint_records ) print("\nSaving all memory files...") with open(args.manual_refs, 'w', encoding='utf-8') as f: json.dump(manual_refs, f, indent=4, ensure_ascii=False) # Export the review workbook only when there is something to inspect. if blueprint_records: bp_df = pd.DataFrame(blueprint_records) bp_df.to_excel(args.blueprint, index=False) # Basic formatting helps reviewers scan confidence levels quickly. bp_wb = openpyxl.load_workbook(args.blueprint) bp_sheet = bp_wb.active header_fill = PatternFill(start_color="1F4E78", end_color="1F4E78", fill_type="solid") header_font = Font(color="FFFFFF", bold=True) high_fill = PatternFill(start_color="C6EFCE", end_color="C6EFCE", fill_type="solid") med_fill = PatternFill(start_color="FFEB9C", end_color="FFEB9C", fill_type="solid") low_fill = PatternFill(start_color="FFC7CE", end_color="FFC7CE", fill_type="solid") conf_col_idx = None for col_idx in range(1, bp_sheet.max_column + 1): cell = bp_sheet.cell(row=1, column=col_idx) cell.fill = header_fill cell.font = header_font if cell.value == "Confidence": conf_col_idx = col_idx bp_sheet.column_dimensions[get_column_letter(col_idx)].width = 30 if conf_col_idx: for row_idx in range(2, bp_sheet.max_row + 1): cell = bp_sheet.cell(row=row_idx, column=conf_col_idx) val = str(cell.value).upper() if "HIGH" in val: cell.fill = high_fill elif "MEDIUM" in val: cell.fill = med_fill elif "LOW" in val: cell.fill = low_fill bp_wb.save(args.blueprint) print(f"[!] Saved and formatted {len(bp_df)} rows requiring review to {args.blueprint}") def extract_uniques(ref_data): """Extract display values from list-style or dict-style references.""" if isinstance(ref_data, dict): return list(ref_data.values()) elif isinstance(ref_data, list): return ref_data return [] master_lists = {} for category in COLUMNS_CONFIG.keys(): off_items = extract_uniques(official_refs.get(category, [])) man_items = extract_uniques(manual_refs.get(category, [])) # Merge official and manual values for the Blueprint dropdowns. master_lists[category] = list(set([x for x in (off_items + man_items) if x])) inject_searchable_dropdowns(args.blueprint, master_lists) else: print("[!] No blueprint generated. All matches were HIGH confidence!") # Copy the source sheet to preserve formatting, then overwrite cleaned columns. print("\nOpening original Excel file to preserve formatting...") wb = openpyxl.load_workbook(args.input) new_sheet_name = output_sheet_name if source_sheet_name == new_sheet_name: raise ValueError("Output sheet name cannot match the source sheet name.") source_sheet = wb[source_sheet_name] if new_sheet_name in wb.sheetnames: del wb[new_sheet_name] new_sheet = wb.copy_worksheet(source_sheet) new_sheet.title = new_sheet_name col_name_to_idx = {new_sheet.cell(row=1, column=c).value: c for c in range(1, new_sheet.max_column + 1) if new_sheet.cell(row=1, column=c).value} for row_idx, (_, row_data) in enumerate(data.iterrows()): excel_row = row_idx + 3 for col_name in COLUMNS_CONFIG.keys(): cleaned_col_name = f"Cleaned_{col_name}" if cleaned_col_name in data.columns and col_name in col_name_to_idx: new_value = row_data[cleaned_col_name] new_sheet.cell(row=excel_row, column=col_name_to_idx[col_name]).value = None if pd.isna(new_value) else new_value wb.save(args.input) print(f"\nSuccess! Initial pass saved. Please review {args.blueprint}.")