| import logging |
| import pickle |
| import re |
| import shutil |
| from functools import lru_cache |
| from pathlib import Path |
|
|
| import numpy as np |
| import pandas as pd |
| from huggingface_hub import snapshot_download |
| from sklearn.linear_model import LogisticRegression, LogisticRegressionCV |
|
|
| from config import Config |
|
|
| LOGGER = logging.getLogger(__name__) |
|
|
|
|
| MODEL_FILES = { |
| "Logistic Regression": "Logistic_Regression.pkl", |
| "Random Forest": "Random_Forest.pkl", |
| |
| "Linear SVC": "Linear_SVC.pkl", |
| "Ridge Classifier": "Ridge_Classifier.pkl", |
| "Multinomial NB": "Multinomial_NB.pkl", |
| "Bernoulli NB": "Bernoulli_NB.pkl", |
| } |
|
|
| SKIP_MODELS = set() |
|
|
| REPO_ID = Config.REPO_ID_LANG |
| HF_TOKEN = Config.HF_TOKEN |
| NEPALI_SUBDIR = "Nepali_model" |
| REQUIRED_BASE_FILES = ("word_vectorizer.pkl", "char_vectorizer.pkl") |
|
|
|
|
| |
| DEFAULT_MODEL_RANKING = [ |
| "Gradient Boosting", |
| "Logistic Regression", |
| "Linear SVC", |
| "Ridge Classifier", |
| "Bernoulli NB", |
| "Random Forest", |
| "Multinomial NB", |
| ] |
|
|
|
|
| def _patch_legacy_logistic_model(model): |
| """Backfill attributes expected by newer sklearn versions.""" |
| if isinstance(model, (LogisticRegression, LogisticRegressionCV)) and not hasattr( |
| model, "multi_class" |
| ): |
| model.multi_class = "auto" |
| return model |
|
|
|
|
| class NepaliRichFeatures: |
| """Burstiness + stylometry feature extractor used during model training.""" |
|
|
| @staticmethod |
| def extract_burstiness(text: str) -> dict: |
| sentences = [s.strip() for s in re.split(r"[।!?]", str(text)) if s.strip()] |
| if not sentences: |
| return { |
| "burst_mean": 0.0, |
| "burst_std": 0.0, |
| "burst_max": 0.0, |
| "burst_min": 0.0, |
| "burst_range": 0.0, |
| } |
| lengths = [len(s.split()) for s in sentences] |
| return { |
| "burst_mean": float(np.mean(lengths)), |
| "burst_std": float(np.std(lengths)), |
| "burst_max": float(np.max(lengths)), |
| "burst_min": float(np.min(lengths)), |
| "burst_range": float(np.max(lengths) - np.min(lengths)), |
| } |
|
|
| @staticmethod |
| def extract_stylometry(text: str) -> dict: |
| words = str(text).split() |
| num_words = max(len(words), 1) |
| num_chars = max(len(str(text)), 1) |
| num_sentences = max( |
| len([s for s in re.split(r"[।!?]", str(text)) if s.strip()]), 1 |
| ) |
| avg_word_len = float(np.mean([len(w) for w in words])) if words else 0.0 |
| avg_sent_len = num_words / num_sentences |
| lexical_diversity = len(set(words)) / num_words |
| punct_count = ( |
| str(text).count("।") |
| + str(text).count("?") |
| + str(text).count("!") |
| + str(text).count(",") |
| ) |
| punct_ratio = punct_count / num_chars |
| bigrams = [" ".join(words[i : i + 2]) for i in range(len(words) - 1)] |
| rep_bigram_ratio = ( |
| (1.0 - len(set(bigrams)) / max(len(bigrams), 1)) if bigrams else 0.0 |
| ) |
| diacritic_count = sum(1 for c in str(text) if "\u093e" <= c <= "\u094d") |
| diacritic_ratio = diacritic_count / num_chars |
| return { |
| "num_words": num_words, |
| "num_chars": num_chars, |
| "num_sentences": num_sentences, |
| "avg_word_len": avg_word_len, |
| "avg_sent_len": avg_sent_len, |
| "lexical_diversity": lexical_diversity, |
| "punct_ratio": punct_ratio, |
| "rep_bigram_ratio": rep_bigram_ratio, |
| "diacritic_ratio": diacritic_ratio, |
| } |
|
|
| def transform(self, texts): |
| if isinstance(texts, str): |
| texts = [texts] |
| rows = [] |
| for text in texts: |
| row = {**self.extract_burstiness(text), **self.extract_stylometry(text)} |
| rows.append(row) |
| return pd.DataFrame(rows).values.astype(np.float32) |
|
|
|
|
| def _repo_root() -> Path: |
| return Path(__file__).resolve().parents[2] |
|
|
|
|
| def _has_required_artifacts(path: Path) -> bool: |
| if not path.exists() or not path.is_dir(): |
| return False |
| has_base = all((path / filename).exists() for filename in REQUIRED_BASE_FILES) |
| has_any_model = any((path / filename).exists() for filename in MODEL_FILES.values()) |
| return has_base and has_any_model |
|
|
|
|
| def _candidate_model_dirs() -> list[Path]: |
| candidates = [] |
| repo = _repo_root() |
|
|
| if Config.Nepali_model_folder: |
| custom = Path(Config.Nepali_model_folder) |
| candidates.extend([custom, custom / NEPALI_SUBDIR]) |
|
|
| default_dir = repo / "features" / "Model" / "Nepali_model" |
| candidates.extend([default_dir, default_dir / NEPALI_SUBDIR]) |
| candidates.append( |
| repo / "notebook" / "ai_vs_human_nepali" / "final_model" / "saved_models" |
| ) |
| return candidates |
|
|
|
|
| def _download_nepali_artifacts() -> None: |
| if not REPO_ID: |
| raise ValueError("English_model repo id is not configured") |
|
|
| repo = _repo_root() |
| target_dir = ( |
| Path(Config.Nepali_model_folder) |
| if Config.Nepali_model_folder |
| else repo / "features" / "Model" / "Nepali_model" |
| ) |
|
|
| snapshot_path = Path(snapshot_download(repo_id=REPO_ID, token=HF_TOKEN)) |
| source_dir = ( |
| snapshot_path / NEPALI_SUBDIR |
| if (snapshot_path / NEPALI_SUBDIR).is_dir() |
| else snapshot_path |
| ) |
|
|
| target_dir.mkdir(parents=True, exist_ok=True) |
| shutil.copytree(source_dir, target_dir, dirs_exist_ok=True) |
|
|
|
|
| def resolve_model_dir() -> Path: |
| for path in _candidate_model_dirs(): |
| if _has_required_artifacts(path): |
| return path |
|
|
| LOGGER.info("Nepali artifacts not found locally; downloading from %s", REPO_ID) |
| _download_nepali_artifacts() |
|
|
| for path in _candidate_model_dirs(): |
| if _has_required_artifacts(path): |
| return path |
|
|
| raise FileNotFoundError( |
| "Nepali model directory not found. Set Nepali_model env or add expected artifacts." |
| ) |
|
|
|
|
| @lru_cache(maxsize=1) |
| def load_artifacts(): |
| model_dir = resolve_model_dir() |
| LOGGER.info("Loading Nepali artifacts from %s", model_dir) |
|
|
| models = {} |
| unavailable = {} |
| for model_name, file_name in MODEL_FILES.items(): |
| if model_name in SKIP_MODELS: |
| unavailable[model_name] = "Skipped due to large artifact size" |
| continue |
| file_path = model_dir / file_name |
| if not file_path.exists(): |
| unavailable[model_name] = "Missing model file" |
| continue |
| with open(file_path, "rb") as fp: |
| models[model_name] = _patch_legacy_logistic_model(pickle.load(fp)) |
|
|
| with open(model_dir / "word_vectorizer.pkl", "rb") as fp: |
| word_vectorizer = pickle.load(fp) |
| with open(model_dir / "char_vectorizer.pkl", "rb") as fp: |
| char_vectorizer = pickle.load(fp) |
|
|
| rich_transformer = NepaliRichFeatures() |
| return { |
| "model_dir": str(model_dir), |
| "models": models, |
| "unavailable_models": unavailable, |
| "word_vectorizer": word_vectorizer, |
| "char_vectorizer": char_vectorizer, |
| "rich_transformer": rich_transformer, |
| } |
|
|
|
|
| def get_available_models(): |
| artifacts = load_artifacts() |
| return list(artifacts["models"].keys()) |
|
|
|
|
| def get_default_top_models(top_k: int = 2): |
| available = set(get_available_models()) |
| ranked = [name for name in DEFAULT_MODEL_RANKING if name in available] |
| if not ranked: |
| return list(available)[:top_k] |
| return ranked[: max(1, top_k)] |
|
|