In [None]:
# Import pandas for DataFrame manipulation
import pandas as pd
# Import numpy for numerical operations
import numpy as np
# Import torch for tensor operations and device handling
import torch
# Import MBART model and tokenizer from Hugging Face Transformers
from transformers import MBartForConditionalGeneration, MBart50TokenizerFast
# Import cosine similarity for comparing embeddings
from sklearn.metrics.pairwise import cosine_similarity
# Import tqdm to show progress bars for loops
from tqdm import tqdm
# Import regex utilities for tokenization and cleaning
import re

In [None]:
# --- Configuration ---
MODEL_NAME = "your/model/name"
SRC_LANG_CODE = "src_lang_code"
TGT_LANG_CODE = "tgt_lang_code"
CORPUS_FILE = "your/corpus/here.csv"
DICT_FILE = "your/bilingual/dictionary/here.csv"

In [None]:
# Hyperparameters for the Knowledge Score (KS_i)
# You would tune these based on empirical performance
ALPHA = 0.1
BETA = 0.3
GAMMA = 0.6
PERCENTILE_THRESHOLD = 70 # Filter threshold: keep pairs above this percentile

In [None]:
def preprocess_text(text):
 """
 Safely preprocesses text by handling NaN, non-string values,
 and performing normalization steps.
 """
 if not isinstance(text, str):
 return ""
 text = text.strip().lower()
 text = re.sub(r"\s+", " ", text) # Collapse multiple spaces
 text = re.sub(r"[^a-zA-Z0-9\s']", "", text) # Remove unwanted symbols (keep alphanumerics and apostrophes)
 return text


def load_data(corpus_file, dict_file):
 """Loads, cleans, and prepares the parallel corpus and bilingual dictionary."""

 # --- Load the CSVs safely ---
 try:
 raw_corpus = pd.read_csv(corpus_file)
 word_dictionary = pd.read_csv(dict_file)
 except Exception as e:
 raise ValueError(f"Error loading files: {e}")

 # --- Ensure expected columns exist ---
 required_corpus_cols = {'English', 'Tagin'}
 required_dict_cols = {'English', 'Tagin'}

 if not required_corpus_cols.issubset(raw_corpus.columns):
 raise ValueError(f"Corpus file must contain columns: {required_corpus_cols}")
 if not required_dict_cols.issubset(word_dictionary.columns):
 raise ValueError(f"Dictionary file must contain columns: {required_dict_cols}")

 # --- Drop rows with all NaN values ---
 raw_corpus = raw_corpus.dropna(how='all')

 # --- Fill NaN cells with empty strings ---
 raw_corpus = raw_corpus.fillna("")

 # --- Apply text preprocessing ---
 raw_corpus["English"] = raw_corpus["English"].apply(preprocess_text)
 raw_corpus["Tagin"] = raw_corpus["Tagin"].apply(preprocess_text)

 # --- Clean dictionary entries ---
 word_dictionary["English"] = word_dictionary["English"].apply(preprocess_text)
 word_dictionary["Tagin"] = word_dictionary["Tagin"].apply(preprocess_text)

 # --- Convert dictionary to mapping ---
 word_dictionary = word_dictionary.set_index('English')['Tagin'].to_dict()

 # --- Remove empty rows after cleaning ---
 raw_corpus = raw_corpus[
 (raw_corpus["English"].str.strip() != "") &
 (raw_corpus["Tagin"].str.strip() != "")
 ].reset_index(drop=True)

 print(f"Loaded {len(raw_corpus)} sentence pairs and {len(word_dictionary)} dictionary entries.")
 return raw_corpus, word_dictionary

In [None]:
load_data(CORPUS_FILE,DICT_FILE)

