| from __future__ import annotations |
|
|
| from dataclasses import dataclass |
| from pathlib import Path |
| from typing import Any |
|
|
| import numpy as np |
| import pandas as pd |
| from textstat import textstat |
| from xgboost import XGBClassifier |
|
|
| from helpers import load_json, load_pickle |
|
|
| from masking_regex import mask_split as regex_mask_split |
| from masking_spacy import Config as SpacyMaskingConfig, _apply_ner_mask, _build_linguistic_record, load_nlp_model |
| from normalization import normalize_text, Config as NormalizationConfig |
| from features_statistical import extract_split_statistics, Config as StatisticalConfig |
| from features_tfidf import record_to_tfidf_text, Config as TFIDFConfig |
| from features_ngram import Config as NGramConfig, build_space_free_char_ngrams, record_to_pos_sequence |
| from model_training import Config as TrainingConfig |
|
|
|
|
| |
| def _coerce_tfidf_config(payload: dict[str, Any]) -> TFIDFConfig: |
| payload = dict(payload) |
| if isinstance(payload.get("ngram_range"), list): |
| payload["ngram_range"] = tuple(payload["ngram_range"]) |
| return TFIDFConfig(**payload) |
| def _coerce_ngram_config(payload: dict[str, Any]) -> NGramConfig: |
| payload = dict(payload) |
| if isinstance(payload.get("pos_ngram_range"), list): |
| payload["pos_ngram_range"] = tuple(payload["pos_ngram_range"]) |
| return NGramConfig(**payload) |
| def _coerce_statistical_config(payload: dict[str, Any]) -> StatisticalConfig: |
| payload = dict(payload) |
| if isinstance(payload.get("phrase_role_dependency_labels"), list): |
| payload["phrase_role_dependency_labels"] = tuple(payload["phrase_role_dependency_labels"]) |
| return StatisticalConfig(**payload) |
| def _coerce_training_config(payload: dict[str, Any]) -> TrainingConfig: |
| payload = dict(payload) |
| return TrainingConfig(**payload) |
|
|
| @dataclass(slots=True) |
| class PredictionResult: |
| probability_same: float |
| predicted_label: int |
| threshold: float |
| normalized_text1: str |
| normalized_text2: str |
| masked_text1: str |
| masked_text2: str |
| def to_dict(self) -> dict[str, Any]: |
| label = "Same author" if self.predicted_label == 1 else "Different author" |
| return { |
| "label": label, |
| "probability": self.probability_same, |
| "threshold": self.threshold, |
| "normalized_text1": self.normalized_text1, |
| "normalized_text2": self.normalized_text2, |
| "masked_text1": self.masked_text1, |
| "masked_text2": self.masked_text2, |
| } |
|
|
|
|
|
|
| |
| class Inference: |
| def __init__(self, project_root: str | Path | None = None) -> None: |
|
|
| self.project_root = Path(project_root) if project_root is not None else Path(__file__).resolve().parents[1] |
| self.saved_dir = self.project_root / "saved" |
| self.model_dir = self.saved_dir / "model" |
|
|
| |
| |
| |
|
|
| self.normalization_config = NormalizationConfig(**load_json(self.saved_dir / "normalization" / "normalization_config.json")) |
|
|
| spacy_payload = load_json(self.saved_dir / "masking" / "spacy_config.json") |
| spacy_payload["verbose"] = False |
| spacy_payload["nlp_n_process"] = 1 |
| self.spacy_config = SpacyMaskingConfig(**spacy_payload) |
|
|
| statistical_payload = load_json(self.saved_dir / "masking" / "statistical_config.json") |
| statistical_payload["verbose"] = False |
| self.statistical_config = _coerce_statistical_config(statistical_payload) |
|
|
| tfidf_payload = load_json(self.saved_dir / "tfidf_features" / "tfidf_config.json") |
| tfidf_payload["verbose"] = False |
| self.tfidf_config = _coerce_tfidf_config(tfidf_payload) |
|
|
| ngram_payload = load_json(self.saved_dir / "ngram_features" / "ngram_config.json") |
| ngram_payload["verbose"] = False |
| self.ngram_config = _coerce_ngram_config(ngram_payload) |
|
|
| training_payload = load_json(self.saved_dir / "model" / "training_config.json") |
| self.training_config = _coerce_training_config(training_payload) |
|
|
| self.tfidf_vectorizer = load_pickle(self.saved_dir / "tfidf_features" / "vectorizer.pkl") |
| self.char_vectorizer = load_pickle(self.saved_dir / "ngram_features" / "char_vectorizer.pkl") |
| self.pos_vectorizer = load_pickle(self.saved_dir / "ngram_features" / "pos_vectorizer.pkl") |
|
|
| self.model = None |
| self.threshold = float(load_json(self.model_dir / "threshold.json")["threshold"]) |
|
|
| feature_spec = load_json(self.model_dir / "feature_spec.json") |
| self.suffixes: list[str] = feature_spec["suffixes"] |
| |
| self.pairwise_column_pairs = [(f"text1_{suffix}", f"text2_{suffix}") for suffix in self.suffixes] |
|
|
| self.metrics = load_json(self.model_dir / "metrics.json") |
| self.nlp = None |
|
|
| def _load_model(self) -> XGBClassifier: |
| if self.model is None: |
| model_path = self.model_dir / "model.json" |
| if not model_path.exists(): |
| raise FileNotFoundError(f"Missing '{model_path}'") |
| model = XGBClassifier() |
| model.load_model(model_path) |
| self.model = model |
| return self.model |
|
|
|
|
| def _predict_positive_proba(self, X: np.ndarray) -> float: |
| model = self._load_model() |
| return float(model.predict_proba(X)[0, 1]) |
|
|
| def _mask_one_text(self, text: str) -> tuple[str, dict[str, Any]]: |
| if self.nlp is None: |
| self.nlp = load_nlp_model(config=self.spacy_config) |
| doc = self.nlp(text) |
| masked_text, _ = _apply_ner_mask(text, doc) |
| record = _build_linguistic_record(doc) |
| return masked_text, record |
| |
| def _build_pairwise_vector(self, feature_df: pd.DataFrame) -> np.ndarray: |
| row_values = feature_df.iloc[0].to_dict() |
| width = len(self.pairwise_column_pairs) * 2 |
| X_pair = np.empty((1, width), dtype=np.float32) |
| column_index = 0 |
| for left_col, right_col in self.pairwise_column_pairs: |
| left = np.float32(row_values.get(left_col, 0.0)) |
| right = np.float32(row_values.get(right_col, 0.0)) |
| diff = left - right |
| X_pair[0, column_index] = abs(diff) |
| X_pair[0, column_index + 1] = left * right |
| column_index += 2 |
|
|
| return X_pair |
|
|
| def _family_suffix_groups(self) -> dict[str, list[str]]: |
| return { |
| "tfidf": [s for s in self.suffixes if s.startswith("tfidf_")], |
| "char_ngrams": [s for s in self.suffixes if s.startswith("char") and "_tfidf_" in s], |
| "pos_ngrams": [s for s in self.suffixes if s.startswith("pos") and "_tfidf_" in s], |
| "scalar": [s for s in self.suffixes if not ( |
| s.startswith("tfidf_") |
| or (s.startswith("char") and "_tfidf_" in s) |
| or (s.startswith("pos") and "_tfidf_" in s) |
| )], |
| } |
| |
| def _build_global_pairwise_vector(self, feature_df: pd.DataFrame) -> np.ndarray: |
| row_values = feature_df.iloc[0].to_dict() |
| values: list[float] = [] |
|
|
|
|
| for family_suffixes in self._family_suffix_groups().values(): |
| if not family_suffixes: |
| continue |
|
|
| left = np.array([row_values.get(f"text1_{suffix}", 0.0) for suffix in family_suffixes], dtype=np.float32) |
| right = np.array([row_values.get(f"text2_{suffix}", 0.0) for suffix in family_suffixes], dtype=np.float32) |
|
|
| denominator = float(np.linalg.norm(left) * np.linalg.norm(right)) |
| cosine = float(np.dot(left, right) / denominator) if denominator > 0 else 0.0 |
| diff = left - right |
| l1 = float(np.abs(diff).sum()) |
| l2 = float(np.linalg.norm(diff)) |
|
|
| values.extend([cosine, l1, l2]) |
|
|
| return np.array(values, dtype=np.float32).reshape(1, -1) |
| |
| |
| def predict(self, text1: str, text2: str, threshold: float | None = None) -> PredictionResult: |
|
|
| threshold_value = self.threshold if threshold is None else float(threshold) |
|
|
| pair_df = pd.DataFrame([{ |
| "text1": normalize_text(text1, config=self.normalization_config), |
| "text2": normalize_text(text2, config=self.normalization_config), |
| "same": 0, |
| }]) |
|
|
| regex_masked_df, _ = regex_mask_split(pair_df) |
|
|
| |
| masked_text1, record1 = self._mask_one_text(regex_masked_df.iloc[0]["text1"]) |
| masked_text2, record2 = self._mask_one_text(regex_masked_df.iloc[0]["text2"]) |
|
|
| masked_df = regex_masked_df.copy() |
| masked_df.at[0, "text1"] = masked_text1 |
| masked_df.at[0, "text2"] = masked_text2 |
|
|
| split_cache = {"text1": [record1], "text2": [record2]} |
| feature_df = pd.DataFrame() |
|
|
| |
| |
| if self.training_config.include_statistical: |
| feature_df = extract_split_statistics( |
| masked_df, |
| split_cache=split_cache, |
| split_name="inference", |
| config=self.statistical_config, |
| ) |
|
|
| |
|
|
| if self.training_config.include_tfidf: |
| for column in ("text1", "text2"): |
| docs = [record_to_tfidf_text(record, config=self.tfidf_config) for record in split_cache[column]] |
| tfidf_matrix = self.tfidf_vectorizer.transform(docs).toarray() |
| tfidf_cols = [f"{column}_tfidf_{index:05d}" for index in range(tfidf_matrix.shape[1])] |
| tfidf_df = pd.DataFrame(tfidf_matrix, columns=tfidf_cols) |
| feature_df = pd.concat([feature_df.reset_index(drop=True), tfidf_df.reset_index(drop=True)], axis=1) |
|
|
| |
| |
| for column in ("text1", "text2"): |
|
|
| if self.training_config.include_char_ngrams: |
| char_docs = [ |
| " ".join(build_space_free_char_ngrams(text, n=self.ngram_config.char_ngram_n)) |
| for text in masked_df[column].tolist()] |
| char_matrix = self.char_vectorizer.transform(char_docs).toarray() |
| char_cols = [ |
| f"{column}_char{self.ngram_config.char_ngram_n}_tfidf_{index:05d}" |
| for index in range(char_matrix.shape[1])] |
| char_df = pd.DataFrame(char_matrix, columns=char_cols) |
| feature_df = pd.concat([feature_df.reset_index(drop=True), char_df.reset_index(drop=True)], axis=1) |
|
|
| if self.training_config.include_pos_ngrams: |
| pos_docs = [" ".join(record_to_pos_sequence(record)) for record in split_cache[column]] |
| pos_matrix = self.pos_vectorizer.transform(pos_docs).toarray() |
| pos_cols = [ |
| f"{column}_pos{self.ngram_config.pos_ngram_range}_tfidf_{index:05d}" |
| for index in range(pos_matrix.shape[1])] |
| pos_df = pd.DataFrame(pos_matrix, columns=pos_cols) |
| feature_df = pd.concat([feature_df.reset_index(drop=True), pos_df.reset_index(drop=True)], axis=1) |
|
|
| continue |
|
|
| |
|
|
| if self.training_config.include_readability: |
| readability_df = pd.DataFrame([{ |
| "text1_readability_flesch_kincaid_grade": round(textstat.flesch_kincaid_grade(masked_df.iloc[0]["text1"]), 5), |
| "text1_readability_gunning_fog": round(textstat.gunning_fog(masked_df.iloc[0]["text1"]), 5), |
| "text1_readability_smog": round(textstat.smog_index(masked_df.iloc[0]["text1"]), 5), |
| "text1_readability_coleman_liau": round(textstat.coleman_liau_index(masked_df.iloc[0]["text1"]), 5), |
|
|
| "text2_readability_flesch_kincaid_grade": round(textstat.flesch_kincaid_grade(masked_df.iloc[0]["text2"]), 5), |
| "text2_readability_gunning_fog": round(textstat.gunning_fog(masked_df.iloc[0]["text2"]), 5), |
| "text2_readability_smog": round(textstat.smog_index(masked_df.iloc[0]["text2"]), 5), |
| "text2_readability_coleman_liau": round(textstat.coleman_liau_index(masked_df.iloc[0]["text2"]), 5) |
| }]) |
| feature_df = pd.concat([feature_df.reset_index(drop=True), readability_df.reset_index(drop=True)], axis=1) |
|
|
| blocks: list[np.ndarray] = [] |
| if self.training_config.include_local_pairwise: |
| blocks.append(self._build_pairwise_vector(feature_df)) |
| if self.training_config.include_global_pairwise: |
| blocks.append(self._build_global_pairwise_vector(feature_df)) |
| if not blocks: |
| raise ValueError("At least one of include_local_pairwise or include_global_pairwise must be True.") |
|
|
| X = np.hstack(blocks).astype(np.float32) |
| probability_same = self._predict_positive_proba(X) |
| predicted_label = int(probability_same >= threshold_value) |
|
|
| return PredictionResult( |
| probability_same=probability_same, |
| predicted_label=predicted_label, |
| threshold=threshold_value, |
| normalized_text1=pair_df.iloc[0]["text1"], |
| normalized_text2=pair_df.iloc[0]["text2"], |
| masked_text1=masked_df.iloc[0]["text1"], |
| masked_text2=masked_df.iloc[0]["text2"], |
| ) |
|
|