|
|
|
|
|
|
| import os
|
|
|
|
|
| os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
|
| os.environ["TF_ENABLE_ONEDNN_OPTS"] = "0"
|
| os.environ["TRANSFORMERS_NO_TF"] = "1"
|
| os.environ["TRANSFORMERS_NO_TF2"] = "1"
|
| os.environ["USE_TF"] = "0"
|
| os.environ["DISABLE_MLFLOW_INTEGRATION"] = "1"
|
| os.environ["DISABLE_TELEMETRY"] = "1"
|
|
|
|
|
| import sys
|
| import torch
|
| import torch.nn.functional as F
|
| from transformers import AutoTokenizer, AutoModel
|
| from sentence_transformers import SentenceTransformer
|
| from typing import Dict, Tuple, Optional
|
| import logging
|
| from pathlib import Path
|
| import warnings
|
|
|
|
|
| warnings.filterwarnings("ignore")
|
|
|
|
|
| logging.basicConfig(
|
| level=logging.INFO,
|
| format="%(asctime)s - %(levelname)s - %(message)s",
|
| handlers=[
|
| logging.FileHandler("fusion_pipeline.log", encoding="utf-8"),
|
| logging.StreamHandler(sys.stdout),
|
| ],
|
| )
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
| class AkademikForgeFusion:
|
| """
|
| Pipeline fusion 3-model untuk deteksi plagiarisme:
|
| - BGE-M3 (embedding multi-lingual)
|
| - SBERT Multilingual (semantic similarity)
|
| - IndoBERT Fine-Tuned (plagiarism classifier)
|
| """
|
|
|
| def __init__(
|
| self,
|
| device: Optional[str] = None,
|
| weights: Optional[Dict[str, float]] = None,
|
| cache_dir: str = "Models",
|
| ):
|
|
|
| self.device = device or ("cuda" if torch.cuda.is_available() else "cpu")
|
| logger.info(f"π§ Using device: {self.device}")
|
|
|
|
|
| self.weights = weights or {
|
| "bge_m3": 0.35,
|
| "sbert": 0.35,
|
| "indobert": 0.30,
|
| }
|
| self._validate_weights()
|
|
|
|
|
| self.thresholds = {
|
| "high": 0.85,
|
| "medium": 0.70,
|
| "low": 0.0,
|
| }
|
|
|
|
|
| self.cache_dir = Path(cache_dir)
|
| if not self.cache_dir.exists():
|
| raise FileNotFoundError(
|
| f"β Model directory '{cache_dir}' tidak ditemukan!"
|
| )
|
|
|
|
|
| self._load_models()
|
|
|
| logger.info("β
Pipeline initialization completed!\n")
|
|
|
| def _validate_weights(self):
|
| total = sum(self.weights.values())
|
| if not (0.99 <= total <= 1.01):
|
| raise ValueError(f"β Total weights harus = 1.0, got {total}")
|
|
|
| def _load_models(self):
|
| """Load semua model dengan error handling"""
|
| try:
|
|
|
| logger.info("π Loading BGE-M3...")
|
| bge_path = self.cache_dir / "bge-m3"
|
| if not bge_path.exists():
|
| raise FileNotFoundError(f"BGE-M3 not found at {bge_path}")
|
|
|
| self.bge_tokenizer = AutoTokenizer.from_pretrained(
|
| str(bge_path),
|
| local_files_only=True,
|
| )
|
| self.bge_model = AutoModel.from_pretrained(
|
| str(bge_path),
|
| local_files_only=True,
|
| ).to(self.device)
|
| self.bge_model.eval()
|
| logger.info("β BGE-M3 loaded")
|
|
|
|
|
| logger.info("π Loading SBERT Multilingual...")
|
| sbert_path = self.cache_dir / "sbert-multilingual"
|
| if not sbert_path.exists():
|
| raise FileNotFoundError(f"SBERT not found at {sbert_path}")
|
|
|
| self.sbert = SentenceTransformer(
|
| str(sbert_path),
|
| device=self.device,
|
| )
|
| logger.info("β SBERT Multilingual loaded")
|
|
|
|
|
| logger.info("π Loading IndoBERT Fine-Tuned...")
|
| indo_path = self.cache_dir / "indobert-finetuned"
|
| if not indo_path.exists():
|
| raise FileNotFoundError(f"IndoBERT not found at {indo_path}")
|
|
|
| self.indo_tokenizer = AutoTokenizer.from_pretrained(
|
| str(indo_path),
|
| local_files_only=True,
|
| )
|
| self.indo_model = AutoModel.from_pretrained(
|
| str(indo_path),
|
| local_files_only=True,
|
| ).to(self.device)
|
| self.indo_model.eval()
|
| logger.info("β IndoBERT Fine-Tuned loaded")
|
|
|
| except Exception as e:
|
| logger.error(f"β Error loading models: {str(e)}")
|
| raise
|
|
|
| def _validate_text(self, text: str, name: str = "text") -> str:
|
| if not isinstance(text, str):
|
| raise TypeError(f"{name} harus berupa string, got {type(text)}")
|
|
|
| text = text.strip()
|
| if not text:
|
| raise ValueError(f"{name} tidak boleh kosong")
|
|
|
| if len(text) < 10:
|
| logger.warning(
|
| f"β οΈ {name} terlalu pendek (<10 karakter), hasil mungkin tidak akurat"
|
| )
|
|
|
| return text
|
|
|
| @torch.no_grad()
|
| def embed_bge(self, text: str) -> torch.Tensor:
|
| try:
|
| tokens = self.bge_tokenizer(
|
| text,
|
| return_tensors="pt",
|
| truncation=True,
|
| padding=True,
|
| max_length=512,
|
| ).to(self.device)
|
|
|
| output = self.bge_model(**tokens)
|
| emb = output.last_hidden_state.mean(dim=1)
|
| return emb.cpu()
|
|
|
| except Exception as e:
|
| logger.error(f"β Error in BGE embedding: {str(e)}")
|
| raise
|
|
|
| @torch.no_grad()
|
| def embed_indobert(self, text: str) -> torch.Tensor:
|
| try:
|
| tokens = self.indo_tokenizer(
|
| text,
|
| return_tensors="pt",
|
| truncation=True,
|
| padding=True,
|
| max_length=512,
|
| ).to(self.device)
|
|
|
| output = self.indo_model(**tokens)
|
| emb = output.last_hidden_state[:, 0, :]
|
| return emb.cpu()
|
|
|
| except Exception as e:
|
| logger.error(f"β Error in IndoBERT embedding: {str(e)}")
|
| raise
|
|
|
| def embed_sbert(self, text: str) -> torch.Tensor:
|
| try:
|
| emb = self.sbert.encode(
|
| text,
|
| convert_to_tensor=True,
|
| show_progress_bar=False,
|
| )
|
| return emb.cpu()
|
|
|
| except Exception as e:
|
| logger.error(f"β Error in SBERT embedding: {str(e)}")
|
| raise
|
|
|
| def cosine_similarity(self, a: torch.Tensor, b: torch.Tensor) -> float:
|
| a_norm = F.normalize(a, p=2, dim=-1)
|
| b_norm = F.normalize(b, p=2, dim=-1)
|
|
|
| similarity = F.cosine_similarity(a_norm, b_norm, dim=-1).item()
|
| return max(0.0, min(1.0, similarity))
|
|
|
| def _get_label(self, score: float) -> Tuple[str, str]:
|
| if score >= self.thresholds["high"]:
|
| return "Plagiarisme Tinggi", "π΄ TERDETEKSI PLAGIARISME"
|
| elif score >= self.thresholds["medium"]:
|
| return "Mirip / Parafrase", "π‘ KEMUNGKINAN PARAFRASE"
|
| else:
|
| return "Unik / Tidak Mirip", "π’ KONTEN ORIGINAL"
|
|
|
| def compare(
|
| self,
|
| text1: str,
|
| text2: str,
|
| return_embeddings: bool = False,
|
| verbose: bool = True,
|
| ) -> Dict:
|
| try:
|
| text1 = self._validate_text(text1, "text1")
|
| text2 = self._validate_text(text2, "text2")
|
|
|
| if verbose:
|
| logger.info("\nπ Generating embeddings...")
|
|
|
| if verbose:
|
| logger.info(" β BGE-M3...")
|
| bge1 = self.embed_bge(text1)
|
| bge2 = self.embed_bge(text2)
|
| sim_bge = self.cosine_similarity(bge1, bge2)
|
|
|
| if verbose:
|
| logger.info(" β SBERT Multilingual...")
|
| sbert1 = self.embed_sbert(text1)
|
| sbert2 = self.embed_sbert(text2)
|
| sim_sbert = self.cosine_similarity(sbert1, sbert2)
|
|
|
| if verbose:
|
| logger.info(" β IndoBERT Fine-Tuned...")
|
| indo1 = self.embed_indobert(text1)
|
| indo2 = self.embed_indobert(text2)
|
| sim_indo = self.cosine_similarity(indo1, indo2)
|
|
|
| fusion_score = (
|
| sim_bge * self.weights["bge_m3"]
|
| + sim_sbert * self.weights["sbert"]
|
| + sim_indo * self.weights["indobert"]
|
| )
|
|
|
| label, description = self._get_label(fusion_score)
|
|
|
| result = {
|
| "similarity": {
|
| "bge_m3": round(sim_bge, 4),
|
| "sbert_multilingual": round(sim_sbert, 4),
|
| "indobert": round(sim_indo, 4),
|
| "fusion_score": round(fusion_score, 4),
|
| },
|
| "label": label,
|
| "description": description,
|
| "weights_used": self.weights,
|
| }
|
|
|
| if return_embeddings:
|
| result["embeddings"] = {
|
| "bge_m3": (bge1, bge2),
|
| "sbert": (sbert1, sbert2),
|
| "indobert": (indo1, indo2),
|
| }
|
|
|
| return result
|
|
|
| except Exception as e:
|
| logger.error(f"β Error in comparison: {str(e)}")
|
| raise
|
|
|
| def batch_compare(
|
| self,
|
| text_pairs: list,
|
| verbose: bool = False,
|
| ) -> list:
|
| results = []
|
| total = len(text_pairs)
|
|
|
| logger.info(f"\nπ Processing {total} text pairs...")
|
|
|
| for idx, (text1, text2) in enumerate(text_pairs, 1):
|
| if not verbose:
|
| logger.info(f" [{idx}/{total}] Processing...")
|
|
|
| result = self.compare(text1, text2, verbose=verbose)
|
| results.append(result)
|
|
|
| logger.info("β
Batch processing completed!")
|
| return results
|
|
|
| def update_thresholds(
|
| self,
|
| high: Optional[float] = None,
|
| medium: Optional[float] = None,
|
| ):
|
| if high is not None:
|
| self.thresholds["high"] = high
|
| if medium is not None:
|
| self.thresholds["medium"] = medium
|
|
|
| logger.info(f"β Thresholds updated: {self.thresholds}")
|
|
|
| def update_weights(self, weights: Dict[str, float]):
|
| self.weights = weights
|
| self._validate_weights()
|
| logger.info(f"β Weights updated: {self.weights}")
|
|
|
| def __repr__(self) -> str:
|
| return (
|
| "AkademikForgeFusion(\n"
|
| f" device={self.device},\n"
|
| f" weights={self.weights},\n"
|
| f" thresholds={self.thresholds}\n"
|
| ")"
|
| )
|
|
|
|
|
| def main():
|
| ...
|
|
|
|
|
| if __name__ == "__main__":
|
| main()
|
|
|