In [None]:
# Function for Step 2: Perplexity (PPL)
@torch.no_grad()
def calculate_perplexity(sentence, model, tokenizer, device):
 """Computes perplexity of a sentence using the given LM."""
 try:
 # Tokenize and format for mBART-50 (e.g., [lang_code] X [eos])
 # We'll treat this as a generation task from the source language to itself
 # to get log probabilities for the language modeling loss.
 input_ids = tokenizer(
 sentence,
 return_tensors="pt",
 max_length=512,
 truncation=True
 ).input_ids.to(device)
 
 # Set the source language
 tokenizer.src_lang = SRC_LANG_CODE
 
 # The labels for perplexity are the input tokens themselves, shifted.
 # This is essentially a language modeling task.
 labels = input_ids.clone()
 
 # Use -100 to ignore the loss for special tokens (like the language code token)
 labels[:, 0] = -100

 outputs = model(input_ids=input_ids, labels=labels)
 neg_log_likelihood = outputs.loss
 
 # Perplexity is exp(average negative log-likelihood)
 # The 'outputs.loss' from the Transformers library is already the average NLL per token.
 ppl = torch.exp(neg_log_likelihood).item()
 return ppl
 except Exception as e:
 print(f"Error calculating PPL for: '{sentence}'. Error: {e}")
 return float('inf') # Return a very high PPL for errors/bad sentences



In [None]:
def normalize_inverse_ppl(ppl_scores, epsilon=1e-6):
 """
 Safely normalizes inverse perplexity (1/PPL_i) to [0, 1].
 
 Handles edge cases where PPL scores are constant, contain inf/nan, or are invalid.
 """
 ppl_scores = np.array(ppl_scores, dtype=np.float64)

 # Replace infinities or NaNs with large finite numbers for stability
 ppl_scores = np.nan_to_num(ppl_scores, nan=np.inf, posinf=np.inf, neginf=np.inf)

 # Compute inverse PPL (fluency measure)
 inv_ppl = 1.0 / (ppl_scores + epsilon)

 # Remove any remaining NaNs/Infs from inverse scores
 inv_ppl = np.nan_to_num(inv_ppl, nan=0.0, posinf=0.0, neginf=0.0)

 inv_min = np.min(inv_ppl)
 inv_max = np.max(inv_ppl)

 # Handle zero-range case: all scores are the same
 if np.isclose(inv_max, inv_min) or np.isnan(inv_max - inv_min):
 return np.zeros_like(inv_ppl)

 # Normal min–max scaling
 inv_ppl_norm = (inv_ppl - inv_min) / (inv_max - inv_min)
 inv_ppl_norm = np.clip(inv_ppl_norm, 0.0, 1.0)

 return inv_ppl_norm



In [None]:
# Function for Step 3: Semantic Similarity (Sim)

def calculate_semantic_similarity(s_i, t_i, model, tokenizer, device):
 """
 Computes Cosine Similarity between source and target sentence embeddings 
 and normalizes the result to the range [0, 1].
 """
 try:
 def get_embedding(sentence, lang_code):
 tokenizer.src_lang = lang_code
 inputs = tokenizer(
 sentence,
 return_tensors="pt",
 max_length=512,
 truncation=True,
 padding=True
 ).to(device)
 
 with torch.no_grad():
 encoder_output = model.model.encoder(**inputs).last_hidden_state
 
 mean_embedding = encoder_output[:, 1:-1, :].mean(dim=1).squeeze() 
 
 return mean_embedding.cpu().detach().numpy().reshape(1, -1)

 emb_s = get_embedding(s_i, SRC_LANG_CODE) 
 emb_t = get_embedding(t_i, TGT_LANG_CODE)

 sim_raw = cosine_similarity(emb_s, emb_t)[0][0]
 
 sim_normalized = (sim_raw + 1) / 2
 sim_normalized = max(0.0, min(1.0, sim_normalized))
 
 return sim_normalized
 
 except Exception as e:
 # print(f"Error calculating Sim for: '{s_i}' and '{t_i}'. Error: {e}")
 return 0.0

In [None]:
# Function for Step 4: Lexical Match (Lex) # header describing the block
# blank line preserved for readability
# Define a function that computes a lexical match score based on a bilingual dictionary
def calculate_lexical_match(s_i, t_i, word_dictionary):
 # Docstring start: describe purpose and formula for lex score
 """
 Computes a dictionary-based lexical match score prioritizing phrase matches.
 Score = (Count of source words covered by successfully translated phrases) / (Total words in source sentence)
 """ # docstring end
 # Helper: normalize text to ease phrase matching (lowercase, token boundaries)
 def normalize_text(text):
 # Simple tokenization: lowercase, remove non-word characters, and join back for easy phrase matching
 return " " + " ".join(re.findall(r'\b\w+\b', text.lower())) + " " # pad with spaces for boundary-safe matching
 # Normalize the source sentence for phrase lookups
 s_normalized = normalize_text(s_i)
 # Token set of the target sentence for quick membership tests
 t_tokens = set(re.findall(r'\b\w+\b', t_i.lower()))
 # blank line preserved for readability
 # Sort dictionary keys by length (descending) to prioritize phrase matches over single words
 tagin_phrases = sorted(word_dictionary.keys(), key=len, reverse=True)
 # blank line preserved for readability
 # Extract source word tokens and compute total count
 source_words = re.findall(r'\b\w+\b', s_i.lower())
 total_source_words = len(source_words)
 # Initialize covered word counter
 covered_word_count = 0
 # If the source sentence is empty, return 0.0 immediately
 if total_source_words == 0:
 return 0.0
 # Track indices of covered words if needed (not used further but kept for clarity)
 covered_indices = set()
 # Iterate over dictionary phrases (longest-first) to find matches in the source
 for phrase in tagin_phrases:
 # Skip empty dictionary entries
 if not phrase:
 continue
 # Normalize the phrase for safe matching
 norm_phrase = normalize_text(phrase)
 # If the normalized phrase exists in the normalized source text, proceed
 if norm_phrase in s_normalized:
 # Get expected translation from the dictionary (lowercased)
 expected_translation = word_dictionary[phrase].lower()
 # Tokenize the expected translation into words
 translation_words = re.findall(r'\b\w+\b', expected_translation)
 # Check whether all translated words appear in the target sentence tokens
 is_translation_present = all(word in t_tokens for word in translation_words)
 # If the translation words are present in the target, count the phrase as covered
 if is_translation_present:
 # Search for possibly multiple occurrences of the phrase in the source
 start = 0
 while True:
 # Find next occurrence starting from 'start' index
 start_index = s_normalized.find(norm_phrase, start)
 # If no more occurrences, break the loop
 if start_index == -1:
 break
 # Count how many words are in the matched phrase
 phrase_word_count = len(re.findall(r'\b\w+\b', phrase))
 # Add the phrase's word count to the covered total
 covered_word_count += phrase_word_count
 # Advance the search start position past the current match
 start = start_index + len(norm_phrase)
 # end while loop for occurrences
 # After checking all phrases, compute lex score as covered words / total source words (capped at 1.0)
 lex_score = min(1.0, covered_word_count / total_source_words)
 # Return lexical match score between 0 and 1
 return lex_score

In [None]:
# Main Algorithm Implementation
def knowledge_based_filtering(raw_corpus, word_dictionary, alpha, beta, gamma, percentile_threshold):
 # 1. Load Model and Tokenizer (Step 1 of the algorithm's loop)
 device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
 print(f"Loading mBART-tgj-base model to {device}...")
 model = MBartForConditionalGeneration.from_pretrained(MODEL_NAME).to(device)
 tokenizer = MBart50TokenizerFast.from_pretrained(MODEL_NAME)
 
 # Ensure source/target language codes are in the tokenizer vocabulary
 if SRC_LANG_CODE not in tokenizer.vocab or TGT_LANG_CODE not in tokenizer.vocab:
 print(f"Warning: Language codes {SRC_LANG_CODE} or {TGT_LANG_CODE} not found in base mBART-tgj-base vocab.")
 print("Using placeholder language codes. Results may not be accurate.")

 results = []

 # 2. Iterate through the corpus (Lines 1-6)
 print("Processing corpus to calculate scores...")
 for index, row in tqdm(raw_corpus.iterrows(), total=len(raw_corpus), desc="Calculating KS"):
 s_i = row['English']
 t_i = row['Tagin']

 # Line 2: Compute PPL_i (Lower is better)
 pp= calculate_perplexity(s_i, model, tokenizer, device)
 PPL_i= normalize_inverse_ppl(pp, epsilon=1e-6)
 # PPL_i = normalize_inverse_ppl(row["Perplexity"])
 
 # Line 3: Compute Sim_i (Higher is better)
 Sim_i = calculate_semantic_similarity(s_i, t_i, model, tokenizer, device)
 
 # Line 4: Check Lex_i (Higher is better)
 Lex_i = calculate_lexical_match(s_i, t_i, word_dictionary)
 
 # Line 5: Derive Knowledge Score (KS_i)
 # Note: We use 1/PPL_i because PPL_i is an inverse quality metric (lower PPL is higher quality)
 # while Sim and Lex are direct quality metrics (higher is better).
 # We add a small epsilon to avoid division by zero, though a PPL of 0 is practically impossible.
 # PPL_i_inv = 1.0 / (PPL_i + 1e-6)
 # -----IMPORTANT------
 
 KS_i = alpha * PPL_i + beta * Sim_i + gamma * Lex_i
 
 results.append({
 'src_lang': s_i,
 'tgt_lang': t_i,
 'PPL_i': PPL_i,
 'Sim_i': Sim_i,
 'Lex_i': Lex_i,
 'PPL_i': PPL_i,
 'KS_i': KS_i
 })

 # Convert results to DataFrame for filtering
 scored_corpus = pd.DataFrame(results)

 # 3. Determine Threshold and Filter (Lines 7-9)
 # Line 7: Find the 80th percentile of Knowledge Scores
 tau_K = np.percentile(scored_corpus['KS_i'], percentile_threshold)
 print(f"\n50th Percentile Knowledge Score (τ_K): {tau_K:.4f}")
 
 # Line 8: Filter the corpus
 D_filtered = scored_corpus[scored_corpus['KS_i'] >= tau_K].copy()
 
 # Final cleanup of columns and return
 D_filtered = D_filtered[['src_lang', 'tgt_lang', 'KS_i']]
 print(f"Raw corpus size: {len(raw_corpus)}")
 print(f"Filtered corpus size (KS_i >= τ_K): {len(D_filtered)}")
 
 return D_filtered

In [None]:
# --- Execution --- # script entry and high-level execution steps
# blank line preserved for readability
# Guard to ensure code only runs when executed as a script, not on import
if __name__ == '__main__':
 # 1. Load data # load and preprocess corpus and dictionary files
 raw_corpus, word_dictionary = load_data(CORPUS_FILE, DICT_FILE)
 # blank line preserved for readability
 # 2. Run the filtering algorithm # compute KS_i and filter by percentile
 filtered_corpus = knowledge_based_filtering(
 raw_corpus, # pass the preprocessed corpus DataFrame
 word_dictionary, # pass the dictionary mapping
 ALPHA, BETA, GAMMA, # weighting hyperparameters for KS_i
 PERCENTILE_THRESHOLD # percentile cutoff for filtering
 )
 # blank line preserved for readability
 # Save the filtered corpus to CSV for downstream use
 filtered_corpus.to_csv("tgj_corpus_filtered_70th.csv", index=False)
 # blank line preserved for readability
 # Notify user of completion and where results were saved
 print("\nFiltering complete. Results saved to tgj_corpus_filtered_70th.csv")
 # Show a short preview of the filtered corpus
 print("\nFiltered Corpus Head:")
 print(filtered_corpus) # print DataFrame to stdout